kafka-avro-debezium-sinks.td 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET enable_envelope_materialize = true
  12. # Test behavior that is specific to Kafka Avro Sinks with ENVELOPE DEBEZIUM
  13. # Test a basic sink with multiple rows.
  14. > CREATE MATERIALIZED VIEW data (a, b) AS VALUES (1, 1), (2, 1), (3, 1), (1, 2)
  15. > CREATE CONNECTION kafka_conn
  16. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  17. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  18. URL '${testdrive.schema-registry-url}'
  19. );
  20. > CREATE CLUSTER data_sink_cluster SIZE '${arg.default-storage-size}';
  21. > CREATE SINK data_sink
  22. IN CLUSTER data_sink_cluster
  23. FROM data
  24. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-sink-${testdrive.seed}')
  25. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  26. ENVELOPE DEBEZIUM
  27. $ kafka-verify-data format=avro sink=materialize.public.data_sink sort-messages=true
  28. {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  29. {"before": null, "after": {"row": {"a": 1, "b": 2}}}
  30. {"before": null, "after": {"row": {"a": 2, "b": 1}}}
  31. {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  32. # More complex sinks, with multiple keys and/or a consistency topic. We test
  33. # all the possible combinations of user-specified sink key and
  34. # natural (primary) relation key.
  35. $ set schema=[
  36. {
  37. "type": "array",
  38. "items": {
  39. "type": "record",
  40. "name": "update",
  41. "namespace": "com.materialize.cdc",
  42. "fields": [
  43. {
  44. "name": "data",
  45. "type": {
  46. "type": "record",
  47. "name": "data",
  48. "fields": [
  49. {"name": "a", "type": "long"},
  50. {"name": "b", "type": "long"}
  51. ]
  52. }
  53. },
  54. {
  55. "name": "time",
  56. "type": "long"
  57. },
  58. {
  59. "name": "diff",
  60. "type": "long"
  61. }
  62. ]
  63. }
  64. },
  65. {
  66. "type": "record",
  67. "name": "progress",
  68. "namespace": "com.materialize.cdc",
  69. "fields": [
  70. {
  71. "name": "lower",
  72. "type": {
  73. "type": "array",
  74. "items": "long"
  75. }
  76. },
  77. {
  78. "name": "upper",
  79. "type": {
  80. "type": "array",
  81. "items": "long"
  82. }
  83. },
  84. {
  85. "name": "counts",
  86. "type": {
  87. "type": "array",
  88. "items": {
  89. "type": "record",
  90. "name": "counts",
  91. "fields": [
  92. {
  93. "name": "time",
  94. "type": "long"
  95. },
  96. {
  97. "name": "count",
  98. "type": "long"
  99. }
  100. ]
  101. }
  102. }
  103. }
  104. ]
  105. }
  106. ]
  107. $ kafka-create-topic topic=input
  108. # first create all the sinks, then ingest data, to ensure that
  109. # input is processed in consistency batches and not all at once
  110. > CREATE CLUSTER input_cluster SIZE '${arg.default-storage-size}';
  111. > CREATE SOURCE input
  112. IN CLUSTER input_cluster
  113. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-${testdrive.seed}')
  114. > CREATE TABLE input_tbl FROM SOURCE input (REFERENCE "testdrive-input-${testdrive.seed}")
  115. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  116. > CREATE CLUSTER non_keyed_sink_cluster SIZE '${arg.default-storage-size}';
  117. > CREATE SINK non_keyed_sink
  118. IN CLUSTER non_keyed_sink_cluster
  119. FROM input_tbl
  120. INTO KAFKA CONNECTION kafka_conn (TOPIC 'non-keyed-sink-${testdrive.seed}')
  121. FORMAT AVRO
  122. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  123. ENVELOPE DEBEZIUM
  124. > CREATE VIEW max_view AS SELECT a, MAX(b) as b FROM input_tbl GROUP BY a
  125. # the sinked relation has the natural primary key (a)
  126. > CREATE CLUSTER non_keyed_sink_of_keyed_relation_cluster SIZE '${arg.default-storage-size}';
  127. > CREATE SINK non_keyed_sink_of_keyed_relation
  128. IN CLUSTER non_keyed_sink_of_keyed_relation_cluster
  129. FROM input_tbl
  130. INTO KAFKA CONNECTION kafka_conn (TOPIC 'non-keyed-sink-of-keyed-relation-${testdrive.seed}')
  131. FORMAT AVRO
  132. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  133. ENVELOPE DEBEZIUM
  134. > CREATE CLUSTER keyed_sink_cluster SIZE '${arg.default-storage-size}';
  135. > CREATE SINK keyed_sink
  136. IN CLUSTER keyed_sink_cluster
  137. FROM input_tbl
  138. INTO KAFKA CONNECTION kafka_conn (TOPIC 'keyed-sink-${testdrive.seed}') KEY (a)
  139. FORMAT AVRO
  140. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  141. ENVELOPE DEBEZIUM
  142. > CREATE CLUSTER keyed_sink_of_keyed_relation_cluster SIZE '${arg.default-storage-size}';
  143. > CREATE SINK keyed_sink_of_keyed_relation
  144. IN CLUSTER keyed_sink_of_keyed_relation_cluster
  145. FROM input_tbl
  146. INTO KAFKA CONNECTION kafka_conn (TOPIC 'keyed-sink-of-keyed-relation-${testdrive.seed}') KEY (b)
  147. FORMAT AVRO
  148. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  149. ENVELOPE DEBEZIUM
  150. > CREATE CLUSTER multi_keyed_sink_cluster SIZE '${arg.default-storage-size}';
  151. > CREATE SINK multi_keyed_sink
  152. IN CLUSTER multi_keyed_sink_cluster
  153. FROM input_tbl
  154. INTO KAFKA CONNECTION kafka_conn (TOPIC 'multi-keyed-sink-${testdrive.seed}') KEY (b, a)
  155. FORMAT AVRO
  156. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  157. ENVELOPE DEBEZIUM
  158. $ kafka-ingest format=avro topic=input schema=${schema}
  159. {"array":[{"data":{"a":1,"b":1},"time":1,"diff":1}]}
  160. {"array":[{"data":{"a":2,"b":2},"time":1,"diff":1}]}
  161. {"array":[{"data":{"a":3,"b":1},"time":2,"diff":1}]}
  162. {"array":[{"data":{"a":4,"b":2},"time":2,"diff":1}]}
  163. {"array":[{"data":{"a":1,"b":7},"time":3,"diff":1}]}
  164. {"com.materialize.cdc.progress":{"lower":[0],"upper":[4],"counts":[{"time":1,"count":2},{"time":2,"count":2},{"time":3,"count":1}]}}
  165. > SELECT * FROM input_tbl;
  166. a b
  167. ------
  168. 1 1
  169. 2 2
  170. 3 1
  171. 4 2
  172. 1 7
  173. # Compare sorted messages within each transaction. We know that messages of one
  174. # transaction appear together as one "bundle" in the output. But there is no
  175. # guarantee on the order within a transaction.
  176. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.non_keyed_sink sort-messages=true
  177. 1 {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  178. 1 {"before": null, "after": {"row": {"a": 2, "b": 2}}}
  179. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.non_keyed_sink sort-messages=true
  180. 2 {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  181. 2 {"before": null, "after": {"row": {"a": 4, "b": 2}}}
  182. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.non_keyed_sink sort-messages=true
  183. 3 {"before": null, "after": {"row": {"a": 1, "b": 7}}}
  184. # Again, compare split by transaction. See comment just above.
  185. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.non_keyed_sink_of_keyed_relation sort-messages=true
  186. 1 {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  187. 1 {"before": null, "after": {"row": {"a": 2, "b": 2}}}
  188. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.non_keyed_sink_of_keyed_relation sort-messages=true
  189. 2 {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  190. 2 {"before": null, "after": {"row": {"a": 4, "b": 2}}}
  191. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.non_keyed_sink_of_keyed_relation sort-messages=true
  192. 3 {"before": null, "after": {"row": {"a": 1, "b": 7}}}
  193. # Again, compare split by transaction. See comment just above.
  194. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.keyed_sink sort-messages=true
  195. 1 {"a": 1} {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  196. 1 {"a": 2} {"before": null, "after": {"row": {"a": 2, "b": 2}}}
  197. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.keyed_sink sort-messages=true
  198. 2 {"a": 3} {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  199. 2 {"a": 4} {"before": null, "after": {"row": {"a": 4, "b": 2}}}
  200. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.keyed_sink sort-messages=true
  201. 3 {"a": 1} {"before": null, "after": {"row": {"a": 1, "b": 7}}}
  202. # Again, compare split by transaction. See comment just above.
  203. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.keyed_sink_of_keyed_relation sort-messages=true
  204. 1 {"b": 1} {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  205. 1 {"b": 2} {"before": null, "after": {"row": {"a": 2, "b": 2}}}
  206. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.keyed_sink_of_keyed_relation sort-messages=true
  207. 2 {"b": 1} {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  208. 2 {"b": 2} {"before": null, "after": {"row": {"a": 4, "b": 2}}}
  209. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.keyed_sink_of_keyed_relation sort-messages=true
  210. 3 {"b": 7} {"before": null, "after": {"row": {"a": 1, "b": 7}}}
  211. # Again, compare split by transaction. See comment just above.
  212. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.multi_keyed_sink sort-messages=true
  213. 1 {"b": 1, "a": 1} {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  214. 1 {"b": 2, "a": 2} {"before": null, "after": {"row": {"a": 2, "b": 2}}}
  215. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.multi_keyed_sink sort-messages=true
  216. 2 {"b": 1, "a": 3} {"before": null, "after": {"row": {"a": 3, "b": 1}}}
  217. 2 {"b": 2, "a": 4} {"before": null, "after": {"row": {"a": 4, "b": 2}}}
  218. $ kafka-verify-data headers=materialize-timestamp format=avro sink=materialize.public.multi_keyed_sink sort-messages=true
  219. 3 {"b": 7, "a": 1} {"before": null, "after": {"row": {"a": 1, "b": 7}}}