123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import logging
- import os
- import subprocess
- import time
- from datetime import datetime, timedelta
- from pg8000.exceptions import InterfaceError
- from materialize.cloudtest.app.cloudtest_application_base import (
- CloudtestApplicationBase,
- )
- from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
- from materialize.cloudtest.k8s.cockroach import cockroach_resources
- from materialize.cloudtest.k8s.debezium import debezium_resources
- from materialize.cloudtest.k8s.environmentd import (
- EnvironmentdSecret,
- EnvironmentdService,
- EnvironmentdStatefulSet,
- ListenersConfigMap,
- MaterializedAliasService,
- )
- from materialize.cloudtest.k8s.minio import Minio
- from materialize.cloudtest.k8s.mysql import mysql_resources
- from materialize.cloudtest.k8s.persist_pubsub import PersistPubSubService
- from materialize.cloudtest.k8s.postgres import postgres_resources
- from materialize.cloudtest.k8s.redpanda import redpanda_resources
- from materialize.cloudtest.k8s.role_binding import AdminRoleBinding
- from materialize.cloudtest.k8s.ssh import ssh_resources
- from materialize.cloudtest.k8s.testdrive import TestdrivePod
- from materialize.cloudtest.k8s.vpc_endpoints_cluster_role import VpcEndpointsClusterRole
- from materialize.cloudtest.util.wait import wait
- LOGGER = logging.getLogger(__name__)
- class MaterializeApplication(CloudtestApplicationBase):
- def __init__(
- self,
- release_mode: bool = True,
- tag: str | None = None,
- aws_region: str | None = None,
- log_filter: str | None = None,
- apply_node_selectors: bool = False,
- ) -> None:
- self.tag = tag
- self.secret = EnvironmentdSecret()
- self.listeners_configmap = ListenersConfigMap()
- self.environmentd = EnvironmentdService()
- self.materialized_alias = MaterializedAliasService()
- self.testdrive = TestdrivePod(
- release_mode=release_mode,
- aws_region=aws_region,
- apply_node_selectors=apply_node_selectors,
- )
- self.apply_node_selectors = apply_node_selectors
- super().__init__(release_mode, aws_region, log_filter)
- # Register the VpcEndpoint CRD.
- self.register_vpc_endpoint()
- self.start_metrics_server()
- self.create_resources_and_wait()
- def get_resources(self, log_filter: str | None) -> list[K8sResource]:
- return [
- # Run first so it's available for Debezium, which gives up too quickly otherwise
- *redpanda_resources(apply_node_selectors=self.apply_node_selectors),
- *cockroach_resources(apply_node_selectors=self.apply_node_selectors),
- *postgres_resources(apply_node_selectors=self.apply_node_selectors),
- *mysql_resources(apply_node_selectors=self.apply_node_selectors),
- *debezium_resources(apply_node_selectors=self.apply_node_selectors),
- *ssh_resources(apply_node_selectors=self.apply_node_selectors),
- Minio(apply_node_selectors=self.apply_node_selectors),
- VpcEndpointsClusterRole(),
- AdminRoleBinding(),
- self.secret,
- self.listeners_configmap,
- EnvironmentdStatefulSet(
- release_mode=self.release_mode,
- tag=self.tag,
- log_filter=log_filter,
- coverage_mode=self.coverage_mode(),
- apply_node_selectors=self.apply_node_selectors,
- ),
- PersistPubSubService(),
- self.environmentd,
- self.materialized_alias,
- self.testdrive,
- ]
- def get_images(self) -> list[str]:
- return ["environmentd", "clusterd", "testdrive", "postgres"]
- def register_vpc_endpoint(self) -> None:
- self.kubectl(
- "apply",
- "-f",
- os.path.join(
- os.path.abspath(self.mz_root),
- "src/cloud-resources/src/crd/generated/vpcendpoints.json",
- ),
- )
- def start_metrics_server(self) -> None:
- self.kubectl(
- "apply",
- "-f",
- "https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml",
- )
- self.kubectl(
- "patch",
- "deployment",
- "metrics-server",
- "--namespace",
- "kube-system",
- "--type",
- "json",
- "-p",
- '[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]',
- )
- def wait_resource_creation_completed(self) -> None:
- wait(
- condition="condition=Ready",
- resource="pod",
- label="cluster.environmentd.materialize.cloud/cluster-id=u1",
- )
- def wait_replicas(self) -> None:
- for cluster_id in ("u1", "s1", "s2"):
- wait(
- condition="condition=Ready",
- resource="pod",
- label=f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}",
- )
- def wait_for_sql(self) -> None:
- """Wait until environmentd pod is ready and can accept SQL connections"""
- wait(condition="condition=Ready", resource="pod/environmentd-0")
- start = datetime.now()
- while datetime.now() - start < timedelta(seconds=300):
- try:
- self.environmentd.sql("SELECT 1")
- break
- except InterfaceError as e:
- # Since we crash environmentd, we expect some errors that we swallow.
- LOGGER.info(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
- time.sleep(2)
- def set_environmentd_failpoints(self, failpoints: str) -> None:
- """Set the FAILPOINTS environmentd variable in the stateful set. This
- will most likely restart environmentd"""
- stateful_set = [
- resource
- for resource in self.resources
- if type(resource) == EnvironmentdStatefulSet
- ]
- assert len(stateful_set) == 1
- stateful_set = stateful_set[0]
- stateful_set.env["FAILPOINTS"] = failpoints
- stateful_set.replace()
- self.wait_for_sql()
- def get_k8s_value(
- self, selector: str, json_path: str, remove_quotes: bool = True
- ) -> str:
- value = self.kubectl(
- "get",
- "pods",
- f"--selector={selector}",
- "-o",
- f"jsonpath='{json_path}'",
- )
- if remove_quotes:
- value = value.replace("'", "")
- return value
- def get_pod_value(
- self, cluster_id: str, json_path: str, remove_quotes: bool = True
- ) -> str:
- return self.get_k8s_value(
- f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}",
- json_path,
- remove_quotes,
- )
- def get_pod_label_value(
- self, cluster_id: str, label: str, remove_quotes: bool = True
- ) -> str:
- return self.get_pod_value(
- cluster_id, "{.items[*].metadata.labels." + label + "}", remove_quotes
- )
- def get_cluster_node_names(self, cluster_name: str) -> list[str]:
- cluster_id = self.get_cluster_id(cluster_name)
- print(f"Cluster with name '{cluster_name}' has ID {cluster_id}")
- value_string = self.get_pod_value(
- cluster_id, "{.items[*].spec.nodeName}", remove_quotes=True
- )
- values = value_string.split(" ")
- return values
- def get_cluster_id(self, cluster_name: str) -> str:
- cluster_id: str = self.environmentd.sql_query(
- f"SELECT id FROM mz_clusters WHERE name = '{cluster_name}'"
- )[0][0]
- return cluster_id
- def get_cluster_and_replica_id(self, mz_table: str, name: str) -> tuple[str, str]:
- [cluster_id, replica_id] = self.environmentd.sql_query(
- f"SELECT s.cluster_id, r.id FROM {mz_table} s JOIN mz_cluster_replicas r ON r.cluster_id = s.cluster_id WHERE s.name = '{name}'"
- )[0]
- return cluster_id, replica_id
- def suspend_k8s_node(self, node_name: str) -> None:
- print(f"Suspending node {node_name}...")
- result = subprocess.run(
- ["docker", "pause", node_name], stderr=subprocess.STDOUT, text=True
- )
- assert result.returncode == 0, f"Got return code {result.returncode}"
- print(f"Suspended node {node_name}.")
- def revive_suspended_k8s_node(self, node_name: str) -> None:
- print(f"Reviving node {node_name}...")
- result = subprocess.run(
- ["docker", "unpause", node_name], stderr=subprocess.STDOUT, text=True
- )
- assert result.returncode == 0, f"Got return code {result.returncode}"
- print(f"Node {node_name} is running again.")
|