debezium.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. @externally_idempotent(False)
  13. class DebeziumPostgres(Check):
  14. def initialize(self) -> Testdrive:
  15. return Testdrive(
  16. dedent(
  17. """
  18. $ postgres-execute connection=postgres://postgres:postgres@postgres
  19. CREATE TABLE debezium_table (f1 TEXT, f2 INTEGER, f3 INTEGER, f4 TEXT, PRIMARY KEY (f1, f2));
  20. ALTER TABLE debezium_table REPLICA IDENTITY FULL;
  21. INSERT INTO debezium_table SELECT 'A', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000);
  22. $ http-request method=POST url=http://debezium:8083/connectors content-type=application/json
  23. {
  24. "name": "psql-connector",
  25. "config": {
  26. "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  27. "database.hostname": "postgres",
  28. "database.port": "5432",
  29. "database.user": "postgres",
  30. "database.password": "postgres",
  31. "database.dbname" : "postgres",
  32. "database.server.name": "postgres",
  33. "schema.include.list": "public",
  34. "table.include.list": "public.debezium_table",
  35. "plugin.name": "pgoutput",
  36. "publication.autocreate.mode": "filtered",
  37. "slot.name" : "tester",
  38. "database.history.kafka.bootstrap.servers": "kafka:9092",
  39. "database.history.kafka.topic": "schema-changes.history",
  40. "truncate.handling.mode": "include",
  41. "decimal.handling.mode": "precise",
  42. "topic.prefix": "postgres"
  43. }
  44. }
  45. $ schema-registry-wait topic=postgres.public.debezium_table
  46. $ kafka-wait-topic topic=postgres.public.debezium_table
  47. # UPSERT is required due to https://github.com/MaterializeInc/database-issues/issues/4064
  48. > CREATE SOURCE debezium_source1
  49. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.debezium_table');
  50. > CREATE TABLE debezium_source1_tbl FROM SOURCE debezium_source1 (REFERENCE "postgres.public.debezium_table")
  51. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  52. ENVELOPE DEBEZIUM;
  53. $ postgres-execute connection=postgres://postgres:postgres@postgres
  54. INSERT INTO debezium_table SELECT 'B', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000);
  55. > CREATE MATERIALIZED VIEW debezium_view1 AS SELECT f1, f3, SUM(LENGTH(f4)) FROM debezium_source1_tbl GROUP BY f1, f3;
  56. > SELECT * FROM debezium_view1;
  57. A 1 16000
  58. B 1 16000
  59. """
  60. )
  61. )
  62. def manipulate(self) -> list[Testdrive]:
  63. return [
  64. Testdrive(dedent(s))
  65. for s in [
  66. """
  67. $ postgres-execute connection=postgres://postgres:postgres@postgres
  68. BEGIN;
  69. INSERT INTO debezium_table SELECT 'C', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000);
  70. UPDATE debezium_table SET f3 = f3 + 1;
  71. COMMIT;
  72. > CREATE SOURCE debezium_source2
  73. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.debezium_table');
  74. > CREATE TABLE debezium_source2_tbl FROM SOURCE debezium_source2 (REFERENCE "postgres.public.debezium_table")
  75. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  76. ENVELOPE DEBEZIUM;
  77. $ postgres-execute connection=postgres://postgres:postgres@postgres
  78. BEGIN;
  79. INSERT INTO debezium_table SELECT 'D', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000);
  80. UPDATE debezium_table SET f3 = f3 + 1;
  81. COMMIT;
  82. > CREATE MATERIALIZED VIEW debezium_view2 AS SELECT f1, f3, SUM(LENGTH(f4)) FROM debezium_source2_tbl GROUP BY f1, f3;
  83. """,
  84. """
  85. $ postgres-execute connection=postgres://postgres:postgres@postgres
  86. BEGIN;
  87. INSERT INTO debezium_table SELECT 'E', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000);
  88. UPDATE debezium_table SET f3 = f3 + 1;
  89. COMMIT;
  90. > CREATE SOURCE debezium_source3
  91. FROM KAFKA CONNECTION kafka_conn (TOPIC 'postgres.public.debezium_table');
  92. > CREATE TABLE debezium_source3_tbl FROM SOURCE debezium_source3 (REFERENCE "postgres.public.debezium_table")
  93. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  94. ENVELOPE DEBEZIUM;
  95. $ postgres-execute connection=postgres://postgres:postgres@postgres
  96. BEGIN;
  97. INSERT INTO debezium_table SELECT 'F', generate_series, 1, REPEAT('X', 16) FROM generate_series(1,1000);
  98. UPDATE debezium_table SET f3 = f3 + 1;
  99. COMMIT;
  100. > CREATE MATERIALIZED VIEW debezium_view3 AS SELECT f1, f3, SUM(LENGTH(f4)) FROM debezium_source3_tbl GROUP BY f1, f3;
  101. """,
  102. ]
  103. ]
  104. def validate(self) -> Testdrive:
  105. return Testdrive(
  106. dedent(
  107. """
  108. > SELECT * FROM debezium_view1;
  109. A 5 16000
  110. B 5 16000
  111. C 5 16000
  112. D 4 16000
  113. E 3 16000
  114. F 2 16000
  115. > SELECT * FROM debezium_view2;
  116. A 5 16000
  117. B 5 16000
  118. C 5 16000
  119. D 4 16000
  120. E 3 16000
  121. F 2 16000
  122. > SELECT * FROM debezium_view3;
  123. A 5 16000
  124. B 5 16000
  125. C 5 16000
  126. D 4 16000
  127. E 3 16000
  128. F 2 16000
  129. """
  130. + (
  131. r"""
  132. $ set-regex match="FORMAT .*? ENVELOPE DEBEZIUM " replacement=""
  133. >[version>=14000] SHOW CREATE SOURCE debezium_source1;
  134. materialize.public.debezium_source1 "CREATE SOURCE materialize.public.debezium_source1\nIN CLUSTER quickstart\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = 'postgres.public.debezium_table')\nEXPOSE PROGRESS AS materialize.public.debezium_source1_progress;"
  135. >[version>=14000] SHOW CREATE SOURCE debezium_source2;
  136. materialize.public.debezium_source2 "CREATE SOURCE materialize.public.debezium_source2\nIN CLUSTER quickstart\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = 'postgres.public.debezium_table')\nEXPOSE PROGRESS AS materialize.public.debezium_source2_progress;"
  137. >[version>=14000] SHOW CREATE SOURCE debezium_source3;
  138. materialize.public.debezium_source3 "CREATE SOURCE materialize.public.debezium_source3\nIN CLUSTER quickstart\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = 'postgres.public.debezium_table')\nEXPOSE PROGRESS AS materialize.public.debezium_source3_progress;"
  139. >[version<14000] SHOW CREATE SOURCE debezium_source1;
  140. materialize.public.debezium_source1 "CREATE SOURCE \"materialize\".\"public\".\"debezium_source1\" IN CLUSTER \"quickstart\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'postgres.public.debezium_table') EXPOSE PROGRESS AS \"materialize\".\"public\".\"debezium_source1_progress\""
  141. >[version<14000] SHOW CREATE SOURCE debezium_source2;
  142. materialize.public.debezium_source2 "CREATE SOURCE \"materialize\".\"public\".\"debezium_source2\" IN CLUSTER \"quickstart\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'postgres.public.debezium_table') EXPOSE PROGRESS AS \"materialize\".\"public\".\"debezium_source2_progress\""
  143. >[version<14000] SHOW CREATE SOURCE debezium_source3;
  144. materialize.public.debezium_source3 "CREATE SOURCE \"materialize\".\"public\".\"debezium_source3\" IN CLUSTER \"quickstart\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'postgres.public.debezium_table') EXPOSE PROGRESS AS \"materialize\".\"public\".\"debezium_source3_progress\""
  145. """
  146. if not self.is_running_as_cloudtest()
  147. else ""
  148. )
  149. )
  150. )