kafka-include-key-sources.td 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. $ set conflictkeyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "id", "type": "long"}
  18. ]
  19. }
  20. $ set schema={
  21. "name": "row",
  22. "type": "record",
  23. "fields": [
  24. {"name": "id", "type": "long"},
  25. {"name": "b", "type": "long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=avro-data partitions=1
  29. $ kafka-ingest format=avro key-format=avro topic=avro-data key-schema=${conflictkeyschema} schema=${schema} timestamp=1
  30. {"id": 1} {"id": 2, "b": 3}
  31. > CREATE CONNECTION kafka_conn
  32. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  33. ! CREATE SOURCE missing_key_format
  34. IN CLUSTER ${arg.single-replica-cluster}
  35. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  36. FORMAT AVRO USING SCHEMA '${schema}'
  37. INCLUDE KEY
  38. contains:INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, got bare FORMAT
  39. ! CREATE SOURCE missing_key_format
  40. IN CLUSTER ${arg.single-replica-cluster}
  41. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  42. FORMAT AVRO USING SCHEMA '${schema}'
  43. INCLUDE KEY AS key_col
  44. contains:INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, got bare FORMAT
  45. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  46. URL '${testdrive.schema-registry-url}'
  47. );
  48. # "Bare" format works when the key format is in a registry
  49. > CREATE CLUSTER bareformatconfluent_cluster SIZE '${arg.default-storage-size}';
  50. > CREATE SOURCE bareformatconfluent
  51. IN CLUSTER bareformatconfluent_cluster
  52. FROM KAFKA CONNECTION kafka_conn (TOPIC
  53. 'testdrive-avro-data-${testdrive.seed}')
  54. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  55. INCLUDE KEY AS named
  56. ENVELOPE UPSERT
  57. > SELECT * from bareformatconfluent
  58. named id b
  59. ------------------------
  60. 1 2 3
  61. ! CREATE SOURCE avro_data_conflict
  62. IN CLUSTER ${arg.single-replica-cluster}
  63. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  64. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  65. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  66. INCLUDE KEY
  67. contains: column "id" specified more than once
  68. > CREATE CLUSTER avro_data_explicit_cluster SIZE '${arg.default-storage-size}';
  69. > CREATE SOURCE avro_data_explicit (key_id, id, b)
  70. IN CLUSTER avro_data_explicit_cluster
  71. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  72. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  73. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  74. INCLUDE KEY
  75. > SELECT key_id, id, b FROM avro_data_explicit
  76. key_id id b
  77. ------------
  78. 1 2 3
  79. > CREATE CLUSTER include_partition_cluster SIZE '${arg.default-storage-size}';
  80. > CREATE SOURCE include_partition
  81. IN CLUSTER include_partition_cluster
  82. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  83. FORMAT AVRO USING SCHEMA '${schema}'
  84. INCLUDE PARTITION
  85. > SELECT * FROM include_partition
  86. id b partition
  87. --------------
  88. 2 3 0
  89. > CREATE CLUSTER avro_data_as_cluster SIZE '${arg.default-storage-size}';
  90. > CREATE SOURCE avro_data_as
  91. IN CLUSTER avro_data_as_cluster
  92. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  93. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  94. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  95. INCLUDE KEY AS renamed_id
  96. > SELECT * FROM avro_data_as
  97. renamed_id id b
  98. ------------
  99. 1 2 3
  100. > CREATE CLUSTER avro_avro_data_cluster SIZE '${arg.default-storage-size}';
  101. > CREATE SOURCE avro_avro_data (key_id, id, b)
  102. IN CLUSTER avro_avro_data_cluster
  103. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  104. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  105. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  106. INCLUDE KEY
  107. > CREATE CLUSTER avro_data_upsert_cluster SIZE '${arg.default-storage-size}';
  108. > CREATE SOURCE avro_data_upsert
  109. IN CLUSTER avro_data_upsert_cluster
  110. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  111. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  112. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  113. INCLUDE KEY AS renamed
  114. ENVELOPE UPSERT
  115. > SELECT * FROM avro_data_upsert
  116. renamed id b
  117. ------------
  118. 1 2 3
  119. $ set multikeyschema={
  120. "type": "record",
  121. "name": "Key",
  122. "fields": [
  123. {"name": "id", "type": "long"},
  124. {"name": "geo", "type": "string"}
  125. ]
  126. }
  127. $ set noconflictschema={
  128. "name": "row",
  129. "type": "record",
  130. "fields": [
  131. {"name": "a", "type": "long"}
  132. ]
  133. }
  134. $ kafka-create-topic topic=avro-data-record
  135. $ kafka-ingest format=avro key-format=avro topic=avro-data-record key-schema=${multikeyschema} schema=${noconflictschema} timestamp=1
  136. {"id": 1, "geo": "nyc"} {"a": 99}
  137. $ kafka-ingest format=avro topic=avro-data-record schema=${noconflictschema} timestamp=2 omit-key=true
  138. {"a": 88}
  139. > CREATE CLUSTER avro_key_record_flattened_cluster SIZE '${arg.default-storage-size}';
  140. > CREATE SOURCE avro_key_record_flattened
  141. IN CLUSTER avro_key_record_flattened_cluster
  142. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-record-${testdrive.seed}')
  143. KEY FORMAT AVRO USING SCHEMA '${multikeyschema}'
  144. VALUE FORMAT AVRO USING SCHEMA '${noconflictschema}'
  145. INCLUDE KEY
  146. ENVELOPE NONE
  147. > SELECT * FROM avro_key_record_flattened ORDER BY a ASC
  148. id geo a
  149. ----------------
  150. <null> <null> 88
  151. 1 nyc 99
  152. > CREATE CLUSTER avro_key_record_renamed_cluster SIZE '${arg.default-storage-size}';
  153. > CREATE SOURCE avro_key_record_renamed
  154. IN CLUSTER avro_key_record_renamed_cluster
  155. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-record-${testdrive.seed}')
  156. KEY FORMAT AVRO USING SCHEMA '${multikeyschema}'
  157. VALUE FORMAT AVRO USING SCHEMA '${noconflictschema}'
  158. INCLUDE KEY AS named
  159. ENVELOPE NONE
  160. > SELECT (named).id as named_id, (named).geo as named_geo, a FROM avro_key_record_renamed ORDER BY a ASC
  161. named_id named_geo a
  162. ---------------------
  163. <null> <null> 88
  164. 1 nyc 99
  165. $ kafka-create-topic topic=avro-dbz partitions=1
  166. ! CREATE SOURCE avro_debezium
  167. IN CLUSTER ${arg.single-replica-cluster}
  168. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-dbz-${testdrive.seed}')
  169. KEY FORMAT AVRO USING SCHEMA '${noconflictschema}'
  170. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  171. INCLUDE KEY AS named
  172. ENVELOPE debezium
  173. contains:Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys.
  174. # formats: TEXT and REGEX
  175. $ kafka-create-topic topic=textsrc partitions=1
  176. $ kafka-ingest topic=textsrc format=bytes key-format=bytes key-terminator=:
  177. one,1:horse,apple
  178. two,2:bee,honey
  179. :cow,grass
  180. > CREATE CLUSTER textsrc_cluster SIZE '${arg.default-storage-size}';
  181. > CREATE SOURCE textsrc
  182. IN CLUSTER textsrc_cluster
  183. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  184. KEY FORMAT TEXT
  185. VALUE FORMAT TEXT
  186. INCLUDE KEY
  187. > SELECT * FROM textsrc
  188. key text
  189. -------------------
  190. one,1 horse,apple
  191. two,2 bee,honey
  192. <null> cow,grass
  193. > CREATE CLUSTER regexvalue_cluster SIZE '${arg.default-storage-size}';
  194. > CREATE SOURCE regexvalue
  195. IN CLUSTER regexvalue_cluster
  196. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  197. KEY FORMAT TEXT
  198. VALUE FORMAT REGEX '(?P<animal>[^,]+),(?P<food>\w+)'
  199. INCLUDE KEY
  200. > SELECT * FROM regexvalue
  201. key animal food
  202. --------------------
  203. one,1 horse apple
  204. two,2 bee honey
  205. <null> cow grass
  206. > CREATE CLUSTER regexboth_cluster SIZE '${arg.default-storage-size}';
  207. > CREATE SOURCE regexboth
  208. IN CLUSTER regexboth_cluster
  209. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  210. KEY FORMAT REGEX '(?P<id_name>[^,]+),(?P<id>\w+)'
  211. VALUE FORMAT REGEX '(?P<animal>[^,]+),(?P<food>\w+)'
  212. INCLUDE KEY
  213. > SELECT * FROM regexboth
  214. id_name id animal food
  215. ---------------------------
  216. one 1 horse apple
  217. two 2 bee honey
  218. <null> <null> cow grass
  219. > CREATE CLUSTER regexbothnest_cluster SIZE '${arg.default-storage-size}';
  220. > CREATE SOURCE regexbothnest
  221. IN CLUSTER regexbothnest_cluster
  222. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textsrc-${testdrive.seed}')
  223. KEY FORMAT REGEX '(?P<id_name>[^,]+),(?P<id>\w+)'
  224. VALUE FORMAT REGEX '(?P<animal>[^,]+),(?P<food>\w+)'
  225. INCLUDE KEY AS nest
  226. > SELECT (nest).id_name, (nest).id, animal FROM regexbothnest
  227. id_name id animal
  228. --------------------
  229. <null> <null> cow
  230. one 1 horse
  231. two 2 bee
  232. $ file-append path=test.proto
  233. syntax = "proto3";
  234. message Key {
  235. string id = 1;
  236. }
  237. message KeyComplex {
  238. int32 id1 = 1;
  239. int32 id2 = 2;
  240. }
  241. message Value {
  242. int32 measurement = 1;
  243. }
  244. $ protobuf-compile-descriptors inputs=test.proto output=test.proto set-var=test-schema
  245. $ kafka-create-topic topic=proto partitions=1
  246. $ kafka-ingest topic=proto
  247. key-format=protobuf key-descriptor-file=test.proto key-message=Key
  248. format=protobuf descriptor-file=test.proto message=Value
  249. {"id": "a"} {"measurement": 10}
  250. $ kafka-ingest topic=proto format=protobuf descriptor-file=test.proto message=Value omit-key=true
  251. {"measurement": 11}
  252. > CREATE CLUSTER input_proto_cluster SIZE '${arg.default-storage-size}';
  253. > CREATE SOURCE input_proto
  254. IN CLUSTER input_proto_cluster
  255. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-proto-${testdrive.seed}')
  256. KEY FORMAT PROTOBUF MESSAGE '.Key' USING SCHEMA '${test-schema}'
  257. VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
  258. INCLUDE KEY
  259. > SELECT * FROM input_proto
  260. id measurement
  261. -------------------
  262. a 10
  263. <null> 11
  264. $ kafka-create-topic topic=proto-structured partitions=1
  265. $ kafka-ingest topic=proto-structured
  266. key-format=protobuf key-descriptor-file=test.proto key-message=KeyComplex
  267. format=protobuf descriptor-file=test.proto message=Value
  268. {"id1": 1, "id2": 2} {"measurement": 10}
  269. > CREATE CLUSTER input_proto_structured_cluster SIZE '${arg.default-storage-size}';
  270. > CREATE SOURCE input_proto_structured
  271. IN CLUSTER input_proto_structured_cluster
  272. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-proto-structured-${testdrive.seed}')
  273. KEY FORMAT PROTOBUF MESSAGE '.KeyComplex' USING SCHEMA '${test-schema}'
  274. VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
  275. INCLUDE KEY AS key
  276. > SELECT key::text, (key).id1, (key).id2, measurement FROM input_proto_structured
  277. key id1 id2 measurement
  278. ----------------------------
  279. (1,2) 1 2 10
  280. #
  281. # Regression test for database-issues#6319
  282. #
  283. # For UPSERT sources with INCLUDE KEY, we expect the queries to
  284. # take advantage of the uniqueness propery of the key column.
  285. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM bareformatconfluent;
  286. Explained Query:
  287. Project (#0)
  288. ReadStorage materialize.public.bareformatconfluent
  289. Source materialize.public.bareformatconfluent
  290. Target cluster: quickstart
  291. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT DISTINCT named FROM bareformatconfluent;
  292. Explained Query:
  293. Project (#0)
  294. ReadStorage materialize.public.bareformatconfluent
  295. Source materialize.public.bareformatconfluent
  296. Target cluster: quickstart
  297. > CREATE DEFAULT INDEX ON bareformatconfluent;
  298. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM bareformatconfluent;
  299. Explained Query (fast path):
  300. Project (#0)
  301. ReadIndex on=materialize.public.bareformatconfluent bareformatconfluent_primary_idx=[*** full scan ***]
  302. Used Indexes:
  303. - materialize.public.bareformatconfluent_primary_idx (*** full scan ***)
  304. Target cluster: quickstart
  305. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT DISTINCT named FROM bareformatconfluent;
  306. Explained Query (fast path):
  307. Project (#0)
  308. ReadIndex on=materialize.public.bareformatconfluent bareformatconfluent_primary_idx=[*** full scan ***]
  309. Used Indexes:
  310. - materialize.public.bareformatconfluent_primary_idx (*** full scan ***)
  311. Target cluster: quickstart
  312. > CREATE CLUSTER envelope_none_with_key_cluster SIZE '${arg.default-storage-size}';
  313. > CREATE SOURCE envelope_none_with_key
  314. IN CLUSTER envelope_none_with_key_cluster
  315. FROM KAFKA CONNECTION kafka_conn (TOPIC
  316. 'testdrive-avro-data-${testdrive.seed}')
  317. KEY FORMAT AVRO USING SCHEMA '${conflictkeyschema}'
  318. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  319. INCLUDE KEY AS named
  320. ENVELOPE NONE
  321. # For ENVELOPE NONE with INCLUDE KEY, uniqueness is not guaranteed,
  322. # so we expect that query plans will contain an explicit Distinct on
  323. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM envelope_none_with_key;
  324. Explained Query:
  325. Distinct project=[#0] monotonic
  326. Project (#0)
  327. ReadStorage materialize.public.envelope_none_with_key
  328. Source materialize.public.envelope_none_with_key
  329. Target cluster: quickstart
  330. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT DISTINCT named FROM envelope_none_with_key;
  331. Explained Query:
  332. Distinct project=[#0] monotonic
  333. Project (#0)
  334. ReadStorage materialize.public.envelope_none_with_key
  335. Source materialize.public.envelope_none_with_key
  336. Target cluster: quickstart
  337. > CREATE DEFAULT INDEX ON envelope_none_with_key;
  338. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT named FROM envelope_none_with_key;
  339. Explained Query:
  340. Distinct project=[#0] monotonic
  341. Project (#0)
  342. ReadIndex on=envelope_none_with_key envelope_none_with_key_primary_idx=[*** full scan ***]
  343. Used Indexes:
  344. - materialize.public.envelope_none_with_key_primary_idx (*** full scan ***)
  345. Target cluster: quickstart
  346. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT DISTINCT named FROM envelope_none_with_key;
  347. Explained Query:
  348. Distinct project=[#0] monotonic
  349. Project (#0)
  350. ReadIndex on=envelope_none_with_key envelope_none_with_key_primary_idx=[*** full scan ***]
  351. Used Indexes:
  352. - materialize.public.envelope_none_with_key_primary_idx (*** full scan ***)
  353. Target cluster: quickstart