123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import argparse
- import datetime
- import gc
- import os
- import random
- import sys
- import threading
- import time
- from collections import Counter, defaultdict
- import psycopg
- from materialize.mzcompose import get_default_system_parameters
- from materialize.mzcompose.composition import Composition
- from materialize.parallel_workload.action import (
- Action,
- ActionList,
- BackupRestoreAction,
- CancelAction,
- KillAction,
- StatisticsAction,
- ZeroDowntimeDeployAction,
- action_lists,
- ddl_action_list,
- dml_nontrans_action_list,
- fetch_action_list,
- read_action_list,
- write_action_list,
- )
- from materialize.parallel_workload.database import (
- MAX_CLUSTER_REPLICAS,
- MAX_CLUSTERS,
- MAX_KAFKA_SINKS,
- MAX_KAFKA_SOURCES,
- MAX_POSTGRES_SOURCES,
- MAX_ROLES,
- MAX_SCHEMAS,
- MAX_TABLES,
- MAX_VIEWS,
- MAX_WEBHOOK_SOURCES,
- Database,
- )
- from materialize.parallel_workload.executor import Executor, initialize_logging
- from materialize.parallel_workload.settings import Complexity, Scenario
- from materialize.parallel_workload.worker import Worker
- from materialize.parallel_workload.worker_exception import WorkerFailedException
- SEED_RANGE = 1_000_000
- REPORT_TIME = 10
- def run(
- host: str,
- ports: dict[str, int],
- seed: str,
- runtime: int,
- complexity: Complexity,
- scenario: Scenario,
- num_threads: int | None,
- naughty_identifiers: bool,
- replicas: int,
- composition: Composition | None,
- azurite: bool,
- sanity_restart: bool,
- ) -> None:
- num_threads = num_threads or os.cpu_count() or 10
- rng = random.Random(random.randrange(SEED_RANGE))
- print(
- f"+++ Running with: --seed={seed} --threads={num_threads} --runtime={runtime} --complexity={complexity.value} --scenario={scenario.value} {'--naughty-identifiers ' if naughty_identifiers else ''} --replicas={replicas} (--host={host})"
- )
- initialize_logging()
- end_time = (
- datetime.datetime.now() + datetime.timedelta(seconds=runtime)
- ).timestamp()
- database = Database(
- rng, seed, host, ports, complexity, scenario, naughty_identifiers
- )
- system_conn = psycopg.connect(
- host=host, port=ports["mz_system"], user="mz_system", dbname="materialize"
- )
- system_conn.autocommit = True
- with system_conn.cursor() as system_cur:
- system_exe = Executor(rng, system_cur, None, database)
- system_exe.execute(
- f"ALTER SYSTEM SET max_schemas_per_database = {MAX_SCHEMAS * 40 + num_threads}"
- )
- # The presence of ALTER TABLE RENAME can cause the total number of tables to exceed MAX_TABLES
- system_exe.execute(
- f"ALTER SYSTEM SET max_tables = {MAX_TABLES * 40 + num_threads}"
- )
- system_exe.execute(
- f"ALTER SYSTEM SET max_materialized_views = {MAX_VIEWS * 40 + num_threads}"
- )
- system_exe.execute(
- f"ALTER SYSTEM SET max_sources = {(MAX_WEBHOOK_SOURCES + MAX_KAFKA_SOURCES + MAX_POSTGRES_SOURCES) * 40 + num_threads}"
- )
- system_exe.execute(
- f"ALTER SYSTEM SET max_sinks = {MAX_KAFKA_SINKS * 40 + num_threads}"
- )
- system_exe.execute(
- f"ALTER SYSTEM SET max_roles = {MAX_ROLES * 40 + num_threads}"
- )
- system_exe.execute(
- f"ALTER SYSTEM SET max_clusters = {MAX_CLUSTERS * 40 + num_threads}"
- )
- system_exe.execute(
- f"ALTER SYSTEM SET max_replicas_per_cluster = {MAX_CLUSTER_REPLICAS * 40 + num_threads}"
- )
- system_exe.execute("ALTER SYSTEM SET max_secrets = 1000000")
- system_exe.execute("ALTER SYSTEM SET idle_in_transaction_session_timeout = 0")
- # Most queries should not fail because of privileges
- for object_type in [
- "TABLES",
- "TYPES",
- "SECRETS",
- "CONNECTIONS",
- "DATABASES",
- "SCHEMAS",
- "CLUSTERS",
- ]:
- system_exe.execute(
- f"ALTER DEFAULT PRIVILEGES FOR ALL ROLES GRANT ALL PRIVILEGES ON {object_type} TO PUBLIC"
- )
- if replicas > 1:
- system_exe.execute("DROP CLUSTER quickstart CASCADE")
- replica_names = [f"r{replica_id}" for replica_id in range(0, replicas)]
- replica_string = ",".join(
- f"{replica_name} (SIZE '4')" for replica_name in replica_names
- )
- system_exe.execute(
- f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
- )
- system_conn.close()
- conn = psycopg.connect(
- host=host,
- port=ports["materialized"],
- user="materialize",
- dbname="materialize",
- )
- conn.autocommit = True
- with conn.cursor() as cur:
- assert composition
- database.create(Executor(rng, cur, None, database), composition)
- conn.close()
- workers = []
- threads = []
- for i in range(num_threads):
- weights: list[float]
- if complexity == Complexity.DDL:
- weights = [60, 30, 30, 30, 100]
- elif complexity == Complexity.DML:
- weights = [60, 30, 30, 30, 0]
- elif complexity == Complexity.Read:
- weights = [60, 30, 0, 0, 0]
- elif complexity == Complexity.DDLOnly:
- weights = [0, 0, 0, 0, 100]
- else:
- raise ValueError(f"Unknown complexity {complexity}")
- worker_rng = random.Random(rng.randrange(SEED_RANGE))
- action_list = worker_rng.choices(
- [
- read_action_list,
- fetch_action_list,
- write_action_list,
- dml_nontrans_action_list,
- ddl_action_list,
- ],
- weights,
- )[0]
- actions = [
- action_class(worker_rng, composition)
- for action_class in action_list.action_classes
- ]
- worker = Worker(
- worker_rng,
- actions,
- action_list.weights,
- end_time,
- action_list.autocommit,
- system=False,
- composition=composition,
- action_list=action_list,
- )
- thread_name = f"worker_{i}"
- print(
- f"{thread_name}: {', '.join(action_class.__name__.removesuffix('Action') for action_class in action_list.action_classes)}"
- )
- workers.append(worker)
- thread = threading.Thread(
- name=thread_name,
- target=worker.run,
- args=(host, ports["materialized"], ports["http"], "materialize", database),
- )
- thread.start()
- threads.append(thread)
- if scenario == Scenario.Cancel:
- worker_rng = random.Random(rng.randrange(SEED_RANGE))
- worker = Worker(
- worker_rng,
- [CancelAction(worker_rng, composition, workers)],
- [1],
- end_time,
- autocommit=False,
- system=True,
- composition=composition,
- )
- workers.append(worker)
- thread = threading.Thread(
- name="cancel",
- target=worker.run,
- args=(host, ports["mz_system"], ports["http"], "mz_system", database),
- )
- thread.start()
- threads.append(thread)
- elif scenario == Scenario.Kill:
- worker_rng = random.Random(rng.randrange(SEED_RANGE))
- assert composition, "Kill scenario only works in mzcompose"
- worker = Worker(
- worker_rng,
- [KillAction(worker_rng, composition, azurite, sanity_restart)],
- [1],
- end_time,
- autocommit=False,
- system=False,
- composition=composition,
- )
- workers.append(worker)
- thread = threading.Thread(
- name="kill",
- target=worker.run,
- args=(host, ports["materialized"], ports["http"], "materialize", database),
- )
- thread.start()
- threads.append(thread)
- elif scenario == Scenario.ZeroDowntimeDeploy:
- worker_rng = random.Random(rng.randrange(SEED_RANGE))
- assert composition, "ZeroDowntimeDeploy scenario only works in mzcompose"
- worker = Worker(
- worker_rng,
- [
- ZeroDowntimeDeployAction(
- worker_rng,
- composition,
- azurite,
- sanity_restart,
- )
- ],
- [1],
- end_time,
- autocommit=False,
- system=False,
- composition=composition,
- )
- workers.append(worker)
- thread = threading.Thread(
- name="zero-downtime-deploy",
- target=worker.run,
- args=(host, ports["materialized"], ports["http"], "materialize", database),
- )
- thread.start()
- threads.append(thread)
- elif scenario == Scenario.BackupRestore:
- worker_rng = random.Random(rng.randrange(SEED_RANGE))
- assert composition, "Backup & Restore scenario only works in mzcompose"
- worker = Worker(
- worker_rng,
- [BackupRestoreAction(worker_rng, composition, database)],
- [1],
- end_time,
- autocommit=False,
- system=False,
- composition=composition,
- )
- workers.append(worker)
- thread = threading.Thread(
- name="kill",
- target=worker.run,
- args=(host, ports["materialized"], ports["http"], "materialize", database),
- )
- thread.start()
- threads.append(thread)
- elif scenario in (Scenario.Regression, Scenario.Rename):
- pass
- else:
- raise ValueError(f"Unknown scenario {scenario}")
- if False: # sanity check for debugging
- worker_rng = random.Random(rng.randrange(SEED_RANGE))
- worker = Worker(
- worker_rng,
- [StatisticsAction(worker_rng, composition)],
- [1],
- end_time,
- autocommit=False,
- system=True,
- composition=composition,
- )
- workers.append(worker)
- thread = threading.Thread(
- name="statistics",
- target=worker.run,
- args=(host, ports["mz_system"], ports["http"], "mz_system", database),
- )
- thread.start()
- threads.append(thread)
- num_queries = defaultdict(Counter)
- try:
- while time.time() < end_time:
- for thread in threads:
- if not thread.is_alive():
- occurred_exception = None
- for worker in workers:
- worker.end_time = time.time()
- occurred_exception = (
- occurred_exception or worker.occurred_exception
- )
- raise WorkerFailedException(
- f"^^^ +++ Thread {thread.name} failed, exiting",
- occurred_exception,
- )
- time.sleep(REPORT_TIME)
- print(
- "QPS: "
- + " ".join(
- f"{worker.num_queries.total() / REPORT_TIME:05.1f}"
- for worker in workers
- )
- )
- for worker in workers:
- for action in worker.num_queries.elements():
- num_queries[worker.action_list][action] += worker.num_queries[
- action
- ]
- worker.num_queries.clear()
- except KeyboardInterrupt:
- print("Keyboard interrupt, exiting")
- for worker in workers:
- worker.end_time = time.time()
- stopping_time = (
- datetime.datetime.now() + datetime.timedelta(seconds=300)
- ).timestamp()
- while time.time() < stopping_time:
- for thread in threads:
- thread.join(timeout=1)
- if all([not thread.is_alive() for thread in threads]):
- break
- else:
- for worker, thread in zip(workers, threads):
- if thread.is_alive():
- print(
- f"{thread.name} still running ({worker.exe.mz_service}): {worker.exe.last_log} ({worker.exe.last_status})"
- )
- print_stats(num_queries, workers, num_threads)
- if num_threads >= 50:
- # Under high load some queries can't finish quickly, especially UPDATE/DELETE
- os._exit(0)
- if scenario == scenario.ZeroDowntimeDeploy:
- # With 0dt deploys connections against the currently-fenced-out
- # environmentd will be stuck forever, the promoted environmentd can
- # take > 5 minutes to become responsive as well
- os._exit(0)
- print("Threads have not stopped within 5 minutes, exiting hard")
- os._exit(1)
- try:
- conn = psycopg.connect(
- host=host, port=ports["materialized"], user="materialize"
- )
- except Exception as e:
- if scenario == Scenario.ZeroDowntimeDeploy:
- print(f"Failed connecting to materialized, using materialized2: {e}")
- conn = psycopg.connect(
- host=host, port=ports["materialized2"], user="materialize"
- )
- else:
- raise
- conn.autocommit = True
- with conn.cursor() as cur:
- # Dropping the database also releases the long running connections
- # used by database objects.
- database.drop(Executor(rng, cur, None, database))
- # Make sure all unreachable connections are closed too
- gc.collect()
- stopping_time = datetime.datetime.now() + datetime.timedelta(seconds=30)
- while datetime.datetime.now() < stopping_time:
- cur.execute(
- "SELECT * FROM mz_internal.mz_sessions WHERE connection_id <> pg_backend_pid()"
- )
- sessions = cur.fetchall()
- if len(sessions) == 0:
- break
- print(
- f"Sessions are still running even though all threads are done: {sessions}"
- )
- # TODO(def-): Why is this failing with psycopg?
- # else:
- # raise ValueError("Sessions did not clean up within 30s of threads stopping")
- conn.close()
- print_stats(num_queries, workers, num_threads)
- def print_stats(
- num_queries: defaultdict[ActionList, Counter[type[Action]]],
- workers: list[Worker],
- num_threads: int,
- ) -> None:
- ignored_errors: defaultdict[str, Counter[type[Action]]] = defaultdict(Counter)
- num_failures = 0
- for worker in workers:
- for action_class, counter in worker.ignored_errors.items():
- ignored_errors[action_class].update(counter)
- for counter in ignored_errors.values():
- for count in counter.values():
- num_failures += count
- total_queries = sum(sub.total() for sub in num_queries.values())
- failed = 100.0 * num_failures / total_queries if total_queries else 0
- print(f"Queries executed: {total_queries} ({failed:.0f}% failed)")
- print("--- Action statistics:")
- for action_list in action_lists:
- text = ", ".join(
- [
- f"{action_class.__name__.removesuffix('Action')}: {num_queries[action_list][action_class]}"
- for action_class in action_list.action_classes
- ]
- )
- print(f" {text}")
- print("--- Error statistics:")
- for error, counter in ignored_errors.items():
- text = ", ".join(
- f"{action_class.__name__}: {count}"
- for action_class, count in counter.items()
- )
- print(f" {error}: {text}")
- assert failed < 50 if num_threads < 50 else failed < 75
- def parse_common_args(parser: argparse.ArgumentParser) -> None:
- parser.add_argument("--seed", type=str, default=str(int(time.time())))
- parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
- parser.add_argument(
- "--complexity",
- default="ddl",
- type=str,
- choices=[elem.value for elem in Complexity] + ["random"],
- )
- parser.add_argument(
- "--scenario",
- default="regression",
- type=str,
- choices=[elem.value for elem in Scenario] + ["random"],
- )
- parser.add_argument(
- "--threads",
- type=int,
- help="Number of threads to run, by default number of SMT threads",
- )
- parser.add_argument(
- "--naughty-identifiers",
- action="store_true",
- help="Whether to use naughty strings as identifiers, makes the queries unreadable",
- )
- parser.add_argument(
- "--fast-startup",
- action="store_true",
- help="Whether to initialize expensive parts like SQLsmith, sources, sinks (for fast local testing, reduces coverage)",
- )
- parser.add_argument(
- "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
- )
- parser.add_argument("--replicas", type=int, default=2, help="use multiple replicas")
- def main() -> int:
- parser = argparse.ArgumentParser(
- prog="parallel-workload",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description="Run a parallel workload against Materialize",
- )
- parser.add_argument("--host", default="localhost", type=str)
- parser.add_argument("--port", default=6875, type=int)
- parser.add_argument("--system-port", default=6877, type=int)
- parser.add_argument("--http-port", default=6876, type=int)
- parse_common_args(parser)
- args = parser.parse_args()
- ports: dict[str, int] = {
- "materialized": args.port,
- "mz_system": args.system_port,
- "http": 6876,
- "kafka": 9092,
- "schema-registry": 8081,
- }
- system_conn = psycopg.connect(
- host=args.host,
- port=ports["mz_system"],
- user="mz_system",
- dbname="materialize",
- )
- system_conn.autocommit = True
- with system_conn.cursor() as cur:
- # TODO: Currently the same as mzcompose default settings, add
- # more settings and shuffle them
- for key, value in get_default_system_parameters().items():
- cur.execute(f"ALTER SYSTEM SET {key} = '{value}'".encode())
- system_conn.close()
- random.seed(args.seed)
- run(
- args.host,
- ports,
- args.seed,
- args.runtime,
- Complexity(args.complexity),
- Scenario(args.scenario),
- args.threads,
- args.naughty_identifiers,
- args.replicas,
- composition=None, # only works in mzcompose
- azurite=args.azurite,
- sanity_restart=False, # only works in mzcompose
- )
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|