mzcompose.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Testdrive is the basic framework and language for defining product tests under
  11. the expected-result/actual-result (aka golden testing) paradigm. A query is
  12. retried until it produces the desired result.
  13. """
  14. import glob
  15. import os
  16. from materialize import MZ_ROOT, buildkite, ci_util
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.services.azurite import Azurite
  19. from materialize.mzcompose.services.fivetran_destination import FivetranDestination
  20. from materialize.mzcompose.services.kafka import Kafka
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.minio import Minio
  23. from materialize.mzcompose.services.mysql import MySql
  24. from materialize.mzcompose.services.mz import Mz
  25. from materialize.mzcompose.services.postgres import Postgres
  26. from materialize.mzcompose.services.redpanda import Redpanda
  27. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  28. from materialize.mzcompose.services.testdrive import Testdrive
  29. from materialize.mzcompose.services.zookeeper import Zookeeper
  30. SERVICES = [
  31. Zookeeper(),
  32. Kafka(),
  33. SchemaRegistry(),
  34. Redpanda(),
  35. Postgres(),
  36. MySql(),
  37. Azurite(),
  38. Mz(app_password=""),
  39. Minio(setup_materialize=True, additional_directories=["copytos3"]),
  40. Materialized(external_blob_store=True),
  41. FivetranDestination(volumes_extra=["tmp:/share/tmp"]),
  42. Testdrive(external_blob_store=True),
  43. ]
  44. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  45. """Run testdrive."""
  46. parser.add_argument(
  47. "--slow",
  48. action="store_true",
  49. help="include slow tests (usually only in Nightly)",
  50. )
  51. parser.add_argument(
  52. "--redpanda",
  53. action="store_true",
  54. help="run against Redpanda instead of the Confluent Platform",
  55. )
  56. parser.add_argument(
  57. "--aws-region",
  58. help="run against the specified AWS region instead of localstack",
  59. )
  60. parser.add_argument(
  61. "--kafka-default-partitions",
  62. type=int,
  63. metavar="N",
  64. help="set the default number of kafka partitions per topic",
  65. )
  66. parser.add_argument(
  67. "--default-size",
  68. type=int,
  69. default=Materialized.Size.DEFAULT_SIZE,
  70. help="Use SIZE 'N-N' for replicas and SIZE 'N' for sources",
  71. )
  72. parser.add_argument(
  73. "--system-param",
  74. type=str,
  75. action="append",
  76. nargs="*",
  77. help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
  78. )
  79. parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas")
  80. parser.add_argument(
  81. "--default-timeout",
  82. type=str,
  83. help="set the default timeout for Testdrive",
  84. )
  85. parser.add_argument(
  86. "--rewrite-results",
  87. action="store_true",
  88. help="Rewrite results, disables junit reports",
  89. )
  90. parser.add_argument(
  91. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  92. )
  93. parser.add_argument(
  94. "files",
  95. nargs="*",
  96. default=["*.td"],
  97. help="run against the specified files",
  98. )
  99. (args, passthrough_args) = parser.parse_known_args()
  100. dependencies = [
  101. "fivetran-destination",
  102. "materialized",
  103. "postgres",
  104. "mysql",
  105. "minio",
  106. ]
  107. if args.redpanda:
  108. dependencies += ["redpanda"]
  109. else:
  110. dependencies += ["zookeeper", "kafka", "schema-registry"]
  111. additional_system_parameter_defaults = {"default_cluster_replication_factor": "1"}
  112. for val in args.system_param or []:
  113. x = val[0].split("=", maxsplit=1)
  114. assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
  115. additional_system_parameter_defaults[x[0]] = x[1]
  116. materialized = Materialized(
  117. default_size=args.default_size,
  118. external_blob_store=True,
  119. blob_store_is_azure=args.azurite,
  120. additional_system_parameter_defaults=additional_system_parameter_defaults,
  121. default_replication_factor=1,
  122. )
  123. testdrive = Testdrive(
  124. kafka_default_partitions=args.kafka_default_partitions,
  125. aws_region=args.aws_region,
  126. validate_catalog_store=True,
  127. default_timeout=args.default_timeout,
  128. volumes_extra=["mzdata:/mzdata"],
  129. external_blob_store=True,
  130. blob_store_is_azure=args.azurite,
  131. fivetran_destination=True,
  132. fivetran_destination_files_path="/share/tmp",
  133. entrypoint_extra=[
  134. f"--var=uses-redpanda={args.redpanda}",
  135. ],
  136. )
  137. with c.override(testdrive, materialized):
  138. c.up(*dependencies, {"name": "testdrive", "persistent": True})
  139. c.sql(
  140. "ALTER SYSTEM SET max_clusters = 50;",
  141. port=6877,
  142. user="mz_system",
  143. )
  144. non_default_testdrive_vars = []
  145. if args.replicas > 1:
  146. c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
  147. # Make sure a replica named 'r1' always exists
  148. replica_names = [
  149. "r1" if replica_id == 0 else f"replica{replica_id}"
  150. for replica_id in range(0, args.replicas)
  151. ]
  152. replica_string = ",".join(
  153. f"{replica_name} (SIZE '{materialized.default_replica_size}')"
  154. for replica_name in replica_names
  155. )
  156. c.sql(
  157. f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
  158. user="mz_system",
  159. port=6877,
  160. )
  161. # Note that any command that outputs SHOW CLUSTERS will have output
  162. # that depends on the number of replicas testdrive has. This means
  163. # it might be easier to skip certain tests if the number of replicas
  164. # is > 1.
  165. c.sql(
  166. f"""
  167. CREATE CLUSTER testdrive_single_replica_cluster SIZE = '{materialized.default_replica_size}';
  168. GRANT ALL PRIVILEGES ON CLUSTER testdrive_single_replica_cluster TO materialize;
  169. """,
  170. user="mz_system",
  171. port=6877,
  172. )
  173. non_default_testdrive_vars.append(f"--var=replicas={args.replicas}")
  174. non_default_testdrive_vars.append(
  175. "--var=single-replica-cluster=testdrive_single_replica_cluster"
  176. )
  177. if args.default_size != 1:
  178. non_default_testdrive_vars.append(
  179. f"--var=default-replica-size={materialized.default_replica_size}"
  180. )
  181. non_default_testdrive_vars.append(
  182. f"--var=default-storage-size={materialized.default_storage_size}"
  183. )
  184. print(f"Passing through arguments to testdrive {passthrough_args}\n")
  185. # do not set default args, they should be set in the td file using set-arg-default to easen the execution
  186. # without mzcompose
  187. def process(file: str) -> None:
  188. if not args.slow and file in (
  189. "fivetran-destination.td",
  190. "materialized-view-refresh-options.td",
  191. "upsert-source-race.td",
  192. ):
  193. return
  194. junit_report = ci_util.junit_report_filename(f"{c.name}_{file}")
  195. c.run_testdrive_files(
  196. (
  197. "--rewrite-results"
  198. if args.rewrite_results
  199. else f"--junit-report={junit_report}"
  200. ),
  201. *non_default_testdrive_vars,
  202. *passthrough_args,
  203. file,
  204. )
  205. # Uploading successful junit files wastes time and contains no useful information
  206. os.remove(f"test/testdrive/{junit_report}")
  207. files = buildkite.shard_list(
  208. sorted(
  209. [
  210. file
  211. for pattern in args.files
  212. for file in glob.glob(
  213. pattern, root_dir=MZ_ROOT / "test" / "testdrive"
  214. )
  215. ]
  216. ),
  217. lambda file: file,
  218. )
  219. c.test_parts(files, process)
  220. c.sanity_restart_mz()