123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Introduce a second Mz instance while a concurrent workload is running for the
- purpose of exercising fencing.
- """
- import argparse
- import random
- import time
- from concurrent import futures
- from dataclasses import dataclass
- from enum import Enum
- from materialize import buildkite
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.azurite import Azurite
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.minio import Minio
- from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
- class Operation(Enum):
- START_SECOND_MZ = 0
- INSERT = 1
- @dataclass
- class Workload:
- name: str
- txn_wal_first: str = "off"
- txn_wal_second: str = "eager"
- concurrency: int = 100
- tables: int = 1
- operation = Operation.INSERT
- second_mz_delay = 5
- operation_count = 3000
- max_transaction_size = 100
- @dataclass
- class SuccessfulCommit:
- table_id: int
- row_id: int
- transaction_size: int
- WORKLOADS = [
- Workload(
- name="off_to_eager_simple",
- ),
- Workload(
- name="off_to_lazy_simple",
- txn_wal_first="off",
- txn_wal_second="lazy",
- ),
- Workload(
- name="eager_to_lazy_simple",
- txn_wal_first="eager",
- txn_wal_second="lazy",
- ),
- Workload(
- name="eager_to_off_simple",
- txn_wal_first="eager",
- txn_wal_second="off",
- ),
- Workload(name="off_to_eager_many_tables", tables=100),
- Workload(name="off_to_eager_many_connections", concurrency=512),
- Workload(
- name="eager_to_lazy_many_tables",
- tables=100,
- txn_wal_first="eager",
- txn_wal_second="lazy",
- ),
- Workload(
- name="eager_to_lazy_many_connections",
- concurrency=512,
- txn_wal_first="eager",
- txn_wal_second="lazy",
- ),
- ]
- SERVICES = [
- Minio(setup_materialize=True),
- Azurite(),
- CockroachOrPostgresMetadata(),
- # Overriden below
- Materialized(name="mz_first"),
- Materialized(name="mz_second"),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument(
- "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
- )
- args = parser.parse_args()
- workloads = buildkite.shard_list(WORKLOADS, lambda w: w.name)
- print(
- f"Workloads in shard with index {buildkite.get_parallelism_index()}: {[w.name for w in workloads]}"
- )
- for workload in workloads:
- run_workload(c, workload, args)
- def execute_operation(
- args: tuple[Composition, Workload, Operation, int]
- ) -> SuccessfulCommit | None:
- c, workload, operation, id = args
- if operation == Operation.START_SECOND_MZ:
- print(
- f"Will sleep {workload.second_mz_delay} before bringing up 'mz_second' ..."
- )
- time.sleep(workload.second_mz_delay)
- print("+++ Bringing up 'mz_second'...")
- c.up("mz_second")
- print("+++ 'mz_second' is now up.")
- return None
- elif operation == Operation.INSERT:
- table_id = id % workload.tables
- mz_service = random.choices(["mz_first", "mz_second"], weights=(66, 33))[0]
- transaction = random.choice([True, False])
- if transaction:
- transaction_size = random.randrange(workload.max_transaction_size) + 1
- else:
- transaction_size = 1
- try:
- cursor = c.sql_cursor(service=mz_service)
- if transaction_size > 1:
- cursor.execute("BEGIN")
- for i in range(transaction_size):
- cursor.execute(
- f"INSERT INTO table{table_id} VALUES ({id}, {i}, '{mz_service}')".encode()
- )
- cursor.execute("COMMIT")
- else:
- cursor.execute(
- f"INSERT INTO table{table_id} VALUES ({id}, 0, '{mz_service}')".encode()
- )
- except Exception as e:
- str_e = str(e)
- if "running docker compose failed" in str_e:
- # The query targeted a Mz container that is not up
- return None
- elif "server closed the connection unexpectedly" in str_e:
- # Container died while query was in progress
- return None
- elif "Connection refused" in str_e:
- # Container died before the SQL connection was established
- return None
- else:
- raise RuntimeError(f"unexpected exception: {e}")
- # No error, so we assume the INSERT successfully committed
- return SuccessfulCommit(
- table_id=table_id, row_id=id, transaction_size=transaction_size
- )
- def run_workload(c: Composition, workload: Workload, args: argparse.Namespace) -> None:
- print(f"+++ Running workload {workload.name} ...")
- c.silent = True
- c.down(destroy_volumes=True)
- c.up(c.metadata_store())
- mzs = {
- "mz_first": workload.txn_wal_first,
- "mz_second": workload.txn_wal_second,
- }
- with c.override(
- *[
- Materialized(
- name=mz_name,
- external_metadata_store=True,
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- sanity_restart=False,
- )
- for mz_name in mzs
- ]
- ):
- c.up("mz_first")
- c.sql(
- """
- ALTER SYSTEM SET max_tables = 1000;
- ALTER SYSTEM SET max_materialized_views = 1000;
- """,
- port=6877,
- user="mz_system",
- service="mz_first",
- )
- print("+++ Creating database objects ...")
- for table_id in range(workload.tables):
- c.sql(
- f"""
- CREATE TABLE IF NOT EXISTS table{table_id}(id INTEGER, subid INTEGER, mz_service STRING);
- CREATE MATERIALIZED VIEW view{table_id} AS SELECT DISTINCT id, subid, mz_service FROM table{table_id};
- """,
- service="mz_first",
- )
- print("+++ Running workload ...")
- start = time.time()
- # Schedule the start of the second Mz instance
- operations = [(c, workload, Operation.START_SECOND_MZ, 0)]
- # As well as all the other operations in the workload
- operations = operations + [
- (c, workload, workload.operation, id)
- for id in range(workload.operation_count)
- ]
- with futures.ThreadPoolExecutor(
- workload.concurrency,
- ) as executor:
- commits = executor.map(execute_operation, operations)
- elapsed = time.time() - start
- # The second Mz instance can come up slightly faster
- assert elapsed > (
- workload.second_mz_delay * 2
- ), f"Workload completed too soon - elapsed {elapsed}s is less than 2 x second_mz_delay({workload.second_mz_delay}s)"
- print(
- f"Workload completed in {elapsed} seconds, with second_mz_delay being {workload.second_mz_delay} seconds."
- )
- # Confirm that the first Mz has properly given up the ghost
- mz_first_log = c.invoke("logs", "mz_first", capture=True)
- assert (
- "unable to confirm leadership" in mz_first_log.stdout
- or "unexpected fence epoch" in mz_first_log.stdout
- or "fenced by new catalog upper" in mz_first_log.stdout
- or "fenced by envd" in mz_first_log.stdout
- )
- print("+++ Verifying committed transactions ...")
- cursor = c.sql_cursor(service="mz_second")
- for commit in commits:
- if commit is None:
- continue
- for target in ["table", "view"]:
- cursor.execute(
- f"""
- SELECT id, COUNT(*) AS transaction_size
- FROM {target}{commit.table_id}
- WHERE id = {commit.row_id}
- GROUP BY id
- """.encode()
- )
- result = cursor.fetchall()
- assert len(result) == 1
- assert (
- result[0][0] == commit.row_id
- ), f"Unexpected result {result}; commit: {commit}; target {target}"
- assert (
- result[0][1] == commit.transaction_size
- ), f"Unexpected result {result}; commit: {commit}; target {target}"
- print("Verification complete.")
|