mzcompose_actions.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. from textwrap import dedent
  11. from typing import TYPE_CHECKING, Any
  12. from materialize.checks.actions import Action
  13. from materialize.checks.executors import Executor
  14. from materialize.mz_version import MzVersion
  15. from materialize.mzcompose.services.clusterd import Clusterd
  16. from materialize.mzcompose.services.materialized import DeploymentStatus, Materialized
  17. from materialize.mzcompose.services.ssh_bastion_host import (
  18. setup_default_ssh_test_connection,
  19. )
  20. if TYPE_CHECKING:
  21. from materialize.checks.scenarios import Scenario
  22. class MzcomposeAction(Action):
  23. def join(self, e: Executor) -> None:
  24. # Most of these actions are already blocking
  25. pass
  26. class StartMz(MzcomposeAction):
  27. def __init__(
  28. self,
  29. scenario: "Scenario",
  30. tag: MzVersion | None = None,
  31. environment_extra: list[str] = [],
  32. system_parameter_defaults: dict[str, str] | None = None,
  33. additional_system_parameter_defaults: dict[str, str] = {},
  34. system_parameter_version: MzVersion | None = None,
  35. mz_service: str | None = None,
  36. platform: str | None = None,
  37. healthcheck: list[str] | None = None,
  38. deploy_generation: int | None = None,
  39. restart: str | None = None,
  40. force_migrations: str | None = None,
  41. publish: bool | None = None,
  42. ) -> None:
  43. if healthcheck is None:
  44. healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]
  45. self.tag = tag
  46. self.environment_extra = environment_extra
  47. self.system_parameter_defaults = system_parameter_defaults
  48. self.additional_system_parameter_defaults = additional_system_parameter_defaults
  49. self.system_parameter_version = system_parameter_version or tag
  50. self.healthcheck = healthcheck
  51. self.mz_service = mz_service
  52. self.platform = platform
  53. self.deploy_generation = deploy_generation
  54. self.restart = restart
  55. self.force_migrations = force_migrations
  56. self.publish = publish
  57. self.scenario = scenario
  58. def execute(self, e: Executor) -> None:
  59. c = e.mzcompose_composition()
  60. image = f"materialize/materialized:{self.tag}" if self.tag is not None else None
  61. print(f"Starting Mz using image {image}, mz_service {self.mz_service}")
  62. mz = Materialized(
  63. name=self.mz_service,
  64. image=image,
  65. external_metadata_store=True,
  66. external_blob_store=True,
  67. blob_store_is_azure=self.scenario.features.azurite_enabled(),
  68. environment_extra=self.environment_extra,
  69. system_parameter_defaults=self.system_parameter_defaults,
  70. additional_system_parameter_defaults=self.additional_system_parameter_defaults,
  71. system_parameter_version=self.system_parameter_version,
  72. sanity_restart=False,
  73. platform=self.platform,
  74. healthcheck=self.healthcheck,
  75. deploy_generation=self.deploy_generation,
  76. restart=self.restart,
  77. force_migrations=self.force_migrations,
  78. publish=self.publish,
  79. default_replication_factor=2,
  80. )
  81. # Don't fail since we are careful to explicitly kill and collect logs
  82. # of the services thus started
  83. with c.override(mz, fail_on_new_service=False):
  84. c.up("materialized" if self.mz_service is None else self.mz_service)
  85. # If we start up Materialize with a deploy-generation , then it
  86. # stays in a stuck state when the preflight-check is completed. So
  87. # we can't connect to it yet to run any commands.
  88. if self.deploy_generation:
  89. return
  90. # This should live in ssh.py and alter_connection.py, but accessing the
  91. # ssh bastion host from inside a check is not possible currently.
  92. for i in range(4):
  93. ssh_tunnel_name = f"ssh_tunnel_{i}"
  94. setup_default_ssh_test_connection(
  95. c, ssh_tunnel_name, mz_service=self.mz_service
  96. )
  97. mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
  98. if self.tag:
  99. assert (
  100. self.tag == mz_version
  101. ), f"Materialize version mismatch, expected {self.tag}, but got {mz_version}"
  102. else:
  103. version_cargo = MzVersion.parse_cargo()
  104. assert (
  105. version_cargo == mz_version
  106. ), f"Materialize version mismatch, expected {version_cargo}, but got {mz_version}"
  107. e.current_mz_version = mz_version
  108. class ConfigureMz(MzcomposeAction):
  109. def __init__(self, scenario: "Scenario", mz_service: str | None = None) -> None:
  110. self.handle: Any | None = None
  111. self.mz_service = mz_service
  112. self.scenario = scenario
  113. def execute(self, e: Executor) -> None:
  114. input = dedent(
  115. """
  116. # Run any query to have the materialize user implicitly created if
  117. # it didn't exist yet. Required for the GRANT later.
  118. > SELECT 1;
  119. 1
  120. """
  121. )
  122. system_settings = {
  123. "ALTER SYSTEM SET max_tables = 1000;",
  124. "ALTER SYSTEM SET max_sinks = 1000;",
  125. "ALTER SYSTEM SET max_sources = 1000;",
  126. "ALTER SYSTEM SET max_materialized_views = 1000;",
  127. "ALTER SYSTEM SET max_objects_per_schema = 1000;",
  128. "ALTER SYSTEM SET max_secrets = 1000;",
  129. "ALTER SYSTEM SET max_clusters = 1000;",
  130. }
  131. # Since we already test with RBAC enabled, we have to give materialize
  132. # user the relevant attributes so the existing tests keep working.
  133. system_settings.add("GRANT ALL PRIVILEGES ON SYSTEM TO materialize;")
  134. # do not enable this by default for all checks
  135. system_settings.add("ALTER SYSTEM SET enable_rbac_checks TO false;")
  136. # Since we already test with RBAC enabled, we have to give materialize
  137. # user the relevant privileges so the existing tests keep working.
  138. system_settings.add("GRANT CREATE ON DATABASE materialize TO materialize;")
  139. system_settings.add("GRANT CREATE ON SCHEMA materialize.public TO materialize;")
  140. system_settings.add("GRANT CREATE ON CLUSTER quickstart TO materialize;")
  141. system_settings = system_settings - e.system_settings
  142. if system_settings:
  143. input += (
  144. "$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}\n"
  145. + "\n".join(system_settings)
  146. )
  147. kafka_broker = "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT"
  148. input += dedent(
  149. f"""
  150. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA {kafka_broker}
  151. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  152. """
  153. )
  154. self.handle = e.testdrive(input=input, mz_service=self.mz_service)
  155. e.system_settings.update(system_settings)
  156. def join(self, e: Executor) -> None:
  157. e.join(self.handle)
  158. class KillMz(MzcomposeAction):
  159. def __init__(
  160. self, mz_service: str = "materialized", capture_logs: bool = False
  161. ) -> None:
  162. self.mz_service = mz_service
  163. self.capture_logs = capture_logs
  164. def execute(self, e: Executor) -> None:
  165. c = e.mzcompose_composition()
  166. # Don't fail since we are careful to explicitly kill and collect logs
  167. # of the services thus started
  168. with c.override(Materialized(name=self.mz_service), fail_on_new_service=False):
  169. c.kill(self.mz_service, wait=True)
  170. if self.capture_logs:
  171. c.capture_logs(self.mz_service)
  172. class Stop(MzcomposeAction):
  173. def __init__(self, service: str = "materialized") -> None:
  174. self.service = service
  175. def execute(self, e: Executor) -> None:
  176. c = e.mzcompose_composition()
  177. c.stop(self.service, wait=True)
  178. class Down(MzcomposeAction):
  179. def execute(self, e: Executor) -> None:
  180. c = e.mzcompose_composition()
  181. c.down()
  182. class UseClusterdCompute(MzcomposeAction):
  183. def __init__(self, scenario: "Scenario") -> None:
  184. self.base_version = scenario.base_version()
  185. def execute(self, e: Executor) -> None:
  186. c = e.mzcompose_composition()
  187. c.sql(
  188. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = on;",
  189. port=6877,
  190. user="mz_system",
  191. )
  192. c.sql(
  193. """
  194. ALTER CLUSTER quickstart SET (MANAGED = false);
  195. DROP CLUSTER REPLICA quickstart.r1;
  196. CREATE CLUSTER REPLICA quickstart.r1
  197. STORAGECTL ADDRESSES ['clusterd_compute_1:2100'],
  198. STORAGE ADDRESSES ['clusterd_compute_1:2103'],
  199. COMPUTECTL ADDRESSES ['clusterd_compute_1:2101'],
  200. COMPUTE ADDRESSES ['clusterd_compute_1:2102'],
  201. WORKERS 1;
  202. """,
  203. port=6877,
  204. user="mz_system",
  205. )
  206. class KillClusterdCompute(MzcomposeAction):
  207. def __init__(self, capture_logs: bool = False) -> None:
  208. self.capture_logs = capture_logs
  209. def execute(self, e: Executor) -> None:
  210. c = e.mzcompose_composition()
  211. with c.override(Clusterd(name="clusterd_compute_1")):
  212. c.kill("clusterd_compute_1")
  213. if self.capture_logs:
  214. c.capture_logs("clusterd_compute_1")
  215. class StartClusterdCompute(MzcomposeAction):
  216. def __init__(self, tag: MzVersion | None = None) -> None:
  217. self.tag = tag
  218. def execute(self, e: Executor) -> None:
  219. c = e.mzcompose_composition()
  220. clusterd = Clusterd(name="clusterd_compute_1")
  221. if self.tag:
  222. clusterd = Clusterd(
  223. name="clusterd_compute_1",
  224. image=f"materialize/clusterd:{self.tag}",
  225. )
  226. print(f"Starting Compute using image {clusterd.config.get('image')}")
  227. with c.override(clusterd):
  228. c.up("clusterd_compute_1")
  229. class RestartRedpandaDebezium(MzcomposeAction):
  230. """Restarts Redpanda and Debezium. Debezium is unable to survive Redpanda restarts so the two go together."""
  231. def execute(self, e: Executor) -> None:
  232. c = e.mzcompose_composition()
  233. for service in ["redpanda", "debezium"]:
  234. c.kill(service)
  235. c.up(service)
  236. class RestartCockroach(MzcomposeAction):
  237. def execute(self, e: Executor) -> None:
  238. c = e.mzcompose_composition()
  239. c.kill(c.metadata_store())
  240. c.up(c.metadata_store())
  241. class RestartSourcePostgres(MzcomposeAction):
  242. def execute(self, e: Executor) -> None:
  243. c = e.mzcompose_composition()
  244. c.kill("postgres")
  245. c.up("postgres")
  246. class KillClusterdStorage(MzcomposeAction):
  247. def execute(self, e: Executor) -> None:
  248. c = e.mzcompose_composition()
  249. # Depending on the workload, clusterd may not be running, hence the || true
  250. c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd` || true")
  251. class DropCreateDefaultReplica(MzcomposeAction):
  252. def __init__(self, scenario: "Scenario") -> None:
  253. self.base_version = scenario.base_version()
  254. def execute(self, e: Executor) -> None:
  255. c = e.mzcompose_composition()
  256. c.sql(
  257. """
  258. ALTER CLUSTER quickstart SET (MANAGED = false);
  259. DROP CLUSTER REPLICA quickstart.r1;
  260. CREATE CLUSTER REPLICA quickstart.r1 SIZE '1';
  261. """,
  262. port=6877,
  263. user="mz_system",
  264. )
  265. class WaitReadyMz(MzcomposeAction):
  266. """Wait until environmentd is ready, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#get-apileaderstatus"""
  267. def __init__(self, mz_service: str = "materialized") -> None:
  268. self.mz_service = mz_service
  269. def execute(self, e: Executor) -> None:
  270. e.mzcompose_composition().await_mz_deployment_status(
  271. DeploymentStatus.READY_TO_PROMOTE, self.mz_service
  272. )
  273. class PromoteMz(MzcomposeAction):
  274. """Promote environmentd to leader, see https://github.com/MaterializeInc/cloud/blob/main/doc/design/20230418_upgrade_orchestration.md#post-apileaderpromote"""
  275. def __init__(self, mz_service: str = "materialized") -> None:
  276. self.mz_service = mz_service
  277. def execute(self, e: Executor) -> None:
  278. c = e.mzcompose_composition()
  279. result = json.loads(
  280. c.exec(
  281. self.mz_service,
  282. "curl",
  283. "-s",
  284. "-X",
  285. "POST",
  286. "http://127.0.0.1:6878/api/leader/promote",
  287. capture=True,
  288. ).stdout
  289. )
  290. assert result["result"] == "Success", f"Unexpected result {result}"
  291. # Wait until new Materialize is ready to handle queries
  292. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, self.mz_service)
  293. mz_version = MzVersion.parse_mz(c.query_mz_version(service=self.mz_service))
  294. e.current_mz_version = mz_version
  295. class SystemVarChange(MzcomposeAction):
  296. """Changes a system var."""
  297. def __init__(self, name: str, value: str):
  298. self.name = name
  299. self.value = value
  300. def execute(self, e: Executor) -> None:
  301. c = e.mzcompose_composition()
  302. c.sql(
  303. f"ALTER SYSTEM SET {self.name} = {self.value};",
  304. port=6877,
  305. user="mz_system",
  306. )