mzcompose.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional test for Kafka with real-time recency enabled. Queries should block
  11. until results are available instead of returning out of date results.
  12. """
  13. import random
  14. import threading
  15. import time
  16. from textwrap import dedent
  17. from psycopg import Cursor
  18. from materialize import buildkite
  19. from materialize.mzcompose.composition import Composition
  20. from materialize.mzcompose.services.kafka import Kafka
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.mz import Mz
  23. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  24. from materialize.mzcompose.services.testdrive import Testdrive
  25. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  26. from materialize.mzcompose.services.zookeeper import Zookeeper
  27. from materialize.util import PropagatingThread
  28. SERVICES = [
  29. Zookeeper(),
  30. Kafka(),
  31. SchemaRegistry(),
  32. Mz(app_password=""),
  33. Materialized(default_replication_factor=2),
  34. Toxiproxy(),
  35. Testdrive(no_reset=True, seed=1),
  36. ]
  37. def workflow_default(c: Composition) -> None:
  38. def process(name: str) -> None:
  39. if name == "default":
  40. return
  41. # TODO: Reenable when database-issues#8657 is fixed
  42. if name == "multithreaded":
  43. return
  44. with c.test_case(name):
  45. c.workflow(name)
  46. workflows = buildkite.shard_list(list(c.workflows), lambda w: w)
  47. c.test_parts(workflows, process)
  48. #
  49. # Test that real-time recency works w/ slow ingest of upstream data.
  50. #
  51. def workflow_simple(c: Composition) -> None:
  52. c.down(destroy_volumes=True)
  53. c.up("zookeeper", "kafka", "schema-registry", "materialized", "toxiproxy")
  54. seed = random.getrandbits(16)
  55. c.run_testdrive_files(
  56. "--max-errors=1",
  57. f"--seed={seed}",
  58. f"--temp-dir=/share/tmp/kafka-resumption-{seed}",
  59. "simple/toxiproxy-setup.td",
  60. "simple/mz-setup.td",
  61. "simple/verify-rtr.td",
  62. )
  63. def workflow_resumption(c: Composition) -> None:
  64. c.down(destroy_volumes=True)
  65. c.up("zookeeper", "kafka", "schema-registry", "materialized", "toxiproxy")
  66. priv_cursor = c.sql_cursor(service="materialized", user="mz_system", port=6877)
  67. priv_cursor.execute("ALTER SYSTEM SET allow_real_time_recency = true;")
  68. def run_verification_query() -> Cursor:
  69. cursor = c.sql_cursor()
  70. cursor.execute("SET TRANSACTION_ISOLATION = 'STRICT SERIALIZABLE'")
  71. cursor.execute("SET REAL_TIME_RECENCY TO TRUE")
  72. cursor.execute("SET statement_timeout = '600s'")
  73. cursor.execute(
  74. """
  75. SELECT sum(count)
  76. FROM (
  77. SELECT count(*) FROM input_1_tbl
  78. UNION ALL SELECT count(*) FROM input_2_tbl
  79. UNION ALL SELECT count(*) FROM t
  80. ) AS x;"""
  81. )
  82. return cursor
  83. def verify_ok():
  84. cursor = run_verification_query()
  85. result = cursor.fetchall()
  86. assert result[0][0] == 2000204, f"Unexpected sum: {result[0][0]}"
  87. def verify_broken():
  88. try:
  89. run_verification_query()
  90. except Exception as e:
  91. assert (
  92. "timed out before ingesting the source's visible frontier when real-time-recency query issued"
  93. in str(e)
  94. )
  95. seed = random.getrandbits(16)
  96. for i, failure_mode in enumerate(
  97. [
  98. "toxiproxy-close-connection.td",
  99. "toxiproxy-limit-connection.td",
  100. "toxiproxy-timeout.td",
  101. "toxiproxy-timeout-hold.td",
  102. ]
  103. ):
  104. print(f"Running failure mode {failure_mode}...")
  105. c.run_testdrive_files(
  106. f"--seed={seed}{i}",
  107. f"--temp-dir=/share/tmp/kafka-resumption-{seed}",
  108. "resumption/toxiproxy-setup.td", # without toxify
  109. "resumption/mz-setup.td",
  110. f"resumption/{failure_mode}",
  111. "resumption/ingest-data.td",
  112. )
  113. t1 = PropagatingThread(target=verify_broken)
  114. t1.start()
  115. time.sleep(10)
  116. c.run_testdrive_files(
  117. "resumption/toxiproxy-restore-connection.td",
  118. )
  119. t1.join()
  120. t2 = PropagatingThread(target=verify_ok)
  121. t2.start()
  122. time.sleep(10)
  123. t2.join()
  124. # reset toxiproxy
  125. c.kill("toxiproxy")
  126. c.up("toxiproxy")
  127. c.run_testdrive_files(
  128. "resumption/mz-reset.td",
  129. )
  130. def workflow_multithreaded(c: Composition) -> None:
  131. c.down(destroy_volumes=True)
  132. c.up(
  133. "zookeeper",
  134. "kafka",
  135. "schema-registry",
  136. "materialized",
  137. {"name": "testdrive", "persistent": True},
  138. )
  139. value = [201]
  140. lock = threading.Lock()
  141. running = True
  142. def run(value):
  143. cursor = c.sql_cursor()
  144. cursor.execute("SET TRANSACTION_ISOLATION = 'STRICT SERIALIZABLE'")
  145. cursor.execute("SET REAL_TIME_RECENCY TO TRUE")
  146. repeat = 1
  147. while running:
  148. with lock:
  149. c.testdrive(
  150. dedent(
  151. f"""
  152. $ kafka-ingest topic=input_1 format=bytes repeat={repeat}
  153. A,B,0
  154. $ kafka-ingest topic=input_2 format=bytes repeat={repeat}
  155. A,B,0
  156. """
  157. )
  158. )
  159. value[0] += repeat * 2
  160. expected = value[0]
  161. repeat *= 2
  162. cursor.execute("BEGIN")
  163. cursor.execute(
  164. """
  165. SELECT sum(count)
  166. FROM (
  167. SELECT count(*) FROM input_1_tbl
  168. UNION ALL SELECT count(*) FROM input_2_tbl
  169. UNION ALL SELECT count(*) FROM t
  170. ) AS x;"""
  171. )
  172. result = cursor.fetchall()
  173. assert result[0][0] >= expected, f"Expected {expected}, got {result[0][0]}"
  174. cursor.execute("COMMIT")
  175. c.testdrive(
  176. dedent(
  177. """
  178. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  179. ALTER SYSTEM SET allow_real_time_recency = true
  180. $ kafka-create-topic topic=input_1
  181. $ kafka-ingest topic=input_1 format=bytes repeat=100
  182. A,B,0
  183. $ kafka-create-topic topic=input_2
  184. $ kafka-ingest topic=input_2 format=bytes repeat=100
  185. A,B,0
  186. > CREATE CONNECTION IF NOT EXISTS kafka_conn_1 TO KAFKA (BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT);
  187. > CREATE CONNECTION IF NOT EXISTS kafka_conn_2 TO KAFKA (BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT);
  188. > CREATE SOURCE input_1
  189. FROM KAFKA CONNECTION kafka_conn_1 (TOPIC 'testdrive-input_1-${testdrive.seed}')
  190. > CREATE TABLE input_1_tbl (city, state, zip) FROM SOURCE input_1 (REFERENCE "testdrive-input_1-${testdrive.seed}")
  191. FORMAT CSV WITH 3 COLUMNS
  192. > CREATE SOURCE input_2
  193. FROM KAFKA CONNECTION kafka_conn_2 (TOPIC 'testdrive-input_2-${testdrive.seed}')
  194. > CREATE TABLE input_2_tbl (city, state, zip) FROM SOURCE input_2 (REFERENCE "testdrive-input_2-${testdrive.seed}")
  195. FORMAT CSV WITH 3 COLUMNS
  196. > CREATE TABLE t (a int);
  197. > INSERT INTO t VALUES (1);
  198. > CREATE MATERIALIZED VIEW sum AS
  199. SELECT sum(count)
  200. FROM (
  201. SELECT count(*) FROM input_1_tbl
  202. UNION ALL SELECT count(*) FROM input_2_tbl
  203. UNION ALL SELECT count(*) FROM t
  204. ) AS x;
  205. """
  206. )
  207. )
  208. threads = [PropagatingThread(target=run, args=(value,)) for i in range(10)]
  209. for thread in threads:
  210. thread.start()
  211. time.sleep(600)
  212. running = False
  213. for thread in threads:
  214. thread.join()