123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import copy
- import json
- import logging
- import time
- import pytest
- from pg8000.exceptions import InterfaceError
- from materialize.cloudtest.app.materialize_application import MaterializeApplication
- from materialize.cloudtest.k8s.environmentd import EnvironmentdStatefulSet
- from materialize.cloudtest.util.cluster import cluster_pod_name, cluster_service_name
- from materialize.cloudtest.util.exists import exists, not_exists
- from materialize.cloudtest.util.wait import wait
- LOGGER = logging.getLogger(__name__)
- def test_cluster_sizing(mz: MaterializeApplication) -> None:
- """Test that a SIZE N cluster indeed creates N clusterd instances."""
- SIZE = 2
- mz.environmentd.sql(
- f"CREATE CLUSTER sized1 REPLICAS (sized_replica1 (SIZE '{SIZE}-1'))"
- )
- cluster_id = mz.environmentd.sql_query(
- "SELECT id FROM mz_clusters WHERE name = 'sized1'"
- )[0][0]
- assert cluster_id is not None
- replica_id = mz.environmentd.sql_query(
- "SELECT id FROM mz_cluster_replicas WHERE name = 'sized_replica1'"
- )[0][0]
- assert replica_id is not None
- for compute_id in range(0, SIZE):
- compute_pod = cluster_pod_name(cluster_id, replica_id, compute_id)
- wait(condition="condition=Ready", resource=compute_pod)
- mz.environmentd.sql("DROP CLUSTER sized1 CASCADE")
- @pytest.mark.parametrize(
- "failpoint",
- ["", "after_catalog_drop_replica=panic", "after_sequencer_drop_replica=panic"],
- )
- @pytest.mark.skip(reason="Failpoints mess up the Mz instance database-issues#5263")
- def test_cluster_shutdown(mz: MaterializeApplication, failpoint: str) -> None:
- """Test that dropping a cluster or replica causes the associated clusterds to shut down."""
- LOGGER.info(f"Testing cluster shutdown with failpoint={failpoint}")
- mz.set_environmentd_failpoints(failpoint)
- def sql_expect_crash(sql: str) -> None:
- # We expect executing `sql` will crash environmentd. To ensure it is actually `sql`
- # wait until the SQL interface is available.
- mz.wait_for_sql()
- try:
- mz.environmentd.sql(sql)
- except InterfaceError as e:
- LOGGER.error(f"Expected SQL error: {e}")
- mz.environmentd.sql(
- "CREATE CLUSTER shutdown1 REPLICAS (shutdown_replica1 (SIZE '1'), shutdown_replica2 (SIZE '1'))"
- )
- cluster_id = mz.environmentd.sql_query(
- "SELECT id FROM mz_clusters WHERE name = 'shutdown1'"
- )[0][0]
- assert cluster_id is not None
- compute_pods = {}
- compute_svcs = {}
- for replica_name in ["shutdown_replica1", "shutdown_replica2"]:
- replica_id = mz.environmentd.sql_query(
- f"SELECT id FROM mz_cluster_replicas WHERE name = '{replica_name}'"
- )[0][0]
- assert replica_id is not None
- compute_pod = cluster_pod_name(cluster_id, replica_id)
- compute_pods[replica_name] = compute_pod
- wait(condition="condition=Ready", resource=compute_pod)
- compute_svc = cluster_service_name(cluster_id, replica_id)
- compute_svcs[replica_name] = compute_svc
- exists(resource=compute_svc)
- sql_expect_crash("DROP CLUSTER REPLICA shutdown1.shutdown_replica1")
- wait(condition="delete", resource=compute_pods["shutdown_replica1"])
- not_exists(resource=compute_svcs["shutdown_replica1"])
- sql_expect_crash("DROP CLUSTER shutdown1 CASCADE")
- wait(condition="delete", resource=compute_pods["shutdown_replica2"])
- not_exists(resource=compute_svcs["shutdown_replica2"])
- mz.set_environmentd_failpoints("")
- def get_value_from_label(
- mz: MaterializeApplication, cluster_id: str, replica_id: str, jsonpath: str
- ) -> str:
- return mz.kubectl(
- "get",
- "pods",
- f"--selector=cluster.environmentd.materialize.cloud/cluster-id={cluster_id},cluster.environmentd.materialize.cloud/replica-id={replica_id}",
- "-o",
- "jsonpath='{.items[*]." + jsonpath + "}'",
- )
- def get_node_selector(
- mz: MaterializeApplication, cluster_id: str, replica_id: str
- ) -> str:
- return get_value_from_label(mz, cluster_id, replica_id, "spec.nodeSelector")
- def test_disk_label(mz: MaterializeApplication) -> None:
- """Test that cluster replicas have the correct materialize.cloud/disk labels"""
- # If cluster_always_use_disk is set to true, it will take precedence over the DISK keyword in CREATE CLUSTER.
- mz.environmentd.sql(
- "ALTER SYSTEM SET cluster_always_use_disk = false;",
- port="internal",
- user="mz_system",
- )
- for value in ("true", "false"):
- mz.environmentd.sql(
- f"CREATE CLUSTER disk_{value} MANAGED, SIZE = '2-no-disk', DISK = {value}"
- )
- (cluster_id, replica_id) = mz.environmentd.sql_query(
- f"SELECT mz_clusters.id, mz_cluster_replicas.id FROM mz_cluster_replicas JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id WHERE mz_clusters.name = 'disk_{value}'"
- )[0]
- assert cluster_id is not None
- assert replica_id is not None
- node_selectors = get_node_selector(mz, cluster_id, replica_id)
- if value == "true":
- assert (
- node_selectors == '\'{"materialize.cloud/disk":"true"}\''
- ), node_selectors
- else:
- assert node_selectors == "''"
- mz.environmentd.sql(f"DROP CLUSTER disk_{value} CASCADE")
- # Reset
- mz.environmentd.sql(
- "ALTER SYSTEM SET cluster_always_use_disk = true;",
- port="internal",
- user="mz_system",
- )
- @pytest.mark.skip(reason="Keeps flaking, see database-issues#8299")
- def test_cluster_replica_sizes(mz: MaterializeApplication) -> None:
- """Test that --cluster-replica-sizes mapping is respected"""
- # Some time for existing cluster drops to complete so we don't try to spin them up again
- time.sleep(5)
- cluster_replica_size_map = {
- "small": {
- "scale": 1,
- "workers": 1,
- "cpu_limit": None,
- "memory_limit": None,
- "disk_limit": None,
- "credits_per_hour": "1",
- "disabled": False,
- "selectors": {"key1": "value1"},
- },
- "medium": {
- "scale": 1,
- "workers": 1,
- "cpu_limit": None,
- "memory_limit": None,
- "disk_limit": None,
- "credits_per_hour": "1",
- "disabled": False,
- "selectors": {"key2": "value2", "key3": "value3"},
- },
- # for existing clusters
- "1": {
- "scale": 1,
- "workers": 1,
- "cpu_limit": None,
- "memory_limit": None,
- "disk_limit": None,
- "disabled": False,
- "credits_per_hour": "1",
- },
- "2-1": {
- "scale": 2,
- "workers": 2,
- "cpu_limit": None,
- "memory_limit": None,
- "disk_limit": None,
- "disabled": False,
- "credits_per_hour": "2",
- },
- }
- stateful_set = [
- resource
- for resource in mz.resources
- if type(resource) == EnvironmentdStatefulSet
- ]
- assert len(stateful_set) == 1
- stateful_set = copy.deepcopy(stateful_set[0])
- stateful_set.env["MZ_CLUSTER_REPLICA_SIZES"] = json.dumps(cluster_replica_size_map)
- stateful_set.extra_args.append("--bootstrap-default-cluster-replica-size=1")
- stateful_set.replace()
- mz.wait_for_sql()
- for key, value in {
- "small": cluster_replica_size_map["small"],
- "medium": cluster_replica_size_map["medium"],
- }.items():
- mz.environmentd.sql(f"CREATE CLUSTER scale_{key} MANAGED, SIZE = '{key}'")
- (cluster_id, replica_id) = mz.environmentd.sql_query(
- f"SELECT mz_clusters.id, mz_cluster_replicas.id FROM mz_cluster_replicas JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id WHERE mz_clusters.name = 'scale_{key}'"
- )[0]
- assert cluster_id is not None
- assert replica_id is not None
- expected = value.get("selectors", {}) | {"materialize.cloud/disk": "true"}
- node_selectors_raw = ""
- for i in range(1, 10):
- node_selectors_raw = get_node_selector(mz, cluster_id, replica_id)
- if node_selectors_raw:
- break
- print("No node selectors available yet, sleeping")
- time.sleep(5)
- node_selectors = json.loads(node_selectors_raw[1:-1])
- assert (
- node_selectors == expected
- ), f"actual: {node_selectors}, but expected {expected}"
- mz.environmentd.sql(f"DROP CLUSTER scale_{key} CASCADE")
- # Cleanup
- stateful_set = [
- resource
- for resource in mz.resources
- if type(resource) == EnvironmentdStatefulSet
- ]
- assert len(stateful_set) == 1
- stateful_set = stateful_set[0]
- stateful_set.replace()
- mz.wait_for_sql()
|