123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import json
- from textwrap import dedent
- from typing import TYPE_CHECKING, Any
- from materialize.checks.actions import Action
- from materialize.checks.executors import Executor
- from materialize.mz_version import MzVersion
- from materialize.mzcompose.services.clusterd import Clusterd
- from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
- from materialize.mzcompose.services.ssh_bastion_host import (
- setup_default_ssh_test_connection,
- )
- if TYPE_CHECKING:
- from materialize.checks.scenarios import Scenario
- class MzcomposeAction(Action):
- def join(self, e: Executor) -> None:
- # Most of these actions are already blocking
- pass
- class StartMz(MzcomposeAction):
- def __init__(
- self,
- scenario: "Scenario",
- tag: MzVersion | None = None,
- environment_extra: list[str] = [],
- system_parameter_defaults: dict[str, str] | None = None,
- additional_system_parameter_defaults: dict[str, str] = {},
- system_parameter_version: MzVersion | None = None,
- mz_service: str | None = None,
- platform: str | None = None,
- healthcheck: list[str] | None = None,
- deploy_generation: int | None = None,
- restart: str | None = None,
- force_migrations: str | None = None,
- publish: bool | None = None,
- ) -> None:
- if healthcheck is None:
- healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]
- self.tag = tag
- self.environment_extra = environment_extra
- self.system_parameter_defaults = system_parameter_defaults
- self.additional_system_parameter_defaults = additional_system_parameter_defaults
- self.system_parameter_version = system_parameter_version or tag
- self.healthcheck = healthcheck
- self.mz_service = mz_service
- self.platform = platform
- self.deploy_generation = deploy_generation
- self.restart = restart
- self.force_migrations = force_migrations
- self.publish = publish
- self.scenario = scenario
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- image = f"materialize/materialized:{self.tag}" if self.tag is not None else None
- print(f"Starting Mz using image {image}, mz_service {self.mz_service}")
- mz = Materialized(
- name=self.mz_service,
- image=image,
- external_metadata_store=True,
- external_blob_store=True,
- blob_store_is_azure=self.scenario.features.azurite_enabled(),
- environment_extra=self.environment_extra,
- system_parameter_defaults=self.system_parameter_defaults,
- additional_system_parameter_defaults=self.additional_system_parameter_defaults,
- system_parameter_version=self.system_parameter_version,
- sanity_restart=False,
- platform=self.platform,
- healthcheck=self.healthcheck,
- deploy_generation=self.deploy_generation,
- restart=self.restart,
- force_migrations=self.force_migrations,
- publish=self.publish,
- default_replication_factor=2,
- )
- # Don't fail since we are careful to explicitly kill and collect logs
- # of the services thus started
- with c.override(mz, fail_on_new_service=False):
- c.up("materialized" if self.mz_service is None else self.mz_service)
- # If we start up Materialize with a deploy-generation , then it
- # stays in a stuck state when the preflight-check is completed. So
- # we can't connect to it yet to run any commands.
- if self.deploy_generation:
- return
- # This should live in ssh.py and alter_connection.py, but accessing the
- # ssh bastion host from inside a check is not possible currently.
- for i in range(4):
- ssh_tunnel_name = f"ssh_tunnel_{i}"
- setup_default_ssh_test_connection(
- c, ssh_tunnel_name, mz_service=self.mz_service
- )
- mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
- if self.tag:
- assert (
- self.tag == mz_version
- ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}"
- else:
- version_cargo = MzVersion.parse_cargo()
- assert (
- version_cargo == mz_version
- ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}"
- e.current_mz_version = mz_version
- class ConfigureMz(MzcomposeAction):
- def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
- self.handle: Any | None = None
- self.mz_service = mz_service
- self.scenario = scenario
- def execute(self, e: Executor) -> None:
- input = dedent(
- """
- # Run any query to have the materialize user implicitly created if
- # it didn't exist yet. Required for the GRANT later.
- > SELECT 1;
- 1
- """
- )
- system_settings = {
- "ALTER SYSTEM SET max_tables = 1000;",
- "ALTER SYSTEM SET max_sinks = 1000;",
- "ALTER SYSTEM SET max_sources = 1000;",
- "ALTER SYSTEM SET max_materialized_views = 1000;",
- "ALTER SYSTEM SET max_objects_per_schema = 1000;",
- "ALTER SYSTEM SET max_secrets = 1000;",
- "ALTER SYSTEM SET max_clusters = 1000;",
- }
- # Since we already test with RBAC enabled, we have to give materialize
- # user the relevant attributes so the existing tests keep working.
- system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;")
- # do not enable this by default for all checks
- system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;")
- # Since we already test with RBAC enabled, we have to give materialize
- # user the relevant privileges so the existing tests keep working.
- system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;")
- system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;")
- system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;")
- system_settings = system_settings - e.system_settings
- if system_settings:
- input += (
- "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n"
- + "\n".join(system_settings)
- )
- kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT"
- input += dedent(
- f"""
- > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker}
- > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
- """
- )
- self.handle = e.testdrive(input=input, mz_service=self.mz_service)
- e.system_settings.update(system_settings)
- def join(self, e: Executor) -> None:
- e.join(self.handle)
- class KillMz(MzcomposeAction):
- def __init__(
- self, mz_service: str = "materialized", capture_logs: bool = False
- ) -> None:
- self.mz_service = mz_service
- self.capture_logs = capture_logs
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- # Don't fail since we are careful to explicitly kill and collect logs
- # of the services thus started
- with c.override(Materialized(name=self.mz_service), fail_on_new_service=False):
- c.kill(self.mz_service, wait=True)
- if self.capture_logs:
- c.capture_logs(self.mz_service)
- class Stop(MzcomposeAction):
- def __init__(self, service: str = "materialized") -> None:
- self.service = service
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.stop(self.service, wait=True)
- class Down(MzcomposeAction):
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.down()
- class UseClusterdCompute(MzcomposeAction):
- def __init__(self, scenario: "Scenario") -> None:
- self.base_version = scenario.base_version()
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.sql(
- "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;",
- port=6877,
- user="mz_system",
- )
- c.sql(
- """
- ALTER CLUSTER quickstart SET (MANAGED = false);
- DROP CLUSTER REPLICA quickstart.r1;
- CREATE CLUSTER REPLICA quickstart.r1
- STORAGECTL ADDRESSES ['clusterd_compute_1:2100'],
- STORAGE ADDRESSES ['clusterd_compute_1:2103'],
- COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
- COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
- WORKERS 1;
- """,
- port=6877,
- user="mz_system",
- )
- class KillClusterdCompute(MzcomposeAction):
- def __init__(self, capture_logs: bool = False) -> None:
- self.capture_logs = capture_logs
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- with c.override(Clusterd(name="clusterd_compute_1")):
- c.kill("clusterd_compute_1")
- if self.capture_logs:
- c.capture_logs("clusterd_compute_1")
- class StartClusterdCompute(MzcomposeAction):
- def __init__(self, tag: MzVersion | None = None) -> None:
- self.tag = tag
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- clusterd = Clusterd(name="clusterd_compute_1")
- if self.tag:
- clusterd = Clusterd(
- name="clusterd_compute_1",
- image=f"materialize/clusterd:{self.tag}",
- )
- print(f"Starting Compute using image {clusterd.config.get('image')}")
- with c.override(clusterd):
- c.up("clusterd_compute_1")
- class RestartRedpandaDebezium(MzcomposeAction):
- """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together."""
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- for service in ["redpanda", "debezium"]:
- c.kill(service)
- c.up(service)
- class RestartCockroach(MzcomposeAction):
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.kill(c.metadata_store())
- c.up(c.metadata_store())
- class RestartSourcePostgres(MzcomposeAction):
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.kill("postgres")
- c.up("postgres")
- class KillClusterdStorage(MzcomposeAction):
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- # Depending on the workload, clusterd may not be running, hence the || true
- c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
- class DropCreateDefaultReplica(MzcomposeAction):
- def __init__(self, scenario: "Scenario") -> None:
- self.base_version = scenario.base_version()
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.sql(
- """
- ALTER CLUSTER quickstart SET (MANAGED = false);
- DROP CLUSTER REPLICA quickstart.r1;
- CREATE CLUSTER REPLICA quickstart.r1 SIZE '1';
- """,
- port=6877,
- user="mz_system",
- )
- class WaitReadyMz(MzcomposeAction):
- """Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus"""
- def __init__(self, mz_service: str = "materialized") -> None:
- self.mz_service = mz_service
- def execute(self, e: Executor) -> None:
- e.mzcompose_composition().await_mz_deployment_status(
- DeploymentStatus.READY_TO_PROMOTE, self.mz_service
- )
- class PromoteMz(MzcomposeAction):
- """Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote"""
- def __init__(self, mz_service: str = "materialized") -> None:
- self.mz_service = mz_service
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- result = json.loads(
- c.exec(
- self.mz_service,
- "curl",
- "-s",
- "-X",
- "POST",
- "http://127.0.0.1:6878/api/leader/promote",
- capture=True,
- ).stdout
- )
- assert result["result"] == "Success", f"Unexpected result {result}"
- # Wait until new Materialize is ready to handle queries
- c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service)
- mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
- e.current_mz_version = mz_version
- class SystemVarChange(MzcomposeAction):
- """Changes a system var."""
- def __init__(self, name: str, value: str):
- self.name = name
- self.value = value
- def execute(self, e: Executor) -> None:
- c = e.mzcompose_composition()
- c.sql(
- f"ALTER SYSTEM SET {self.name} = {self.value};",
- port=6877,
- user="mz_system",
- )
|