pg_cdc_actions.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.framework import Action, Capabilities, Capability, State
  14. from materialize.zippy.mz_capabilities import MzIsRunning
  15. from materialize.zippy.pg_cdc_capabilities import PostgresCdcTableExists
  16. from materialize.zippy.postgres_capabilities import PostgresRunning, PostgresTableExists
  17. from materialize.zippy.replica_capabilities import source_capable_clusters
  18. from materialize.zippy.storaged_capabilities import StoragedRunning
  19. class CreatePostgresCdcTable(Action):
  20. """Creates a Postgres CDC source in Materialized."""
  21. @classmethod
  22. def requires(cls) -> set[type[Capability]]:
  23. return {
  24. BalancerdIsRunning,
  25. MzIsRunning,
  26. StoragedRunning,
  27. PostgresRunning,
  28. PostgresTableExists,
  29. }
  30. def __init__(self, capabilities: Capabilities) -> None:
  31. postgres_table = random.choice(capabilities.get(PostgresTableExists))
  32. postgres_pg_cdc_name = f"postgres_{postgres_table.name}"
  33. this_postgres_cdc_table = PostgresCdcTableExists(name=postgres_pg_cdc_name)
  34. cluster_name = random.choice(source_capable_clusters(capabilities))
  35. existing_postgres_cdc_tables = [
  36. s
  37. for s in capabilities.get(PostgresCdcTableExists)
  38. if s.name == this_postgres_cdc_table.name
  39. ]
  40. if len(existing_postgres_cdc_tables) == 0:
  41. self.new_postgres_cdc_table = True
  42. self.postgres_cdc_table = this_postgres_cdc_table
  43. self.postgres_cdc_table.postgres_table = postgres_table
  44. self.cluster_name = cluster_name
  45. elif len(existing_postgres_cdc_tables) == 1:
  46. self.new_postgres_cdc_table = False
  47. self.postgres_cdc_table = existing_postgres_cdc_tables[0]
  48. else:
  49. raise RuntimeError("More than one CDC table exists")
  50. super().__init__(capabilities)
  51. def run(self, c: Composition, state: State) -> None:
  52. if self.new_postgres_cdc_table:
  53. assert self.postgres_cdc_table is not None
  54. assert self.postgres_cdc_table.postgres_table is not None
  55. name = self.postgres_cdc_table.name
  56. c.testdrive(
  57. dedent(
  58. f"""
  59. $ postgres-execute connection=postgres://postgres:postgres@postgres
  60. CREATE PUBLICATION {name}_publication FOR TABLE {self.postgres_cdc_table.postgres_table.name};
  61. > CREATE SECRET {name}_password AS 'postgres';
  62. > CREATE CONNECTION {name}_connection TO POSTGRES (
  63. HOST postgres,
  64. DATABASE postgres,
  65. USER postgres,
  66. PASSWORD SECRET {name}_password
  67. );
  68. > CREATE SOURCE {name}_source
  69. IN CLUSTER {self.cluster_name}
  70. FROM POSTGRES CONNECTION {name}_connection (PUBLICATION '{name}_publication');
  71. > CREATE TABLE {name} FROM SOURCE {name}_source (REFERENCE {self.postgres_cdc_table.postgres_table.name});
  72. """
  73. ),
  74. mz_service=state.mz_service,
  75. )
  76. def provides(self) -> list[Capability]:
  77. return [self.postgres_cdc_table] if self.new_postgres_cdc_table else []