kafka-avro-sinks-doc-comments.td 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. # Test Avro UPSERT sinks doc comments
  12. > CREATE TYPE point AS (x integer, y integer);
  13. > CREATE TYPE custom_map AS MAP (KEY TYPE = text, VALUE TYPE = bool)
  14. > CREATE TABLE t (c1 point, c2 text NOT NULL, "c3_map[text=>text]" custom_map, c4 point list);
  15. > INSERT INTO t SELECT ROW(1, 1)::point AS c1, 'text' AS c2, '{a=>true}'::custom_map as "c3_map[text=>text]", LIST[ROW(1, 1)::point] as c4;
  16. > COMMENT ON TABLE t IS 'comment on table t with a \\ \';
  17. > COMMENT ON COLUMN t."c3_map[text=>text]" IS 'comment on column t.c3_map with a ''';
  18. > COMMENT ON COLUMN t.c4 IS 'comment on column t.c4 with an äöü';
  19. > COMMENT ON TYPE point IS 'comment on type point';
  20. > COMMENT ON COLUMN point.x IS 'comment on column point.x';
  21. > CREATE CONNECTION kafka_conn
  22. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  23. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  24. URL '${testdrive.schema-registry-url}'
  25. );
  26. > CREATE CLUSTER sink1_cluster SIZE '${arg.default-storage-size}';
  27. > CREATE SINK sink1
  28. IN CLUSTER sink1_cluster
  29. FROM t
  30. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  31. KEY (c2) NOT ENFORCED
  32. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  33. (
  34. DOC ON COLUMN t.c1 = 'doc on t.c1',
  35. VALUE DOC ON COLUMN t.c2 = 'value doc on t.c2',
  36. KEY DOC ON COLUMN t.c2 = 'key doc on t.c2',
  37. DOC ON COLUMN t.c4 = 'doc on t.c4',
  38. KEY DOC ON TYPE point = 'key doc on point',
  39. VALUE DOC ON TYPE point = 'value doc on point',
  40. KEY DOC ON TYPE t = 'key doc on t',
  41. VALUE DOC ON COLUMN point.y = 'value doc on point.y'
  42. )
  43. ENVELOPE UPSERT;
  44. > SHOW CREATE SINK sink1;
  45. name create_sql
  46. -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  47. materialize.public.sink1 "CREATE SINK materialize.public.sink1 IN CLUSTER sink1_cluster FROM materialize.public.t INTO KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = 'testdrive-sink1-${testdrive.seed}') KEY (c2) NOT ENFORCED FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION materialize.public.csr_conn (DOC ON COLUMN materialize.public.t.c1 = 'doc on t.c1', VALUE DOC ON COLUMN materialize.public.t.c2 = 'value doc on t.c2', KEY DOC ON COLUMN materialize.public.t.c2 = 'key doc on t.c2', DOC ON COLUMN materialize.public.t.c4 = 'doc on t.c4', KEY DOC ON TYPE materialize.public.point = 'key doc on point', VALUE DOC ON TYPE materialize.public.point = 'value doc on point', KEY DOC ON TYPE materialize.public.t = 'key doc on t', VALUE DOC ON COLUMN materialize.public.point.y = 'value doc on point.y', DOC ON TYPE materialize.public.point = 'comment on type point', DOC ON COLUMN materialize.public.point.x = 'comment on column point.x', DOC ON TYPE materialize.public.t = 'comment on table t with a \\\\ \\', DOC ON COLUMN materialize.public.t.\"c3_map[text=>text]\" = 'comment on column t.c3_map with a ''') ENVELOPE UPSERT;"
  48. $ unset-regex
  49. $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.seed}-value
  50. {"type":"record","name":"envelope","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"value doc on point","fields":[{"name":"x","type":["null","int"],"doc":"comment on column point.x"},{"name":"y","type":["null","int"],"doc":"value doc on point.y"}]}],"doc":"doc on t.c1"},{"name":"c2","type":"string","doc":"value doc on t.c2"},{"name":"c3_map_text__text_","type":["null",{"type":"map","values":["null","boolean"]}]},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"value doc on point","fields":[{"name":"x","type":["null","int"],"doc":"comment on column point.x"},{"name":"y","type":["null","int"],"doc":"value doc on point.y"}]}]}],"doc":"doc on t.c4"}]}
  51. $ schema-registry-verify schema-type=avro subject=testdrive-sink1-${testdrive.seed}-key
  52. {"type":"record","name":"row","doc":"key doc on t","fields":[{"name":"c2","type":"string","doc":"key doc on t.c2"}]}
  53. > CREATE CLUSTER sink2_cluster SIZE '${arg.default-storage-size}';
  54. > CREATE SINK sink2
  55. IN CLUSTER sink2_cluster
  56. FROM t
  57. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink2-${testdrive.seed}')
  58. KEY (c2) NOT ENFORCED
  59. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  60. ENVELOPE UPSERT;
  61. $ schema-registry-verify schema-type=avro subject=testdrive-sink2-${testdrive.seed}-value
  62. {"type":"record","name":"envelope","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"],"doc":"comment on column point.x"},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string"},{"name":"c3_map_text__text_","type":["null",{"type":"map","values":["null","boolean"]}]},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"],"doc":"comment on column point.x"},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]}
  63. > CREATE CLUSTER sink3_cluster SIZE '${arg.default-storage-size}';
  64. > CREATE SINK sink3
  65. IN CLUSTER sink3_cluster
  66. FROM t
  67. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
  68. KEY (c2) NOT ENFORCED
  69. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  70. (
  71. DOC ON COLUMN t.c2 = 'doc on t.c2'
  72. )
  73. ENVELOPE DEBEZIUM;
  74. $ schema-registry-verify schema-type=avro subject=testdrive-sink3-${testdrive.seed}-value
  75. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c1","type":["null",{"type":"record","name":"record0","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"],"doc":"comment on column point.x"},{"name":"y","type":["null","int"]}]}]},{"name":"c2","type":"string","doc":"doc on t.c2"},{"name":"c3_map_text__text_","type":["null",{"type":"map","values":["null","boolean"]}]},{"name":"c4","type":["null",{"type":"array","items":["null",{"type":"record","name":"record1","namespace":"com.materialize.sink","doc":"comment on type point","fields":[{"name":"x","type":["null","int"],"doc":"comment on column point.x"},{"name":"y","type":["null","int"]}]}]}],"doc":"comment on column t.c4 with an äöü"}]}]},{"name":"after","type":["null","row"]}]}
  76. $ schema-registry-verify schema-type=avro subject=testdrive-sink3-${testdrive.seed}-key
  77. {"type":"record","name":"row","doc":"comment on table t with a \\\\ \\","fields":[{"name":"c2","type":"string","doc":"doc on t.c2"}]}
  78. # Explain schema. Note that we intentionally use a sink name that already exists
  79. # to ensure that `EXPLAIN SCHEMA` doesn't complain if the specified sink name is
  80. # already in use.
  81. > EXPLAIN VALUE SCHEMA FOR CREATE SINK sink1
  82. IN CLUSTER ${arg.single-replica-cluster}
  83. FROM t
  84. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
  85. KEY (c2) NOT ENFORCED
  86. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  87. (
  88. NULL DEFAULTS = TRUE
  89. )
  90. ENVELOPE DEBEZIUM;
  91. "{\n \"type\": \"record\",\n \"name\": \"envelope\",\n \"fields\": [\n {\n \"name\": \"before\",\n \"type\": [\n \"null\",\n {\n \"type\": \"record\",\n \"name\": \"row\",\n \"doc\": \"comment on table t with a \\\\\\\\ \\\\\",\n \"fields\": [\n {\n \"name\": \"c1\",\n \"type\": [\n \"null\",\n {\n \"type\": \"record\",\n \"name\": \"record0\",\n \"namespace\": \"com.materialize.sink\",\n \"doc\": \"comment on type point\",\n \"fields\": [\n {\n \"name\": \"x\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null,\n \"doc\": \"comment on column point.x\"\n },\n {\n \"name\": \"y\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null\n }\n ]\n }\n ],\n \"default\": null\n },\n {\n \"name\": \"c2\",\n \"type\": \"string\"\n },\n {\n \"name\": \"c3_map_text__text_\",\n \"type\": [\n \"null\",\n {\n \"type\": \"map\",\n \"values\": [\n \"null\",\n \"boolean\"\n ]\n }\n ],\n \"default\": null\n },\n {\n \"name\": \"c4\",\n \"type\": [\n \"null\",\n {\n \"type\": \"array\",\n \"items\": [\n \"null\",\n {\n \"type\": \"record\",\n \"name\": \"record1\",\n \"namespace\": \"com.materialize.sink\",\n \"doc\": \"comment on type point\",\n \"fields\": [\n {\n \"name\": \"x\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null,\n \"doc\": \"comment on column point.x\"\n },\n {\n \"name\": \"y\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null\n }\n ]\n }\n ]\n }\n ],\n \"default\": null,\n \"doc\": \"comment on column t.c4 with an äöü\"\n }\n ]\n }\n ],\n \"default\": null\n },\n {\n \"name\": \"after\",\n \"type\": [\n \"null\",\n \"row\"\n ],\n \"default\": null\n }\n ]\n}"
  92. > EXPLAIN KEY SCHEMA AS JSON FOR CREATE SINK sink1
  93. IN CLUSTER ${arg.single-replica-cluster}
  94. FROM t
  95. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
  96. KEY (c2) NOT ENFORCED
  97. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  98. ENVELOPE DEBEZIUM;
  99. "{\n \"type\": \"record\",\n \"name\": \"row\",\n \"doc\": \"comment on table t with a \\\\\\\\ \\\\\",\n \"fields\": [\n {\n \"name\": \"c2\",\n \"type\": \"string\"\n }\n ]\n}"
  100. # Updated comment is reflected right away in explain schema
  101. > COMMENT ON COLUMN t.c2 IS 'comment on t.c2';
  102. > EXPLAIN VALUE SCHEMA FOR CREATE SINK sink1
  103. IN CLUSTER ${arg.single-replica-cluster}
  104. FROM t
  105. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
  106. KEY (c2) NOT ENFORCED
  107. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  108. (
  109. NULL DEFAULTS = TRUE,
  110. VALUE DOC ON TYPE t = 'body of the upsert',
  111. KEY DOC ON TYPE t = 'key of the upsert'
  112. )
  113. ENVELOPE UPSERT;
  114. "{\n \"type\": \"record\",\n \"name\": \"envelope\",\n \"doc\": \"body of the upsert\",\n \"fields\": [\n {\n \"name\": \"c1\",\n \"type\": [\n \"null\",\n {\n \"type\": \"record\",\n \"name\": \"record0\",\n \"namespace\": \"com.materialize.sink\",\n \"doc\": \"comment on type point\",\n \"fields\": [\n {\n \"name\": \"x\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null,\n \"doc\": \"comment on column point.x\"\n },\n {\n \"name\": \"y\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null\n }\n ]\n }\n ],\n \"default\": null\n },\n {\n \"name\": \"c2\",\n \"type\": \"string\",\n \"doc\": \"comment on t.c2\"\n },\n {\n \"name\": \"c3_map_text__text_\",\n \"type\": [\n \"null\",\n {\n \"type\": \"map\",\n \"values\": [\n \"null\",\n \"boolean\"\n ]\n }\n ],\n \"default\": null\n },\n {\n \"name\": \"c4\",\n \"type\": [\n \"null\",\n {\n \"type\": \"array\",\n \"items\": [\n \"null\",\n {\n \"type\": \"record\",\n \"name\": \"record1\",\n \"namespace\": \"com.materialize.sink\",\n \"doc\": \"comment on type point\",\n \"fields\": [\n {\n \"name\": \"x\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null,\n \"doc\": \"comment on column point.x\"\n },\n {\n \"name\": \"y\",\n \"type\": [\n \"null\",\n \"int\"\n ],\n \"default\": null\n }\n ]\n }\n ]\n }\n ],\n \"default\": null,\n \"doc\": \"comment on column t.c4 with an äöü\"\n }\n ]\n}"
  115. > EXPLAIN KEY SCHEMA FOR CREATE SINK sink1
  116. IN CLUSTER ${arg.single-replica-cluster}
  117. FROM t
  118. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
  119. KEY (c2) NOT ENFORCED
  120. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  121. (
  122. NULL DEFAULTS = TRUE,
  123. VALUE DOC ON TYPE t = 'body of the upsert',
  124. KEY DOC ON TYPE t = 'key of the upsert'
  125. )
  126. ENVELOPE UPSERT;
  127. "{\n \"type\": \"record\",\n \"name\": \"row\",\n \"doc\": \"key of the upsert\",\n \"fields\": [\n {\n \"name\": \"c2\",\n \"type\": \"string\",\n \"doc\": \"comment on t.c2\"\n }\n ]\n}"
  128. # Works without sink name
  129. > EXPLAIN KEY SCHEMA FOR CREATE SINK
  130. IN CLUSTER ${arg.single-replica-cluster}
  131. FROM t
  132. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink3-${testdrive.seed}')
  133. KEY (c2) NOT ENFORCED
  134. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  135. (
  136. NULL DEFAULTS = TRUE,
  137. VALUE DOC ON TYPE t = 'body of the upsert',
  138. KEY DOC ON TYPE t = 'key of the upsert'
  139. )
  140. ENVELOPE UPSERT;
  141. "{\n \"type\": \"record\",\n \"name\": \"row\",\n \"doc\": \"key of the upsert\",\n \"fields\": [\n {\n \"name\": \"c2\",\n \"type\": \"string\",\n \"doc\": \"comment on t.c2\"\n }\n ]\n}"
  142. # errors
  143. ! CREATE SINK bad_sink
  144. IN CLUSTER ${arg.single-replica-cluster}
  145. FROM t
  146. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  147. KEY (c2) NOT ENFORCED
  148. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  149. (
  150. DOC ON COLUMN = 'comments'
  151. )
  152. ENVELOPE UPSERT
  153. contains: Expected identifier, found equals sign
  154. ! CREATE SINK bad_sink
  155. IN CLUSTER ${arg.single-replica-cluster}
  156. FROM t
  157. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  158. KEY (c2) NOT ENFORCED
  159. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  160. (
  161. DOC ON COLUMN t.bad_column = 'comments'
  162. )
  163. ENVELOPE UPSERT
  164. contains: column "t.bad_column" does not exist
  165. # Fails if name is not provided
  166. ! CREATE SINK FROM t
  167. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  168. KEY (c2) NOT ENFORCED
  169. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  170. ENVELOPE UPSERT
  171. contains: unspecified name for sink
  172. ! CREATE SINK bad_sink
  173. IN CLUSTER ${arg.single-replica-cluster}
  174. FROM t
  175. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  176. KEY (c2) NOT ENFORCED
  177. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  178. (
  179. DOC ON TYPE bad_table_name = 'comments'
  180. )
  181. ENVELOPE UPSERT
  182. contains: unknown catalog item 'bad_table_name'
  183. ! CREATE SINK bad_sink
  184. IN CLUSTER ${arg.single-replica-cluster}
  185. FROM t
  186. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  187. KEY (c2) NOT ENFORCED
  188. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  189. (
  190. DOC ON TYPE t
  191. )
  192. ENVELOPE UPSERT
  193. contains: option value: cannot be empty
  194. ! CREATE SINK bad_sink
  195. IN CLUSTER ${arg.single-replica-cluster}
  196. FROM t
  197. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  198. KEY (c2) NOT ENFORCED
  199. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  200. (
  201. DOC ON COLUMN t = 'comments'
  202. )
  203. ENVELOPE UPSERT
  204. contains: need to specify an object and a column
  205. ! CREATE SINK bad_sink
  206. IN CLUSTER ${arg.single-replica-cluster}
  207. FROM t
  208. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  209. KEY (c2) NOT ENFORCED
  210. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  211. (
  212. DOC ON COLUMN t.c1 = NULL
  213. )
  214. ENVELOPE UPSERT;
  215. contains: cannot use value as string