mzcompose.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Basic tests for Persistence layer.
  11. """
  12. import os
  13. import time
  14. from argparse import Namespace
  15. from textwrap import dedent
  16. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  17. from materialize.mzcompose.services.kafka import Kafka
  18. from materialize.mzcompose.services.materialized import Materialized
  19. from materialize.mzcompose.services.mz import Mz
  20. from materialize.mzcompose.services.redpanda import Redpanda
  21. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  22. from materialize.mzcompose.services.testdrive import Testdrive
  23. from materialize.mzcompose.services.zookeeper import Zookeeper
  24. SERVICES = [
  25. Zookeeper(),
  26. Kafka(),
  27. SchemaRegistry(),
  28. Redpanda(),
  29. Mz(app_password=""),
  30. Materialized(),
  31. Testdrive(no_reset=True),
  32. ]
  33. td_test = os.environ.pop("TD_TEST", "*")
  34. def start_deps(
  35. c: Composition, args_or_parser: WorkflowArgumentParser | Namespace
  36. ) -> None:
  37. if isinstance(args_or_parser, Namespace):
  38. args = args_or_parser
  39. else:
  40. args_or_parser.add_argument(
  41. "--redpanda",
  42. action="store_true",
  43. help="run against Redpanda instead of the Confluent Platform",
  44. )
  45. args = args_or_parser.parse_args()
  46. if args.redpanda:
  47. dependencies = ["redpanda"]
  48. else:
  49. dependencies = ["zookeeper", "kafka", "schema-registry"]
  50. c.up(*dependencies)
  51. def workflow_kafka_sources(
  52. c: Composition, args_or_parser: WorkflowArgumentParser | Namespace
  53. ) -> None:
  54. start_deps(c, args_or_parser)
  55. seed = round(time.time())
  56. c.up("materialized")
  57. c.run_testdrive_files(f"--seed={seed}", f"kafka-sources/*{td_test}*-before.td")
  58. c.kill("materialized")
  59. c.up("materialized")
  60. # And restart again, for extra stress
  61. c.kill("materialized")
  62. c.up("materialized")
  63. c.run_testdrive_files(f"--seed={seed}", f"kafka-sources/*{td_test}*-after.td")
  64. # Do one more restart, just in case and just confirm that Mz is able to come up
  65. c.kill("materialized")
  66. c.up("materialized")
  67. c.kill("materialized")
  68. c.rm("materialized", "testdrive", destroy_volumes=True)
  69. c.rm_volumes("mzdata")
  70. def workflow_user_tables(
  71. c: Composition, args_or_parser: WorkflowArgumentParser | Namespace
  72. ) -> None:
  73. start_deps(c, args_or_parser)
  74. seed = round(time.time())
  75. c.up("materialized")
  76. c.run_testdrive_files(
  77. f"--seed={seed}",
  78. f"user-tables/table-persistence-before-{td_test}.td",
  79. )
  80. c.kill("materialized")
  81. c.up("materialized")
  82. c.kill("materialized")
  83. c.up("materialized")
  84. c.run_testdrive_files(
  85. f"--seed={seed}",
  86. f"user-tables/table-persistence-after-{td_test}.td",
  87. )
  88. c.kill("materialized")
  89. c.rm("materialized", "testdrive", destroy_volumes=True)
  90. c.rm_volumes("mzdata")
  91. def workflow_failpoints(c: Composition, parser: WorkflowArgumentParser) -> None:
  92. start_deps(c, parser)
  93. for failpoint in [
  94. "fileblob_set_sync",
  95. "fileblob_delete_before",
  96. "fileblob_delete_after",
  97. "insert_timestamp_bindings_before",
  98. "insert_timestamp_bindings_after",
  99. ]:
  100. for action in ["return", "panic", "sleep(1000)"]:
  101. run_one_failpoint(c, failpoint, action)
  102. def run_one_failpoint(c: Composition, failpoint: str, action: str) -> None:
  103. print(f">>> Running failpoint test for failpoint {failpoint} with action {action}")
  104. seed = round(time.time())
  105. c.up("materialized")
  106. c.run_testdrive_files(
  107. f"--seed={seed}",
  108. f"--var=failpoint={failpoint}",
  109. f"--var=action={action}",
  110. "failpoints/before.td",
  111. )
  112. time.sleep(2)
  113. # kill Mz if the failpoint has not killed it
  114. c.kill("materialized")
  115. c.up("materialized")
  116. c.run_testdrive_files(f"--seed={seed}", "failpoints/after.td")
  117. c.kill("materialized")
  118. c.rm("materialized", "testdrive", destroy_volumes=True)
  119. c.rm_volumes("mzdata")
  120. def workflow_compaction(c: Composition) -> None:
  121. with c.override(
  122. Materialized(options=["--metrics-scraping-interval=1s"]),
  123. ):
  124. c.up("materialized")
  125. c.run_testdrive_files("compaction/compaction.td")
  126. c.kill("materialized")
  127. c.rm("materialized", "testdrive", destroy_volumes=True)
  128. c.rm_volumes("mzdata")
  129. def workflow_inspect_shard(c: Composition) -> None:
  130. """Regression test for https://github.com/MaterializeInc/materialize/pull/21098"""
  131. c.up("materialized")
  132. c.sql(
  133. dedent(
  134. """
  135. CREATE TABLE foo (
  136. big0 string, big1 string, big2 string, big3 string, big4 string, big5 string,
  137. barTimestamp string,
  138. big6 string, big7 string
  139. );
  140. INSERT INTO foo VALUES (
  141. repeat('x', 1024), repeat('x', 1024), repeat('x', 1024), repeat('x', 1024), repeat('x', 1024), repeat('x', 1024),
  142. repeat('SENTINEL', 2048),
  143. repeat('x', 1024), repeat('x', 1024)
  144. );
  145. SELECT * FROM foo;
  146. """
  147. )
  148. )
  149. json_dict = c.sql_query("INSPECT SHARD 'u1'", port=6877, user="mz_system")[0][0]
  150. parts = [
  151. part
  152. for batch in json_dict["batches"]
  153. for part_run in batch["part_runs"]
  154. for part in part_run[1]
  155. ]
  156. non_empty_part = next(part for part in parts if part["encoded_size_bytes"] > 0)
  157. cols = non_empty_part["stats"]["cols"]["ok"]
  158. # Leading columns are present in the stats
  159. assert "SENTINEL" in cols["bartimestamp"]["lower"]
  160. assert "SENTINEL" in cols["bartimestamp"]["upper"]
  161. for col_name in ["big0", "big1", "big2"]:
  162. assert cols[col_name]["lower"].endswith("xxx")
  163. assert cols[col_name]["upper"].endswith("xxy")
  164. # Trailing columns not represented because of stats size limits
  165. for col_name in ["big3", "big4", "big5"]:
  166. assert col_name not in cols
  167. def workflow_default(c: Composition) -> None:
  168. def process(name: str) -> None:
  169. if name == "default":
  170. return
  171. if name in ["failpoints", "compaction"]:
  172. # Legacy tests, not currently operational
  173. return
  174. with c.test_case(name):
  175. c.workflow(name)
  176. c.test_parts(list(c.workflows.keys()), process)