123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from random import Random
- from typing import TYPE_CHECKING
- from materialize import buildkite
- from materialize.buildkite import BuildkiteEnvVar
- from materialize.checks.actions import Testdrive
- from materialize.checks.executors import Executor
- from materialize.checks.features import Features
- from materialize.mz_version import MzVersion
- if TYPE_CHECKING:
- from materialize.checks.actions import Action
- TESTDRIVE_NOP = "$ nop"
- class Check:
- # Has to be set for the class already, not just in the constructor, so that
- # we can change the value for the entire class in the decorator
- enabled: bool = True
- externally_idempotent: bool = True
- def __init__(
- self, base_version: MzVersion, rng: Random | None, features: Features | None
- ) -> None:
- self.base_version = base_version
- self.rng = rng
- self.features = features
- def _can_run(self, e: Executor) -> bool:
- return True
- def _kafka_broker(self) -> str:
- return "BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT"
- def _unsafe_schema(self) -> str:
- """
- :return: the schema containing unsafe functions, such as `mz_sleep`.
- """
- return "mz_unsafe"
- def _default_cluster(self) -> str:
- """
- :return: name of the cluster created in all environments.
- """
- return "quickstart"
- def initialize(self) -> Testdrive:
- return Testdrive(TESTDRIVE_NOP)
- def manipulate(self) -> list[Testdrive]:
- raise NotImplementedError
- def validate(self) -> Testdrive:
- """Note that the validation method may be invoked multiple times (depending on the scenario)."""
- raise NotImplementedError
- def start_initialize(self, e: Executor, a: "Action") -> None:
- if self._can_run(e) and self.enabled:
- self.current_version = e.current_mz_version
- self._initialize = self.initialize()
- self._initialize.execute(e, a.mz_service)
- def join_initialize(self, e: Executor) -> None:
- if self._can_run(e) and self.enabled:
- self._initialize.join(e)
- def start_manipulate(self, e: Executor, a: "Action") -> None:
- if self._can_run(e) and self.enabled:
- self.current_version = e.current_mz_version
- self._manipulate = self.manipulate()
- assert (
- len(self._manipulate) == 2
- ), f"manipulate() should return a list with exactly 2 elements, but actually returns {len(self._manipulate)} elements"
- assert a.phase is not None
- self._manipulate[a.phase].execute(e, a.mz_service)
- def join_manipulate(self, e: Executor, a: "Action") -> None:
- if self._can_run(e) and self.enabled:
- assert a.phase is not None
- self._manipulate[a.phase].join(e)
- def start_validate(self, e: Executor, a: "Action") -> None:
- if self._can_run(e) and self.enabled:
- self.current_version = e.current_mz_version
- self._validate = self.validate()
- self._validate.execute(e, a.mz_service)
- def join_validate(self, e: Executor) -> None:
- if self._can_run(e) and self.enabled:
- self._validate.join(e)
- def is_running_as_cloudtest(self) -> bool:
- return buildkite.get_var(BuildkiteEnvVar.BUILDKITE_STEP_KEY, "") in {
- "cloudtest-upgrade",
- "testdrive-in-cloudtest",
- }
- def disabled(ignore_reason: str):
- def decorator(cls):
- cls.enabled = False
- return cls
- return decorator
- def externally_idempotent(externally_idempotent: bool = True):
- def decorator(cls):
- cls.externally_idempotent = externally_idempotent
- return cls
- return decorator
|