kafka-sinks.td 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default default-replica-size=1
  11. $ set-arg-default single-replica-cluster=quickstart
  12. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  13. ALTER SYSTEM SET max_clusters = 20
  14. $ kafka-create-topic topic=test partitions=1
  15. $ kafka-ingest topic=test format=bytes
  16. jack,jill
  17. goofus,gallant
  18. > CREATE CONNECTION kafka_conn
  19. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  20. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  21. URL '${testdrive.schema-registry-url}'
  22. );
  23. > CREATE CLUSTER src_cluster SIZE '${arg.default-storage-size}';
  24. > CREATE SOURCE src
  25. IN CLUSTER src_cluster
  26. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
  27. > CREATE TABLE src_tbl (a, b) FROM SOURCE src (REFERENCE "testdrive-test-${testdrive.seed}")
  28. FORMAT CSV WITH 2 COLUMNS
  29. INCLUDE OFFSET
  30. > CREATE CLUSTER src_materialized_cluster SIZE '${arg.default-storage-size}';
  31. > CREATE SOURCE src_materialized
  32. IN CLUSTER src_materialized_cluster
  33. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
  34. > CREATE TABLE src_materialized_tbl (a, b) FROM SOURCE src_materialized (REFERENCE "testdrive-test-${testdrive.seed}")
  35. FORMAT CSV WITH 2 COLUMNS
  36. INCLUDE OFFSET
  37. > CREATE VIEW v1 AS
  38. SELECT a || b AS c FROM src_tbl
  39. > CREATE VIEW v2 AS
  40. SELECT a || b AS c FROM src_materialized_tbl
  41. > CREATE MATERIALIZED VIEW v3 AS
  42. SELECT a || b AS c FROM src_tbl
  43. # We should refuse to create a sink with invalid WITH options
  44. ! CREATE SINK invalid_with_option
  45. IN CLUSTER ${arg.single-replica-cluster}
  46. FROM src_tbl
  47. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  48. WITH (badoption=true)
  49. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  50. contains:Expected one of PARTITION or SNAPSHOT or VERSION
  51. ! CREATE SINK invalid_with_option
  52. IN CLUSTER ${arg.single-replica-cluster}
  53. FROM src_tbl
  54. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  55. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  56. WITH (VERSION=1)
  57. contains:CREATE SINK...WITH (VERSION..) is not allowed
  58. > SHOW SINKS
  59. name type cluster comment
  60. -------------------------------------------
  61. # # We should refuse to create a sink with an invalid schema registry URL.
  62. #
  63. # # Invalid in that the address is not well formed
  64. # ! CREATE SINK bad_schema_registry
  65. # IN CLUSTER ${arg.single-replica-cluster}
  66. # FROM v3
  67. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  68. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.kafka-addr}'
  69. # contains:cannot construct a CCSR client with a cannot-be-a-base URL
  70. #
  71. # # Invalid in that the address points to an invalid host
  72. # ! CREATE SINK bad_schema_registry
  73. # IN CLUSTER ${arg.single-replica-cluster}
  74. # FROM v3
  75. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  76. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://no-such-host'
  77. # contains:unable to publish value schema to registry in kafka sink
  78. #
  79. # # Invalid in that the address is not for a schema registry
  80. # ! CREATE SINK bad_schema_registry
  81. # IN CLUSTER ${arg.single-replica-cluster}
  82. # FROM v3
  83. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  84. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://materialized:6875'
  85. # contains:unable to publish value schema to registry in kafka sink
  86. #
  87. # ! CREATE SINK bad_view
  88. # IN CLUSTER ${arg.single-replica-cluster}
  89. # FROM v1
  90. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  91. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  92. # contains:v1 is a view, which cannot be exported as a sink
  93. #
  94. # # ...Even if that view is based on a materialized source
  95. # ! CREATE SINK bad_view2
  96. # IN CLUSTER ${arg.single-replica-cluster}
  97. # FROM v2
  98. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  99. # WITH (retention_ms=1000000)
  100. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  101. # contains:v2 is a view, which cannot be exported as a sink
  102. #
  103. # > SHOW SINKS
  104. # name type size cluster
  105. # ---------------------------------------
  106. # N.B. it is important to test sinks that depend on sources directly vs. sinks
  107. # that depend on views, as the code paths are different.
  108. > CREATE CLUSTER snk1_cluster SIZE '${arg.default-storage-size}';
  109. > CREATE SINK snk1
  110. IN CLUSTER snk1_cluster
  111. FROM src_tbl
  112. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  113. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  114. ENVELOPE DEBEZIUM
  115. > CREATE CLUSTER snk2_cluster SIZE '${arg.default-storage-size}';
  116. > CREATE SINK snk2
  117. IN CLUSTER snk2_cluster
  118. FROM src_materialized_tbl
  119. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk2-${testdrive.seed}')
  120. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  121. ENVELOPE DEBEZIUM
  122. > CREATE CLUSTER snk3_cluster SIZE '${arg.default-storage-size}';
  123. > CREATE SINK snk3
  124. IN CLUSTER snk3_cluster
  125. FROM v3
  126. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk3-${testdrive.seed}')
  127. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  128. ENVELOPE DEBEZIUM
  129. > SHOW SINKS
  130. name type cluster comment
  131. -----------------------------------------------
  132. snk1 kafka snk1_cluster ""
  133. snk2 kafka snk2_cluster ""
  134. snk3 kafka snk3_cluster ""
  135. $ kafka-verify-data format=avro sink=materialize.public.snk1 sort-messages=true
  136. {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
  137. {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
  138. $ kafka-verify-data format=avro sink=materialize.public.snk2 sort-messages=true
  139. {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
  140. {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
  141. $ kafka-verify-data format=avro sink=materialize.public.snk3 sort-messages=true
  142. {"before": null, "after": {"row":{"c": "goofusgallant"}}}
  143. {"before": null, "after": {"row":{"c": "jackjill"}}}
  144. # Test Avro serialization of unsigned values.
  145. > CREATE MATERIALIZED VIEW unsigned (a, b, c, d, e, f) AS
  146. VALUES ('1'::uint2, '2'::uint2, '3'::uint4, '4'::uint4, '5'::uint8, '6'::uint8)
  147. > CREATE CLUSTER snk_unsigned_cluster SIZE '${arg.default-storage-size}';
  148. > CREATE SINK snk_unsigned
  149. IN CLUSTER snk_unsigned_cluster
  150. FROM unsigned
  151. INTO KAFKA CONNECTION kafka_conn (TOPIC 'snk2')
  152. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  153. ENVELOPE DEBEZIUM
  154. $ kafka-verify-data format=avro sink=materialize.public.snk_unsigned sort-messages=true
  155. {"before": null, "after": {"row":{"a": [0, 1], "b": [0, 2], "c": [0, 0, 0, 3], "d": [0, 0, 0, 4], "e": [0, 0, 0, 0, 0, 0, 0, 5], "f": [0, 0, 0, 0, 0, 0, 0, 6]}}}
  156. # Test the case where we have non +/- 1 multiplicities
  157. > CREATE MATERIALIZED VIEW v4 AS
  158. SELECT true AS c FROM src_tbl
  159. > CREATE CLUSTER snk4_cluster SIZE '${arg.default-storage-size}';
  160. > CREATE SINK snk4
  161. IN CLUSTER snk4_cluster
  162. FROM v4
  163. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk4-${testdrive.seed}')
  164. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  165. ENVELOPE DEBEZIUM
  166. $ kafka-verify-data format=avro sink=materialize.public.snk4
  167. {"before": null, "after": {"row":{"c": true}}}
  168. {"before": null, "after": {"row":{"c": true}}}
  169. # Test WITH (SNAPSHOT).
  170. #
  171. # N.B. It's important that we've verified above that a sink exporting
  172. # src_materialized has processed the row. This means the data has a definite
  173. # timestamp. Without that, WITH (SNAPSHOT = false) could correct either include or
  174. # exclude the old rows.
  175. > CREATE CLUSTER snk5_cluster SIZE '${arg.default-storage-size}';
  176. > CREATE SINK snk5
  177. IN CLUSTER snk5_cluster
  178. FROM src_materialized_tbl
  179. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk5-${testdrive.seed}')
  180. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  181. ENVELOPE DEBEZIUM
  182. WITH (SNAPSHOT = false)
  183. > CREATE CLUSTER snk6_cluster SIZE '${arg.default-storage-size}';
  184. > CREATE SINK snk6
  185. IN CLUSTER snk6_cluster
  186. FROM src_materialized_tbl
  187. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk6-${testdrive.seed}')
  188. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  189. ENVELOPE DEBEZIUM
  190. WITH (SNAPSHOT = true)
  191. $ kafka-ingest topic=test format=bytes
  192. extra,row
  193. $ kafka-verify-data format=avro sink=materialize.public.snk5
  194. {"before": null, "after": {"row":{"a": "extra", "b": "row", "offset": [0, 0, 0, 0, 0, 0, 0, 2]}}}
  195. $ kafka-verify-data format=avro sink=materialize.public.snk6 sort-messages=true
  196. {"before": null, "after": {"row":{"a": "extra", "b": "row", "offset": [0, 0, 0, 0, 0, 0, 0, 2]}}}
  197. {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
  198. {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
  199. # Test that we are correctly handling SNAPSHOT on views with empty upper
  200. # frontier
  201. > CREATE MATERIALIZED VIEW foo AS VALUES (1), (2), (3);
  202. > CREATE CLUSTER snk7_cluster SIZE '${arg.default-storage-size}';
  203. > CREATE SINK snk7
  204. IN CLUSTER snk7_cluster
  205. FROM foo
  206. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk7-${testdrive.seed}')
  207. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  208. ENVELOPE DEBEZIUM
  209. WITH (SNAPSHOT = false)
  210. > CREATE CLUSTER snk8_cluster SIZE '${arg.default-storage-size}';
  211. > CREATE SINK snk8
  212. IN CLUSTER snk8_cluster
  213. FROM foo
  214. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk8-${testdrive.seed}')
  215. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  216. ENVELOPE DEBEZIUM
  217. WITH (SNAPSHOT)
  218. $ kafka-verify-data format=avro sink=materialize.public.snk8 sort-messages=true
  219. {"before": null, "after": {"row":{"column1": 1}}}
  220. {"before": null, "after": {"row":{"column1": 2}}}
  221. {"before": null, "after": {"row":{"column1": 3}}}
  222. # test already existing topic with non-default partition count
  223. $ kafka-create-topic topic=snk9 partitions=4
  224. > CREATE CLUSTER snk9_cluster SIZE '${arg.default-storage-size}';
  225. > CREATE SINK snk9
  226. IN CLUSTER snk9_cluster
  227. FROM foo
  228. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk14-${testdrive.seed}')
  229. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  230. ENVELOPE DEBEZIUM
  231. > SET cluster TO ${arg.single-replica-cluster}
  232. # create sink without specifying CLUSTER
  233. > CREATE SINK default_cluster_sink
  234. FROM src_tbl
  235. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  236. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  237. ENVELOPE DEBEZIUM
  238. > SET cluster TO default
  239. # linked clusters totally deprecated
  240. ! CREATE SINK sink_with_size FROM src_tbl
  241. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  242. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  243. ENVELOPE DEBEZIUM
  244. WITH (SIZE = '2')
  245. contains:Expected one of PARTITION or SNAPSHOT or VERSION, found SIZE
  246. # create sink with SNAPSHOT set
  247. > CREATE CLUSTER sink_with_options_cluster SIZE '${arg.default-storage-size}';
  248. > CREATE SINK sink_with_options
  249. IN CLUSTER sink_with_options_cluster
  250. FROM src_tbl
  251. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  252. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  253. ENVELOPE DEBEZIUM
  254. WITH (SNAPSHOT = false)
  255. > CREATE CLUSTER c SIZE '4'
  256. > CREATE SINK cluster_c_sink
  257. IN CLUSTER c
  258. FROM src_tbl
  259. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  260. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  261. ENVELOPE DEBEZIUM
  262. # All sinks are unlinked
  263. > SELECT bool_and(size IS NULL) FROM mz_sinks;
  264. true
  265. # Check SHOW SINKS
  266. > SHOW SINKS
  267. name type cluster comment
  268. -------------------------------------------------------------------
  269. cluster_c_sink kafka c ""
  270. default_cluster_sink kafka ${arg.single-replica-cluster} ""
  271. sink_with_options kafka sink_with_options_cluster ""
  272. snk1 kafka snk1_cluster ""
  273. snk2 kafka snk2_cluster ""
  274. snk3 kafka snk3_cluster ""
  275. snk4 kafka snk4_cluster ""
  276. snk5 kafka snk5_cluster ""
  277. snk6 kafka snk6_cluster ""
  278. snk7 kafka snk7_cluster ""
  279. snk8 kafka snk8_cluster ""
  280. snk9 kafka snk9_cluster ""
  281. snk_unsigned kafka snk_unsigned_cluster ""