mzcompose.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Runs a randomized parallel workload stressing all parts of Materialize, can
  11. mostly find panics and unexpected errors. See zippy for a sequential randomized
  12. tests which can verify correctness.
  13. """
  14. import os
  15. import random
  16. import requests
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.service import Service
  19. from materialize.mzcompose.services.azurite import Azurite
  20. from materialize.mzcompose.services.cockroach import Cockroach
  21. from materialize.mzcompose.services.kafka import Kafka
  22. from materialize.mzcompose.services.materialized import Materialized
  23. from materialize.mzcompose.services.minio import Mc, Minio
  24. from materialize.mzcompose.services.mysql import MySql
  25. from materialize.mzcompose.services.postgres import Postgres
  26. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  27. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  28. from materialize.mzcompose.services.zookeeper import Zookeeper
  29. from materialize.parallel_workload.parallel_workload import parse_common_args, run
  30. from materialize.parallel_workload.settings import Complexity, Scenario
  31. SERVICES = [
  32. Cockroach(setup_materialize=True),
  33. Postgres(),
  34. MySql(),
  35. Zookeeper(),
  36. Kafka(
  37. auto_create_topics=False,
  38. ports=["30123:30123"],
  39. allow_host_ports=True,
  40. environment_extra=[
  41. "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
  42. "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
  43. ],
  44. ),
  45. SchemaRegistry(),
  46. Minio(setup_materialize=True, additional_directories=["copytos3"]),
  47. Azurite(),
  48. Mc(),
  49. Materialized(default_replication_factor=2),
  50. Materialized(name="materialized2", default_replication_factor=2),
  51. Service("sqlsmith", {"mzbuild": "sqlsmith"}),
  52. Service(
  53. name="persistcli",
  54. config={"mzbuild": "jobs"},
  55. ),
  56. Toxiproxy(),
  57. ]
  58. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  59. parse_common_args(parser)
  60. args = parser.parse_args()
  61. print(f"--- Random seed is {args.seed}")
  62. service_names = [
  63. "cockroach",
  64. "postgres",
  65. "mysql",
  66. "zookeeper",
  67. "kafka",
  68. "schema-registry",
  69. # Still required for backups/s3 testing even when we use Azurite as blob store
  70. "minio",
  71. "materialized",
  72. ]
  73. random.seed(args.seed)
  74. scenario = Scenario(args.scenario)
  75. complexity = Complexity(args.complexity)
  76. sanity_restart = False
  77. with c.override(
  78. Materialized(
  79. # TODO: Retry with toxiproxy on azurite
  80. external_blob_store=True,
  81. blob_store_is_azure=args.azurite,
  82. external_metadata_store="toxiproxy",
  83. ports=["6975:6875", "6976:6876", "6977:6877"],
  84. sanity_restart=sanity_restart,
  85. metadata_store="cockroach",
  86. default_replication_factor=2,
  87. ),
  88. Toxiproxy(seed=random.randrange(2**63)),
  89. ):
  90. toxiproxy_start(c)
  91. c.up(*service_names)
  92. c.up({"name": "mc", "persistent": True})
  93. c.exec(
  94. "mc",
  95. "mc",
  96. "alias",
  97. "set",
  98. "persist",
  99. "http://minio:9000/",
  100. "minioadmin",
  101. "minioadmin",
  102. )
  103. c.exec("mc", "mc", "version", "enable", "persist/persist")
  104. ports = {s: c.default_port(s) for s in service_names}
  105. ports["http"] = c.port("materialized", 6876)
  106. ports["mz_system"] = c.port("materialized", 6877)
  107. if scenario == Scenario.ZeroDowntimeDeploy:
  108. ports["materialized2"] = 7075
  109. ports["http2"] = 7076
  110. ports["mz_system2"] = 7077
  111. # try:
  112. run(
  113. "localhost",
  114. ports,
  115. args.seed,
  116. args.runtime,
  117. complexity,
  118. scenario,
  119. args.threads,
  120. args.naughty_identifiers,
  121. args.replicas,
  122. c,
  123. args.azurite,
  124. sanity_restart,
  125. )
  126. # Don't wait for potentially hanging threads that we are ignoring
  127. os._exit(0)
  128. # TODO: Only ignore errors that will be handled by parallel-workload, not others
  129. # except Exception:
  130. # print("--- Execution of parallel-workload failed")
  131. # print_exc()
  132. # # Don't fail the entire run. We ran into a crash,
  133. # # ci-annotate-errors will handle this if it's an unknown failure.
  134. # return
  135. def toxiproxy_start(c: Composition) -> None:
  136. c.up("toxiproxy")
  137. port = c.default_port("toxiproxy")
  138. r = requests.post(
  139. f"http://localhost:{port}/proxies",
  140. json={
  141. "name": "cockroach",
  142. "listen": "0.0.0.0:26257",
  143. "upstream": "cockroach:26257",
  144. "enabled": True,
  145. },
  146. )
  147. assert r.status_code == 201, r
  148. r = requests.post(
  149. f"http://localhost:{port}/proxies",
  150. json={
  151. "name": "minio",
  152. "listen": "0.0.0.0:9000",
  153. "upstream": "minio:9000",
  154. "enabled": True,
  155. },
  156. )
  157. assert r.status_code == 201, r
  158. r = requests.post(
  159. f"http://localhost:{port}/proxies",
  160. json={
  161. "name": "azurite",
  162. "listen": "0.0.0.0:10000",
  163. "upstream": "azurite:10000",
  164. "enabled": True,
  165. },
  166. )
  167. assert r.status_code == 201, r
  168. r = requests.post(
  169. f"http://localhost:{port}/proxies/cockroach/toxics",
  170. json={
  171. "name": "cockroach",
  172. "type": "latency",
  173. "attributes": {"latency": 0, "jitter": 100},
  174. },
  175. )
  176. assert r.status_code == 200, r
  177. r = requests.post(
  178. f"http://localhost:{port}/proxies/minio/toxics",
  179. json={
  180. "name": "minio",
  181. "type": "latency",
  182. "attributes": {"latency": 0, "jitter": 100},
  183. },
  184. )
  185. assert r.status_code == 200, r
  186. r = requests.post(
  187. f"http://localhost:{port}/proxies/minio/toxics",
  188. json={
  189. "name": "azurite",
  190. "type": "latency",
  191. "attributes": {"latency": 0, "jitter": 100},
  192. },
  193. )
  194. assert r.status_code == 200, r