mz_actions.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.mzcompose.composition import Composition
  10. from materialize.mzcompose.services.materialized import (
  11. LEADER_STATUS_HEALTHCHECK,
  12. DeploymentStatus,
  13. Materialized,
  14. )
  15. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  16. from materialize.zippy.blob_store_capabilities import BlobStoreIsRunning
  17. from materialize.zippy.crdb_capabilities import CockroachIsRunning
  18. from materialize.zippy.framework import (
  19. Action,
  20. ActionFactory,
  21. Capabilities,
  22. Capability,
  23. Mz0dtDeployBaseAction,
  24. State,
  25. )
  26. from materialize.zippy.mz_capabilities import MzIsRunning
  27. from materialize.zippy.view_capabilities import ViewExists
  28. class MzStartParameterized(ActionFactory):
  29. """Starts a Mz instance with custom paramters."""
  30. @classmethod
  31. def requires(cls) -> set[type[Capability]]:
  32. return {CockroachIsRunning, BlobStoreIsRunning}
  33. @classmethod
  34. def incompatible_with(cls) -> set[type[Capability]]:
  35. return {MzIsRunning}
  36. def __init__(
  37. self, additional_system_parameter_defaults: dict[str, str] = {}
  38. ) -> None:
  39. self.additional_system_parameter_defaults = additional_system_parameter_defaults
  40. def new(self, capabilities: Capabilities) -> list[Action]:
  41. return [
  42. MzStart(
  43. capabilities=capabilities,
  44. additional_system_parameter_defaults=self.additional_system_parameter_defaults,
  45. )
  46. ]
  47. class MzStart(Action):
  48. """Starts a Mz instance (all components are running in the same container)."""
  49. @classmethod
  50. def requires(cls) -> set[type[Capability]]:
  51. return {CockroachIsRunning, BlobStoreIsRunning}
  52. @classmethod
  53. def incompatible_with(cls) -> set[type[Capability]]:
  54. return {MzIsRunning}
  55. def __init__(
  56. self,
  57. capabilities: Capabilities,
  58. additional_system_parameter_defaults: dict[str, str] = {},
  59. ) -> None:
  60. self.additional_system_parameter_defaults = additional_system_parameter_defaults
  61. super().__init__(capabilities)
  62. def run(self, c: Composition, state: State) -> None:
  63. print(
  64. f"Starting Mz with additional_system_parameter_defaults = {self.additional_system_parameter_defaults}"
  65. )
  66. with c.override(
  67. Materialized(
  68. name=state.mz_service,
  69. external_blob_store=True,
  70. blob_store_is_azure=c.blob_store() == "azurite",
  71. external_metadata_store=True,
  72. deploy_generation=state.deploy_generation,
  73. system_parameter_defaults=state.system_parameter_defaults,
  74. sanity_restart=False,
  75. restart="on-failure",
  76. additional_system_parameter_defaults=self.additional_system_parameter_defaults,
  77. metadata_store="cockroach",
  78. default_replication_factor=2,
  79. )
  80. ):
  81. c.up(state.mz_service)
  82. for config_param in [
  83. "max_tables",
  84. "max_sources",
  85. "max_objects_per_schema",
  86. "max_materialized_views",
  87. "max_sinks",
  88. ]:
  89. c.sql(
  90. f"ALTER SYSTEM SET {config_param} TO 1000",
  91. user="mz_system",
  92. port=6877,
  93. print_statement=False,
  94. service=state.mz_service,
  95. )
  96. c.sql(
  97. """
  98. ALTER CLUSTER quickstart SET (MANAGED = false);
  99. """,
  100. user="mz_system",
  101. port=6877,
  102. service=state.mz_service,
  103. )
  104. # Make sure all eligible LIMIT queries use the PeekPersist optimization
  105. c.sql(
  106. "ALTER SYSTEM SET persist_fast_path_limit = 1000000000",
  107. user="mz_system",
  108. port=6877,
  109. service=state.mz_service,
  110. )
  111. def provides(self) -> list[Capability]:
  112. return [MzIsRunning()]
  113. class MzStop(Action):
  114. """Stops the entire Mz instance (all components are running in the same container)."""
  115. @classmethod
  116. def requires(cls) -> set[type[Capability]]:
  117. # Technically speaking, we do not need balancerd to be up in order to kill Mz
  118. # However, without this protection we frequently end up in a situation where
  119. # both are down and Zippy enters a prolonged period of restarting one or the
  120. # other and no other useful work can be performed in the meantime.
  121. return {MzIsRunning, BalancerdIsRunning}
  122. def run(self, c: Composition, state: State) -> None:
  123. c.kill(state.mz_service)
  124. def withholds(self) -> set[type[Capability]]:
  125. return {MzIsRunning}
  126. class MzRestart(Action):
  127. """Restarts the entire Mz instance (all components are running in the same container)."""
  128. @classmethod
  129. def requires(cls) -> set[type[Capability]]:
  130. return {MzIsRunning}
  131. def run(self, c: Composition, state: State) -> None:
  132. with c.override(
  133. Materialized(
  134. name=state.mz_service,
  135. external_blob_store=True,
  136. blob_store_is_azure=c.blob_store() == "azurite",
  137. external_metadata_store=True,
  138. deploy_generation=state.deploy_generation,
  139. system_parameter_defaults=state.system_parameter_defaults,
  140. sanity_restart=False,
  141. restart="on-failure",
  142. metadata_store="cockroach",
  143. default_replication_factor=2,
  144. )
  145. ):
  146. c.kill(state.mz_service)
  147. c.up(state.mz_service)
  148. class Mz0dtDeploy(Mz0dtDeployBaseAction):
  149. """Switches Mz to a new deployment using 0dt."""
  150. @classmethod
  151. def requires(cls) -> set[type[Capability]]:
  152. return {MzIsRunning}
  153. def run(self, c: Composition, state: State) -> None:
  154. state.deploy_generation += 1
  155. state.mz_service = (
  156. "materialized" if state.deploy_generation % 2 == 0 else "materialized2"
  157. )
  158. print(f"Deploying generation {state.deploy_generation} on {state.mz_service}")
  159. with c.override(
  160. Materialized(
  161. name=state.mz_service,
  162. external_blob_store=True,
  163. blob_store_is_azure=c.blob_store() == "azurite",
  164. external_metadata_store=True,
  165. deploy_generation=state.deploy_generation,
  166. system_parameter_defaults=state.system_parameter_defaults,
  167. sanity_restart=False,
  168. restart="on-failure",
  169. healthcheck=LEADER_STATUS_HEALTHCHECK,
  170. metadata_store="cockroach",
  171. default_replication_factor=2,
  172. ),
  173. ):
  174. c.up(state.mz_service, detach=True)
  175. c.await_mz_deployment_status(
  176. DeploymentStatus.READY_TO_PROMOTE, state.mz_service
  177. )
  178. c.promote_mz(state.mz_service)
  179. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, state.mz_service)
  180. c.stop(
  181. (
  182. "materialized2"
  183. if state.mz_service == "materialized"
  184. else "materialized"
  185. ),
  186. wait=True,
  187. )
  188. class KillClusterd(Action):
  189. """Kills the clusterd processes in the environmentd container. The process orchestrator will restart them."""
  190. @classmethod
  191. def requires(cls) -> set[type[Capability]]:
  192. return {MzIsRunning, ViewExists}
  193. def run(self, c: Composition, state: State) -> None:
  194. # Depending on the workload, clusterd may not be running, hence the || true
  195. c.exec(state.mz_service, "bash", "-c", "kill -9 `pidof clusterd` || true")