mzcompose.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Zippy generates a pseudo-random sequence of DDLs, DMLs, failures, data
  11. validation and other events and runs it sequentially. By keeping track of the
  12. expected state it can verify results for correctness.
  13. """
  14. import random
  15. import re
  16. import time
  17. from datetime import timedelta
  18. from enum import Enum
  19. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  20. from materialize.mzcompose.services.azurite import Azurite
  21. from materialize.mzcompose.services.balancerd import Balancerd
  22. from materialize.mzcompose.services.clusterd import Clusterd
  23. from materialize.mzcompose.services.cockroach import Cockroach
  24. from materialize.mzcompose.services.debezium import Debezium
  25. from materialize.mzcompose.services.grafana import Grafana
  26. from materialize.mzcompose.services.materialized import Materialized
  27. from materialize.mzcompose.services.minio import Mc, Minio
  28. from materialize.mzcompose.services.mysql import MySql
  29. from materialize.mzcompose.services.persistcli import Persistcli
  30. from materialize.mzcompose.services.postgres import Postgres
  31. from materialize.mzcompose.services.prometheus import Prometheus
  32. from materialize.mzcompose.services.redpanda import Redpanda
  33. from materialize.mzcompose.services.ssh_bastion_host import (
  34. SshBastionHost,
  35. setup_default_ssh_test_connection,
  36. )
  37. from materialize.mzcompose.services.testdrive import Testdrive
  38. from materialize.mzcompose.services.zookeeper import Zookeeper
  39. from materialize.zippy.framework import Test
  40. from materialize.zippy.mz_actions import Mz0dtDeploy
  41. from materialize.zippy.scenarios import * # noqa: F401 F403
  42. def create_mzs(
  43. azurite: bool,
  44. transaction_isolation: bool,
  45. additional_system_parameter_defaults: dict[str, str] | None = None,
  46. ) -> list[Testdrive | Materialized]:
  47. return [
  48. Materialized(
  49. name=mz_name,
  50. external_blob_store=True,
  51. blob_store_is_azure=azurite,
  52. external_metadata_store=True,
  53. sanity_restart=False,
  54. metadata_store="cockroach",
  55. additional_system_parameter_defaults=additional_system_parameter_defaults,
  56. default_replication_factor=2,
  57. )
  58. for mz_name in ["materialized", "materialized2"]
  59. ] + [
  60. Testdrive(
  61. materialize_url="postgres://materialize@balancerd:6875",
  62. no_reset=True,
  63. seed=1,
  64. # Timeout increased since Large Zippy occasionally runs into them
  65. default_timeout="1200s",
  66. materialize_params={
  67. "statement_timeout": "'1800s'",
  68. "transaction_isolation": f"'{transaction_isolation}'",
  69. },
  70. metadata_store="cockroach",
  71. external_blob_store=True,
  72. blob_store_is_azure=azurite,
  73. ),
  74. ]
  75. SERVICES = [
  76. Zookeeper(),
  77. Redpanda(auto_create_topics=True),
  78. Debezium(redpanda=True),
  79. Postgres(),
  80. Cockroach(),
  81. Minio(setup_materialize=True, additional_directories=["copytos3"]),
  82. Azurite(),
  83. Mc(),
  84. Balancerd(),
  85. *create_mzs(azurite=False, transaction_isolation=False),
  86. Clusterd(name="storaged", workers=4),
  87. Grafana(),
  88. Prometheus(),
  89. SshBastionHost(),
  90. Persistcli(),
  91. MySql(),
  92. ]
  93. class TransactionIsolation(Enum):
  94. SERIALIZABLE = "serializable"
  95. STRICT_SERIALIZABLE = "strict serializable"
  96. def __str__(self) -> str:
  97. return self.value
  98. def parse_timedelta(arg: str) -> timedelta:
  99. p = re.compile(
  100. (r"((?P<days>-?\d+)d)?" r"((?P<hours>-?\d+)h)?" r"((?P<minutes>-?\d+)m)?"),
  101. re.IGNORECASE,
  102. )
  103. m = p.match(arg)
  104. assert m is not None
  105. parts = {k: int(v) for k, v in m.groupdict().items() if v}
  106. td = timedelta(**parts)
  107. assert td > timedelta(0), f"timedelta '{td}' from arg '{arg}' is not positive"
  108. return td
  109. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  110. """A general framework for longevity and stress testing"""
  111. c.silent = True
  112. parser.add_argument(
  113. "--scenario",
  114. metavar="SCENARIO",
  115. type=str,
  116. help="Scenario to run",
  117. required=True,
  118. )
  119. parser.add_argument(
  120. "--seed", metavar="N", type=int, help="Random seed", default=int(time.time())
  121. )
  122. parser.add_argument(
  123. "--actions",
  124. metavar="N",
  125. type=int,
  126. help="Number of actions to run",
  127. default=1000,
  128. )
  129. parser.add_argument(
  130. "--max-execution-time", metavar="XhYmZs", type=parse_timedelta, default="1d"
  131. )
  132. parser.add_argument(
  133. "--transaction-isolation",
  134. type=TransactionIsolation,
  135. choices=list(TransactionIsolation),
  136. default=TransactionIsolation.STRICT_SERIALIZABLE,
  137. )
  138. parser.add_argument(
  139. "--cockroach-tag",
  140. type=str,
  141. default=Cockroach.DEFAULT_COCKROACH_TAG,
  142. help="Cockroach DockerHub tag to use.",
  143. )
  144. parser.add_argument(
  145. "--observability",
  146. action="store_true",
  147. help="Start Prometheus and Grafana",
  148. )
  149. parser.add_argument(
  150. "--system-param",
  151. type=str,
  152. action="append",
  153. nargs="*",
  154. help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
  155. )
  156. parser.add_argument(
  157. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  158. )
  159. args = parser.parse_args()
  160. scenario_class = globals()[args.scenario]
  161. c.up("zookeeper", "redpanda", "ssh-bastion-host")
  162. if args.azurite:
  163. c.up("azurite")
  164. else:
  165. del c.compose["services"]["azurite"]
  166. # Required for backups, even with azurite
  167. c.enable_minio_versioning()
  168. if args.observability:
  169. c.up("prometheus", "grafana")
  170. print(f"Using seed {args.seed}")
  171. random.seed(args.seed)
  172. additional_system_parameter_defaults = {}
  173. for val in args.system_param or []:
  174. x = val[0].split("=", maxsplit=1)
  175. assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
  176. additional_system_parameter_defaults[x[0]] = x[1]
  177. with c.override(
  178. Cockroach(
  179. image=f"cockroachdb/cockroach:{args.cockroach_tag}",
  180. # Workaround for database-issues#5719
  181. restart="on-failure:5",
  182. setup_materialize=True,
  183. ),
  184. *create_mzs(
  185. args.azurite,
  186. args.transaction_isolation,
  187. additional_system_parameter_defaults,
  188. ),
  189. ):
  190. c.up("materialized")
  191. c.sql(
  192. "ALTER SYSTEM SET max_replicas_per_cluster = 10;",
  193. port=6877,
  194. user="mz_system",
  195. )
  196. c.sql(
  197. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  198. port=6877,
  199. user="mz_system",
  200. )
  201. scenario = scenario_class()
  202. # Separate clusterd not supported by Mz0dtDeploy yet
  203. if Mz0dtDeploy in scenario.actions_with_weight():
  204. c.sql(
  205. """
  206. CREATE CLUSTER storage (SIZE = '2-2');
  207. """
  208. )
  209. else:
  210. c.sql(
  211. """
  212. CREATE CLUSTER storage REPLICAS (r2 (
  213. STORAGECTL ADDRESSES ['storaged:2100'],
  214. STORAGE ADDRESSES ['storaged:2103'],
  215. COMPUTECTL ADDRESSES ['storaged:2101'],
  216. COMPUTE ADDRESSES ['storaged:2102'],
  217. WORKERS 4
  218. ))
  219. """
  220. )
  221. setup_default_ssh_test_connection(c, "zippy_ssh")
  222. c.rm("materialized")
  223. c.up({"name": "testdrive", "persistent": True})
  224. print("Generating test...")
  225. test = Test(
  226. scenario=scenario,
  227. actions=args.actions,
  228. max_execution_time=args.max_execution_time,
  229. )
  230. print("Running test...")
  231. test.run(c)