123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Test replica isolation by introducing faults of various kinds in replica1 and
- then making sure that the cluster continues to operate properly
- """
- import time
- from collections.abc import Callable
- from dataclasses import dataclass
- from textwrap import dedent
- from typing import Any
- from psycopg import Cursor
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.clusterd import Clusterd
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.localstack import Localstack
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.zookeeper import Zookeeper
- from materialize.util import selected_by_name
- SERVICES = [
- Zookeeper(),
- Kafka(),
- SchemaRegistry(),
- Localstack(),
- Materialized(
- additional_system_parameter_defaults={
- "log_filter": "mz_cluster::client=debug,info",
- },
- ),
- Clusterd(name="clusterd_1_1"),
- Clusterd(name="clusterd_1_2"),
- Clusterd(name="clusterd_2_1"),
- Clusterd(name="clusterd_2_2"),
- Testdrive(),
- ]
- class AllowCompactionCheck:
- # replica: a string describing the SQL accessible name of the replica. Example: "cluster1.replica1"
- # host: docker container name from which to check the log. Example: "clusterd_1_1"
- def __init__(self, replica: str, host: str):
- assert "." in replica
- self.replica = replica
- self.host = host
- self.ids: list[str] | None = None
- self.missing_ids: list[str] = []
- self.satisfied = False
- def find_ids(self, c: Composition) -> None:
- raise NotImplementedError
- def print_error(self) -> None:
- raise NotImplementedError
- def check_log(self, c: Composition) -> None:
- self.find_ids(c)
- assert self.ids is not None
- log: str = c.invoke("logs", self.host, capture=True).stdout
- self.missing_ids = []
- self.satisfied = all([self._log_contains_id(log, x) for x in self.ids])
- def replica_id(self, c: Composition) -> str:
- cursor = c.sql_cursor()
- (cluster, replica) = self.replica.split(".")
- cursor.execute(
- f"""
- SELECT mz_cluster_replicas.id FROM mz_clusters, mz_cluster_replicas
- WHERE cluster_id = mz_clusters.id AND mz_clusters.name = '{cluster}'
- AND mz_cluster_replicas.name = '{replica}'""".encode(),
- )
- return str(get_single_value_from_cursor(cursor))
- def cluster_id(self, c: Composition) -> str:
- cursor = c.sql_cursor()
- cluster = self.replica.split(".")[0]
- cursor.execute(
- f"SELECT id FROM mz_clusters WHERE mz_clusters.name = '{cluster}'".encode(),
- )
- return str(get_single_value_from_cursor(cursor))
- def _log_contains_id(self, log: str, the_id: str) -> bool:
- for line in [
- x for x in log.splitlines() if "ClusterClient send=AllowCompaction" in x
- ]:
- if the_id in line:
- return True
- self.missing_ids.append(the_id)
- return False
- @staticmethod
- def _format_id(iid: str) -> str:
- if iid.startswith("si"):
- return "IntrospectionSourceIndex(" + iid[2:] + ")"
- elif iid.startswith("s"):
- return "System(" + iid[1:] + ")"
- elif iid.startswith("u"):
- return "User(" + iid[1:] + ")"
- raise RuntimeError(f"Unexpected iid: {iid}")
- @staticmethod
- def all_checks(replica: str, host: str) -> list["AllowCompactionCheck"]:
- return [
- MaterializedView(replica, host),
- ArrangedIntro(replica, host),
- ArrangedIndex(replica, host),
- ]
- class MaterializedView(AllowCompactionCheck):
- """
- Checks that clusterd receives AllowCompaction commands for materialized views.
- For materialized views we hold back compaction until slow replicas have caught
- up. Hence we dont expect these messages if there is another failing replica in
- the cluster.
- """
- def find_ids(self, c: Composition) -> None:
- cursor = c.sql_cursor()
- cursor.execute(
- """
- SELECT id,shard_id from mz_internal.mz_storage_shards, mz_catalog.mz_materialized_views
- WHERE object_id = id AND name = 'v3';
- """
- )
- self.ids = [self._format_id(get_single_value_from_cursor(cursor))]
- def print_error(self) -> None:
- print(
- f"!! AllowCompaction not found for materialized view with ids {self.missing_ids}"
- )
- class ArrangedIntro(AllowCompactionCheck):
- """
- Checks that clusterd receives AllowCompaction commands for introspection sources.
- This is purely per replica property. Other failing replicas in the same cluster should
- not influence the result of this test.
- """
- def find_ids(self, c: Composition) -> None:
- cluster_id = self.cluster_id(c)
- cursor = c.sql_cursor()
- cursor.execute(
- f"""
- SELECT idx.id from mz_catalog.mz_sources AS src, mz_catalog.mz_indexes AS idx
- WHERE src.id = idx.on_id AND idx.cluster_id = '{cluster_id}'""".encode()
- )
- self.ids = [self._format_id(x[0]) for x in cursor.fetchall()]
- def print_error(self) -> None:
- print(
- f"!! AllowCompaction not found for introspection with ids {self.missing_ids}"
- )
- class ArrangedIndex(AllowCompactionCheck):
- """
- Checks that the arrangement of an index receive AllowCompaction.
- For arrangements, we hold back compaction until all replicas have caught up. Thus, a failing
- replica will not guarantee these messages anymore.
- """
- def find_ids(self, c: Composition) -> None:
- cursor = c.sql_cursor()
- cursor.execute(
- """
- SELECT idx.id FROM mz_catalog.mz_views AS views, mz_catalog.mz_indexes AS idx
- WHERE views.name = 'ct1' AND views.id = idx.on_id
- """
- )
- self.ids = [self._format_id(x[0]) for x in cursor.fetchall()]
- def print_error(self) -> None:
- print(
- f"!! AllowCompaction not found for index arrangement with ids {self.missing_ids}"
- )
- def populate(c: Composition) -> None:
- # Create some database objects
- c.testdrive(
- dedent(
- """
- > CREATE TABLE t1 (f1 INTEGER);
- > INSERT INTO t1 SELECT * FROM generate_series(1, 10);
- > CREATE VIEW ct1 AS SELECT COUNT(*) AS c1 FROM t1;
- > CREATE DEFAULT INDEX ON ct1;
- > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c1 FROM t1;
- > CREATE TABLE ten (f1 INTEGER);
- > INSERT INTO ten SELECT * FROM generate_series(1, 10);
- > CREATE MATERIALIZED VIEW expensive AS SELECT (a1.f1 * 1) +
- (a2.f1 * 10) +
- (a3.f1 * 100) +
- (a4.f1 * 1000) +
- (a5.f1 * 10000) +
- (a6.f1 * 100000) +
- (a7.f1 * 1000000)
- FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4, ten AS a5, ten AS a6, ten AS a7;
- $ kafka-create-topic topic=source1
- $ kafka-ingest format=bytes topic=source1 repeat=1000000
- A${kafka-ingest.iteration}
- > CREATE CLUSTER c SIZE '1';
- > CREATE CONNECTION IF NOT EXISTS kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
- > CREATE SOURCE source1
- IN CLUSTER c
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source1-${testdrive.seed}')
- FORMAT BYTES
- > CREATE MATERIALIZED VIEW v2 AS SELECT COUNT(*) FROM source1
- """
- ),
- )
- def restart_replica(c: Composition) -> None:
- c.kill("clusterd_1_1", "clusterd_1_2")
- c.up("clusterd_1_1", "clusterd_1_2")
- def restart_environmentd(c: Composition) -> None:
- c.kill("materialized")
- c.up("materialized")
- def drop_create_replica(c: Composition) -> None:
- c.sql(
- "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
- port=6877,
- user="mz_system",
- )
- c.testdrive(
- dedent(
- """
- > DROP CLUSTER REPLICA cluster1.replica1
- > CREATE CLUSTER REPLICA cluster1.replica3
- STORAGECTL ADDRESSES ['clusterd_1_1:2100', 'clusterd_1_2:2100'],
- STORAGE ADDRESSES ['clusterd_1_1:2103', 'clusterd_1_2:2103'],
- COMPUTECTL ADDRESSES ['clusterd_1_1:2101', 'clusterd_1_2:2101'],
- COMPUTE ADDRESSES ['clusterd_1_1:2102', 'clusterd_1_2:2102']
- """
- )
- )
- def create_invalid_replica(c: Composition) -> None:
- c.sql(
- "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
- port=6877,
- user="mz_system",
- )
- c.testdrive(
- dedent(
- """
- > CREATE CLUSTER REPLICA cluster1.replica3
- STORAGECTL ADDRESSES ['no_such_host:2100'],
- STORAGE ADDRESSES ['no_such_host:2103'],
- COMPUTECTL ADDRESSES ['no_such_host:2101'],
- COMPUTE ADDRESSES ['no_such_host:2102']
- """
- )
- )
- def validate(c: Composition) -> None:
- # Validate that the cluster continues to operate
- c.testdrive(
- dedent(
- """
- # Dataflows
- > SELECT * FROM ct1;
- 10
- > SELECT * FROM v1;
- 10
- # Existing sources
- $ kafka-ingest format=bytes topic=source1 repeat=1000000
- B${kafka-ingest.iteration}
- > SELECT * FROM v2;
- 2000000
- # Existing tables
- > INSERT INTO t1 VALUES (20);
- > SELECT * FROM ct1;
- 11
- > SELECT * FROM v1;
- 11
- # New materialized views
- > CREATE MATERIALIZED VIEW v3 AS SELECT COUNT(*) AS c1 FROM t1;
- > SELECT * FROM v3;
- 11
- # New tables
- > CREATE TABLE t2 (f1 INTEGER);
- > INSERT INTO t2 SELECT * FROM t1;
- > SELECT COUNT(*) FROM t2;
- 11
- > CREATE CONNECTION IF NOT EXISTS kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
- # New sources
- > CREATE SOURCE source2
- IN CLUSTER c
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source1-${testdrive.seed}')
- FORMAT BYTES
- > SELECT COUNT(*) FROM source2
- 2000000
- """
- ),
- )
- def validate_introspection_compaction(
- c: Composition, checks: list[AllowCompactionCheck]
- ) -> None:
- # Validate that the AllowCompaction commands arrive at the corresponding replicas.
- # Allow up to 10 seconds for the compaction the command to appear
- start = time.time()
- while time.time() < start + 5:
- for check in checks:
- check.check_log(c)
- if all([check.satisfied for check in checks]):
- return
- for check in checks:
- if not check.satisfied:
- check.print_error()
- assert all([check.satisfied for check in checks])
- @dataclass
- class Disruption:
- name: str
- disruption: Callable
- compaction_checks: list[AllowCompactionCheck]
- disruptions = [
- Disruption(
- name="none",
- disruption=lambda c: None,
- compaction_checks=AllowCompactionCheck.all_checks(
- "cluster1.replica1", "clusterd_1_1"
- )
- + AllowCompactionCheck.all_checks("cluster1.replica2", "clusterd_2_1"),
- ),
- Disruption(
- name="drop-create-replica",
- disruption=lambda c: drop_create_replica(c),
- compaction_checks=[
- ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
- ],
- ),
- Disruption(
- name="create-invalid-replica",
- disruption=lambda c: create_invalid_replica(c),
- compaction_checks=[
- ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
- ],
- ),
- Disruption(
- name="restart-replica",
- disruption=lambda c: restart_replica(c),
- compaction_checks=AllowCompactionCheck.all_checks(
- "cluster1.replica1", "clusterd_1_1"
- )
- + AllowCompactionCheck.all_checks("cluster1.replica2", "clusterd_2_1"),
- ),
- Disruption(
- name="pause-one-clusterd",
- disruption=lambda c: c.pause("clusterd_1_1"),
- compaction_checks=[
- ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
- ],
- ),
- Disruption(
- name="kill-replica",
- disruption=lambda c: c.kill("clusterd_1_1", "clusterd_1_2"),
- compaction_checks=[
- ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
- ],
- ),
- Disruption(
- name="drop-replica",
- disruption=lambda c: c.testdrive("> DROP CLUSTER REPLICA cluster1.replica1"),
- compaction_checks=AllowCompactionCheck.all_checks(
- "cluster1.replica2", "clusterd_2_1"
- ),
- ),
- Disruption(
- name="restart-environmentd",
- disruption=restart_environmentd,
- compaction_checks=AllowCompactionCheck.all_checks(
- "cluster1.replica1", "clusterd_1_1"
- )
- + AllowCompactionCheck.all_checks("cluster1.replica2", "clusterd_2_1"),
- ),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument("disruptions", nargs="*", default=[d.name for d in disruptions])
- args = parser.parse_args()
- c.up("zookeeper", "kafka", "schema-registry", "localstack")
- for id, disruption in enumerate(selected_by_name(args.disruptions, disruptions)):
- run_test(c, disruption, id)
- def run_test(c: Composition, disruption: Disruption, id: int) -> None:
- # Cleanup here instead of at the end of the test to make sure we keep state
- # and logs in case something goes wrong
- cleanup_list = [
- "materialized",
- "testdrive",
- "clusterd_1_1",
- "clusterd_1_2",
- "clusterd_2_1",
- "clusterd_2_2",
- ]
- c.kill(*cleanup_list)
- c.rm(*cleanup_list, destroy_volumes=True)
- c.rm_volumes("mzdata")
- print(f"+++ Running disruption scenario {disruption.name}")
- with c.override(
- Testdrive(
- no_reset=True,
- materialize_params={"cluster": "cluster1"},
- seed=id,
- default_timeout="300s",
- ),
- Clusterd(
- name="clusterd_1_1",
- process_names=["clusterd_1_1", "clusterd_1_2"],
- ),
- Clusterd(
- name="clusterd_1_2",
- process_names=["clusterd_1_1", "clusterd_1_2"],
- ),
- Clusterd(
- name="clusterd_2_1",
- process_names=["clusterd_2_1", "clusterd_2_2"],
- ),
- Clusterd(
- name="clusterd_2_2",
- process_names=["clusterd_2_1", "clusterd_2_2"],
- ),
- ):
- c.up(
- "materialized",
- "clusterd_1_1",
- "clusterd_1_2",
- "clusterd_2_1",
- "clusterd_2_2",
- {"name": "testdrive", "persistent": True},
- )
- c.sql(
- "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
- port=6877,
- user="mz_system",
- )
- if any(
- isinstance(check, ArrangedIntro) for check in disruption.compaction_checks
- ):
- # Disable introspection subscribes because they break the
- # `ArrangedIntro` check by disabling compaction of logging indexes
- # on all replicas if one of the replicas is failing. That's because
- # of a defect of replica-targeted subscribes: They get installed on
- # all replicas but only the targeted replica can drive the write
- # frontier forward. If the targeted replica is crashing, the write
- # frontier cannot advance and thus the read frontier cannot either.
- #
- # TODO(database-issues#8091): Fix this by installing targeted subscribes only on the
- # targeted replica.
- c.sql(
- "ALTER SYSTEM SET enable_introspection_subscribes = false;",
- port=6877,
- user="mz_system",
- )
- c.sql(
- """
- CREATE CLUSTER cluster1 REPLICAS (
- replica1 (
- STORAGECTL ADDRESSES ['clusterd_1_1:2100', 'clusterd_1_2:2100'],
- STORAGE ADDRESSES ['clusterd_1_1:2103', 'clusterd_1_2:2103'],
- COMPUTECTL ADDRESSES ['clusterd_1_1:2101', 'clusterd_1_2:2101'],
- COMPUTE ADDRESSES ['clusterd_1_1:2102', 'clusterd_1_2:2102']
- ),
- replica2 (
- STORAGECTL ADDRESSES ['clusterd_2_1:2100', 'clusterd_2_2:2100'],
- STORAGE ADDRESSES ['clusterd_2_1:2103', 'clusterd_2_2:2103'],
- COMPUTECTL ADDRESSES ['clusterd_2_1:2101', 'clusterd_2_2:2101'],
- COMPUTE ADDRESSES ['clusterd_2_1:2102', 'clusterd_2_2:2102']
- )
- )
- """
- )
- populate(c)
- # Disrupt replica1 by some means
- disruption.disruption(c)
- validate(c)
- validate_introspection_compaction(c, disruption.compaction_checks)
- def get_single_value_from_cursor(cursor: Cursor) -> Any:
- result = cursor.fetchone()
- assert result is not None
- return result[0]
|