mysql_actions.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.mzcompose.services.mysql import MySql
  13. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  14. from materialize.zippy.framework import Action, Capabilities, Capability, State
  15. from materialize.zippy.mysql_capabilities import MySqlRunning, MySqlTableExists
  16. from materialize.zippy.mz_capabilities import MzIsRunning
  17. class MySqlStart(Action):
  18. """Start a MySQL instance."""
  19. def provides(self) -> list[Capability]:
  20. return [MySqlRunning()]
  21. def run(self, c: Composition, state: State) -> None:
  22. c.up("mysql")
  23. c.testdrive(
  24. dedent(
  25. f"""
  26. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  27. $ mysql-execute name=mysql
  28. DROP DATABASE IF EXISTS public;
  29. CREATE DATABASE public;
  30. USE public;
  31. """
  32. ),
  33. mz_service=state.mz_service,
  34. )
  35. class MySqlStop(Action):
  36. """Stop the MySQL instance."""
  37. @classmethod
  38. def requires(cls) -> set[type[Capability]]:
  39. return {MySqlRunning}
  40. def withholds(self) -> set[type[Capability]]:
  41. return {MySqlRunning}
  42. def run(self, c: Composition, state: State) -> None:
  43. c.kill("mysql")
  44. class MySqlRestart(Action):
  45. """Restart the MySql instance."""
  46. def run(self, c: Composition, state: State) -> None:
  47. c.kill("mysql")
  48. c.up("mysql")
  49. class CreateMySqlTable(Action):
  50. """Creates a table on the MySql instance. 50% of the tables have a PK."""
  51. @classmethod
  52. def requires(cls) -> set[type[Capability]]:
  53. return {BalancerdIsRunning, MzIsRunning, MySqlRunning}
  54. def __init__(self, capabilities: Capabilities) -> None:
  55. this_mysql_table = MySqlTableExists(name="table" + str(random.randint(1, 10)))
  56. existing_mysql_tables = [
  57. t
  58. for t in capabilities.get(MySqlTableExists)
  59. if t.name == this_mysql_table.name
  60. ]
  61. if len(existing_mysql_tables) == 0:
  62. self.new_mysql_table = True
  63. # A PK is now required for Debezium
  64. this_mysql_table.has_pk = True
  65. self.mysql_table = this_mysql_table
  66. elif len(existing_mysql_tables) == 1:
  67. self.new_mysql_table = False
  68. self.mysql_table = existing_mysql_tables[0]
  69. else:
  70. raise RuntimeError("More than one table exists")
  71. super().__init__(capabilities)
  72. def run(self, c: Composition, state: State) -> None:
  73. if self.new_mysql_table:
  74. primary_key = "PRIMARY KEY" if self.mysql_table.has_pk else ""
  75. c.testdrive(
  76. dedent(
  77. f"""
  78. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  79. $ mysql-execute name=mysql
  80. USE public;
  81. CREATE TABLE {self.mysql_table.name} (f1 INTEGER {primary_key});
  82. INSERT INTO {self.mysql_table.name} VALUES ({self.mysql_table.watermarks.max});
  83. """
  84. ),
  85. mz_service=state.mz_service,
  86. )
  87. def provides(self) -> list[Capability]:
  88. return [self.mysql_table] if self.new_mysql_table else []
  89. class MySqlDML(Action):
  90. """Performs an INSERT, DELETE or UPDATE against a MySQL table."""
  91. # We use smaller batches in Pg then in Mz because Pg will fill up much faster
  92. MAX_BATCH_SIZE = 10000
  93. @classmethod
  94. def requires(cls) -> set[type[Capability]]:
  95. return {BalancerdIsRunning, MzIsRunning, MySqlRunning, MySqlTableExists}
  96. def __init__(self, capabilities: Capabilities) -> None:
  97. self.mysql_table = random.choice(capabilities.get(MySqlTableExists))
  98. self.delta = random.randint(1, MySqlDML.MAX_BATCH_SIZE)
  99. super().__init__(capabilities)
  100. def __str__(self) -> str:
  101. return f"{Action.__str__(self)} {self.mysql_table.name}"
  102. class MySqlInsert(MySqlDML):
  103. """Inserts rows into a MySQL table."""
  104. def run(self, c: Composition, state: State) -> None:
  105. prev_max = self.mysql_table.watermarks.max
  106. self.mysql_table.watermarks.max = prev_max + self.delta
  107. c.testdrive(
  108. dedent(
  109. f"""
  110. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  111. $ mysql-execute name=mysql
  112. USE public;
  113. SET @i:={prev_max};
  114. INSERT INTO {self.mysql_table.name} SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.mysql_table.watermarks.max - prev_max};
  115. """
  116. ),
  117. mz_service=state.mz_service,
  118. )
  119. class MySqlShiftForward(MySqlDML):
  120. """Update all rows from a MySQL table by incrementing their values by a constant (tables without a PK only)"""
  121. def run(self, c: Composition, state: State) -> None:
  122. if not self.mysql_table.has_pk:
  123. self.mysql_table.watermarks.shift(self.delta)
  124. c.testdrive(
  125. dedent(
  126. f"""
  127. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  128. $ mysql-execute name=mysql
  129. USE public;
  130. UPDATE {self.mysql_table.name} SET f1 = f1 + {self.delta};
  131. """
  132. ),
  133. mz_service=state.mz_service,
  134. )
  135. class MySqlShiftBackward(MySqlDML):
  136. """Update all rows from a MySQL table by decrementing their values by a constant (tables without a PK only)"""
  137. def run(self, c: Composition, state: State) -> None:
  138. if not self.mysql_table.has_pk:
  139. self.mysql_table.watermarks.shift(-self.delta)
  140. c.testdrive(
  141. dedent(
  142. f"""
  143. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  144. $ mysql-execute name=mysql
  145. USE public;
  146. UPDATE {self.mysql_table.name} SET f1 = f1 - {self.delta};
  147. """
  148. ),
  149. mz_service=state.mz_service,
  150. )
  151. class MySqlDeleteFromHead(MySqlDML):
  152. """Delete the largest values from a MySQL table"""
  153. def run(self, c: Composition, state: State) -> None:
  154. self.mysql_table.watermarks.max = max(
  155. self.mysql_table.watermarks.max - self.delta,
  156. self.mysql_table.watermarks.min,
  157. )
  158. c.testdrive(
  159. dedent(
  160. f"""
  161. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  162. $ mysql-execute name=mysql
  163. USE public;
  164. DELETE FROM {self.mysql_table.name} WHERE f1 > {self.mysql_table.watermarks.max};
  165. """
  166. ),
  167. mz_service=state.mz_service,
  168. )
  169. class MySqlDeleteFromTail(MySqlDML):
  170. """Delete the smallest values from a MySQL table"""
  171. def run(self, c: Composition, state: State) -> None:
  172. self.mysql_table.watermarks.min = min(
  173. self.mysql_table.watermarks.min + self.delta,
  174. self.mysql_table.watermarks.max,
  175. )
  176. c.testdrive(
  177. dedent(
  178. f"""
  179. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  180. $ mysql-execute name=mysql
  181. USE public;
  182. DELETE FROM {self.mysql_table.name} WHERE f1 < {self.mysql_table.watermarks.min};
  183. """
  184. ),
  185. mz_service=state.mz_service,
  186. )