postgres_actions.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.framework import Action, Capabilities, Capability, State
  14. from materialize.zippy.mz_capabilities import MzIsRunning
  15. from materialize.zippy.postgres_capabilities import PostgresRunning, PostgresTableExists
  16. class PostgresStart(Action):
  17. """Start a PostgresInstance instance."""
  18. def provides(self) -> list[Capability]:
  19. return [PostgresRunning()]
  20. def run(self, c: Composition, state: State) -> None:
  21. c.up("postgres")
  22. class PostgresStop(Action):
  23. """Stop the Postgres instance."""
  24. @classmethod
  25. def requires(cls) -> set[type[Capability]]:
  26. return {PostgresRunning}
  27. def withholds(self) -> set[type[Capability]]:
  28. return {PostgresRunning}
  29. def run(self, c: Composition, state: State) -> None:
  30. c.kill("postgres")
  31. class PostgresRestart(Action):
  32. """Restart the Postgres instance."""
  33. def run(self, c: Composition, state: State) -> None:
  34. c.kill("postgres")
  35. c.up("postgres")
  36. class CreatePostgresTable(Action):
  37. """Creates a table on the Postgres instance. 50% of the tables have a PK."""
  38. @classmethod
  39. def requires(cls) -> set[type[Capability]]:
  40. return {BalancerdIsRunning, MzIsRunning, PostgresRunning}
  41. def __init__(self, capabilities: Capabilities) -> None:
  42. this_postgres_table = PostgresTableExists(
  43. name="table" + str(random.randint(1, 10))
  44. )
  45. existing_postgres_tables = [
  46. t
  47. for t in capabilities.get(PostgresTableExists)
  48. if t.name == this_postgres_table.name
  49. ]
  50. if len(existing_postgres_tables) == 0:
  51. self.new_postgres_table = True
  52. # A PK is now required for Debezium
  53. this_postgres_table.has_pk = True
  54. self.postgres_table = this_postgres_table
  55. elif len(existing_postgres_tables) == 1:
  56. self.new_postgres_table = False
  57. self.postgres_table = existing_postgres_tables[0]
  58. else:
  59. raise RuntimeError("More than one table exists")
  60. super().__init__(capabilities)
  61. def run(self, c: Composition, state: State) -> None:
  62. if self.new_postgres_table:
  63. primary_key = "PRIMARY KEY" if self.postgres_table.has_pk else ""
  64. c.testdrive(
  65. dedent(
  66. f"""
  67. $ postgres-execute connection=postgres://postgres:postgres@postgres
  68. CREATE TABLE {self.postgres_table.name} (f1 INTEGER {primary_key});
  69. ALTER TABLE {self.postgres_table.name} REPLICA IDENTITY FULL;
  70. INSERT INTO {self.postgres_table.name} VALUES ({self.postgres_table.watermarks.max});
  71. """
  72. ),
  73. mz_service=state.mz_service,
  74. )
  75. def provides(self) -> list[Capability]:
  76. return [self.postgres_table] if self.new_postgres_table else []
  77. class PostgresDML(Action):
  78. """Performs an INSERT, DELETE or UPDATE against a Postgres table."""
  79. # We use smaller batches in Pg then in Mz because Pg will fill up much faster
  80. MAX_BATCH_SIZE = 10000
  81. @classmethod
  82. def requires(cls) -> set[type[Capability]]:
  83. return {BalancerdIsRunning, MzIsRunning, PostgresRunning, PostgresTableExists}
  84. def __init__(self, capabilities: Capabilities) -> None:
  85. self.postgres_table = random.choice(capabilities.get(PostgresTableExists))
  86. self.delta = random.randint(1, PostgresDML.MAX_BATCH_SIZE)
  87. super().__init__(capabilities)
  88. def __str__(self) -> str:
  89. return f"{Action.__str__(self)} {self.postgres_table.name}"
  90. class PostgresInsert(PostgresDML):
  91. """Inserts rows into a Postgres table."""
  92. def run(self, c: Composition, state: State) -> None:
  93. prev_max = self.postgres_table.watermarks.max
  94. self.postgres_table.watermarks.max = prev_max + self.delta
  95. c.testdrive(
  96. dedent(
  97. f"""
  98. $ postgres-execute connection=postgres://postgres:postgres@postgres
  99. INSERT INTO {self.postgres_table.name} SELECT * FROM generate_series({prev_max + 1}, {self.postgres_table.watermarks.max});
  100. """
  101. ),
  102. mz_service=state.mz_service,
  103. )
  104. class PostgresShiftForward(PostgresDML):
  105. """Update all rows from a Postgres table by incrementing their values by a constant (tables without a PK only)"""
  106. def run(self, c: Composition, state: State) -> None:
  107. if not self.postgres_table.has_pk:
  108. self.postgres_table.watermarks.shift(self.delta)
  109. c.testdrive(
  110. dedent(
  111. f"""
  112. $ postgres-execute connection=postgres://postgres:postgres@postgres
  113. UPDATE {self.postgres_table.name} SET f1 = f1 + {self.delta};
  114. """
  115. ),
  116. mz_service=state.mz_service,
  117. )
  118. class PostgresShiftBackward(PostgresDML):
  119. """Update all rows from a Postgres table by decrementing their values by a constant (tables without a PK only)"""
  120. def run(self, c: Composition, state: State) -> None:
  121. if not self.postgres_table.has_pk:
  122. self.postgres_table.watermarks.shift(-self.delta)
  123. c.testdrive(
  124. dedent(
  125. f"""
  126. $ postgres-execute connection=postgres://postgres:postgres@postgres
  127. UPDATE {self.postgres_table.name} SET f1 = f1 - {self.delta};
  128. """
  129. ),
  130. mz_service=state.mz_service,
  131. )
  132. class PostgresDeleteFromHead(PostgresDML):
  133. """Delete the largest values from a Postgres table"""
  134. def run(self, c: Composition, state: State) -> None:
  135. self.postgres_table.watermarks.max = max(
  136. self.postgres_table.watermarks.max - self.delta,
  137. self.postgres_table.watermarks.min,
  138. )
  139. c.testdrive(
  140. dedent(
  141. f"""
  142. $ postgres-execute connection=postgres://postgres:postgres@postgres
  143. DELETE FROM {self.postgres_table.name} WHERE f1 > {self.postgres_table.watermarks.max};
  144. """
  145. ),
  146. mz_service=state.mz_service,
  147. )
  148. class PostgresDeleteFromTail(PostgresDML):
  149. """Delete the smallest values from a Postgres table"""
  150. def run(self, c: Composition, state: State) -> None:
  151. self.postgres_table.watermarks.min = min(
  152. self.postgres_table.watermarks.min + self.delta,
  153. self.postgres_table.watermarks.max,
  154. )
  155. c.testdrive(
  156. dedent(
  157. f"""
  158. $ postgres-execute connection=postgres://postgres:postgres@postgres
  159. DELETE FROM {self.postgres_table.name} WHERE f1 < {self.postgres_table.watermarks.min};
  160. """
  161. ),
  162. mz_service=state.mz_service,
  163. )