123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- from collections.abc import Sequence
- from datetime import datetime, timedelta
- from typing import TYPE_CHECKING, TypeVar, Union
- from materialize.mzcompose import get_default_system_parameters
- from materialize.mzcompose.composition import Composition
- if TYPE_CHECKING:
- from materialize.zippy.scenarios import Scenario
- class State:
- mz_service: str
- deploy_generation: int
- system_parameter_defaults: dict[str, str]
- def __init__(self, zero_downtime: bool):
- self.mz_service = "materialized"
- self.deploy_generation = 0
- self.system_parameter_defaults = get_default_system_parameters(
- zero_downtime=zero_downtime
- )
- class Capability:
- """Base class for a Zippy capability.
- A capability represents a condition that is true about a Zippy test context,
- like "a table with name 'foo' exists".
- """
- name: str
- @classmethod
- def format_str(cls) -> str:
- raise NotImplementedError()
- T = TypeVar("T", bound=Capability)
- ActionOrFactory = Union[type["Action"], "ActionFactory"]
- class Capabilities:
- """A set of `Capability`s."""
- _capabilities: Sequence[Capability]
- def __init__(self) -> None:
- self._capabilities = []
- def _extend(self, capabilities: Sequence[Capability]) -> None:
- """Add new capabilities."""
- new_capabilities = list(capabilities)
- self._capabilities = list(self._capabilities) + new_capabilities
- def remove_capability_instance(self, capability: Capability) -> None:
- """Remove a specific capability."""
- self._capabilities = [
- cap for cap in self._capabilities if not cap == capability
- ]
- def _remove(self, capabilities: set[type[T]]) -> None:
- """Remove all existing capabilities of the specified types."""
- self._capabilities = [
- cap for cap in self._capabilities if type(cap) not in capabilities
- ]
- def provides(self, capability: type[T]) -> bool:
- """Report whether any capability of the specified type exists."""
- return len(self.get(capability)) > 0
- def get(self, capability: type[T]) -> list[T]:
- """Get all capabilities of the specified type."""
- matches: list[T] = [
- # NOTE: unfortunately pyright can't handle this
- cap
- for cap in self._capabilities
- if type(cap) == capability # type: ignore
- ]
- return matches
- def get_capability_names(self, capability: type[T]) -> list[str]:
- return [t.name for t in self.get(capability)]
- def get_free_capability_name(
- self, capability: type[T], max_objects: int
- ) -> str | None:
- all_object_names = [
- capability.format_str().format(i) for i in range(0, max_objects)
- ]
- existing_object_names = self.get_capability_names(capability)
- remaining_object_names = set(all_object_names) - set(existing_object_names)
- return (
- random.choice(list(remaining_object_names))
- if len(remaining_object_names) > 0
- else None
- )
- class Action:
- """Base class for an action that a Zippy test can take."""
- current_seqno: int = 0
- def __init__(self, capabilities: Capabilities) -> None:
- """Construct a new action, possibly conditioning on the available
- capabilities."""
- Action.current_seqno = Action.current_seqno + 1
- self.seqno = Action.current_seqno
- pass
- @classmethod
- def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
- """Compute the capability classes that this action requires."""
- return set()
- @classmethod
- def incompatible_with(cls) -> set[type[Capability]]:
- """The capability classes that this action is not compatible with."""
- return set()
- def withholds(self) -> set[type[Capability]]:
- """Compute the capability classes that this action will make unavailable."""
- return set()
- def provides(self) -> list[Capability]:
- """Compute the capabilities that this action will make available."""
- return []
- def run(self, c: Composition, state: State) -> None:
- """Run this action on the provided composition."""
- raise NotImplementedError
- @classmethod
- def require_explicit_mention(cls) -> bool:
- """Only use if explicitly mentioned by name in a Scenario."""
- return False
- def __str__(self) -> str:
- return f"--- #{self.seqno}: {self.__class__.__name__}"
- class Mz0dtDeployBaseAction(Action):
- pass
- class ActionFactory:
- """Base class for Action Factories that return parameterized Actions to execute."""
- def new(self, capabilities: Capabilities) -> list[Action]:
- raise NotImplementedError
- @classmethod
- def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
- """Compute the capability classes that this Action Factory requires."""
- return set()
- @classmethod
- def incompatible_with(cls) -> set[type[Capability]]:
- """The capability classes that this action is not compatible with."""
- return set()
- class Test:
- """A Zippy test, consisting of a sequence of actions."""
- def __init__(
- self, scenario: "Scenario", actions: int, max_execution_time: timedelta
- ) -> None:
- """Generate a new Zippy test.
- Args:
- scenario: The Scenario to pick actions from.
- actions: The number of actions to generate.
- """
- self._scenario = scenario
- self._actions: list[Action] = []
- self._final_actions: list[Action] = []
- self._capabilities = Capabilities()
- self._actions_with_weight: dict[ActionOrFactory, float] = (
- self._scenario.actions_with_weight()
- )
- self._state = State(
- zero_downtime=any(
- [
- isinstance(action, Mz0dtDeployBaseAction)
- for action in self._actions_with_weight
- ]
- )
- )
- self._max_execution_time: timedelta = max_execution_time
- for action_or_factory in self._scenario.bootstrap():
- self._actions.extend(self.generate_actions(action_or_factory))
- while len(self._actions) < actions:
- action_or_factory = self._pick_action_or_factory()
- self._actions.extend(self.generate_actions(action_or_factory))
- for action_or_factory in self._scenario.finalization():
- self._final_actions.extend(self.generate_actions(action_or_factory))
- def generate_actions(self, action_def: ActionOrFactory) -> list[Action]:
- if isinstance(action_def, ActionFactory):
- actions = action_def.new(capabilities=self._capabilities)
- elif issubclass(action_def, Action):
- actions = [action_def(capabilities=self._capabilities)]
- else:
- raise RuntimeError(
- f"{type(action_def)} is not a subclass of {ActionFactory} or {Action}"
- )
- for action in actions:
- print("test:", action)
- self._capabilities._extend(action.provides())
- print(" - ", self._capabilities, action.provides())
- self._capabilities._remove(action.withholds())
- print(" - ", self._capabilities, action.withholds())
- return actions
- def run(self, c: Composition) -> None:
- """Run the Zippy test."""
- max_time = datetime.now() + self._max_execution_time
- for action in self._actions:
- print(action)
- action.run(c, self._state)
- if datetime.now() > max_time:
- print(
- f"--- Desired execution time of {self._max_execution_time} has been reached."
- )
- break
- for action in self._final_actions:
- print(action)
- action.run(c, self._state)
- def _pick_action_or_factory(self) -> ActionOrFactory:
- """Select the next Action to run in the Test"""
- actions_or_factories: list[ActionOrFactory] = []
- class_weights = []
- for action_or_factory in self._actions_with_weight.keys():
- alternatives = []
- # We do not drill down if it is an ActionFactory
- # If it is an Action, drill down for any children
- subclasses: list[ActionOrFactory] = (
- [action_or_factory]
- if isinstance(action_or_factory, ActionFactory)
- else self._all_subclasses(action_or_factory)
- )
- for subclass in subclasses:
- # Do not pick an Action whose requirements can not be satisfied
- if self._can_run(subclass):
- alternatives.append(subclass)
- for alternative in alternatives:
- actions_or_factories.append(alternative)
- weight = self._actions_with_weight[action_or_factory]
- class_weights.append(weight / len(alternatives))
- assert (
- len(actions_or_factories) > 0
- ), "No actions available to take. You may be stopping or deleting items without starting them again."
- return random.choices(actions_or_factories, weights=class_weights, k=1)[0]
- def _can_run(self, action: ActionOrFactory) -> bool:
- if any(
- self._capabilities.provides(dislike)
- for dislike in action.incompatible_with()
- ):
- return False
- requires = action.requires()
- if isinstance(requires, set):
- return all(self._capabilities.provides(req) for req in requires)
- else:
- for one_alternative in requires:
- if all(self._capabilities.provides(req) for req in one_alternative):
- return True
- return False
- def _all_subclasses(self, cls: type[Action]) -> list[ActionOrFactory]:
- """Return all Actions that are a subclass of the given cls."""
- children = [c for c in cls.__subclasses__() if not c.require_explicit_mention()]
- if len(children) == 0:
- return [cls]
- else:
- subclasses = []
- for c in children:
- subclasses.extend(self._all_subclasses(c))
- return subclasses
|