mzcompose.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Write a single set of .td fragments for a particular feature or functionality
  11. and then execute them in upgrade, 0dt-upgrade, restart, recovery and failure
  12. contexts.
  13. """
  14. import os
  15. from enum import Enum
  16. from materialize import buildkite
  17. from materialize.checks.all_checks import * # noqa: F401 F403
  18. from materialize.checks.checks import Check
  19. from materialize.checks.executors import MzcomposeExecutor, MzcomposeExecutorParallel
  20. from materialize.checks.features import Features
  21. from materialize.checks.scenarios import * # noqa: F401 F403
  22. from materialize.checks.scenarios import Scenario, SystemVarChange
  23. from materialize.checks.scenarios_backup_restore import * # noqa: F401 F403
  24. from materialize.checks.scenarios_upgrade import * # noqa: F401 F403
  25. from materialize.checks.scenarios_zero_downtime import * # noqa: F401 F403
  26. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  27. from materialize.mzcompose.services.azurite import Azurite
  28. from materialize.mzcompose.services.clusterd import Clusterd
  29. from materialize.mzcompose.services.debezium import Debezium
  30. from materialize.mzcompose.services.kafka import Kafka
  31. from materialize.mzcompose.services.materialized import Materialized
  32. from materialize.mzcompose.services.minio import Mc, Minio
  33. from materialize.mzcompose.services.mysql import MySql
  34. from materialize.mzcompose.services.persistcli import Persistcli
  35. from materialize.mzcompose.services.postgres import (
  36. CockroachOrPostgresMetadata,
  37. Postgres,
  38. )
  39. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  40. from materialize.mzcompose.services.sql_server import SqlServer
  41. from materialize.mzcompose.services.ssh_bastion_host import SshBastionHost
  42. from materialize.mzcompose.services.test_certs import TestCerts
  43. from materialize.mzcompose.services.testdrive import Testdrive as TestdriveService
  44. from materialize.mzcompose.services.zookeeper import Zookeeper
  45. from materialize.util import all_subclasses
  46. TESTDRIVE_DEFAULT_TIMEOUT = os.environ.get("PLATFORM_CHECKS_TD_TIMEOUT", "300s")
  47. def create_mzs(
  48. azurite: bool,
  49. default_replication_factor: int,
  50. additional_system_parameter_defaults: dict[str, str] | None = None,
  51. ) -> list[TestdriveService | Materialized]:
  52. return [
  53. Materialized(
  54. name=mz_name,
  55. external_metadata_store=True,
  56. external_blob_store=True,
  57. blob_store_is_azure=azurite,
  58. sanity_restart=False,
  59. volumes_extra=["secrets:/share/secrets"],
  60. additional_system_parameter_defaults=additional_system_parameter_defaults,
  61. default_replication_factor=default_replication_factor,
  62. )
  63. for mz_name in ["materialized", "mz_1", "mz_2", "mz_3", "mz_4", "mz_5"]
  64. ] + [
  65. TestdriveService(
  66. default_timeout=TESTDRIVE_DEFAULT_TIMEOUT,
  67. materialize_params={"statement_timeout": f"'{TESTDRIVE_DEFAULT_TIMEOUT}'"},
  68. external_blob_store=True,
  69. blob_store_is_azure=azurite,
  70. no_reset=True,
  71. seed=1,
  72. entrypoint_extra=[
  73. "--var=replicas=1",
  74. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  75. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  76. ],
  77. volumes_extra=["secrets:/share/secrets"],
  78. )
  79. ]
  80. SERVICES = [
  81. TestCerts(),
  82. CockroachOrPostgresMetadata(
  83. # Workaround for database-issues#5899
  84. restart="on-failure:5",
  85. ),
  86. Minio(setup_materialize=True, additional_directories=["copytos3"]),
  87. Azurite(),
  88. Mc(),
  89. Postgres(),
  90. MySql(),
  91. SqlServer(),
  92. Zookeeper(),
  93. Kafka(
  94. auto_create_topics=True,
  95. depends_on_extra=["test-certs"],
  96. advertised_listeners=[
  97. # Using lowercase listener names here bypasses some too-helpful
  98. # checks in the Docker entrypoint that (incorrectly) attempt to
  99. # assess the validity of the authentication configuration.
  100. "plaintext://kafka:9092",
  101. "ssl://kafka:9093",
  102. "mssl://kafka:9094",
  103. "sasl_plaintext://kafka:9095",
  104. "sasl_ssl://kafka:9096",
  105. "sasl_mssl://kafka:9097",
  106. ],
  107. environment_extra=[
  108. "ZOOKEEPER_SASL_ENABLED=FALSE",
  109. "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,ssl:SSL,mssl:SSL,sasl_plaintext:SASL_PLAINTEXT,sasl_ssl:SASL_SSL,sasl_mssl:SASL_SSL",
  110. "KAFKA_INTER_BROKER_LISTENER_NAME=plaintext",
  111. "KAFKA_SASL_ENABLED_MECHANISMS=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512",
  112. "KAFKA_SSL_KEY_PASSWORD=mzmzmz",
  113. "KAFKA_SSL_KEYSTORE_LOCATION=/etc/kafka/secrets/kafka.keystore.jks",
  114. "KAFKA_SSL_KEYSTORE_PASSWORD=mzmzmz",
  115. "KAFKA_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.truststore.jks",
  116. "KAFKA_SSL_TRUSTSTORE_PASSWORD=mzmzmz",
  117. "KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/jaas.config",
  118. "KAFKA_LISTENER_NAME_MSSL_SSL_CLIENT_AUTH=required",
  119. "KAFKA_LISTENER_NAME_SASL__MSSL_SSL_CLIENT_AUTH=required",
  120. "KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer",
  121. "KAFKA_SUPER_USERS=User:materialize;User:CN=materialized;User:ANONYMOUS",
  122. ],
  123. volumes=[
  124. "secrets:/etc/kafka/secrets",
  125. "./kafka.jaas.config:/etc/kafka/jaas.config",
  126. ],
  127. ),
  128. SchemaRegistry(),
  129. Debezium(),
  130. Clusterd(
  131. name="clusterd_compute_1"
  132. ), # Started by some Scenarios, defined here only for the teardown
  133. *create_mzs(azurite=False, default_replication_factor=1),
  134. Persistcli(),
  135. SshBastionHost(),
  136. ]
  137. class ExecutionMode(Enum):
  138. SEQUENTIAL = "sequential"
  139. PARALLEL = "parallel"
  140. ONEATATIME = "oneatatime"
  141. def __str__(self) -> str:
  142. return self.value
  143. def setup(c: Composition) -> None:
  144. c.up(
  145. "test-certs",
  146. "zookeeper",
  147. "kafka",
  148. "schema-registry",
  149. "postgres",
  150. "mysql",
  151. "debezium",
  152. "ssh-bastion-host",
  153. {"name": "testdrive", "persistent": True},
  154. )
  155. c.enable_minio_versioning()
  156. # Add `materialize` SCRAM user to Kafka.
  157. c.exec(
  158. "kafka",
  159. "kafka-configs",
  160. "--bootstrap-server=localhost:9092",
  161. "--alter",
  162. "--add-config=SCRAM-SHA-256=[password=sekurity],SCRAM-SHA-512=[password=sekurity]",
  163. "--entity-type=users",
  164. "--entity-name=materialize",
  165. )
  166. def teardown(c: Composition) -> None:
  167. c.rm(*[s.name for s in SERVICES], stop=True, destroy_volumes=True)
  168. c.rm_volumes("mzdata", "tmp", force=True)
  169. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  170. # c.silent = True
  171. parser.add_argument(
  172. "--scenario", metavar="SCENARIO", type=str, help="Scenario to run."
  173. )
  174. parser.add_argument(
  175. "--check", metavar="CHECK", type=str, action="append", help="Check(s) to run."
  176. )
  177. parser.add_argument(
  178. "--execution-mode",
  179. type=ExecutionMode,
  180. choices=list(ExecutionMode),
  181. default=ExecutionMode.SEQUENTIAL,
  182. )
  183. parser.add_argument(
  184. "--seed",
  185. metavar="SEED",
  186. type=str,
  187. help="Seed for shuffling checks in sequential run.",
  188. )
  189. parser.add_argument(
  190. "--system-param",
  191. type=str,
  192. action="append",
  193. nargs="*",
  194. help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
  195. )
  196. parser.add_argument(
  197. "--features",
  198. nargs="*",
  199. help="A list of features (e.g. azurite, sql_server), to enable.",
  200. )
  201. parser.add_argument(
  202. "--default-replication-factor",
  203. type=int,
  204. default=2,
  205. help="Default replication factor for clusters",
  206. )
  207. args = parser.parse_args()
  208. features = Features(args.features)
  209. if args.scenario:
  210. assert args.scenario in globals(), f"scenario {args.scenario} does not exist"
  211. scenarios = [globals()[args.scenario]]
  212. else:
  213. base_scenarios = {SystemVarChange}
  214. scenarios = all_subclasses(Scenario) - base_scenarios
  215. if args.check:
  216. all_checks = {check.__name__: check for check in all_subclasses(Check)}
  217. for check in args.check:
  218. assert check in all_checks, f"check {check} does not exist"
  219. checks = [all_checks[check] for check in args.check]
  220. else:
  221. checks = list(all_subclasses(Check))
  222. if features.sql_server_enabled():
  223. c.up("sql-server")
  224. checks.sort(key=lambda ch: ch.__name__)
  225. checks = buildkite.shard_list(checks, lambda ch: ch.__name__)
  226. if buildkite.get_parallelism_index() != 0 or buildkite.get_parallelism_count() != 1:
  227. print(
  228. f"Checks in shard with index {buildkite.get_parallelism_index()}: {[c.__name__ for c in checks]}"
  229. )
  230. additional_system_parameter_defaults = {}
  231. for val in args.system_param or []:
  232. x = val[0].split("=", maxsplit=1)
  233. assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
  234. additional_system_parameter_defaults[x[0]] = x[1]
  235. with c.override(
  236. *create_mzs(
  237. features.azurite_enabled(),
  238. args.default_replication_factor,
  239. additional_system_parameter_defaults,
  240. )
  241. ):
  242. executor = MzcomposeExecutor(composition=c)
  243. for scenario_class in scenarios:
  244. assert issubclass(
  245. scenario_class, Scenario
  246. ), f"{scenario_class} is not a Scenario. Maybe you meant to specify a Check via --check ?"
  247. print(f"Testing scenario {scenario_class}...")
  248. executor_class = (
  249. MzcomposeExecutorParallel
  250. if args.execution_mode is ExecutionMode.PARALLEL
  251. else MzcomposeExecutor
  252. )
  253. executor = executor_class(composition=c)
  254. execution_mode = args.execution_mode
  255. if execution_mode in [ExecutionMode.SEQUENTIAL, ExecutionMode.PARALLEL]:
  256. setup(c)
  257. scenario = scenario_class(
  258. checks=checks,
  259. executor=executor,
  260. features=features,
  261. seed=args.seed,
  262. )
  263. scenario.run()
  264. elif execution_mode is ExecutionMode.ONEATATIME:
  265. for check in checks:
  266. print(
  267. f"Running individual check {check}, scenario {scenario_class}"
  268. )
  269. c.override_current_testcase_name(
  270. f"Check '{check}' with scenario '{scenario_class}'"
  271. )
  272. setup(c)
  273. scenario = scenario_class(
  274. checks=[check],
  275. executor=executor,
  276. features=features,
  277. seed=args.seed,
  278. )
  279. scenario.run()
  280. else:
  281. raise RuntimeError(f"Unsupported execution mode: {execution_mode}")