kafka-sinks.td 13 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default default-replica-size=1
  11. $ set-arg-default single-replica-cluster=quickstart
  12. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  13. ALTER SYSTEM SET max_clusters = 20
  14. $ kafka-create-topic topic=test partitions=1
  15. $ kafka-ingest topic=test format=bytes
  16. jack,jill
  17. goofus,gallant
  18. > CREATE CONNECTION kafka_conn
  19. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  20. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  21. URL '${testdrive.schema-registry-url}'
  22. );
  23. > CREATE CLUSTER src_cluster SIZE '${arg.default-storage-size}';
  24. > CREATE SOURCE src (a, b)
  25. IN CLUSTER src_cluster
  26. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
  27. FORMAT CSV WITH 2 COLUMNS
  28. INCLUDE OFFSET
  29. > CREATE CLUSTER src_materialized_cluster SIZE '${arg.default-storage-size}';
  30. > CREATE SOURCE src_materialized (a, b)
  31. IN CLUSTER src_materialized_cluster
  32. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
  33. FORMAT CSV WITH 2 COLUMNS
  34. INCLUDE OFFSET
  35. > CREATE VIEW v1 AS
  36. SELECT a || b AS c FROM src
  37. > CREATE VIEW v2 AS
  38. SELECT a || b AS c FROM src_materialized
  39. > CREATE MATERIALIZED VIEW v3 AS
  40. SELECT a || b AS c FROM src
  41. # We should refuse to create a sink with invalid WITH options
  42. ! CREATE SINK invalid_with_option
  43. IN CLUSTER ${arg.single-replica-cluster}
  44. FROM src
  45. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  46. WITH (badoption=true)
  47. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  48. contains:Expected one of PARTITION or SNAPSHOT or VERSION
  49. ! CREATE SINK invalid_with_option
  50. IN CLUSTER ${arg.single-replica-cluster}
  51. FROM src
  52. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  53. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  54. WITH (VERSION=1)
  55. contains:CREATE SINK...WITH (VERSION..) is not allowed
  56. > SHOW SINKS
  57. name type cluster comment
  58. -------------------------------------------
  59. # # We should refuse to create a sink with an invalid schema registry URL.
  60. #
  61. # # Invalid in that the address is not well formed
  62. # ! CREATE SINK bad_schema_registry
  63. # IN CLUSTER ${arg.single-replica-cluster}
  64. # FROM v3
  65. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  66. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.kafka-addr}'
  67. # contains:cannot construct a CCSR client with a cannot-be-a-base URL
  68. #
  69. # # Invalid in that the address points to an invalid host
  70. # ! CREATE SINK bad_schema_registry
  71. # IN CLUSTER ${arg.single-replica-cluster}
  72. # FROM v3
  73. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  74. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://no-such-host'
  75. # contains:unable to publish value schema to registry in kafka sink
  76. #
  77. # # Invalid in that the address is not for a schema registry
  78. # ! CREATE SINK bad_schema_registry
  79. # IN CLUSTER ${arg.single-replica-cluster}
  80. # FROM v3
  81. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  82. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://materialized:6875'
  83. # contains:unable to publish value schema to registry in kafka sink
  84. #
  85. # ! CREATE SINK bad_view
  86. # IN CLUSTER ${arg.single-replica-cluster}
  87. # FROM v1
  88. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  89. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  90. # contains:v1 is a view, which cannot be exported as a sink
  91. #
  92. # # ...Even if that view is based on a materialized source
  93. # ! CREATE SINK bad_view2
  94. # IN CLUSTER ${arg.single-replica-cluster}
  95. # FROM v2
  96. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  97. # WITH (retention_ms=1000000)
  98. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  99. # contains:v2 is a view, which cannot be exported as a sink
  100. #
  101. # > SHOW SINKS
  102. # name type size cluster
  103. # ---------------------------------------
  104. # N.B. it is important to test sinks that depend on sources directly vs. sinks
  105. # that depend on views, as the code paths are different.
  106. > CREATE CLUSTER snk1_cluster SIZE '${arg.default-storage-size}';
  107. > CREATE SINK snk1
  108. IN CLUSTER snk1_cluster
  109. FROM src
  110. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  111. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  112. ENVELOPE DEBEZIUM
  113. > CREATE CLUSTER snk2_cluster SIZE '${arg.default-storage-size}';
  114. > CREATE SINK snk2
  115. IN CLUSTER snk2_cluster
  116. FROM src_materialized
  117. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk2-${testdrive.seed}')
  118. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  119. ENVELOPE DEBEZIUM
  120. > CREATE CLUSTER snk3_cluster SIZE '${arg.default-storage-size}';
  121. > CREATE SINK snk3
  122. IN CLUSTER snk3_cluster
  123. FROM v3
  124. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk3-${testdrive.seed}')
  125. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  126. ENVELOPE DEBEZIUM
  127. > SHOW SINKS
  128. name type cluster comment
  129. -----------------------------------------------
  130. snk1 kafka snk1_cluster ""
  131. snk2 kafka snk2_cluster ""
  132. snk3 kafka snk3_cluster ""
  133. $ kafka-verify-data format=avro sink=materialize.public.snk1 sort-messages=true
  134. {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
  135. {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
  136. $ kafka-verify-data format=avro sink=materialize.public.snk2 sort-messages=true
  137. {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
  138. {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
  139. $ kafka-verify-data format=avro sink=materialize.public.snk3 sort-messages=true
  140. {"before": null, "after": {"row":{"c": "goofusgallant"}}}
  141. {"before": null, "after": {"row":{"c": "jackjill"}}}
  142. # Test Avro serialization of unsigned values.
  143. > CREATE MATERIALIZED VIEW unsigned (a, b, c, d, e, f) AS
  144. VALUES ('1'::uint2, '2'::uint2, '3'::uint4, '4'::uint4, '5'::uint8, '6'::uint8)
  145. > CREATE CLUSTER snk_unsigned_cluster SIZE '${arg.default-storage-size}';
  146. > CREATE SINK snk_unsigned
  147. IN CLUSTER snk_unsigned_cluster
  148. FROM unsigned
  149. INTO KAFKA CONNECTION kafka_conn (TOPIC 'snk2')
  150. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  151. ENVELOPE DEBEZIUM
  152. $ kafka-verify-data format=avro sink=materialize.public.snk_unsigned sort-messages=true
  153. {"before": null, "after": {"row":{"a": [0, 1], "b": [0, 2], "c": [0, 0, 0, 3], "d": [0, 0, 0, 4], "e": [0, 0, 0, 0, 0, 0, 0, 5], "f": [0, 0, 0, 0, 0, 0, 0, 6]}}}
  154. # Test the case where we have non +/- 1 multiplicities
  155. > CREATE MATERIALIZED VIEW v4 AS
  156. SELECT true AS c FROM src
  157. > CREATE CLUSTER snk4_cluster SIZE '${arg.default-storage-size}';
  158. > CREATE SINK snk4
  159. IN CLUSTER snk4_cluster
  160. FROM v4
  161. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk4-${testdrive.seed}')
  162. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  163. ENVELOPE DEBEZIUM
  164. $ kafka-verify-data format=avro sink=materialize.public.snk4
  165. {"before": null, "after": {"row":{"c": true}}}
  166. {"before": null, "after": {"row":{"c": true}}}
  167. # Test WITH (SNAPSHOT).
  168. #
  169. # N.B. It's important that we've verified above that a sink exporting
  170. # src_materialized has processed the row. This means the data has a definite
  171. # timestamp. Without that, WITH (SNAPSHOT = false) could correct either include or
  172. # exclude the old rows.
  173. > CREATE CLUSTER snk5_cluster SIZE '${arg.default-storage-size}';
  174. > CREATE SINK snk5
  175. IN CLUSTER snk5_cluster
  176. FROM src_materialized
  177. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk5-${testdrive.seed}')
  178. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  179. ENVELOPE DEBEZIUM
  180. WITH (SNAPSHOT = false)
  181. > CREATE CLUSTER snk6_cluster SIZE '${arg.default-storage-size}';
  182. > CREATE SINK snk6
  183. IN CLUSTER snk6_cluster
  184. FROM src_materialized
  185. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk6-${testdrive.seed}')
  186. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  187. ENVELOPE DEBEZIUM
  188. WITH (SNAPSHOT = true)
  189. $ kafka-ingest topic=test format=bytes
  190. extra,row
  191. $ kafka-verify-data format=avro sink=materialize.public.snk5
  192. {"before": null, "after": {"row":{"a": "extra", "b": "row", "offset": [0, 0, 0, 0, 0, 0, 0, 2]}}}
  193. $ kafka-verify-data format=avro sink=materialize.public.snk6 sort-messages=true
  194. {"before": null, "after": {"row":{"a": "extra", "b": "row", "offset": [0, 0, 0, 0, 0, 0, 0, 2]}}}
  195. {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
  196. {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
  197. # Test that we are correctly handling SNAPSHOT on views with empty upper
  198. # frontier
  199. > CREATE MATERIALIZED VIEW foo AS VALUES (1), (2), (3);
  200. > CREATE CLUSTER snk7_cluster SIZE '${arg.default-storage-size}';
  201. > CREATE SINK snk7
  202. IN CLUSTER snk7_cluster
  203. FROM foo
  204. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk7-${testdrive.seed}')
  205. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  206. ENVELOPE DEBEZIUM
  207. WITH (SNAPSHOT = false)
  208. > CREATE CLUSTER snk8_cluster SIZE '${arg.default-storage-size}';
  209. > CREATE SINK snk8
  210. IN CLUSTER snk8_cluster
  211. FROM foo
  212. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk8-${testdrive.seed}')
  213. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  214. ENVELOPE DEBEZIUM
  215. WITH (SNAPSHOT)
  216. $ kafka-verify-data format=avro sink=materialize.public.snk8 sort-messages=true
  217. {"before": null, "after": {"row":{"column1": 1}}}
  218. {"before": null, "after": {"row":{"column1": 2}}}
  219. {"before": null, "after": {"row":{"column1": 3}}}
  220. # test already existing topic with non-default partition count
  221. $ kafka-create-topic topic=snk9 partitions=4
  222. > CREATE CLUSTER snk9_cluster SIZE '${arg.default-storage-size}';
  223. > CREATE SINK snk9
  224. IN CLUSTER snk9_cluster
  225. FROM foo
  226. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk14-${testdrive.seed}')
  227. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  228. ENVELOPE DEBEZIUM
  229. > SET cluster TO ${arg.single-replica-cluster}
  230. # create sink without specifying CLUSTER
  231. > CREATE SINK default_cluster_sink
  232. FROM src
  233. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  234. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  235. ENVELOPE DEBEZIUM
  236. > SET cluster TO default
  237. # linked clusters totally deprecated
  238. ! CREATE SINK sink_with_size FROM src
  239. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  240. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  241. ENVELOPE DEBEZIUM
  242. WITH (SIZE = '2')
  243. contains:Expected one of PARTITION or SNAPSHOT or VERSION, found SIZE
  244. # create sink with SNAPSHOT set
  245. > CREATE CLUSTER sink_with_options_cluster SIZE '${arg.default-storage-size}';
  246. > CREATE SINK sink_with_options
  247. IN CLUSTER sink_with_options_cluster
  248. FROM src
  249. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  250. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  251. ENVELOPE DEBEZIUM
  252. WITH (SNAPSHOT = false)
  253. > CREATE CLUSTER c SIZE '4'
  254. > CREATE SINK cluster_c_sink
  255. IN CLUSTER c
  256. FROM src
  257. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
  258. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  259. ENVELOPE DEBEZIUM
  260. # All sinks are unlinked
  261. > SELECT bool_and(size IS NULL) FROM mz_sinks;
  262. true
  263. # Check SHOW SINKS
  264. > SHOW SINKS
  265. name type cluster comment
  266. -------------------------------------------------------------------
  267. cluster_c_sink kafka c ""
  268. default_cluster_sink kafka ${arg.single-replica-cluster} ""
  269. sink_with_options kafka sink_with_options_cluster ""
  270. snk1 kafka snk1_cluster ""
  271. snk2 kafka snk2_cluster ""
  272. snk3 kafka snk3_cluster ""
  273. snk4 kafka snk4_cluster ""
  274. snk5 kafka snk5_cluster ""
  275. snk6 kafka snk6_cluster ""
  276. snk7 kafka snk7_cluster ""
  277. snk8 kafka snk8_cluster ""
  278. snk9 kafka snk9_cluster ""
  279. snk_unsigned kafka snk_unsigned_cluster ""