mzcompose.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test that real-time recency works w/ slow ingest of upstream data from Kafka+MySQL+Postgres
  11. """
  12. import random
  13. import threading
  14. import time
  15. from materialize.mzcompose.composition import Composition
  16. from materialize.mzcompose.services.kafka import Kafka
  17. from materialize.mzcompose.services.materialized import Materialized
  18. from materialize.mzcompose.services.mysql import MySql
  19. from materialize.mzcompose.services.postgres import Postgres
  20. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  21. from materialize.mzcompose.services.testdrive import Testdrive
  22. from materialize.mzcompose.services.zookeeper import Zookeeper
  23. from materialize.util import PropagatingThread
  24. SERVICES = [
  25. Zookeeper(),
  26. Kafka(),
  27. SchemaRegistry(),
  28. MySql(),
  29. Postgres(),
  30. Materialized(),
  31. Testdrive(
  32. entrypoint_extra=[
  33. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  34. ],
  35. ),
  36. ]
  37. def workflow_default(c: Composition) -> None:
  38. c.down(destroy_volumes=True)
  39. c.up("zookeeper", "kafka", "schema-registry", "postgres", "mysql", "materialized")
  40. seed = random.getrandbits(16)
  41. c.run_testdrive_files(
  42. "--no-reset",
  43. "--max-errors=1",
  44. f"--seed={seed}",
  45. "rtr/mz-setup.td",
  46. )
  47. running = True
  48. def query():
  49. with c.sql_cursor() as cursor:
  50. cursor.execute("SET TRANSACTION_ISOLATION = 'STRICT SERIALIZABLE'")
  51. cursor.execute("SET REAL_TIME_RECENCY TO TRUE")
  52. queries = [
  53. """SELECT sum(count) FROM (
  54. SELECT count(*) FROM table_mysql
  55. UNION ALL SELECT count(*) FROM table_pg
  56. UNION ALL SELECT count(*) FROM input_kafka)""",
  57. "SELECT sum FROM sum",
  58. ]
  59. thread_name = threading.current_thread().getName()
  60. while running:
  61. for query in queries:
  62. start_time = time.time()
  63. cursor.execute(query.encode())
  64. results = cursor.fetchone()
  65. assert results
  66. runtime = time.time() - start_time
  67. print(f"{thread_name}: {results[0]} ({runtime} s)")
  68. threads = [PropagatingThread(target=query, name=f"verify{i}") for i in range(10)]
  69. for thread in threads:
  70. thread.start()
  71. end_time = time.time() + 150
  72. while time.time() < end_time:
  73. # Only reaches one execution every 4 seconds currently, instead of the targetted 1 second, so probably need to parallelize this
  74. start_time = time.time()
  75. c.run_testdrive_files(
  76. "--no-reset",
  77. "--max-errors=1",
  78. f"--seed={seed}",
  79. "rtr/ingest.td",
  80. )
  81. runtime = time.time() - start_time
  82. print(f"ingest: {runtime} s")
  83. running = False
  84. for thread in threads:
  85. thread.join()