kafka-avro-sinks-defaults.td 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. # Test Avro UPSERT sinks with null defaults
  12. # sinking directly from an UPSERT source with multi-part key
  13. $ set upsert-keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key1", "type": "string"},
  18. {"name": "key2", "type": ["null", "long"]}
  19. ]
  20. }
  21. $ set upsert-schema={
  22. "type" : "record",
  23. "name" : "test",
  24. "fields" : [
  25. {"name":"f1", "type":["null", "string"]},
  26. {"name":"f2", "type":["long", "null"]},
  27. {"name":"f3", "type":["long", "string"]}
  28. ]
  29. }
  30. $ kafka-create-topic topic=upsert-avro
  31. $ kafka-ingest format=avro topic=upsert-avro key-format=avro key-schema=${upsert-keyschema} schema=${upsert-schema}
  32. {"key1": "fish", "key2": {"long": 2}} {"f1": {"string": "fish"}, "f2": {"long": 1000}, "f3": {"long": 1}}
  33. {"key1": "fisch", "key2": {"long": 42}} {"f1": null, "f2": {"long": 1000}, "f3": {"string": "hello"}}
  34. > CREATE CONNECTION kafka_conn
  35. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  36. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  37. URL '${testdrive.schema-registry-url}'
  38. );
  39. > CREATE CLUSTER upsert_input_cluster SIZE '${arg.default-storage-size}';
  40. > CREATE SOURCE upsert_input
  41. IN CLUSTER upsert_input_cluster
  42. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-avro-${testdrive.seed}')
  43. > CREATE TABLE upsert_input_tbl FROM SOURCE upsert_input (REFERENCE "testdrive-upsert-avro-${testdrive.seed}")
  44. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  45. ENVELOPE UPSERT
  46. # we split avro unions into separate columns
  47. > SELECT * FROM upsert_input_tbl;
  48. fisch 42 <null> 1000 <null> hello
  49. fish 2 fish 1000 1 <null>
  50. # Checking all combination of NULL DEFAULTS with and without values
  51. > CREATE CLUSTER upsert_input_sink1_cluster SIZE '${arg.default-storage-size}';
  52. > CREATE SINK upsert_input_sink1
  53. IN CLUSTER upsert_input_sink1_cluster
  54. FROM upsert_input_tbl
  55. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink1-${testdrive.seed}')
  56. KEY (key1, key2)
  57. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  58. (
  59. NULL DEFAULTS = TRUE
  60. )
  61. ENVELOPE UPSERT
  62. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink1-${testdrive.seed}-value
  63. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"],"default":null},{"name":"f1","type":["null","string"],"default":null},{"name":"f2","type":["null","long"],"default":null},{"name":"f31","type":["null","long"],"default":null},{"name":"f32","type":["null","string"],"default":null}]}
  64. > CREATE CLUSTER upsert_input_sink2_cluster SIZE '${arg.default-storage-size}';
  65. > CREATE SINK upsert_input_sink2
  66. IN CLUSTER upsert_input_sink2_cluster
  67. FROM upsert_input_tbl
  68. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink2-${testdrive.seed}')
  69. KEY (key1, key2)
  70. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  71. (
  72. NULL DEFAULTS
  73. )
  74. ENVELOPE UPSERT
  75. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink2-${testdrive.seed}-value
  76. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"],"default":null},{"name":"f1","type":["null","string"],"default":null},{"name":"f2","type":["null","long"],"default":null},{"name":"f31","type":["null","long"],"default":null},{"name":"f32","type":["null","string"],"default":null}]}
  77. > CREATE CLUSTER upsert_input_sink3_cluster SIZE '${arg.default-storage-size}';
  78. > CREATE SINK upsert_input_sink3
  79. IN CLUSTER upsert_input_sink3_cluster
  80. FROM upsert_input_tbl
  81. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink3-${testdrive.seed}')
  82. KEY (key1, key2)
  83. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  84. (
  85. NULL DEFAULTS = FALSE
  86. )
  87. ENVELOPE UPSERT
  88. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink3-${testdrive.seed}-value
  89. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"]},{"name":"f1","type":["null","string"]},{"name":"f2","type":["null","long"]},{"name":"f31","type":["null","long"]},{"name":"f32","type":["null","string"]}]}
  90. > CREATE CLUSTER upsert_input_sink4_cluster SIZE '${arg.default-storage-size}';
  91. > CREATE SINK upsert_input_sink4
  92. IN CLUSTER upsert_input_sink4_cluster
  93. FROM upsert_input_tbl
  94. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink4-${testdrive.seed}')
  95. KEY (key1, key2)
  96. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  97. ENVELOPE UPSERT
  98. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink4-${testdrive.seed}-value
  99. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"]},{"name":"f1","type":["null","string"]},{"name":"f2","type":["null","long"]},{"name":"f31","type":["null","long"]},{"name":"f32","type":["null","string"]}]}
  100. # Different types of columns
  101. > CREATE TYPE point AS (x integer, y integer);
  102. > CREATE TYPE custom_map AS MAP (KEY TYPE = text, VALUE TYPE = bool)
  103. > CREATE TABLE t (c1 point, c2 text NOT NULL, c3 custom_map, c4 point list);
  104. > INSERT INTO t SELECT ROW(1, 1)::point AS c1, 'text' AS c2, '{a=>true}'::custom_map as c3, LIST[ROW(1, 1)::point] as c4;
  105. > CREATE MATERIALIZED VIEW v AS SELECT * from t;
  106. > CREATE CLUSTER sink1_cluster SIZE '${arg.default-storage-size}';
  107. > CREATE SINK sink1
  108. IN CLUSTER sink1_cluster
  109. FROM v
  110. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  111. KEY (c2) NOT ENFORCED
  112. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  113. (
  114. NULL DEFAULTS
  115. )
  116. ENVELOPE UPSERT
  117. $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.seed}-value
  118. {"type":"record","name":"envelope","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"],"default":null},{"name":"y","type":["null","int"],"default":null}]}],"default":null},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"default":null},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"],"default":null},{"name":"y","type":["null","int"],"default":null}]}]}],"default":null}]}
  119. > CREATE CLUSTER sink2_cluster SIZE '${arg.default-storage-size}';
  120. > CREATE SINK sink2
  121. IN CLUSTER sink2_cluster
  122. FROM v
  123. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink2-${testdrive.seed}')
  124. KEY (c2) NOT ENFORCED
  125. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  126. (
  127. NULL DEFAULTS = FALSE
  128. )
  129. ENVELOPE UPSERT
  130. $ schema-registry-verify schema-type=avro subject=testdrive-sink2-${testdrive.seed}-value
  131. {"type":"record","name":"envelope","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}]},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}]}]}
  132. # errors
  133. ! CREATE SINK bad_sink
  134. IN CLUSTER ${arg.single-replica-cluster}
  135. FROM v
  136. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  137. KEY (c2) NOT ENFORCED
  138. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  139. (
  140. NULL DEFAULTS = "some_value"
  141. )
  142. ENVELOPE UPSERT
  143. contains: invalid NULL DEFAULTS option value: cannot use value as boolean
  144. ! CREATE SINK bad_sink
  145. IN CLUSTER ${arg.single-replica-cluster}
  146. FROM v
  147. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  148. KEY (c2) NOT ENFORCED
  149. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  150. (
  151. NULL DEFAULTS = ""
  152. )
  153. ENVELOPE UPSERT
  154. contains: Expected option value, found identifier ""
  155. ! CREATE SINK bad_sink
  156. IN CLUSTER ${arg.single-replica-cluster}
  157. FROM v
  158. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  159. KEY (c2) NOT ENFORCED
  160. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  161. (
  162. NULL DEFAULTS = NULL
  163. )
  164. ENVELOPE UPSERT
  165. contains: invalid NULL DEFAULTS option value: cannot use value as boolean
  166. ! CREATE SINK bad_sink
  167. IN CLUSTER ${arg.single-replica-cluster}
  168. FROM v
  169. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  170. KEY (c2) NOT ENFORCED
  171. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  172. (
  173. NULL DEFAULTS,
  174. NULL DEFAULTS = TRUE
  175. )
  176. ENVELOPE UPSERT
  177. contains: NULL DEFAULTS specified more than once