123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition
- from materialize.zippy.framework import Action, Capabilities, Capability, State
- from materialize.zippy.mz_capabilities import MzIsRunning
- from materialize.zippy.replica_capabilities import ReplicaExists, ReplicaSizeType
- class DropDefaultReplica(Action):
- """Drops the default replica."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {MzIsRunning}
- def run(self, c: Composition, state: State) -> None:
- # Default cluster is not owned by materialize, thus can't be dropped by
- # it if enable_rbac_checks is on.
- c.testdrive(
- dedent(
- """
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER CLUSTER quickstart SET (MANAGED = false)
- DROP CLUSTER REPLICA quickstart.r1
- """
- ),
- mz_service=state.mz_service,
- )
- class CreateReplica(Action):
- """Creates a replica on the quickstart cluster."""
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {MzIsRunning}
- def __init__(self, capabilities: Capabilities) -> None:
- this_replica = ReplicaExists(name="replica" + str(random.randint(1, 4)))
- existing_replicas = [
- t for t in capabilities.get(ReplicaExists) if t.name == this_replica.name
- ]
- if len(existing_replicas) == 0:
- self.new_replica = True
- size_types = [
- ReplicaSizeType.Nodes,
- ReplicaSizeType.Workers,
- ReplicaSizeType.Both,
- ]
- size_type = random.choice(size_types)
- size = str(random.choice([2, 4]))
- if size_type is ReplicaSizeType.Nodes:
- this_replica.size = size + "-1"
- elif size_type is ReplicaSizeType.Workers:
- this_replica.size = size
- elif size_type is ReplicaSizeType.Both:
- this_replica.size = f"{size}-{size}"
- else:
- raise RuntimeError(f"Unsupported size type: {size_type}")
- if this_replica.size == "1-1":
- this_replica.size = "1"
- self.replica = this_replica
- elif len(existing_replicas) == 1:
- self.new_replica = False
- self.replica = existing_replicas[0]
- else:
- raise RuntimeError("More than one replica exists")
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- if self.new_replica:
- # Default cluster is not owned by materialize, thus can't have a replica
- # added if enable_rbac_checks is on.
- c.testdrive(
- dedent(
- f"""
- $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- CREATE CLUSTER REPLICA quickstart.{self.replica.name} SIZE '{self.replica.size}'
- """
- ),
- mz_service=state.mz_service,
- )
- def provides(self) -> list[Capability]:
- return [self.replica] if self.new_replica else []
- class DropReplica(Action):
- """Drops a replica from the quickstart cluster."""
- replica: ReplicaExists | None
- @classmethod
- def requires(cls) -> set[type[Capability]]:
- return {MzIsRunning, ReplicaExists}
- def __init__(self, capabilities: Capabilities) -> None:
- existing_replicas = capabilities.get(ReplicaExists)
- if len(existing_replicas) > 1:
- self.replica = random.choice(existing_replicas)
- capabilities.remove_capability_instance(self.replica)
- else:
- self.replica = None
- super().__init__(capabilities)
- def run(self, c: Composition, state: State) -> None:
- if self.replica is not None:
- # Default cluster is not owned by materialize, thus can't have a replica
- # removed if enable_rbac_checks is on.
- c.testdrive(
- dedent(
- f"""
- $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- DROP CLUSTER REPLICA IF EXISTS quickstart.{self.replica.name}
- """
- ),
- mz_service=state.mz_service,
- )
|