kafka-include-key-sources.td 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. $ set conflictkeyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "id", "type": "long"}
  18. ]
  19. }
  20. $ set schema={
  21. "name": "row",
  22. "type": "record",
  23. "fields": [
  24. {"name": "id", "type": "long"},
  25. {"name": "b", "type": "long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=avro-data partitions=1
  29. $ kafka-ingest format=avro key-format=avro topic=avro-data key-schema=${conflictkeyschema} schema=${schema} timestamp=1
  30. {"id": 1} {"id": 2, "b": 3}
  31. > CREATE CONNECTION kafka_conn
  32. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  33. > CREATE SOURCE missing_key_format
  34. IN CLUSTER ${arg.single-replica-cluster}
  35. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  36. ! CREATE TABLE missing_key_format_tbl FROM SOURCE missing_key_format (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  37. FORMAT AVRO USING SCHEMA '${schema}'
  38. INCLUDE KEY
  39. contains:INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, got bare FORMAT
  40. ! CREATE TABLE missing_key_format_tbl FROM SOURCE missing_key_format (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  41. FORMAT AVRO USING SCHEMA '${schema}'
  42. INCLUDE KEY AS key_col
  43. contains:INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, got bare FORMAT
  44. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  45. URL '${testdrive.schema-registry-url}'
  46. );
  47. # "Bare" format works when the key format is in a registry
  48. > CREATE CLUSTER bareformatconfluent_cluster SIZE '${arg.default-storage-size}';
  49. > CREATE SOURCE bareformatconfluent
  50. IN CLUSTER bareformatconfluent_cluster
  51. FROM KAFKA CONNECTION kafka_conn (TOPIC
  52. 'testdrive-avro-data-${testdrive.seed}')
  53. > CREATE TABLE bareformatconfluent_tbl FROM SOURCE bareformatconfluent (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  54. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  55. INCLUDE KEY AS named
  56. ENVELOPE UPSERT
  57. > SELECT * from bareformatconfluent_tbl
  58. named id b
  59. ------------------------
  60. 1 2 3
  61. # > CREATE SOURCE avro_data_conflict
  62. # IN CLUSTER ${arg.single-replica-cluster}
  63. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  64. # ! CREATE TABLE avro_data_conflict_tbl
  65. # FROM SOURCE avro_data_conflict (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  66. # KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  67. # VALUE FORMAT AVRO USING SCHEMA '${schema}'
  68. # INCLUDE KEY
  69. # contains: column "id" specified more than once
  70. > CREATE CLUSTER avro_data_explicit_cluster SIZE '${arg.default-storage-size}';
  71. > CREATE SOURCE avro_data_explicit
  72. IN CLUSTER avro_data_explicit_cluster
  73. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  74. > CREATE TABLE avro_data_explicit_tbl (key_id, id, b) FROM SOURCE avro_data_explicit (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  75. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  76. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  77. INCLUDE KEY
  78. > SELECT key_id, id, b FROM avro_data_explicit_tbl
  79. key_id id b
  80. ------------
  81. 1 2 3
  82. > CREATE CLUSTER include_partition_cluster SIZE '${arg.default-storage-size}';
  83. > CREATE SOURCE include_partition
  84. IN CLUSTER include_partition_cluster
  85. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  86. > CREATE TABLE include_partition_tbl FROM SOURCE include_partition (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  87. FORMAT AVRO USING SCHEMA '${schema}'
  88. INCLUDE PARTITION
  89. > SELECT * FROM include_partition_tbl
  90. id b partition
  91. --------------
  92. 2 3 0
  93. > CREATE CLUSTER avro_data_as_cluster SIZE '${arg.default-storage-size}';
  94. > CREATE SOURCE avro_data_as
  95. IN CLUSTER avro_data_as_cluster
  96. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  97. > CREATE TABLE avro_data_as_tbl FROM SOURCE avro_data_as (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  98. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  99. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  100. INCLUDE KEY AS renamed_id
  101. > SELECT * FROM avro_data_as_tbl
  102. renamed_id id b
  103. ------------
  104. 1 2 3
  105. > CREATE CLUSTER avro_avro_data_cluster SIZE '${arg.default-storage-size}';
  106. > CREATE SOURCE avro_avro_data
  107. IN CLUSTER avro_avro_data_cluster
  108. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  109. > CREATE TABLE avro_avro_data_tbl (key_id, id, b) FROM SOURCE avro_avro_data (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  110. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  111. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  112. INCLUDE KEY
  113. > CREATE CLUSTER avro_data_upsert_cluster SIZE '${arg.default-storage-size}';
  114. > CREATE SOURCE avro_data_upsert
  115. IN CLUSTER avro_data_upsert_cluster
  116. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  117. > CREATE TABLE avro_data_upsert_tbl FROM SOURCE avro_data_upsert (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  118. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  119. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  120. INCLUDE KEY AS renamed
  121. ENVELOPE UPSERT
  122. > SELECT * FROM avro_data_upsert_tbl
  123. renamed id b
  124. ------------
  125. 1 2 3
  126. $ set multikeyschema={
  127. "type": "record",
  128. "name": "Key",
  129. "fields": [
  130. {"name": "id", "type": "long"},
  131. {"name": "geo", "type": "string"}
  132. ]
  133. }
  134. $ set noconflictschema={
  135. "name": "row",
  136. "type": "record",
  137. "fields": [
  138. {"name": "a", "type": "long"}
  139. ]
  140. }
  141. $ kafka-create-topic topic=avro-data-record
  142. $ kafka-ingest format=avro key-format=avro topic=avro-data-record key-schema=${multikeyschema} schema=${noconflictschema} timestamp=1
  143. {"id": 1, "geo": "nyc"} {"a": 99}
  144. $ kafka-ingest format=avro topic=avro-data-record schema=${noconflictschema} timestamp=2 omit-key=true
  145. {"a": 88}
  146. > CREATE CLUSTER avro_key_record_flattened_cluster SIZE '${arg.default-storage-size}';
  147. > CREATE SOURCE avro_key_record_flattened
  148. IN CLUSTER avro_key_record_flattened_cluster
  149. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-record-${testdrive.seed}')
  150. > CREATE TABLE avro_key_record_flattened_tbl FROM SOURCE avro_key_record_flattened (REFERENCE "testdrive-avro-data-record-${testdrive.seed}")
  151. KEY FORMAT AVRO USING SCHEMA '${multikeyschema}'
  152. VALUE FORMAT AVRO USING SCHEMA '${noconflictschema}'
  153. INCLUDE KEY
  154. ENVELOPE NONE
  155. > SELECT * FROM avro_key_record_flattened_tbl ORDER BY a ASC
  156. id geo a
  157. ----------------
  158. <null> <null> 88
  159. 1 nyc 99
  160. > CREATE CLUSTER avro_key_record_renamed_cluster SIZE '${arg.default-storage-size}';
  161. > CREATE SOURCE avro_key_record_renamed
  162. IN CLUSTER avro_key_record_renamed_cluster
  163. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-record-${testdrive.seed}')
  164. > CREATE TABLE avro_key_record_renamed_tbl FROM SOURCE avro_key_record_renamed (REFERENCE "testdrive-avro-data-record-${testdrive.seed}")
  165. KEY FORMAT AVRO USING SCHEMA '${multikeyschema}'
  166. VALUE FORMAT AVRO USING SCHEMA '${noconflictschema}'
  167. INCLUDE KEY AS named
  168. ENVELOPE NONE
  169. > SELECT (named).id as named_id, (named).geo as named_geo, a FROM avro_key_record_renamed_tbl ORDER BY a ASC
  170. named_id named_geo a
  171. ---------------------
  172. <null> <null> 88
  173. 1 nyc 99
  174. $ kafka-create-topic topic=avro-dbz partitions=1
  175. > CREATE SOURCE avro_debezium
  176. IN CLUSTER ${arg.single-replica-cluster}
  177. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-dbz-${testdrive.seed}')
  178. ! CREATE TABLE avro_debezium_tbl FROM SOURCE avro_debezium (REFERENCE "testdrive-avro-dbz-${testdrive.seed}")
  179. KEY FORMAT AVRO USING SCHEMA '${noconflictschema}'
  180. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  181. INCLUDE KEY AS named
  182. ENVELOPE debezium
  183. contains:Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys.
  184. # formats: TEXT and REGEX
  185. $ kafka-create-topic topic=textsrc partitions=1
  186. $ kafka-ingest topic=textsrc format=bytes key-format=bytes key-terminator=:
  187. one,1:horse,apple
  188. two,2:bee,honey
  189. :cow,grass
  190. > CREATE CLUSTER textsrc_cluster SIZE '${arg.default-storage-size}';
  191. > CREATE SOURCE textsrc
  192. IN CLUSTER textsrc_cluster
  193. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  194. > CREATE TABLE textsrc_tbl FROM SOURCE textsrc (REFERENCE "testdrive-textsrc-${testdrive.seed}")
  195. KEY FORMAT TEXT
  196. VALUE FORMAT TEXT
  197. INCLUDE KEY
  198. > SELECT * FROM textsrc_tbl
  199. key text
  200. -------------------
  201. one,1 horse,apple
  202. two,2 bee,honey
  203. <null> cow,grass
  204. > CREATE CLUSTER regexvalue_cluster SIZE '${arg.default-storage-size}';
  205. > CREATE SOURCE regexvalue
  206. IN CLUSTER regexvalue_cluster
  207. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  208. > CREATE TABLE regexvalue_tbl
  209. FROM SOURCE regexvalue (REFERENCE "testdrive-textsrc-${testdrive.seed}")
  210. KEY FORMAT TEXT
  211. VALUE FORMAT REGEX '(?P<animal>[^,]+),(?P<food>\w+)'
  212. INCLUDE KEY
  213. > SELECT * FROM regexvalue_tbl
  214. key animal food
  215. --------------------
  216. one,1 horse apple
  217. two,2 bee honey
  218. <null> cow grass
  219. > CREATE CLUSTER regexboth_cluster SIZE '${arg.default-storage-size}';
  220. > CREATE SOURCE regexboth
  221. IN CLUSTER regexboth_cluster
  222. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  223. > CREATE TABLE regexboth_tbl
  224. FROM SOURCE regexboth (REFERENCE "testdrive-textsrc-${testdrive.seed}")
  225. KEY FORMAT REGEX '(?P<id_name>[^,]+),(?P<id>\w+)'
  226. VALUE FORMAT REGEX '(?P<animal>[^,]+),(?P<food>\w+)'
  227. INCLUDE KEY
  228. > SELECT * FROM regexboth_tbl
  229. id_name id animal food
  230. ---------------------------
  231. one 1 horse apple
  232. two 2 bee honey
  233. <null> <null> cow grass
  234. > CREATE CLUSTER regexbothnest_cluster SIZE '${arg.default-storage-size}';
  235. > CREATE SOURCE regexbothnest
  236. IN CLUSTER regexbothnest_cluster
  237. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  238. > CREATE TABLE regexbothnest_tbl
  239. FROM SOURCE regexbothnest (REFERENCE "testdrive-textsrc-${testdrive.seed}")
  240. KEY FORMAT REGEX '(?P<id_name>[^,]+),(?P<id>\w+)'
  241. VALUE FORMAT REGEX '(?P<animal>[^,]+),(?P<food>\w+)'
  242. INCLUDE KEY AS nest
  243. > SELECT (nest).id_name, (nest).id, animal FROM regexbothnest_tbl
  244. id_name id animal
  245. --------------------
  246. <null> <null> cow
  247. one 1 horse
  248. two 2 bee
  249. $ file-append path=test.proto
  250. syntax = "proto3";
  251. message Key {
  252. string id = 1;
  253. }
  254. message KeyComplex {
  255. int32 id1 = 1;
  256. int32 id2 = 2;
  257. }
  258. message Value {
  259. int32 measurement = 1;
  260. }
  261. $ protobuf-compile-descriptors inputs=test.proto output=test.proto set-var=test-schema
  262. $ kafka-create-topic topic=proto partitions=1
  263. $ kafka-ingest topic=proto
  264. key-format=protobuf key-descriptor-file=test.proto key-message=Key
  265. format=protobuf descriptor-file=test.proto message=Value
  266. {"id": "a"} {"measurement": 10}
  267. $ kafka-ingest topic=proto format=protobuf descriptor-file=test.proto message=Value omit-key=true
  268. {"measurement": 11}
  269. > CREATE CLUSTER input_proto_cluster SIZE '${arg.default-storage-size}';
  270. > CREATE SOURCE input_proto
  271. IN CLUSTER input_proto_cluster
  272. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-proto-${testdrive.seed}')
  273. > CREATE TABLE input_proto_tbl FROM SOURCE input_proto (REFERENCE "testdrive-proto-${testdrive.seed}")
  274. KEY FORMAT PROTOBUF MESSAGE '.Key' USING SCHEMA '${test-schema}'
  275. VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
  276. INCLUDE KEY
  277. > SELECT * FROM input_proto_tbl
  278. id measurement
  279. -------------------
  280. a 10
  281. <null> 11
  282. $ kafka-create-topic topic=proto-structured partitions=1
  283. $ kafka-ingest topic=proto-structured
  284. key-format=protobuf key-descriptor-file=test.proto key-message=KeyComplex
  285. format=protobuf descriptor-file=test.proto message=Value
  286. {"id1": 1, "id2": 2} {"measurement": 10}
  287. > CREATE CLUSTER input_proto_structured_cluster SIZE '${arg.default-storage-size}';
  288. > CREATE SOURCE input_proto_structured
  289. IN CLUSTER input_proto_structured_cluster
  290. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-proto-structured-${testdrive.seed}')
  291. > CREATE TABLE input_proto_structured_tbl FROM SOURCE input_proto_structured (REFERENCE "testdrive-proto-structured-${testdrive.seed}")
  292. KEY FORMAT PROTOBUF MESSAGE '.KeyComplex' USING SCHEMA '${test-schema}'
  293. VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
  294. INCLUDE KEY AS key
  295. > SELECT key::text, (key).id1, (key).id2, measurement FROM input_proto_structured_tbl
  296. key id1 id2 measurement
  297. ----------------------------
  298. (1,2) 1 2 10
  299. #
  300. # Regression test for database-issues#6319
  301. #
  302. # For UPSERT sources with INCLUDE KEY, we expect the queries to
  303. # take advantage of the uniqueness propery of the key column.
  304. ? EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM bareformatconfluent_tbl;
  305. Explained Query:
  306. Project (#0)
  307. ReadStorage materialize.public.bareformatconfluent_tbl
  308. Source materialize.public.bareformatconfluent_tbl
  309. Target cluster: quickstart
  310. > CREATE DEFAULT INDEX ON bareformatconfluent_tbl;
  311. ? EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM bareformatconfluent_tbl;
  312. Explained Query (fast path):
  313. Project (#0)
  314. ReadIndex on=materialize.public.bareformatconfluent_tbl bareformatconfluent_tbl_primary_idx=[*** full scan ***]
  315. Used Indexes:
  316. - materialize.public.bareformatconfluent_tbl_primary_idx (*** full scan ***)
  317. Target cluster: quickstart
  318. > CREATE CLUSTER envelope_none_with_key_cluster SIZE '${arg.default-storage-size}';
  319. > CREATE SOURCE envelope_none_with_key
  320. IN CLUSTER envelope_none_with_key_cluster
  321. FROM KAFKA CONNECTION kafka_conn (TOPIC
  322. 'testdrive-avro-data-${testdrive.seed}')
  323. > CREATE TABLE envelope_none_with_key_tbl FROM SOURCE envelope_none_with_key (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  324. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  325. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  326. INCLUDE KEY AS named
  327. ENVELOPE NONE
  328. # For ENVELOPE NONE with INCLUDE KEY, uniqueness is not guaranteed,
  329. # so we expect that query plans will contain an explicit Distinct on
  330. ? EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM envelope_none_with_key_tbl;
  331. Explained Query:
  332. Distinct project=[#0]
  333. Project (#0)
  334. ReadStorage materialize.public.envelope_none_with_key_tbl
  335. Source materialize.public.envelope_none_with_key_tbl
  336. Target cluster: quickstart
  337. > CREATE DEFAULT INDEX ON envelope_none_with_key_tbl;
  338. ? EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM envelope_none_with_key_tbl;
  339. Explained Query:
  340. Distinct project=[#0]
  341. Project (#0)
  342. ReadIndex on=envelope_none_with_key_tbl envelope_none_with_key_tbl_primary_idx=[*** full scan ***]
  343. Used Indexes:
  344. - materialize.public.envelope_none_with_key_tbl_primary_idx (*** full scan ***)
  345. Target cluster: quickstart