kafka-avro-sinks-defaults.td 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. # Test Avro UPSERT sinks with null defaults
  12. # sinking directly from an UPSERT source with multi-part key
  13. $ set upsert-keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key1", "type": "string"},
  18. {"name": "key2", "type": ["null", "long"]}
  19. ]
  20. }
  21. $ set upsert-schema={
  22. "type" : "record",
  23. "name" : "test",
  24. "fields" : [
  25. {"name":"f1", "type":["null", "string"]},
  26. {"name":"f2", "type":["long", "null"]},
  27. {"name":"f3", "type":["long", "string"]}
  28. ]
  29. }
  30. $ kafka-create-topic topic=upsert-avro
  31. $ kafka-ingest format=avro topic=upsert-avro key-format=avro key-schema=${upsert-keyschema} schema=${upsert-schema}
  32. {"key1": "fish", "key2": {"long": 2}} {"f1": {"string": "fish"}, "f2": {"long": 1000}, "f3": {"long": 1}}
  33. {"key1": "fisch", "key2": {"long": 42}} {"f1": null, "f2": {"long": 1000}, "f3": {"string": "hello"}}
  34. > CREATE CONNECTION kafka_conn
  35. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  36. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  37. URL '${testdrive.schema-registry-url}'
  38. );
  39. > CREATE CLUSTER upsert_input_cluster SIZE '${arg.default-storage-size}';
  40. > CREATE SOURCE upsert_input
  41. IN CLUSTER upsert_input_cluster
  42. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-avro-${testdrive.seed}')
  43. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  44. ENVELOPE UPSERT
  45. # we split avro unions into separate columns
  46. > SELECT * FROM upsert_input;
  47. fisch 42 <null> 1000 <null> hello
  48. fish 2 fish 1000 1 <null>
  49. # Checking all combination of NULL DEFAULTS with and without values
  50. > CREATE CLUSTER upsert_input_sink1_cluster SIZE '${arg.default-storage-size}';
  51. > CREATE SINK upsert_input_sink1
  52. IN CLUSTER upsert_input_sink1_cluster
  53. FROM upsert_input
  54. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink1-${testdrive.seed}')
  55. KEY (key1, key2)
  56. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  57. (
  58. NULL DEFAULTS = TRUE
  59. )
  60. ENVELOPE UPSERT
  61. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink1-${testdrive.seed}-value
  62. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"],"default":null},{"name":"f1","type":["null","string"],"default":null},{"name":"f2","type":["null","long"],"default":null},{"name":"f31","type":["null","long"],"default":null},{"name":"f32","type":["null","string"],"default":null}]}
  63. > CREATE CLUSTER upsert_input_sink2_cluster SIZE '${arg.default-storage-size}';
  64. > CREATE SINK upsert_input_sink2
  65. IN CLUSTER upsert_input_sink2_cluster
  66. FROM upsert_input
  67. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink2-${testdrive.seed}')
  68. KEY (key1, key2)
  69. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  70. (
  71. NULL DEFAULTS
  72. )
  73. ENVELOPE UPSERT
  74. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink2-${testdrive.seed}-value
  75. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"],"default":null},{"name":"f1","type":["null","string"],"default":null},{"name":"f2","type":["null","long"],"default":null},{"name":"f31","type":["null","long"],"default":null},{"name":"f32","type":["null","string"],"default":null}]}
  76. > CREATE CLUSTER upsert_input_sink3_cluster SIZE '${arg.default-storage-size}';
  77. > CREATE SINK upsert_input_sink3
  78. IN CLUSTER upsert_input_sink3_cluster
  79. FROM upsert_input
  80. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink3-${testdrive.seed}')
  81. KEY (key1, key2)
  82. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  83. (
  84. NULL DEFAULTS = FALSE
  85. )
  86. ENVELOPE UPSERT
  87. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink3-${testdrive.seed}-value
  88. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"]},{"name":"f1","type":["null","string"]},{"name":"f2","type":["null","long"]},{"name":"f31","type":["null","long"]},{"name":"f32","type":["null","string"]}]}
  89. > CREATE CLUSTER upsert_input_sink4_cluster SIZE '${arg.default-storage-size}';
  90. > CREATE SINK upsert_input_sink4
  91. IN CLUSTER upsert_input_sink4_cluster
  92. FROM upsert_input
  93. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-input-sink4-${testdrive.seed}')
  94. KEY (key1, key2)
  95. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  96. ENVELOPE UPSERT
  97. $ schema-registry-verify schema-type=avro subject=testdrive-upsert-input-sink4-${testdrive.seed}-value
  98. {"type":"record","name":"envelope","fields":[{"name":"key1","type":"string"},{"name":"key2","type":["null","long"]},{"name":"f1","type":["null","string"]},{"name":"f2","type":["null","long"]},{"name":"f31","type":["null","long"]},{"name":"f32","type":["null","string"]}]}
  99. # Different types of columns
  100. > CREATE TYPE point AS (x integer, y integer);
  101. > CREATE TYPE custom_map AS MAP (KEY TYPE = text, VALUE TYPE = bool)
  102. > CREATE TABLE t (c1 point, c2 text NOT NULL, c3 custom_map, c4 point list);
  103. > INSERT INTO t SELECT ROW(1, 1)::point AS c1, 'text' AS c2, '{a=>true}'::custom_map as c3, LIST[ROW(1, 1)::point] as c4;
  104. > CREATE MATERIALIZED VIEW v AS SELECT * from t;
  105. > CREATE CLUSTER sink1_cluster SIZE '${arg.default-storage-size}';
  106. > CREATE SINK sink1
  107. IN CLUSTER sink1_cluster
  108. FROM v
  109. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  110. KEY (c2) NOT ENFORCED
  111. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  112. (
  113. NULL DEFAULTS
  114. )
  115. ENVELOPE UPSERT
  116. $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.seed}-value
  117. {"type":"record","name":"envelope","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"],"default":null},{"name":"y","type":["null","int"],"default":null}]}],"default":null},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}],"default":null},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"],"default":null},{"name":"y","type":["null","int"],"default":null}]}]}],"default":null}]}
  118. > CREATE CLUSTER sink2_cluster SIZE '${arg.default-storage-size}';
  119. > CREATE SINK sink2
  120. IN CLUSTER sink2_cluster
  121. FROM v
  122. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink2-${testdrive.seed}')
  123. KEY (c2) NOT ENFORCED
  124. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  125. (
  126. NULL DEFAULTS = FALSE
  127. )
  128. ENVELOPE UPSERT
  129. $ schema-registry-verify schema-type=avro subject=testdrive-sink2-${testdrive.seed}-value
  130. {"type":"record","name":"envelope","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3","type":["null",{"type":"map","values":["null","boolean"]}]},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","fields":[{"name":"x","type":["null","int"]},{"name":"y","type":["null","int"]}]}]}]}]}
  131. # errors
  132. ! CREATE SINK bad_sink
  133. IN CLUSTER ${arg.single-replica-cluster}
  134. FROM v
  135. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  136. KEY (c2) NOT ENFORCED
  137. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  138. (
  139. NULL DEFAULTS = "some_value"
  140. )
  141. ENVELOPE UPSERT
  142. contains: invalid NULL DEFAULTS option value: cannot use value as boolean
  143. ! CREATE SINK bad_sink
  144. IN CLUSTER ${arg.single-replica-cluster}
  145. FROM v
  146. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  147. KEY (c2) NOT ENFORCED
  148. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  149. (
  150. NULL DEFAULTS = ""
  151. )
  152. ENVELOPE UPSERT
  153. contains: Expected option value, found identifier ""
  154. ! CREATE SINK bad_sink
  155. IN CLUSTER ${arg.single-replica-cluster}
  156. FROM v
  157. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  158. KEY (c2) NOT ENFORCED
  159. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  160. (
  161. NULL DEFAULTS = NULL
  162. )
  163. ENVELOPE UPSERT
  164. contains: invalid NULL DEFAULTS option value: cannot use value as boolean
  165. ! CREATE SINK bad_sink
  166. IN CLUSTER ${arg.single-replica-cluster}
  167. FROM v
  168. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  169. KEY (c2) NOT ENFORCED
  170. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  171. (
  172. NULL DEFAULTS,
  173. NULL DEFAULTS = TRUE
  174. )
  175. ENVELOPE UPSERT
  176. contains: NULL DEFAULTS specified more than once