framework.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from collections.abc import Sequence
  11. from datetime import datetime, timedelta
  12. from typing import TYPE_CHECKING, TypeVar, Union
  13. from materialize.mzcompose import get_default_system_parameters
  14. from materialize.mzcompose.composition import Composition
  15. if TYPE_CHECKING:
  16. from materialize.zippy.scenarios import Scenario
  17. class State:
  18. mz_service: str
  19. deploy_generation: int
  20. system_parameter_defaults: dict[str, str]
  21. def __init__(self, zero_downtime: bool):
  22. self.mz_service = "materialized"
  23. self.deploy_generation = 0
  24. self.system_parameter_defaults = get_default_system_parameters(
  25. zero_downtime=zero_downtime
  26. )
  27. class Capability:
  28. """Base class for a Zippy capability.
  29. A capability represents a condition that is true about a Zippy test context,
  30. like "a table with name 'foo' exists".
  31. """
  32. name: str
  33. @classmethod
  34. def format_str(cls) -> str:
  35. raise NotImplementedError()
  36. T = TypeVar("T", bound=Capability)
  37. ActionOrFactory = Union[type["Action"], "ActionFactory"]
  38. class Capabilities:
  39. """A set of `Capability`s."""
  40. _capabilities: Sequence[Capability]
  41. def __init__(self) -> None:
  42. self._capabilities = []
  43. def _extend(self, capabilities: Sequence[Capability]) -> None:
  44. """Add new capabilities."""
  45. new_capabilities = list(capabilities)
  46. self._capabilities = list(self._capabilities) + new_capabilities
  47. def remove_capability_instance(self, capability: Capability) -> None:
  48. """Remove a specific capability."""
  49. self._capabilities = [
  50. cap for cap in self._capabilities if not cap == capability
  51. ]
  52. def _remove(self, capabilities: set[type[T]]) -> None:
  53. """Remove all existing capabilities of the specified types."""
  54. self._capabilities = [
  55. cap for cap in self._capabilities if type(cap) not in capabilities
  56. ]
  57. def provides(self, capability: type[T]) -> bool:
  58. """Report whether any capability of the specified type exists."""
  59. return len(self.get(capability)) > 0
  60. def get(self, capability: type[T]) -> list[T]:
  61. """Get all capabilities of the specified type."""
  62. matches: list[T] = [
  63. # NOTE: unfortunately pyright can't handle this
  64. cap
  65. for cap in self._capabilities
  66. if type(cap) == capability # type: ignore
  67. ]
  68. return matches
  69. def get_capability_names(self, capability: type[T]) -> list[str]:
  70. return [t.name for t in self.get(capability)]
  71. def get_free_capability_name(
  72. self, capability: type[T], max_objects: int
  73. ) -> str | None:
  74. all_object_names = [
  75. capability.format_str().format(i) for i in range(0, max_objects)
  76. ]
  77. existing_object_names = self.get_capability_names(capability)
  78. remaining_object_names = set(all_object_names) - set(existing_object_names)
  79. return (
  80. random.choice(list(remaining_object_names))
  81. if len(remaining_object_names) > 0
  82. else None
  83. )
  84. class Action:
  85. """Base class for an action that a Zippy test can take."""
  86. current_seqno: int = 0
  87. def __init__(self, capabilities: Capabilities) -> None:
  88. """Construct a new action, possibly conditioning on the available
  89. capabilities."""
  90. Action.current_seqno = Action.current_seqno + 1
  91. self.seqno = Action.current_seqno
  92. pass
  93. @classmethod
  94. def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
  95. """Compute the capability classes that this action requires."""
  96. return set()
  97. @classmethod
  98. def incompatible_with(cls) -> set[type[Capability]]:
  99. """The capability classes that this action is not compatible with."""
  100. return set()
  101. def withholds(self) -> set[type[Capability]]:
  102. """Compute the capability classes that this action will make unavailable."""
  103. return set()
  104. def provides(self) -> list[Capability]:
  105. """Compute the capabilities that this action will make available."""
  106. return []
  107. def run(self, c: Composition, state: State) -> None:
  108. """Run this action on the provided composition."""
  109. raise NotImplementedError
  110. @classmethod
  111. def require_explicit_mention(cls) -> bool:
  112. """Only use if explicitly mentioned by name in a Scenario."""
  113. return False
  114. def __str__(self) -> str:
  115. return f"--- #{self.seqno}: {self.__class__.__name__}"
  116. class Mz0dtDeployBaseAction(Action):
  117. pass
  118. class ActionFactory:
  119. """Base class for Action Factories that return parameterized Actions to execute."""
  120. def new(self, capabilities: Capabilities) -> list[Action]:
  121. raise NotImplementedError
  122. @classmethod
  123. def requires(cls) -> set[type[Capability]] | list[set[type[Capability]]]:
  124. """Compute the capability classes that this Action Factory requires."""
  125. return set()
  126. @classmethod
  127. def incompatible_with(cls) -> set[type[Capability]]:
  128. """The capability classes that this action is not compatible with."""
  129. return set()
  130. class Test:
  131. """A Zippy test, consisting of a sequence of actions."""
  132. def __init__(
  133. self, scenario: "Scenario", actions: int, max_execution_time: timedelta
  134. ) -> None:
  135. """Generate a new Zippy test.
  136. Args:
  137. scenario: The Scenario to pick actions from.
  138. actions: The number of actions to generate.
  139. """
  140. self._scenario = scenario
  141. self._actions: list[Action] = []
  142. self._final_actions: list[Action] = []
  143. self._capabilities = Capabilities()
  144. self._actions_with_weight: dict[ActionOrFactory, float] = (
  145. self._scenario.actions_with_weight()
  146. )
  147. self._state = State(
  148. zero_downtime=any(
  149. [
  150. isinstance(action, Mz0dtDeployBaseAction)
  151. for action in self._actions_with_weight
  152. ]
  153. )
  154. )
  155. self._max_execution_time: timedelta = max_execution_time
  156. for action_or_factory in self._scenario.bootstrap():
  157. self._actions.extend(self.generate_actions(action_or_factory))
  158. while len(self._actions) < actions:
  159. action_or_factory = self._pick_action_or_factory()
  160. self._actions.extend(self.generate_actions(action_or_factory))
  161. for action_or_factory in self._scenario.finalization():
  162. self._final_actions.extend(self.generate_actions(action_or_factory))
  163. def generate_actions(self, action_def: ActionOrFactory) -> list[Action]:
  164. if isinstance(action_def, ActionFactory):
  165. actions = action_def.new(capabilities=self._capabilities)
  166. elif issubclass(action_def, Action):
  167. actions = [action_def(capabilities=self._capabilities)]
  168. else:
  169. raise RuntimeError(
  170. f"{type(action_def)} is not a subclass of {ActionFactory} or {Action}"
  171. )
  172. for action in actions:
  173. print("test:", action)
  174. self._capabilities._extend(action.provides())
  175. print(" - ", self._capabilities, action.provides())
  176. self._capabilities._remove(action.withholds())
  177. print(" - ", self._capabilities, action.withholds())
  178. return actions
  179. def run(self, c: Composition) -> None:
  180. """Run the Zippy test."""
  181. max_time = datetime.now() + self._max_execution_time
  182. for action in self._actions:
  183. print(action)
  184. action.run(c, self._state)
  185. if datetime.now() > max_time:
  186. print(
  187. f"--- Desired execution time of {self._max_execution_time} has been reached."
  188. )
  189. break
  190. for action in self._final_actions:
  191. print(action)
  192. action.run(c, self._state)
  193. def _pick_action_or_factory(self) -> ActionOrFactory:
  194. """Select the next Action to run in the Test"""
  195. actions_or_factories: list[ActionOrFactory] = []
  196. class_weights = []
  197. for action_or_factory in self._actions_with_weight.keys():
  198. alternatives = []
  199. # We do not drill down if it is an ActionFactory
  200. # If it is an Action, drill down for any children
  201. subclasses: list[ActionOrFactory] = (
  202. [action_or_factory]
  203. if isinstance(action_or_factory, ActionFactory)
  204. else self._all_subclasses(action_or_factory)
  205. )
  206. for subclass in subclasses:
  207. # Do not pick an Action whose requirements can not be satisfied
  208. if self._can_run(subclass):
  209. alternatives.append(subclass)
  210. for alternative in alternatives:
  211. actions_or_factories.append(alternative)
  212. weight = self._actions_with_weight[action_or_factory]
  213. class_weights.append(weight / len(alternatives))
  214. assert (
  215. len(actions_or_factories) > 0
  216. ), "No actions available to take. You may be stopping or deleting items without starting them again."
  217. return random.choices(actions_or_factories, weights=class_weights, k=1)[0]
  218. def _can_run(self, action: ActionOrFactory) -> bool:
  219. if any(
  220. self._capabilities.provides(dislike)
  221. for dislike in action.incompatible_with()
  222. ):
  223. return False
  224. requires = action.requires()
  225. if isinstance(requires, set):
  226. return all(self._capabilities.provides(req) for req in requires)
  227. else:
  228. for one_alternative in requires:
  229. if all(self._capabilities.provides(req) for req in one_alternative):
  230. return True
  231. return False
  232. def _all_subclasses(self, cls: type[Action]) -> list[ActionOrFactory]:
  233. """Return all Actions that are a subclass of the given cls."""
  234. children = [c for c in cls.__subclasses__() if not c.require_explicit_mention()]
  235. if len(children) == 0:
  236. return [cls]
  237. else:
  238. subclasses = []
  239. for c in children:
  240. subclasses.extend(self._all_subclasses(c))
  241. return subclasses