mzcompose.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test that ingests large amounts of data from Kafka/Postgres/MySQL and verifies
  11. that Materialize can handle it correctly by comparing the results.
  12. """
  13. import random
  14. import time
  15. from materialize import buildkite
  16. from materialize.data_ingest.executor import (
  17. KafkaExecutor,
  18. MySqlExecutor,
  19. )
  20. from materialize.data_ingest.workload import * # noqa: F401 F403
  21. from materialize.data_ingest.workload import WORKLOADS, execute_workload
  22. from materialize.mzcompose import get_default_system_parameters
  23. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  24. from materialize.mzcompose.services.azurite import Azurite
  25. from materialize.mzcompose.services.clusterd import Clusterd
  26. from materialize.mzcompose.services.kafka import Kafka
  27. from materialize.mzcompose.services.materialized import Materialized
  28. from materialize.mzcompose.services.minio import Minio
  29. from materialize.mzcompose.services.mysql import MySql
  30. from materialize.mzcompose.services.postgres import (
  31. CockroachOrPostgresMetadata,
  32. Postgres,
  33. )
  34. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  35. from materialize.mzcompose.services.zookeeper import Zookeeper
  36. SERVICES = [
  37. Postgres(),
  38. MySql(),
  39. Zookeeper(),
  40. Kafka(
  41. auto_create_topics=False,
  42. ports=["30123:30123"],
  43. allow_host_ports=True,
  44. environment_extra=[
  45. "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
  46. "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
  47. ],
  48. ),
  49. SchemaRegistry(),
  50. CockroachOrPostgresMetadata(),
  51. Minio(setup_materialize=True),
  52. Azurite(),
  53. # Overridden below
  54. Materialized(),
  55. Materialized(name="materialized2"),
  56. Clusterd(name="clusterd1", scratch_directory="/mzdata/source_data"),
  57. ]
  58. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  59. parser.add_argument(
  60. "--seed", metavar="SEED", type=str, default=str(int(time.time()))
  61. )
  62. parser.add_argument("--verbose", action="store_true")
  63. parser.add_argument(
  64. "--runtime", default=600, type=int, help="Runtime in seconds per workload"
  65. )
  66. parser.add_argument(
  67. "--workload",
  68. metavar="WORKLOAD",
  69. type=str,
  70. action="append",
  71. help="Workload(s) to run.",
  72. )
  73. parser.add_argument(
  74. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  75. )
  76. parser.add_argument("--replicas", type=int, default=2, help="use multiple replicas")
  77. args = parser.parse_args()
  78. workloads = buildkite.shard_list(
  79. (
  80. [globals()[workload] for workload in args.workload]
  81. if args.workload
  82. else list(WORKLOADS)
  83. ),
  84. lambda w: w.__name__,
  85. )
  86. print(
  87. f"Workloads in shard with index {buildkite.get_parallelism_index()}: {workloads}"
  88. )
  89. print(f"--- Random seed is {args.seed}")
  90. services = (
  91. "materialized",
  92. "zookeeper",
  93. "kafka",
  94. "schema-registry",
  95. "postgres",
  96. "mysql",
  97. )
  98. # TODO: Reenable when database-issues#8657 is fixed
  99. # executor_classes = [MySqlExecutor, KafkaRoundtripExecutor, KafkaExecutor]
  100. executor_classes = [MySqlExecutor, KafkaExecutor]
  101. with c.override(
  102. # Fixed port so that we keep the same port after restarting Mz in disruptions
  103. Materialized(
  104. ports=["16875:6875", "16877:6877"],
  105. external_blob_store=True,
  106. blob_store_is_azure=args.azurite,
  107. external_metadata_store=True,
  108. system_parameter_defaults=get_default_system_parameters(zero_downtime=True),
  109. additional_system_parameter_defaults={"unsafe_enable_table_keys": "true"},
  110. sanity_restart=False,
  111. ),
  112. Materialized(
  113. name="materialized2",
  114. ports=["26875:6875", "26877:6877"],
  115. external_blob_store=True,
  116. blob_store_is_azure=args.azurite,
  117. external_metadata_store=True,
  118. system_parameter_defaults=get_default_system_parameters(zero_downtime=True),
  119. additional_system_parameter_defaults={"unsafe_enable_table_keys": "true"},
  120. sanity_restart=False,
  121. ),
  122. ):
  123. c.up(*services)
  124. if args.replicas > 1:
  125. c.sql(
  126. "ALTER SYSTEM SET max_replicas_per_cluster = 32",
  127. user="mz_system",
  128. port=6877,
  129. )
  130. c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
  131. replica_names = [f"r{replica_id}" for replica_id in range(0, args.replicas)]
  132. replica_string = ",".join(
  133. f"{replica_name} (SIZE '4')" for replica_name in replica_names
  134. )
  135. c.sql(
  136. f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
  137. user="mz_system",
  138. port=6877,
  139. )
  140. c.sql(
  141. "GRANT ALL PRIVILEGES ON CLUSTER quickstart TO materialize",
  142. user="mz_system",
  143. port=6877,
  144. )
  145. c.sql(
  146. "CREATE CLUSTER singlereplica SIZE = '4', REPLICATION FACTOR = 1",
  147. )
  148. conn = c.sql_connection()
  149. conn.autocommit = True
  150. with conn.cursor() as cur:
  151. cur.execute(
  152. """CREATE CONNECTION IF NOT EXISTS kafka_conn
  153. FOR KAFKA BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT"""
  154. )
  155. cur.execute(
  156. """CREATE CONNECTION IF NOT EXISTS csr_conn
  157. FOR CONFLUENT SCHEMA REGISTRY
  158. URL 'http://schema-registry:8081'"""
  159. )
  160. conn.autocommit = False
  161. conn.close()
  162. ports = {s: c.default_port(s) for s in services}
  163. ports["materialized2"] = 26875
  164. mz_service = "materialized"
  165. deploy_generation = 0
  166. for i, workload_class in enumerate(workloads):
  167. random.seed(args.seed)
  168. print(f"--- Testing workload {workload_class.__name__}")
  169. workload = workload_class(args.azurite, c, mz_service, deploy_generation)
  170. execute_workload(
  171. executor_classes,
  172. workload,
  173. i,
  174. ports,
  175. args.runtime,
  176. args.verbose,
  177. )
  178. mz_service = workload.mz_service
  179. deploy_generation = workload.deploy_generation