table_actions.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.framework import (
  14. Action,
  15. ActionFactory,
  16. Capabilities,
  17. Capability,
  18. State,
  19. )
  20. from materialize.zippy.mz_capabilities import MzIsRunning
  21. from materialize.zippy.table_capabilities import TableExists
  22. MAX_ROWS_PER_ACTION = 10000
  23. class CreateTableParameterized(ActionFactory):
  24. def __init__(
  25. self, max_tables: int = 10, max_rows_per_action: int = MAX_ROWS_PER_ACTION
  26. ) -> None:
  27. self.max_tables = max_tables
  28. self.max_rows_per_action = max_rows_per_action
  29. @classmethod
  30. def requires(cls) -> set[type[Capability]]:
  31. return {BalancerdIsRunning, MzIsRunning}
  32. def new(self, capabilities: Capabilities) -> list[Action]:
  33. new_table_name = capabilities.get_free_capability_name(
  34. TableExists, self.max_tables
  35. )
  36. if new_table_name:
  37. return [
  38. CreateTable(
  39. capabilities=capabilities,
  40. table=TableExists(
  41. name=new_table_name,
  42. has_index=random.choice([True, False]),
  43. max_rows_per_action=self.max_rows_per_action,
  44. ),
  45. )
  46. ]
  47. else:
  48. return []
  49. class CreateTable(Action):
  50. """Creates a table on the Mz instance. 50% of the tables have a default index."""
  51. @classmethod
  52. def requires(cls) -> set[type[Capability]]:
  53. return {BalancerdIsRunning, MzIsRunning}
  54. def __init__(self, table: TableExists, capabilities: Capabilities) -> None:
  55. assert (
  56. table is not None
  57. ), "CreateTable Action can not be referenced directly, it is produced by CreateTableParameterized factory"
  58. self.table = table
  59. super().__init__(capabilities)
  60. def run(self, c: Composition, state: State) -> None:
  61. index = (
  62. f"> CREATE DEFAULT INDEX ON {self.table.name}"
  63. if self.table.has_index
  64. else ""
  65. )
  66. c.testdrive(
  67. dedent(
  68. f"""
  69. > CREATE TABLE {self.table.name} (f1 INTEGER);
  70. {index}
  71. > INSERT INTO {self.table.name} VALUES ({self.table.watermarks.max});
  72. """
  73. ),
  74. mz_service=state.mz_service,
  75. )
  76. def provides(self) -> list[Capability]:
  77. return [self.table]
  78. class ValidateTable(Action):
  79. """Validates that a single table contains data that is consistent with the expected min/max watermark."""
  80. @classmethod
  81. def requires(cls) -> set[type[Capability]]:
  82. return {BalancerdIsRunning, MzIsRunning, TableExists}
  83. def __init__(
  84. self, capabilities: Capabilities, table: TableExists | None = None
  85. ) -> None:
  86. if table is not None:
  87. self.table = table
  88. else:
  89. self.table = random.choice(capabilities.get(TableExists))
  90. self.select_limit = random.choices([True, False], weights=[0.2, 0.8], k=1)[0]
  91. super().__init__(capabilities)
  92. def run(self, c: Composition, state: State) -> None:
  93. # Validating via SELECT ... LIMIT is expensive as it requires creating a temporary table
  94. # Therefore, only use it in 20% of validations.
  95. if self.select_limit:
  96. c.testdrive(
  97. dedent(
  98. f"""
  99. > CREATE TEMPORARY TABLE {self.table.name}_select_limit (f1 INTEGER);
  100. > INSERT INTO {self.table.name}_select_limit SELECT * FROM {self.table.name} LIMIT 999999999;
  101. > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name}_select_limit;
  102. {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
  103. > DROP TABLE {self.table.name}_select_limit
  104. """
  105. ),
  106. mz_service=state.mz_service,
  107. )
  108. else:
  109. c.testdrive(
  110. dedent(
  111. f"""
  112. > SELECT MIN(f1), MAX(f1), COUNT(f1), COUNT(DISTINCT f1) FROM {self.table.name};
  113. {self.table.watermarks.min} {self.table.watermarks.max} {(self.table.watermarks.max-self.table.watermarks.min)+1} {(self.table.watermarks.max-self.table.watermarks.min)+1}
  114. """
  115. ),
  116. mz_service=state.mz_service,
  117. )
  118. class DML(Action):
  119. """Performs an INSERT, DELETE or UPDATE against a table."""
  120. @classmethod
  121. def requires(cls) -> set[type[Capability]]:
  122. return {BalancerdIsRunning, MzIsRunning, TableExists}
  123. def __init__(self, capabilities: Capabilities) -> None:
  124. self.table = random.choice(capabilities.get(TableExists))
  125. self.delta = random.randint(1, self.table.max_rows_per_action)
  126. super().__init__(capabilities)
  127. def __str__(self) -> str:
  128. return f"{Action.__str__(self)} {self.table.name}"
  129. class Insert(DML):
  130. """Inserts rows into a table."""
  131. def run(self, c: Composition, state: State) -> None:
  132. prev_max = self.table.watermarks.max
  133. self.table.watermarks.max = prev_max + self.delta
  134. c.testdrive(
  135. f"> INSERT INTO {self.table.name} SELECT * FROM generate_series({prev_max + 1}, {self.table.watermarks.max});",
  136. mz_service=state.mz_service,
  137. )
  138. class ShiftForward(DML):
  139. """Update all rows from a table by incrementing their values by a constant."""
  140. def run(self, c: Composition, state: State) -> None:
  141. self.table.watermarks.shift(self.delta)
  142. c.testdrive(
  143. f"> UPDATE {self.table.name} SET f1 = f1 + {self.delta};",
  144. mz_service=state.mz_service,
  145. )
  146. class ShiftBackward(DML):
  147. """Update all rows from a table by decrementing their values by a constant."""
  148. def run(self, c: Composition, state: State) -> None:
  149. self.table.watermarks.shift(-self.delta)
  150. c.testdrive(
  151. f"> UPDATE {self.table.name} SET f1 = f1 - {self.delta};",
  152. mz_service=state.mz_service,
  153. )
  154. class DeleteFromHead(DML):
  155. """Delete the largest values from a table"""
  156. def run(self, c: Composition, state: State) -> None:
  157. self.table.watermarks.max = max(
  158. self.table.watermarks.max - self.delta, self.table.watermarks.min
  159. )
  160. c.testdrive(
  161. f"> DELETE FROM {self.table.name} WHERE f1 > {self.table.watermarks.max};",
  162. mz_service=state.mz_service,
  163. )
  164. class DeleteFromTail(DML):
  165. """Delete the smallest values from a table"""
  166. def run(self, c: Composition, state: State) -> None:
  167. self.table.watermarks.min = min(
  168. self.table.watermarks.min + self.delta, self.table.watermarks.max
  169. )
  170. c.testdrive(
  171. f"> DELETE FROM {self.table.name} WHERE f1 < {self.table.watermarks.min};",
  172. mz_service=state.mz_service,
  173. )