replica_actions.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.framework import Action, Capabilities, Capability, State
  13. from materialize.zippy.mz_capabilities import MzIsRunning
  14. from materialize.zippy.replica_capabilities import ReplicaExists, ReplicaSizeType
  15. class DropDefaultReplica(Action):
  16. """Drops the default replica."""
  17. @classmethod
  18. def requires(cls) -> set[type[Capability]]:
  19. return {MzIsRunning}
  20. def run(self, c: Composition, state: State) -> None:
  21. # Default cluster is not owned by materialize, thus can't be dropped by
  22. # it if enable_rbac_checks is on.
  23. c.testdrive(
  24. dedent(
  25. """
  26. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  27. ALTER CLUSTER quickstart SET (MANAGED = false)
  28. DROP CLUSTER REPLICA quickstart.r1
  29. """
  30. ),
  31. mz_service=state.mz_service,
  32. )
  33. class CreateReplica(Action):
  34. """Creates a replica on the quickstart cluster."""
  35. @classmethod
  36. def requires(cls) -> set[type[Capability]]:
  37. return {MzIsRunning}
  38. def __init__(self, capabilities: Capabilities) -> None:
  39. this_replica = ReplicaExists(name="replica" + str(random.randint(1, 4)))
  40. existing_replicas = [
  41. t for t in capabilities.get(ReplicaExists) if t.name == this_replica.name
  42. ]
  43. if len(existing_replicas) == 0:
  44. self.new_replica = True
  45. size_types = [
  46. ReplicaSizeType.Nodes,
  47. ReplicaSizeType.Workers,
  48. ReplicaSizeType.Both,
  49. ]
  50. size_type = random.choice(size_types)
  51. size = str(random.choice([2, 4]))
  52. if size_type is ReplicaSizeType.Nodes:
  53. this_replica.size = size + "-1"
  54. elif size_type is ReplicaSizeType.Workers:
  55. this_replica.size = size
  56. elif size_type is ReplicaSizeType.Both:
  57. this_replica.size = f"{size}-{size}"
  58. else:
  59. raise RuntimeError(f"Unsupported size type: {size_type}")
  60. if this_replica.size == "1-1":
  61. this_replica.size = "1"
  62. self.replica = this_replica
  63. elif len(existing_replicas) == 1:
  64. self.new_replica = False
  65. self.replica = existing_replicas[0]
  66. else:
  67. raise RuntimeError("More than one replica exists")
  68. super().__init__(capabilities)
  69. def run(self, c: Composition, state: State) -> None:
  70. if self.new_replica:
  71. # Default cluster is not owned by materialize, thus can't have a replica
  72. # added if enable_rbac_checks is on.
  73. c.testdrive(
  74. dedent(
  75. f"""
  76. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  77. CREATE CLUSTER REPLICA quickstart.{self.replica.name} SIZE '{self.replica.size}'
  78. """
  79. ),
  80. mz_service=state.mz_service,
  81. )
  82. def provides(self) -> list[Capability]:
  83. return [self.replica] if self.new_replica else []
  84. class DropReplica(Action):
  85. """Drops a replica from the quickstart cluster."""
  86. replica: ReplicaExists | None
  87. @classmethod
  88. def requires(cls) -> set[type[Capability]]:
  89. return {MzIsRunning, ReplicaExists}
  90. def __init__(self, capabilities: Capabilities) -> None:
  91. existing_replicas = capabilities.get(ReplicaExists)
  92. if len(existing_replicas) > 1:
  93. self.replica = random.choice(existing_replicas)
  94. capabilities.remove_capability_instance(self.replica)
  95. else:
  96. self.replica = None
  97. super().__init__(capabilities)
  98. def run(self, c: Composition, state: State) -> None:
  99. if self.replica is not None:
  100. # Default cluster is not owned by materialize, thus can't have a replica
  101. # removed if enable_rbac_checks is on.
  102. c.testdrive(
  103. dedent(
  104. f"""
  105. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  106. DROP CLUSTER REPLICA IF EXISTS quickstart.{self.replica.name}
  107. """
  108. ),
  109. mz_service=state.mz_service,
  110. )