mzcompose.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Testdrive is the basic framework and language for defining product tests under
  11. the expected-result/actual-result (aka golden testing) paradigm. A query is
  12. retried until it produces the desired result.
  13. """
  14. import glob
  15. import os
  16. from materialize import MZ_ROOT, ci_util, spawn
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.services.azurite import Azurite
  19. from materialize.mzcompose.services.fivetran_destination import FivetranDestination
  20. from materialize.mzcompose.services.kafka import Kafka
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.minio import Minio
  23. from materialize.mzcompose.services.mysql import MySql
  24. from materialize.mzcompose.services.mz import Mz
  25. from materialize.mzcompose.services.postgres import (
  26. METADATA_STORE,
  27. CockroachOrPostgresMetadata,
  28. Postgres,
  29. )
  30. from materialize.mzcompose.services.redpanda import Redpanda
  31. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  32. from materialize.mzcompose.services.testdrive import Testdrive
  33. from materialize.mzcompose.services.zookeeper import Zookeeper
  34. from materialize.source_table_migration import (
  35. get_new_image_for_source_table_migration_test,
  36. get_old_image_for_source_table_migration_test,
  37. verify_sources_after_source_table_migration,
  38. )
  39. SERVICES = [
  40. Zookeeper(),
  41. Kafka(),
  42. SchemaRegistry(),
  43. Redpanda(),
  44. Postgres(),
  45. MySql(),
  46. Minio(setup_materialize=True, additional_directories=["copytos3"]),
  47. Azurite(),
  48. Mz(app_password=""),
  49. Materialized(external_blob_store=True),
  50. CockroachOrPostgresMetadata(),
  51. FivetranDestination(volumes_extra=["tmp:/share/tmp"]),
  52. Testdrive(external_blob_store=True),
  53. ]
  54. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  55. """Run testdrive."""
  56. parser.add_argument(
  57. "--redpanda",
  58. action="store_true",
  59. help="run against Redpanda instead of the Confluent Platform",
  60. )
  61. parser.add_argument(
  62. "--aws-region",
  63. help="run against the specified AWS region instead of localstack",
  64. )
  65. parser.add_argument(
  66. "--kafka-default-partitions",
  67. type=int,
  68. metavar="N",
  69. help="set the default number of kafka partitions per topic",
  70. )
  71. parser.add_argument(
  72. "--default-size",
  73. type=int,
  74. default=Materialized.Size.DEFAULT_SIZE,
  75. help="Use SIZE 'N-N' for replicas and SIZE 'N' for sources",
  76. )
  77. parser.add_argument(
  78. "--system-param",
  79. type=str,
  80. action="append",
  81. nargs="*",
  82. help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
  83. )
  84. parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas")
  85. parser.add_argument(
  86. "--default-timeout",
  87. type=str,
  88. help="set the default timeout for Testdrive",
  89. )
  90. parser.add_argument(
  91. "--rewrite-results",
  92. action="store_true",
  93. help="Rewrite results, disables junit reports",
  94. )
  95. parser.add_argument(
  96. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  97. )
  98. parser.add_argument(
  99. "files",
  100. nargs="*",
  101. default=["*.td"],
  102. help="run against the specified files",
  103. )
  104. (args, passthrough_args) = parser.parse_known_args()
  105. dependencies = [
  106. "fivetran-destination",
  107. "materialized",
  108. "postgres",
  109. "mysql",
  110. ]
  111. if args.redpanda:
  112. dependencies += ["redpanda"]
  113. else:
  114. dependencies += ["zookeeper", "kafka", "schema-registry"]
  115. sysparams = args.system_param
  116. if not args.system_param:
  117. sysparams = []
  118. additional_system_parameter_defaults = {"default_cluster_replication_factor": "1"}
  119. for val in sysparams:
  120. x = val[0].split("=", maxsplit=1)
  121. assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
  122. key = x[0]
  123. val = x[1]
  124. additional_system_parameter_defaults[key] = val
  125. materialized = Materialized(
  126. default_size=args.default_size,
  127. external_blob_store=True,
  128. blob_store_is_azure=args.azurite,
  129. additional_system_parameter_defaults=additional_system_parameter_defaults,
  130. default_replication_factor=1,
  131. )
  132. testdrive = Testdrive(
  133. forward_buildkite_shard=True,
  134. kafka_default_partitions=args.kafka_default_partitions,
  135. aws_region=args.aws_region,
  136. validate_catalog_store=True,
  137. default_timeout=args.default_timeout,
  138. volumes_extra=["mzdata:/mzdata"],
  139. external_blob_store=True,
  140. blob_store_is_azure=args.azurite,
  141. fivetran_destination=True,
  142. fivetran_destination_files_path="/share/tmp",
  143. entrypoint_extra=[
  144. f"--var=uses-redpanda={args.redpanda}",
  145. ],
  146. )
  147. with c.override(testdrive, materialized):
  148. c.up(*dependencies)
  149. c.sql(
  150. "ALTER SYSTEM SET max_clusters = 50;",
  151. port=6877,
  152. user="mz_system",
  153. )
  154. non_default_testdrive_vars = []
  155. if args.replicas > 1:
  156. c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
  157. # Make sure a replica named 'r1' always exists
  158. replica_names = [
  159. "r1" if replica_id == 0 else f"replica{replica_id}"
  160. for replica_id in range(0, args.replicas)
  161. ]
  162. replica_string = ",".join(
  163. f"{replica_name} (SIZE '{materialized.default_replica_size}')"
  164. for replica_name in replica_names
  165. )
  166. c.sql(
  167. f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
  168. user="mz_system",
  169. port=6877,
  170. )
  171. # Note that any command that outputs SHOW CLUSTERS will have output
  172. # that depends on the number of replicas testdrive has. This means
  173. # it might be easier to skip certain tests if the number of replicas
  174. # is > 1.
  175. c.sql(
  176. f"""
  177. CREATE CLUSTER testdrive_single_replica_cluster SIZE = '{materialized.default_replica_size}';
  178. GRANT ALL PRIVILEGES ON CLUSTER testdrive_single_replica_cluster TO materialize;
  179. """,
  180. user="mz_system",
  181. port=6877,
  182. )
  183. non_default_testdrive_vars.append(f"--var=replicas={args.replicas}")
  184. non_default_testdrive_vars.append(
  185. "--var=single-replica-cluster=testdrive_single_replica_cluster"
  186. )
  187. if args.default_size != 1:
  188. non_default_testdrive_vars.append(
  189. f"--var=default-replica-size={materialized.default_replica_size}"
  190. )
  191. non_default_testdrive_vars.append(
  192. f"--var=default-storage-size={materialized.default_storage_size}"
  193. )
  194. junit_report = ci_util.junit_report_filename(c.name)
  195. print(f"Passing through arguments to testdrive {passthrough_args}\n")
  196. # do not set default args, they should be set in the td file using set-arg-default to easen the execution
  197. # without mzcompose
  198. def process(file: str) -> None:
  199. c.run_testdrive_files(
  200. (
  201. "--rewrite-results"
  202. if args.rewrite_results
  203. else f"--junit-report={junit_report}"
  204. ),
  205. *non_default_testdrive_vars,
  206. *passthrough_args,
  207. file,
  208. persistent=False,
  209. )
  210. # Uploading successful junit files wastes time and contains no useful information
  211. os.remove(f"test/testdrive-old-kafka-src-syntax/{junit_report}")
  212. c.test_parts(args.files, process)
  213. c.sanity_restart_mz()
  214. def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None:
  215. """Run testdrive."""
  216. parser.add_argument(
  217. "--redpanda",
  218. action="store_true",
  219. help="run against Redpanda instead of the Confluent Platform",
  220. )
  221. parser.add_argument(
  222. "--aws-region",
  223. help="run against the specified AWS region instead of localstack",
  224. )
  225. parser.add_argument(
  226. "--kafka-default-partitions",
  227. type=int,
  228. metavar="N",
  229. help="set the default number of kafka partitions per topic",
  230. )
  231. parser.add_argument(
  232. "--system-param",
  233. type=str,
  234. action="append",
  235. nargs="*",
  236. help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
  237. )
  238. parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas")
  239. parser.add_argument(
  240. "--default-timeout",
  241. type=str,
  242. help="set the default timeout for Testdrive",
  243. )
  244. parser.add_argument(
  245. "--rewrite-results",
  246. action="store_true",
  247. help="Rewrite results, disables junit reports",
  248. )
  249. parser.add_argument(
  250. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  251. )
  252. parser.add_argument(
  253. "files",
  254. nargs="*",
  255. default=["*.td"],
  256. help="run against the specified files",
  257. )
  258. (args, _) = parser.parse_known_args()
  259. matching_files = []
  260. for filter in args.files:
  261. matching_files.extend(
  262. glob.glob(
  263. filter, root_dir=MZ_ROOT / "test" / "testdrive-old-kafka-src-syntax"
  264. )
  265. )
  266. # Exclude status-history.td because we added the replica_id column. This a)
  267. # makes it so we lose history across migrations and b) makes it hard to
  268. # write a test because the column is present in the newest version but not
  269. # older versions.
  270. matching_files = [
  271. file
  272. for file in matching_files
  273. if file != "session.td" and file != "status-history.td"
  274. ]
  275. matching_files: list[str] = sorted(matching_files)
  276. dependencies = [
  277. "fivetran-destination",
  278. "minio",
  279. "materialized",
  280. "postgres",
  281. "mysql",
  282. ]
  283. if args.redpanda:
  284. kafka_deps = ["redpanda"]
  285. else:
  286. kafka_deps = ["zookeeper", "kafka", "schema-registry"]
  287. dependencies += kafka_deps
  288. sysparams = args.system_param
  289. if not args.system_param:
  290. sysparams = []
  291. additional_system_parameter_defaults = {}
  292. for val in sysparams:
  293. x = val[0].split("=", maxsplit=1)
  294. assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
  295. key = x[0]
  296. val = x[1]
  297. additional_system_parameter_defaults[key] = val
  298. mz_old = Materialized(
  299. default_size=Materialized.Size.DEFAULT_SIZE,
  300. image=get_old_image_for_source_table_migration_test(),
  301. external_blob_store=True,
  302. blob_store_is_azure=args.azurite,
  303. additional_system_parameter_defaults=dict(additional_system_parameter_defaults),
  304. )
  305. testdrive = Testdrive(
  306. forward_buildkite_shard=True,
  307. kafka_default_partitions=args.kafka_default_partitions,
  308. aws_region=args.aws_region,
  309. default_timeout=args.default_timeout,
  310. volumes_extra=["mzdata:/mzdata"],
  311. external_blob_store=True,
  312. blob_store_is_azure=args.azurite,
  313. fivetran_destination=True,
  314. fivetran_destination_files_path="/share/tmp",
  315. entrypoint_extra=[
  316. f"--var=uses-redpanda={args.redpanda}",
  317. ],
  318. )
  319. x = dict(additional_system_parameter_defaults)
  320. additional_system_parameter_defaults["force_source_table_syntax"] = "true"
  321. mz_new = Materialized(
  322. default_size=Materialized.Size.DEFAULT_SIZE,
  323. image=get_new_image_for_source_table_migration_test(),
  324. external_blob_store=True,
  325. blob_store_is_azure=args.azurite,
  326. additional_system_parameter_defaults=additional_system_parameter_defaults,
  327. )
  328. for file in matching_files:
  329. with c.override(testdrive, mz_old):
  330. c.rm("testdrive")
  331. c.up(*dependencies)
  332. c.sql(
  333. "ALTER SYSTEM SET max_clusters = 50;",
  334. port=6877,
  335. user="mz_system",
  336. )
  337. non_default_testdrive_vars = []
  338. if args.replicas > 1:
  339. c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
  340. # Make sure a replica named 'r1' always exists
  341. replica_names = [
  342. "r1" if replica_id == 0 else f"replica{replica_id}"
  343. for replica_id in range(0, args.replicas)
  344. ]
  345. replica_string = ",".join(
  346. f"{replica_name} (SIZE '{mz_old.default_replica_size}')"
  347. for replica_name in replica_names
  348. )
  349. c.sql(
  350. f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
  351. user="mz_system",
  352. port=6877,
  353. )
  354. # Note that any command that outputs SHOW CLUSTERS will have output
  355. # that depends on the number of replicas testdrive has. This means
  356. # it might be easier to skip certain tests if the number of replicas
  357. # is > 1.
  358. c.sql(
  359. f"""
  360. CREATE CLUSTER testdrive_single_replica_cluster SIZE = '{mz_old.default_replica_size}';
  361. GRANT ALL PRIVILEGES ON CLUSTER testdrive_single_replica_cluster TO materialize;
  362. """,
  363. user="mz_system",
  364. port=6877,
  365. )
  366. non_default_testdrive_vars.append(f"--var=replicas={args.replicas}")
  367. non_default_testdrive_vars.append(
  368. "--var=single-replica-cluster=testdrive_single_replica_cluster"
  369. )
  370. non_default_testdrive_vars.append(
  371. f"--var=default-replica-size={mz_old.default_replica_size}"
  372. )
  373. non_default_testdrive_vars.append(
  374. f"--var=default-storage-size={mz_old.default_storage_size}"
  375. )
  376. print(f"Running {file} with mz_old")
  377. c.run_testdrive_files(
  378. *non_default_testdrive_vars,
  379. "--no-reset",
  380. file,
  381. persistent=False,
  382. )
  383. c.kill("materialized", wait=True)
  384. with c.override(testdrive, mz_new):
  385. c.rm("testdrive")
  386. c.up("materialized")
  387. print("Running mz_new")
  388. verify_sources_after_source_table_migration(c, file)
  389. c.kill("materialized", wait=True)
  390. c.kill("postgres", wait=True)
  391. c.kill("mysql", wait=True)
  392. c.kill(METADATA_STORE, wait=True)
  393. for dep in kafka_deps:
  394. c.kill(dep, wait=True)
  395. for dep in kafka_deps:
  396. c.rm(dep)
  397. c.rm("materialized")
  398. c.rm(METADATA_STORE)
  399. c.rm("postgres")
  400. c.rm("mysql")
  401. # remove the testdrive container which uses the mzdata volume
  402. testdrive_container_id = spawn.capture(
  403. ["docker", "ps", "-a", "--filter", f"volume={c.name}_mzdata", "-q"]
  404. ).strip()
  405. spawn.runv(["docker", "rm", testdrive_container_id])
  406. c.rm_volumes("mzdata", force=True)