materialize_application.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import logging
  10. import os
  11. import subprocess
  12. import time
  13. from datetime import datetime, timedelta
  14. from pg8000.exceptions import InterfaceError
  15. from materialize.cloudtest.app.cloudtest_application_base import (
  16. CloudtestApplicationBase,
  17. )
  18. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  19. from materialize.cloudtest.k8s.cockroach import cockroach_resources
  20. from materialize.cloudtest.k8s.debezium import debezium_resources
  21. from materialize.cloudtest.k8s.environmentd import (
  22. EnvironmentdSecret,
  23. EnvironmentdService,
  24. EnvironmentdStatefulSet,
  25. ListenersConfigMap,
  26. MaterializedAliasService,
  27. )
  28. from materialize.cloudtest.k8s.minio import Minio
  29. from materialize.cloudtest.k8s.mysql import mysql_resources
  30. from materialize.cloudtest.k8s.persist_pubsub import PersistPubSubService
  31. from materialize.cloudtest.k8s.postgres import postgres_resources
  32. from materialize.cloudtest.k8s.redpanda import redpanda_resources
  33. from materialize.cloudtest.k8s.role_binding import AdminRoleBinding
  34. from materialize.cloudtest.k8s.ssh import ssh_resources
  35. from materialize.cloudtest.k8s.testdrive import TestdrivePod
  36. from materialize.cloudtest.k8s.vpc_endpoints_cluster_role import VpcEndpointsClusterRole
  37. from materialize.cloudtest.util.wait import wait
  38. LOGGER = logging.getLogger(__name__)
  39. class MaterializeApplication(CloudtestApplicationBase):
  40. def __init__(
  41. self,
  42. release_mode: bool = True,
  43. tag: str | None = None,
  44. aws_region: str | None = None,
  45. log_filter: str | None = None,
  46. apply_node_selectors: bool = False,
  47. ) -> None:
  48. self.tag = tag
  49. self.secret = EnvironmentdSecret()
  50. self.listeners_configmap = ListenersConfigMap()
  51. self.environmentd = EnvironmentdService()
  52. self.materialized_alias = MaterializedAliasService()
  53. self.testdrive = TestdrivePod(
  54. release_mode=release_mode,
  55. aws_region=aws_region,
  56. apply_node_selectors=apply_node_selectors,
  57. )
  58. self.apply_node_selectors = apply_node_selectors
  59. super().__init__(release_mode, aws_region, log_filter)
  60. # Register the VpcEndpoint CRD.
  61. self.register_vpc_endpoint()
  62. self.start_metrics_server()
  63. self.create_resources_and_wait()
  64. def get_resources(self, log_filter: str | None) -> list[K8sResource]:
  65. return [
  66. # Run first so it's available for Debezium, which gives up too quickly otherwise
  67. *redpanda_resources(apply_node_selectors=self.apply_node_selectors),
  68. *cockroach_resources(apply_node_selectors=self.apply_node_selectors),
  69. *postgres_resources(apply_node_selectors=self.apply_node_selectors),
  70. *mysql_resources(apply_node_selectors=self.apply_node_selectors),
  71. *debezium_resources(apply_node_selectors=self.apply_node_selectors),
  72. *ssh_resources(apply_node_selectors=self.apply_node_selectors),
  73. Minio(apply_node_selectors=self.apply_node_selectors),
  74. VpcEndpointsClusterRole(),
  75. AdminRoleBinding(),
  76. self.secret,
  77. self.listeners_configmap,
  78. EnvironmentdStatefulSet(
  79. release_mode=self.release_mode,
  80. tag=self.tag,
  81. log_filter=log_filter,
  82. coverage_mode=self.coverage_mode(),
  83. apply_node_selectors=self.apply_node_selectors,
  84. ),
  85. PersistPubSubService(),
  86. self.environmentd,
  87. self.materialized_alias,
  88. self.testdrive,
  89. ]
  90. def get_images(self) -> list[str]:
  91. return ["environmentd", "clusterd", "testdrive", "postgres"]
  92. def register_vpc_endpoint(self) -> None:
  93. self.kubectl(
  94. "apply",
  95. "-f",
  96. os.path.join(
  97. os.path.abspath(self.mz_root),
  98. "src/cloud-resources/src/crd/generated/vpcendpoints.json",
  99. ),
  100. )
  101. def start_metrics_server(self) -> None:
  102. self.kubectl(
  103. "apply",
  104. "-f",
  105. "https://github.com/kubernetes-sigs/metrics-server/releases/download/metrics-server-helm-chart-3.8.2/components.yaml",
  106. )
  107. self.kubectl(
  108. "patch",
  109. "deployment",
  110. "metrics-server",
  111. "--namespace",
  112. "kube-system",
  113. "--type",
  114. "json",
  115. "-p",
  116. '[{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--kubelet-insecure-tls" }]',
  117. )
  118. def wait_resource_creation_completed(self) -> None:
  119. wait(
  120. condition="condition=Ready",
  121. resource="pod",
  122. label="cluster.environmentd.materialize.cloud/cluster-id=u1",
  123. )
  124. def wait_replicas(self) -> None:
  125. for cluster_id in ("u1", "s1", "s2"):
  126. wait(
  127. condition="condition=Ready",
  128. resource="pod",
  129. label=f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}",
  130. )
  131. def wait_for_sql(self) -> None:
  132. """Wait until environmentd pod is ready and can accept SQL connections"""
  133. wait(condition="condition=Ready", resource="pod/environmentd-0")
  134. start = datetime.now()
  135. while datetime.now() - start < timedelta(seconds=300):
  136. try:
  137. self.environmentd.sql("SELECT 1")
  138. break
  139. except InterfaceError as e:
  140. # Since we crash environmentd, we expect some errors that we swallow.
  141. LOGGER.info(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
  142. time.sleep(2)
  143. def set_environmentd_failpoints(self, failpoints: str) -> None:
  144. """Set the FAILPOINTS environmentd variable in the stateful set. This
  145. will most likely restart environmentd"""
  146. stateful_set = [
  147. resource
  148. for resource in self.resources
  149. if type(resource) == EnvironmentdStatefulSet
  150. ]
  151. assert len(stateful_set) == 1
  152. stateful_set = stateful_set[0]
  153. stateful_set.env["FAILPOINTS"] = failpoints
  154. stateful_set.replace()
  155. self.wait_for_sql()
  156. def get_k8s_value(
  157. self, selector: str, json_path: str, remove_quotes: bool = True
  158. ) -> str:
  159. value = self.kubectl(
  160. "get",
  161. "pods",
  162. f"--selector={selector}",
  163. "-o",
  164. f"jsonpath='{json_path}'",
  165. )
  166. if remove_quotes:
  167. value = value.replace("'", "")
  168. return value
  169. def get_pod_value(
  170. self, cluster_id: str, json_path: str, remove_quotes: bool = True
  171. ) -> str:
  172. return self.get_k8s_value(
  173. f"cluster.environmentd.materialize.cloud/cluster-id={cluster_id}",
  174. json_path,
  175. remove_quotes,
  176. )
  177. def get_pod_label_value(
  178. self, cluster_id: str, label: str, remove_quotes: bool = True
  179. ) -> str:
  180. return self.get_pod_value(
  181. cluster_id, "{.items[*].metadata.labels." + label + "}", remove_quotes
  182. )
  183. def get_cluster_node_names(self, cluster_name: str) -> list[str]:
  184. cluster_id = self.get_cluster_id(cluster_name)
  185. print(f"Cluster with name '{cluster_name}' has ID {cluster_id}")
  186. value_string = self.get_pod_value(
  187. cluster_id, "{.items[*].spec.nodeName}", remove_quotes=True
  188. )
  189. values = value_string.split(" ")
  190. return values
  191. def get_cluster_id(self, cluster_name: str) -> str:
  192. cluster_id: str = self.environmentd.sql_query(
  193. f"SELECT id FROM mz_clusters WHERE name = '{cluster_name}'"
  194. )[0][0]
  195. return cluster_id
  196. def get_cluster_and_replica_id(self, mz_table: str, name: str) -> tuple[str, str]:
  197. [cluster_id, replica_id] = self.environmentd.sql_query(
  198. f"SELECT s.cluster_id, r.id FROM {mz_table} s JOIN mz_cluster_replicas r ON r.cluster_id = s.cluster_id WHERE s.name = '{name}'"
  199. )[0]
  200. return cluster_id, replica_id
  201. def suspend_k8s_node(self, node_name: str) -> None:
  202. print(f"Suspending node {node_name}...")
  203. result = subprocess.run(
  204. ["docker", "pause", node_name], stderr=subprocess.STDOUT, text=True
  205. )
  206. assert result.returncode == 0, f"Got return code {result.returncode}"
  207. print(f"Suspended node {node_name}.")
  208. def revive_suspended_k8s_node(self, node_name: str) -> None:
  209. print(f"Reviving node {node_name}...")
  210. result = subprocess.run(
  211. ["docker", "unpause", node_name], stderr=subprocess.STDOUT, text=True
  212. )
  213. assert result.returncode == 0, f"Got return code {result.returncode}"
  214. print(f"Node {node_name} is running again.")