kafka-avro-sinks.td 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. # Test Avro Sinks in general. This tests things that are not specific to an
  12. # envelope. Mostly that we can correctly encode various data types and how we
  13. # determine field names. This uses ENVELOPE DEBEZIUM implicitly but the tested
  14. # behavior is not specific to DEBEZIUM sinks.
  15. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  16. ALTER SYSTEM SET max_clusters = 20
  17. # Test that we invent field names for unnamed columns.
  18. > CREATE CONNECTION kafka_conn
  19. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  20. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  21. URL '${testdrive.schema-registry-url}'
  22. );
  23. # Test interval type.
  24. > CREATE MATERIALIZED VIEW interval_data (interval) AS VALUES
  25. (INTERVAL '0s'),
  26. (INTERVAL '1month 1day 1us'),
  27. (INTERVAL '-1month -1day -1us'),
  28. (INTERVAL '-178000000 years'),
  29. (INTERVAL '178000000 years')
  30. > CREATE CLUSTER interval_data_sink_cluster SIZE '${arg.default-storage-size}';
  31. > CREATE SINK interval_data_sink
  32. IN CLUSTER interval_data_sink_cluster
  33. FROM interval_data
  34. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-interval-data-sink-${testdrive.seed}')
  35. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  36. ENVELOPE DEBEZIUM
  37. $ kafka-verify-data format=avro sink=materialize.public.interval_data_sink sort-messages=true
  38. {"before": null, "after": {"row": {"interval": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}}
  39. {"before": null, "after": {"row": {"interval": [0, 198, 80, 127, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}}
  40. {"before": null, "after": {"row": {"interval": [0, 58, 175, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}}
  41. {"before": null, "after": {"row": {"interval": [1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]}}}
  42. {"before": null, "after": {"row": {"interval": [255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255]}}}
  43. # See database-issues#2957
  44. #> CREATE MATERIALIZED VIEW unnamed_cols AS SELECT 1, 2 AS b, 3;
  45. #
  46. #> CREATE SINK unnamed_cols_sink
  47. # IN CLUSTER ${arg.single-replica-cluster}
  48. # FROM unnamed_cols
  49. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-unnamed-cols-sink-${testdrive.seed}')
  50. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  51. #
  52. #$ kafka-verify-data format=avro sink=materialize.public.unnamed_cols_sink
  53. #{"before": null, "after": {"row": {"column1": 1, "b": 2, "column3": 3}}}
  54. # Test that invented field names do not clash with named columns.
  55. # See database-issues#2957
  56. #> CREATE MATERIALIZED VIEW clashing_cols AS SELECT 1, 2 AS column1, 3 as b, 4 as b2, 5 as b3;
  57. #
  58. #> CREATE SINK clashing_cols_sink
  59. # IN CLUSTER ${arg.single-replica-cluster}
  60. # FROM clashing_cols
  61. # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-clashing-cols-sink-${testdrive.seed}')
  62. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  63. #
  64. #$ kafka-verify-data format=avro sink=materialize.public.clashing_cols_sink
  65. #{"before": null, "after": {"row": {"column1": 1, "column1_1": 2, "b": 3, "b2": 4, "b3": 5}}}
  66. # Test date/time types.
  67. > CREATE MATERIALIZED VIEW datetime_data (date, ts, ts_tz) AS VALUES
  68. (DATE '2000-01-01', TIMESTAMP '2000-01-01 10:10:10.111', TIMESTAMPTZ '2000-01-01 10:10:10.111+02'),
  69. (DATE '2000-02-01', TIMESTAMP '2000-02-01 10:10:10.111', TIMESTAMPTZ '2000-02-01 10:10:10.111+02'),
  70. (('0001-01-01'::DATE - '1721389days'::INTERVAL)::DATE, ('0001-01-01'::DATE - '1721389days'::INTERVAL)::TIMESTAMP, ('0001-01-01'::DATE - '1721389days'::INTERVAL)::TIMESTAMPTZ),
  71. (('0001-01-01'::DATE + '262141years 11months 30days'::INTERVAL)::DATE, ('0001-01-01'::DATE + '262141years 11months 30days'::INTERVAL)::TIMESTAMP, ('0001-01-01'::DATE + '262141years 11months 30days'::INTERVAL)::TIMESTAMPTZ)
  72. > CREATE CLUSTER datetime_data_sink_cluster SIZE '${arg.default-storage-size}';
  73. > CREATE SINK datetime_data_sink
  74. IN CLUSTER datetime_data_sink_cluster
  75. FROM datetime_data
  76. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-datetime-data-sink-${testdrive.seed}')
  77. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  78. ENVELOPE DEBEZIUM
  79. $ kafka-verify-data format=avro sink=materialize.public.datetime_data_sink sort-messages=true
  80. {"before": null, "after": {"row": {"date": -2440551, "ts": -210863606400000000, "ts_tz": -210863606400000000}}}
  81. {"before": null, "after": {"row": {"date": 10957, "ts": 946721410111000, "ts_tz": 946714210111000}}}
  82. {"before": null, "after": {"row": {"date": 10988, "ts": 949399810111000, "ts_tz": 949392610111000}}}
  83. {"before": null, "after": {"row": {"date": 95026236, "ts": 8210266790400000000, "ts_tz": 8210266790400000000}}}
  84. > CREATE MATERIALIZED VIEW time_data (time) AS VALUES (TIME '01:02:03'), (TIME '01:02:04'), (TIME '00:00:00'), (TIME '23:59:59')
  85. > CREATE CLUSTER time_data_sink_cluster SIZE '${arg.default-storage-size}';
  86. > CREATE SINK time_data_sink
  87. IN CLUSTER time_data_sink_cluster
  88. FROM time_data
  89. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-time-data-sink-${testdrive.seed}')
  90. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  91. ENVELOPE DEBEZIUM
  92. $ kafka-verify-data format=avro sink=materialize.public.time_data_sink sort-messages=true
  93. {"before": null, "after": {"row": {"time": 0}}}
  94. {"before": null, "after": {"row": {"time": 3723000000}}}
  95. {"before": null, "after": {"row": {"time": 3724000000}}}
  96. {"before": null, "after": {"row": {"time": 86399000000}}}
  97. # Test jsonb
  98. > CREATE MATERIALIZED VIEW json_data (a, b) AS VALUES ('{"a":1, "b":2}'::jsonb, 2)
  99. # Sinks with JSON columns should not crash - see https://github.com/MaterializeInc/database-issues/issues/1477
  100. > CREATE CLUSTER json_data_sink_cluster SIZE '${arg.default-storage-size}';
  101. > CREATE SINK json_data_sink
  102. IN CLUSTER json_data_sink_cluster
  103. FROM json_data
  104. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-json-data-sink-${testdrive.seed}')
  105. FORMAT AVRO
  106. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  107. ENVELOPE DEBEZIUM
  108. # Test map
  109. > CREATE MATERIALIZED VIEW map_data (map) AS SELECT '{a => 1, b => 2}'::map[text=>int];
  110. > CREATE CLUSTER map_sink_cluster SIZE '${arg.default-storage-size}';
  111. > CREATE SINK map_sink
  112. IN CLUSTER map_sink_cluster
  113. FROM map_data
  114. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-map-sink-${testdrive.seed}')
  115. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  116. ENVELOPE DEBEZIUM
  117. $ kafka-verify-data format=avro sink=materialize.public.map_sink sort-messages=true
  118. {"before": null, "after": {"row": {"map": {"a": {"int": 1}, "b": {"int": 2}}}}}
  119. > CREATE MATERIALIZED VIEW list_data (list) AS SELECT LIST[1, 2];
  120. > CREATE CLUSTER list_sink_cluster SIZE '${arg.default-storage-size}';
  121. > CREATE SINK list_sink
  122. IN CLUSTER list_sink_cluster
  123. FROM list_data
  124. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-list-sink-${testdrive.seed}')
  125. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  126. ENVELOPE DEBEZIUM
  127. $ kafka-verify-data format=avro sink=materialize.public.list_sink sort-messages=true
  128. {"before": null, "after": {"row": {"list": [{"int": 1}, {"int": 2}]}}}
  129. # Test optional namespace for auto-generated value schema
  130. > CREATE MATERIALIZED VIEW namespace_value_data (namespace) AS SELECT 1;
  131. > CREATE CLUSTER namespace_value_sink_cluster SIZE '${arg.default-storage-size}';
  132. > CREATE SINK namespace_value_sink
  133. IN CLUSTER namespace_value_sink_cluster
  134. FROM namespace_value_data
  135. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-namespace-value-sink-${testdrive.seed}')
  136. FORMAT AVRO USING
  137. CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO VALUE FULLNAME = 'abc.def.ghi')
  138. ENVELOPE DEBEZIUM
  139. $ schema-registry-verify schema-type=avro subject=testdrive-namespace-value-sink-${testdrive.seed}-value
  140. {"type":"record","name":"ghi","namespace":"abc.def","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"namespace","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
  141. $ kafka-verify-data format=avro sink=materialize.public.namespace_value_sink sort-messages=true
  142. {"before": null, "after": {"row": {"namespace": 1}}}
  143. # Test optional namespaces for autogenerated key and value schemas
  144. > CREATE MATERIALIZED VIEW namespace_key_value_data (a, b) AS SELECT * FROM (VALUES (1, 2));
  145. > CREATE CLUSTER namespace_key_value_sink_cluster SIZE '${arg.default-storage-size}';
  146. > CREATE SINK namespace_key_value_sink
  147. IN CLUSTER namespace_key_value_sink_cluster
  148. FROM namespace_key_value_data
  149. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-namespace-key-value-sink-${testdrive.seed}')
  150. KEY (b)
  151. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'some.neat.class.foo', AVRO VALUE FULLNAME = 'some.neat.class.bar')
  152. ENVELOPE DEBEZIUM
  153. $ schema-registry-verify schema-type=avro subject=testdrive-namespace-key-value-sink-${testdrive.seed}-key
  154. {"type":"record","name":"foo","namespace":"some.neat.class","fields":[{"name":"b","type":"int"}]}
  155. $ schema-registry-verify schema-type=avro subject=testdrive-namespace-key-value-sink-${testdrive.seed}-value
  156. {"type":"record","name":"bar","namespace":"some.neat.class","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
  157. $ kafka-verify-data format=avro sink=materialize.public.namespace_key_value_sink sort-messages=true
  158. {"b": 2} {"before": null, "after": {"row": {"a": 1, "b": 2}}}
  159. > SELECT key_format, value_format, format FROM mz_sinks WHERE name = 'namespace_key_value_sink'
  160. avro avro avro
  161. # Test setting the compatibility level for both topics
  162. > CREATE MATERIALIZED VIEW compat_level_data (a, b) AS SELECT * FROM (VALUES (1, 2));
  163. > CREATE CLUSTER compat_level_sink_cluster SIZE '${arg.default-storage-size}';
  164. > CREATE SINK compat_level_sink
  165. IN CLUSTER compat_level_sink_cluster
  166. FROM compat_level_data
  167. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-compat-level-sink-${testdrive.seed}')
  168. KEY (b)
  169. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (
  170. KEY COMPATIBILITY LEVEL 'FULL_TRANSITIVE',
  171. VALUE COMPATIBILITY LEVEL 'FORWARD_TRANSITIVE'
  172. )
  173. ENVELOPE DEBEZIUM
  174. $ schema-registry-verify schema-type=avro subject=testdrive-compat-level-sink-${testdrive.seed}-key compatibility-level=FULL_TRANSITIVE
  175. {"type":"record","name":"row","fields":[{"name":"b","type":"int"}]}
  176. $ schema-registry-verify schema-type=avro subject=testdrive-compat-level-sink-${testdrive.seed}-value compatibility-level=FORWARD_TRANSITIVE
  177. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
  178. $ kafka-verify-data format=avro sink=materialize.public.compat_level_sink sort-messages=true
  179. {"b": 2} {"before": null, "after": {"row": {"a": 1, "b": 2}}}
  180. # Test a sink with an Avro value format and Json key format
  181. > CREATE MATERIALIZED VIEW mixed_format_data (a, b) AS SELECT * FROM (VALUES (1, 2));
  182. > CREATE CLUSTER mixed_format_sink_cluster SIZE '${arg.default-storage-size}';
  183. > CREATE SINK mixed_format_sink
  184. IN CLUSTER mixed_format_sink_cluster
  185. FROM mixed_format_data
  186. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-mixed-format-sink-${testdrive.seed}')
  187. KEY (b)
  188. KEY FORMAT JSON
  189. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (
  190. VALUE COMPATIBILITY LEVEL 'FORWARD_TRANSITIVE'
  191. )
  192. ENVELOPE DEBEZIUM
  193. $ schema-registry-verify schema-type=avro subject=testdrive-mixed-format-sink-${testdrive.seed}-value compatibility-level=FORWARD_TRANSITIVE
  194. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
  195. $ kafka-verify-data key-format=json value-format=avro sink=materialize.public.mixed_format_sink sort-messages=true
  196. {"b": 2} {"before": null, "after": {"row": {"a": 1, "b": 2}}}
  197. > SELECT key_format, value_format, format FROM mz_sinks WHERE name = 'mixed_format_sink'
  198. json avro key-json-value-avro
  199. # Test a sink over a view whose column names are not directly usable as
  200. # Avro schema names.
  201. > CREATE MATERIALIZED VIEW tricky_names AS SELECT 1 AS "tricky-name", 1 AS "tricky!name", 1 AS "tricky@name", 1 AS "666";
  202. > CREATE CLUSTER tricky_sink_cluster SIZE '${arg.default-storage-size}';
  203. > CREATE SINK tricky_sink
  204. IN CLUSTER tricky_sink_cluster
  205. FROM tricky_names
  206. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-tricky-sink-${testdrive.seed}')
  207. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  208. ENVELOPE DEBEZIUM
  209. $ schema-registry-verify schema-type=avro subject=testdrive-tricky-sink-${testdrive.seed}-value
  210. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"tricky_name","type":"int"},{"name":"tricky_name1","type":"int"},{"name":"tricky_name2","type":"int"},{"name":"_666","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
  211. # Bad Sinks
  212. ! CREATE SINK bad_sink
  213. IN CLUSTER ${arg.single-replica-cluster}
  214. FROM namespace_key_value_data
  215. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bad-sink-${testdrive.seed}')
  216. KEY (b)
  217. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'bad-name', AVRO VALUE FULLNAME = 'goodname')
  218. ENVELOPE DEBEZIUM
  219. contains:Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: bad-name
  220. ! CREATE SINK bad_sink
  221. IN CLUSTER ${arg.single-replica-cluster}
  222. FROM namespace_key_value_data
  223. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bad-sink-${testdrive.seed}')
  224. KEY (b)
  225. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'goodname', AVRO VALUE FULLNAME = 'bad-name')
  226. ENVELOPE DEBEZIUM
  227. contains:Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: bad-name
  228. > CREATE MATERIALIZED VIEW input (a, b) AS SELECT * FROM (VALUES (1, 2))
  229. ! CREATE SINK bad_sink
  230. IN CLUSTER ${arg.single-replica-cluster}
  231. FROM input
  232. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}') KEY (a, a)
  233. FORMAT AVRO
  234. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  235. ENVELOPE DEBEZIUM
  236. contains:duplicate column referenced in KEY: a
  237. ! CREATE SINK bad_sink
  238. IN CLUSTER ${arg.single-replica-cluster}
  239. FROM input
  240. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}')
  241. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO VALUE FULLNAME = 'some.neat.class.foo', AVRO KEY FULLNAME = 'some.neat.class.bar')
  242. ENVELOPE DEBEZIUM
  243. contains:Cannot specify AVRO KEY FULLNAME without a corresponding KEY field
  244. ! CREATE SINK bad_sink
  245. IN CLUSTER ${arg.single-replica-cluster}
  246. FROM input
  247. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}')
  248. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'some.neat.class.bar')
  249. ENVELOPE DEBEZIUM
  250. contains:Cannot specify AVRO KEY FULLNAME without a corresponding KEY field
  251. ! CREATE SINK bad_sink
  252. IN CLUSTER ${arg.single-replica-cluster}
  253. FROM input
  254. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}') KEY (a)
  255. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'some.neat.class.bar')
  256. ENVELOPE DEBEZIUM
  257. contains:Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names
  258. ! CREATE SINK bad_sink
  259. IN CLUSTER ${arg.single-replica-cluster}
  260. FROM input
  261. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}') KEY (a)
  262. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO VALUE FULLNAME = 'some.neat.class.bar')
  263. ENVELOPE DEBEZIUM
  264. contains:Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names