123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Test that ingests large amounts of data from Kafka/Postgres/MySQL and verifies
- that Materialize can handle it correctly by comparing the results.
- """
- import random
- import time
- from materialize import buildkite
- from materialize.data_ingest.executor import (
- KafkaExecutor,
- MySqlExecutor,
- )
- from materialize.data_ingest.workload import * # noqa: F401 F403
- from materialize.data_ingest.workload import WORKLOADS, execute_workload
- from materialize.mzcompose import get_default_system_parameters
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.azurite import Azurite
- from materialize.mzcompose.services.clusterd import Clusterd
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.minio import Minio
- from materialize.mzcompose.services.mysql import MySql
- from materialize.mzcompose.services.postgres import (
- CockroachOrPostgresMetadata,
- Postgres,
- )
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.zookeeper import Zookeeper
- SERVICES = [
- Postgres(),
- MySql(),
- Zookeeper(),
- Kafka(
- auto_create_topics=False,
- ports=["30123:30123"],
- allow_host_ports=True,
- environment_extra=[
- "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
- "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
- ],
- ),
- SchemaRegistry(),
- CockroachOrPostgresMetadata(),
- Minio(setup_materialize=True),
- Azurite(),
- # Overridden below
- Materialized(),
- Materialized(name="materialized2"),
- Clusterd(name="clusterd1", scratch_directory="/mzdata/source_data"),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument(
- "--seed", metavar="SEED", type=str, default=str(int(time.time()))
- )
- parser.add_argument("--verbose", action="store_true")
- parser.add_argument(
- "--runtime", default=600, type=int, help="Runtime in seconds per workload"
- )
- parser.add_argument(
- "--workload",
- metavar="WORKLOAD",
- type=str,
- action="append",
- help="Workload(s) to run.",
- )
- parser.add_argument(
- "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
- )
- parser.add_argument("--replicas", type=int, default=2, help="use multiple replicas")
- args = parser.parse_args()
- workloads = buildkite.shard_list(
- (
- [globals()[workload] for workload in args.workload]
- if args.workload
- else list(WORKLOADS)
- ),
- lambda w: w.__name__,
- )
- print(
- f"Workloads in shard with index {buildkite.get_parallelism_index()}: {workloads}"
- )
- print(f"--- Random seed is {args.seed}")
- services = (
- "materialized",
- "zookeeper",
- "kafka",
- "schema-registry",
- "postgres",
- "mysql",
- )
- # TODO: Reenable when database-issues#8657 is fixed
- # executor_classes = [MySqlExecutor, KafkaRoundtripExecutor, KafkaExecutor]
- executor_classes = [MySqlExecutor, KafkaExecutor]
- with c.override(
- # Fixed port so that we keep the same port after restarting Mz in disruptions
- Materialized(
- ports=["16875:6875", "16877:6877"],
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- external_metadata_store=True,
- system_parameter_defaults=get_default_system_parameters(zero_downtime=True),
- additional_system_parameter_defaults={"unsafe_enable_table_keys": "true"},
- sanity_restart=False,
- ),
- Materialized(
- name="materialized2",
- ports=["26875:6875", "26877:6877"],
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- external_metadata_store=True,
- system_parameter_defaults=get_default_system_parameters(zero_downtime=True),
- additional_system_parameter_defaults={"unsafe_enable_table_keys": "true"},
- sanity_restart=False,
- ),
- ):
- c.up(*services)
- if args.replicas > 1:
- c.sql(
- "ALTER SYSTEM SET max_replicas_per_cluster = 32",
- user="mz_system",
- port=6877,
- )
- c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
- replica_names = [f"r{replica_id}" for replica_id in range(0, args.replicas)]
- replica_string = ",".join(
- f"{replica_name} (SIZE '4')" for replica_name in replica_names
- )
- c.sql(
- f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
- user="mz_system",
- port=6877,
- )
- c.sql(
- "GRANT ALL PRIVILEGES ON CLUSTER quickstart TO materialize",
- user="mz_system",
- port=6877,
- )
- c.sql(
- "CREATE CLUSTER singlereplica SIZE = '4', REPLICATION FACTOR = 1",
- )
- conn = c.sql_connection()
- conn.autocommit = True
- with conn.cursor() as cur:
- cur.execute(
- """CREATE CONNECTION IF NOT EXISTS kafka_conn
- FOR KAFKA BROKER 'kafka:9092', SECURITY PROTOCOL PLAINTEXT"""
- )
- cur.execute(
- """CREATE CONNECTION IF NOT EXISTS csr_conn
- FOR CONFLUENT SCHEMA REGISTRY
- URL 'http://schema-registry:8081'"""
- )
- conn.autocommit = False
- conn.close()
- ports = {s: c.default_port(s) for s in services}
- ports["materialized2"] = 26875
- mz_service = "materialized"
- deploy_generation = 0
- for i, workload_class in enumerate(workloads):
- random.seed(args.seed)
- print(f"--- Testing workload {workload_class.__name__}")
- workload = workload_class(args.azurite, c, mz_service, deploy_generation)
- execute_workload(
- executor_classes,
- workload,
- i,
- ports,
- args.runtime,
- args.verbose,
- )
- mz_service = workload.mz_service
- deploy_generation = workload.deploy_generation
|