debezium_actions.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from textwrap import dedent
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.zippy.balancerd_capabilities import BalancerdIsRunning
  13. from materialize.zippy.debezium_capabilities import (
  14. DebeziumRunning,
  15. DebeziumSourceExists,
  16. PostgresTableExists,
  17. )
  18. from materialize.zippy.framework import Action, Capabilities, Capability, State
  19. from materialize.zippy.kafka_capabilities import KafkaRunning
  20. from materialize.zippy.mz_capabilities import MzIsRunning
  21. from materialize.zippy.replica_capabilities import source_capable_clusters
  22. from materialize.zippy.storaged_capabilities import StoragedRunning
  23. class DebeziumStart(Action):
  24. """Start a Debezium instance."""
  25. def provides(self) -> list[Capability]:
  26. return [DebeziumRunning()]
  27. def run(self, c: Composition, state: State) -> None:
  28. c.up("debezium")
  29. class DebeziumStop(Action):
  30. """Stop the Debezium instance."""
  31. @classmethod
  32. def requires(cls) -> set[type[Capability]]:
  33. return {DebeziumRunning}
  34. def withholds(self) -> set[type[Capability]]:
  35. return {DebeziumRunning}
  36. def run(self, c: Composition, state: State) -> None:
  37. c.kill("debezium")
  38. class CreateDebeziumSource(Action):
  39. """Creates a Debezium source in Materialized."""
  40. @classmethod
  41. def requires(cls) -> set[type[Capability]]:
  42. return {
  43. BalancerdIsRunning,
  44. MzIsRunning,
  45. StoragedRunning,
  46. KafkaRunning,
  47. PostgresTableExists,
  48. }
  49. def __init__(self, capabilities: Capabilities) -> None:
  50. # To avoid conflicts, we make sure the postgres table and the debezium source have matching names
  51. postgres_table = random.choice(capabilities.get(PostgresTableExists))
  52. cluster_name = random.choice(source_capable_clusters(capabilities))
  53. debezium_source_name = f"debezium_source_{postgres_table.name}"
  54. this_debezium_source = DebeziumSourceExists(name=debezium_source_name)
  55. existing_debezium_sources = [
  56. s
  57. for s in capabilities.get(DebeziumSourceExists)
  58. if s.name == this_debezium_source.name
  59. ]
  60. if len(existing_debezium_sources) == 0:
  61. self.new_debezium_source = True
  62. self.debezium_source = this_debezium_source
  63. self.postgres_table = postgres_table
  64. self.debezium_source.postgres_table = self.postgres_table
  65. self.cluster_name = cluster_name
  66. elif len(existing_debezium_sources) == 1:
  67. self.new_debezium_source = False
  68. self.debezium_source = existing_debezium_sources[0]
  69. assert self.debezium_source.postgres_table is not None
  70. self.postgres_table = self.debezium_source.postgres_table
  71. else:
  72. raise RuntimeError("More than one Debezium source exists")
  73. super().__init__(capabilities)
  74. def run(self, c: Composition, state: State) -> None:
  75. if self.new_debezium_source:
  76. c.testdrive(
  77. dedent(
  78. f"""
  79. $ http-request method=POST url=http://debezium:8083/connectors content-type=application/json
  80. {{
  81. "name": "{self.debezium_source.name}",
  82. "config": {{
  83. "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  84. "database.hostname": "postgres",
  85. "database.port": "5432",
  86. "database.user": "postgres",
  87. "database.password": "postgres",
  88. "database.dbname" : "postgres",
  89. "database.server.name": "postgres",
  90. "schema.include.list": "public",
  91. "table.include.list": "public.{self.postgres_table.name}",
  92. "plugin.name": "pgoutput",
  93. "publication.name": "dbz_publication_{self.debezium_source.name}",
  94. "publication.autocreate.mode": "filtered",
  95. "slot.name" : "slot_{self.postgres_table.name}",
  96. "database.history.kafka.bootstrap.servers": "kafka:9092",
  97. "database.history.kafka.topic": "schema-changes.history",
  98. "truncate.handling.mode": "include",
  99. "decimal.handling.mode": "precise",
  100. "topic.prefix": "postgres"
  101. }}
  102. }}
  103. $ schema-registry-wait topic=postgres.public.{self.postgres_table.name}
  104. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  105. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  106. > CREATE SOURCE {self.debezium_source.name}
  107. IN CLUSTER {self.cluster_name}
  108. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.{self.postgres_table.name}')
  109. > CREATE TABLE {self.debezium_source.get_name_for_query()} FROM SOURCE {self.debezium_source.name} (REFERENCE "postgres.public.{self.postgres_table.name}")
  110. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  111. ENVELOPE DEBEZIUM
  112. """
  113. )
  114. )
  115. def provides(self) -> list[Capability]:
  116. return [self.debezium_source] if self.new_debezium_source else []