123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Runs a randomized parallel workload stressing all parts of Materialize, can
- mostly find panics and unexpected errors. See zippy for a sequential randomized
- tests which can verify correctness.
- """
- import os
- import random
- import requests
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.service import Service
- from materialize.mzcompose.services.azurite import Azurite
- from materialize.mzcompose.services.cockroach import Cockroach
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.minio import Mc, Minio
- from materialize.mzcompose.services.mysql import MySql
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.toxiproxy import Toxiproxy
- from materialize.mzcompose.services.zookeeper import Zookeeper
- from materialize.parallel_workload.parallel_workload import parse_common_args, run
- from materialize.parallel_workload.settings import Complexity, Scenario
- SERVICES = [
- Cockroach(setup_materialize=True),
- Postgres(),
- MySql(),
- Zookeeper(),
- Kafka(
- auto_create_topics=False,
- ports=["30123:30123"],
- allow_host_ports=True,
- environment_extra=[
- "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
- "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
- ],
- ),
- SchemaRegistry(),
- Minio(setup_materialize=True, additional_directories=["copytos3"]),
- Azurite(),
- Mc(),
- Materialized(default_replication_factor=2),
- Materialized(name="materialized2", default_replication_factor=2),
- Service("sqlsmith", {"mzbuild": "sqlsmith"}),
- Service(
- name="persistcli",
- config={"mzbuild": "jobs"},
- ),
- Toxiproxy(),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parse_common_args(parser)
- args = parser.parse_args()
- print(f"--- Random seed is {args.seed}")
- service_names = [
- "cockroach",
- "postgres",
- "mysql",
- "zookeeper",
- "kafka",
- "schema-registry",
- # Still required for backups/s3 testing even when we use Azurite as blob store
- "minio",
- "materialized",
- ]
- random.seed(args.seed)
- scenario = Scenario(args.scenario)
- complexity = Complexity(args.complexity)
- sanity_restart = False
- with c.override(
- Materialized(
- # TODO: Retry with toxiproxy on azurite
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- external_metadata_store="toxiproxy",
- ports=["6975:6875", "6976:6876", "6977:6877"],
- sanity_restart=sanity_restart,
- metadata_store="cockroach",
- default_replication_factor=2,
- ),
- Toxiproxy(seed=random.randrange(2**63)),
- ):
- toxiproxy_start(c)
- c.up(*service_names)
- c.up({"name": "mc", "persistent": True})
- c.exec(
- "mc",
- "mc",
- "alias",
- "set",
- "persist",
- "http://minio:9000/",
- "minioadmin",
- "minioadmin",
- )
- c.exec("mc", "mc", "version", "enable", "persist/persist")
- ports = {s: c.default_port(s) for s in service_names}
- ports["http"] = c.port("materialized", 6876)
- ports["mz_system"] = c.port("materialized", 6877)
- if scenario == Scenario.ZeroDowntimeDeploy:
- ports["materialized2"] = 7075
- ports["http2"] = 7076
- ports["mz_system2"] = 7077
- # try:
- run(
- "localhost",
- ports,
- args.seed,
- args.runtime,
- complexity,
- scenario,
- args.threads,
- args.naughty_identifiers,
- args.replicas,
- c,
- args.azurite,
- sanity_restart,
- )
- # Don't wait for potentially hanging threads that we are ignoring
- os._exit(0)
- # TODO: Only ignore errors that will be handled by parallel-workload, not others
- # except Exception:
- # print("--- Execution of parallel-workload failed")
- # print_exc()
- # # Don't fail the entire run. We ran into a crash,
- # # ci-annotate-errors will handle this if it's an unknown failure.
- # return
- def toxiproxy_start(c: Composition) -> None:
- c.up("toxiproxy")
- port = c.default_port("toxiproxy")
- r = requests.post(
- f"http://localhost:{port}/proxies",
- json={
- "name": "cockroach",
- "listen": "0.0.0.0:26257",
- "upstream": "cockroach:26257",
- "enabled": True,
- },
- )
- assert r.status_code == 201, r
- r = requests.post(
- f"http://localhost:{port}/proxies",
- json={
- "name": "minio",
- "listen": "0.0.0.0:9000",
- "upstream": "minio:9000",
- "enabled": True,
- },
- )
- assert r.status_code == 201, r
- r = requests.post(
- f"http://localhost:{port}/proxies",
- json={
- "name": "azurite",
- "listen": "0.0.0.0:10000",
- "upstream": "azurite:10000",
- "enabled": True,
- },
- )
- assert r.status_code == 201, r
- r = requests.post(
- f"http://localhost:{port}/proxies/cockroach/toxics",
- json={
- "name": "cockroach",
- "type": "latency",
- "attributes": {"latency": 0, "jitter": 100},
- },
- )
- assert r.status_code == 200, r
- r = requests.post(
- f"http://localhost:{port}/proxies/minio/toxics",
- json={
- "name": "minio",
- "type": "latency",
- "attributes": {"latency": 0, "jitter": 100},
- },
- )
- assert r.status_code == 200, r
- r = requests.post(
- f"http://localhost:{port}/proxies/minio/toxics",
- json={
- "name": "azurite",
- "type": "latency",
- "attributes": {"latency": 0, "jitter": 100},
- },
- )
- assert r.status_code == 200, r
|