123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import time
- from dataclasses import dataclass
- from textwrap import dedent
- import pytest
- from materialize.cloudtest.app.materialize_application import MaterializeApplication
- TD_TIMEOUT_SHORT = 10
- TD_TIMEOUT_FULL_RECOVERY = 660
- @dataclass
- class ReplicaDefinition:
- cluster_name: str
- index: int
- availability_zone: str
- def get_name(self) -> str:
- return f"{self.cluster_name}_r_{self.index}"
- @dataclass
- class ClusterDefinition:
- name: str
- replica_definitions: list[ReplicaDefinition]
- def create_replica_definitions_sql(self) -> str:
- replica_definitions = []
- for replica in self.replica_definitions:
- replica_definitions.append(
- f"{replica.get_name()} (SIZE = '1', AVAILABILITY ZONE '{replica.availability_zone}')"
- )
- return ", ".join(replica_definitions)
- def default_compute_cluster() -> ClusterDefinition:
- """Single cluster in availability zone 1."""
- compute_cluster = ClusterDefinition("c_compute", [])
- compute_cluster.replica_definitions.append(
- ReplicaDefinition(compute_cluster.name, index=1, availability_zone="1")
- )
- return compute_cluster
- def default_storage_cluster() -> ClusterDefinition:
- """Single cluster in availability zone 2."""
- storage_cluster = ClusterDefinition("c_storage", [])
- storage_cluster.replica_definitions.append(
- ReplicaDefinition(storage_cluster.name, index=1, availability_zone="2")
- )
- return storage_cluster
- REQUIRED_PROGRESS = 5
- def populate(
- mz: MaterializeApplication,
- compute_cluster: ClusterDefinition,
- storage_cluster: ClusterDefinition,
- ) -> None:
- # Make sure the `quickstart` cluster replica gets is scheduled on its own
- # node, so queries still work when a node running a compute/storage replica
- # is suspended.
- mz.environmentd.sql(
- "ALTER CLUSTER quickstart SET (AVAILABILITY ZONES ('quickstart'))",
- port="internal",
- user="mz_system",
- )
- all_clusters = [compute_cluster, storage_cluster]
- drop_cluster_statements = [
- f"> DROP CLUSTER IF EXISTS {cluster.name};" for cluster in all_clusters
- ]
- # string needs the same indentation as testdrive script below
- drop_cluster_statement_sql = "\n".join(drop_cluster_statements)
- create_cluster_statements = [
- f"> CREATE CLUSTER {cluster.name} REPLICAS ({cluster.create_replica_definitions_sql()});"
- for cluster in all_clusters
- ]
- # string needs the same indentation as testdrive script below
- create_cluster_statement_sql = "\n".join(create_cluster_statements)
- mz.testdrive.run(
- input=dedent(
- """
- > DROP MATERIALIZED VIEW IF EXISTS mv;
- > DROP SOURCE IF EXISTS source CASCADE;
- """
- )
- + dedent(drop_cluster_statement_sql)
- + "\n"
- + dedent(create_cluster_statement_sql)
- + "\n"
- + dedent(
- f"""
- > SELECT name FROM mz_clusters WHERE name IN ('{compute_cluster.name}', '{storage_cluster.name}');
- {storage_cluster.name}
- {compute_cluster.name}
- > CREATE SOURCE source IN CLUSTER {storage_cluster.name}
- FROM LOAD GENERATOR COUNTER
- (TICK INTERVAL '500ms');
- > SELECT COUNT(*) FROM (SHOW SOURCES) WHERE name = 'source';
- 1
- > CREATE MATERIALIZED VIEW mv (f1) IN CLUSTER {compute_cluster.name} AS SELECT counter + 1 FROM source;
- > CREATE DEFAULT INDEX IN CLUSTER {compute_cluster.name} ON mv;
- > SELECT COUNT(*) > 0 from mv;
- true
- """
- ),
- no_reset=True,
- )
- def validate_state(
- mz: MaterializeApplication,
- reached_index: int,
- must_exceed_reached_index: bool,
- timeout_in_sec: int,
- expected_state: str,
- isolation_level: str = "STRICT SERIALIZABLE",
- ) -> None:
- comparison_operator = ">" if must_exceed_reached_index else ">="
- print(f"Expect '{expected_state}' within timeout of {timeout_in_sec}s")
- testdrive_run_timeout_in_sec = 10
- validation_succeeded = False
- last_error_message = None
- start_time = time.time()
- # re-run testdrive to make sure it connects to the most recent envd
- max_run_count = int(timeout_in_sec / testdrive_run_timeout_in_sec)
- max_run_count = 1 if max_run_count < 1 else max_run_count
- for run in range(0, max_run_count):
- is_last_run = run + 1 == max_run_count
- try:
- mz.testdrive.run(
- input=dedent(
- f"""
- > SET TRANSACTION_ISOLATION TO '{isolation_level}';
- > SELECT COUNT(*) {comparison_operator} {reached_index} FROM source; -- validate source with isolation {isolation_level}
- true
- > SELECT COUNT(*) {comparison_operator} {reached_index} FROM mv; -- validate mv with isolation {isolation_level}
- true
- """
- ),
- default_timeout=f"{testdrive_run_timeout_in_sec}s",
- no_reset=True,
- suppress_command_error_output=not is_last_run,
- )
- validation_succeeded = True
- break
- except Exception as e:
- try_info = f"{run + 1}/{max_run_count} with isolation {isolation_level}"
- # arbitrary error can occur if envd is not yet ready after restart
- if is_last_run:
- print(f"Validation failed in try {try_info}, aborting!")
- if last_error_message is not None:
- print(f"Last error message was: {last_error_message}")
- else:
- print(f"Validation failed in try {try_info}, retrying.")
- last_error_message = str(e)
- end_time = time.time()
- if not validation_succeeded:
- # do not raise an FailedTestExecutionError because we are not in mzcompose
- # do not use fail because it comes with a verbose stacktrace
- assert (
- False
- ), f"Failed to achieve '{expected_state}' using '{isolation_level}' within {timeout_in_sec}s!"
- duration = round(end_time - start_time, 1)
- print(
- f"Succeeded to achieve '{expected_state}' within {duration} seconds (limit: {timeout_in_sec}s)"
- )
- def get_current_counter_index(mz: MaterializeApplication) -> int:
- """
- This query has no timeout. Only use it if is expected to deliver.
- """
- reached_value: int = mz.environmentd.sql_query("SELECT COUNT(*) FROM source")[0][0]
- return reached_value
- def suspend_node_of_replica(
- mz: MaterializeApplication, cluster: ClusterDefinition
- ) -> str:
- node_names = mz.get_cluster_node_names(cluster.name)
- assert len(node_names) > 0
- print(f"Cluster {cluster.name} uses nodes {node_names}")
- suspended_node_name = node_names[0]
- mz.suspend_k8s_node(suspended_node_name)
- return suspended_node_name
- @pytest.mark.node_recovery
- def test_unreplicated_storage_cluster_on_failing_node(
- mz: MaterializeApplication,
- ) -> None:
- """
- An unreplicated storage cluster is on the failed node. Queries of a downstream index in serializable mode should
- continue to work but return stale data. Staleness should resolve within a minute or two.
- """
- compute_cluster = default_compute_cluster()
- storage_cluster = default_storage_cluster()
- populate(mz, compute_cluster, storage_cluster)
- reached_index = get_current_counter_index(mz)
- suspended_node_name = suspend_node_of_replica(mz, storage_cluster)
- # with SERIALIZABLE
- validate_state(
- mz,
- reached_index,
- must_exceed_reached_index=False,
- timeout_in_sec=TD_TIMEOUT_SHORT,
- expected_state="stale data being delivered timely",
- isolation_level="SERIALIZABLE",
- )
- # with STRICT SERIALIZABLE
- validate_state(
- mz,
- reached_index,
- must_exceed_reached_index=False,
- timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
- expected_state="data being delivered",
- isolation_level="STRICT SERIALIZABLE",
- )
- # only request this index because the previous validation succeeded / did not block
- stalled_index = get_current_counter_index(mz)
- # expect live data to be delivered at most after two minutes in production (or longer in k8s)
- validate_state(
- mz,
- stalled_index,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
- expected_state="live data after to node recovery",
- )
- recovered_index = get_current_counter_index(mz)
- mz.revive_suspended_k8s_node(suspended_node_name)
- validate_state(
- mz,
- recovered_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_SHORT,
- expected_state="no issues after node recovery",
- )
- @pytest.mark.node_recovery
- def test_unreplicated_compute_cluster_on_failing_node(
- mz: MaterializeApplication,
- ) -> None:
- """
- An unreplicated compute cluster is on the failed node. Queries of indexes on the compute cluster should fail, but
- resolve within a minute or two.
- """
- compute_cluster = default_compute_cluster()
- storage_cluster = default_storage_cluster()
- populate(mz, compute_cluster, storage_cluster)
- reached_index = get_current_counter_index(mz)
- suspended_node_name = suspend_node_of_replica(
- mz,
- compute_cluster,
- )
- # expect (live) data to be delivered after at most after two minutes in production (or longer in k8s)
- validate_state(
- mz,
- reached_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
- expected_state="node recovery and live data",
- )
- recovered_index = get_current_counter_index(mz)
- mz.revive_suspended_k8s_node(suspended_node_name)
- validate_state(
- mz,
- recovered_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_SHORT,
- expected_state="no issues after node recovery",
- )
- @pytest.mark.node_recovery
- def test_replicated_compute_cluster_on_failing_node(mz: MaterializeApplication) -> None:
- """
- A replicated compute cluster is on the failed node. Queries of indexes on the compute cluster should experience no
- disruption in latency, thanks to the second replica.
- """
- compute_cluster = default_compute_cluster()
- compute_cluster.replica_definitions.append(
- ReplicaDefinition(compute_cluster.name, index=2, availability_zone="3")
- )
- assert (
- compute_cluster.replica_definitions[0].availability_zone
- != compute_cluster.replica_definitions[1].availability_zone
- ), "Test configuration error"
- storage_cluster = default_storage_cluster()
- populate(mz, compute_cluster, storage_cluster)
- reached_index = get_current_counter_index(mz)
- nodes_with_compute_clusters = set(mz.get_cluster_node_names(compute_cluster.name))
- nodes_with_storage_clusters = set(mz.get_cluster_node_names(storage_cluster.name))
- nodes_with_only_compute_clusters = (
- nodes_with_compute_clusters - nodes_with_storage_clusters
- )
- assert (
- len(nodes_with_only_compute_clusters) > 0
- ), "No nodes that do not contain both compute and storage clusters"
- suspended_node_name = next(iter(nodes_with_only_compute_clusters))
- print(
- f"Compute clusters on nodes {nodes_with_compute_clusters}, storage clusters on nodes {nodes_with_storage_clusters}"
- )
- print(f"Suspending {suspended_node_name}")
- mz.suspend_k8s_node(suspended_node_name)
- validate_state(
- mz,
- reached_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_SHORT,
- expected_state="live data without disruption in latency",
- isolation_level="SERIALIZABLE",
- )
- reached_index = get_current_counter_index(mz)
- mz.revive_suspended_k8s_node(suspended_node_name)
- validate_state(
- mz,
- reached_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_SHORT,
- expected_state="no issues after node recovery",
- )
- @pytest.mark.node_recovery
- def test_envd_on_failing_node(mz: MaterializeApplication) -> None:
- """
- environmentd is on the failed node. All connections should fail, but resolve within a minute or two.
- """
- compute_cluster = default_compute_cluster()
- storage_cluster = default_storage_cluster()
- populate(mz, compute_cluster, storage_cluster)
- reached_index = get_current_counter_index(mz)
- envd_node_name = mz.get_k8s_value(
- "app=environmentd", "{.items[*].spec.nodeName}", remove_quotes=True
- )
- mz.suspend_k8s_node(envd_node_name)
- print("Expecting connection timeout...")
- # all connections / queries should fail initially
- try:
- mz.testdrive.run(
- input=dedent(
- """
- > SELECT COUNT(*) > 0 FROM mz_tables;
- true
- """
- ),
- default_timeout=f"{TD_TIMEOUT_SHORT}s",
- no_reset=True,
- suppress_command_error_output=True,
- )
- raise RuntimeError("Expected timeout")
- except Exception:
- # OK
- print("Timeout is expected")
- print("Survived connection timeout.")
- # expect (live) data to be delivered after at most after two minutes in production (or longer in k8s)
- validate_state(
- mz,
- reached_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
- expected_state="node recovery and live data",
- )
- recovered_index = get_current_counter_index(mz)
- mz.revive_suspended_k8s_node(envd_node_name)
- validate_state(
- mz,
- recovered_index + REQUIRED_PROGRESS,
- must_exceed_reached_index=True,
- timeout_in_sec=TD_TIMEOUT_SHORT,
- expected_state="no issues after node recovery",
- )
|