kafka-headers.td 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. # As of the Change Date specified in that file, in accordance with
  6. # the Business Source License, use of this software will be governed
  7. # by the Apache License, Version 2.0.
  8. $ set-arg-default default-storage-size=1
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ set keyschema={
  11. "type": "record",
  12. "name": "Key",
  13. "fields": [
  14. {"name": "key", "type": "string"}
  15. ]
  16. }
  17. $ set schema={
  18. "type" : "record",
  19. "name" : "test",
  20. "fields" : [
  21. {"name":"f1", "type":"string"},
  22. {"name":"f2", "type":"long"}
  23. ]
  24. }
  25. $ kafka-create-topic topic=headers_src
  26. # [103, 117, 115, 51] = "gus3"
  27. $ kafka-ingest format=avro topic=headers_src key-format=avro key-schema=${keyschema} schema=${schema} headers={"gus": "gusfive", "gus2": [103, 117, 115, 51]}
  28. {"key": "fish"} {"f1": "fishval", "f2": 1000}
  29. $ kafka-ingest format=avro topic=headers_src key-format=avro key-schema=${keyschema} schema=${schema}
  30. {"key": "fish2"} {"f1": "fishval", "f2": 1000}
  31. > CREATE CONNECTION kafka_conn
  32. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  33. > CREATE CLUSTER headers_src_cluster SIZE '${arg.default-storage-size}';
  34. > CREATE SOURCE headers_src
  35. IN CLUSTER headers_src_cluster
  36. FROM KAFKA CONNECTION kafka_conn (TOPIC
  37. 'testdrive-headers_src-${testdrive.seed}')
  38. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  39. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  40. INCLUDE HEADERS
  41. ENVELOPE UPSERT
  42. # empty case + has headers case
  43. > SELECT key, f1, f2, list_length(headers), headers::text from headers_src
  44. key f1 f2 list_length headers
  45. ----------------------------------------------
  46. fish fishval 1000 2 "{\"(gus,\\\"\\\\\\\\x67757366697665\\\")\",\"(gus2,\\\"\\\\\\\\x67757333\\\")\"}"
  47. fish2 fishval 1000 0 "{}"
  48. # unpacking works
  49. > SELECT key, f1, f2, headers[1].value as gus from headers_src
  50. key f1 f2 gus
  51. -------------------------------------------
  52. fish fishval 1000 gusfive
  53. fish2 fishval 1000 <null>
  54. # map_build lets you get the headers as a map
  55. > SELECT key, f1, f2, map_build(headers)->'gus' as gus, map_build(headers)->'gus2' AS gus2 from headers_src;
  56. key f1 f2 gus gus2
  57. -------------------------------------------
  58. fish fishval 1000 gusfive gus3
  59. fish2 fishval 1000 <null> <null>
  60. # selecting by key works
  61. > SELECT key, f1, f2, thekey, value FROM (SELECT i.key, i.f1, i.f2, unnest(headers).key as thekey, unnest(headers).value as value from headers_src as I) i WHERE thekey = 'gus'
  62. key f1 f2 thekey value
  63. -------------------------------------------
  64. fish fishval 1000 gus gusfive
  65. # The headers dict is entirely overwritten, even if the value AND the remaining header hasn't changed
  66. $ kafka-ingest format=avro topic=headers_src key-format=avro key-schema=${keyschema} schema=${schema} headers={"gus":"gusfive"}
  67. {"key": "fish"} {"f1": "fishval", "f2": 1000}
  68. # empty case + has headers case
  69. > SELECT key, f1, f2, list_length(headers) from headers_src
  70. key f1 f2 list_length
  71. -------------------------------------------
  72. fish fishval 1000 1
  73. fish2 fishval 1000 0
  74. # Headers with the same key are both preserved
  75. $ kafka-ingest format=avro topic=headers_src key-format=avro key-schema=${keyschema} schema=${schema} headers=[{"gus": "a"}, {"gus": "b"}]
  76. {"key": "fish"} {"f1": "fishval", "f2": 1000}
  77. > SELECT key, f1, f2, headers[1].value as gus1, headers[2].value as gus2 from headers_src
  78. key f1 f2 gus1 gus2
  79. -------------------------------------------
  80. fish fishval 1000 a b
  81. fish2 fishval 1000 <null> <null>
  82. # Works with other includes
  83. $ kafka-create-topic topic=headers_also partitions=1
  84. $ kafka-ingest format=avro topic=headers_also key-format=avro key-schema=${keyschema} schema=${schema} headers={"gus":"gusfive"}
  85. {"key": "fish"} {"f1": "fishval", "f2": 1000}
  86. > CREATE CLUSTER headers_also_cluster SIZE '${arg.default-storage-size}';
  87. > CREATE SOURCE headers_also
  88. IN CLUSTER headers_also_cluster
  89. FROM KAFKA CONNECTION kafka_conn (TOPIC
  90. 'testdrive-headers_also-${testdrive.seed}')
  91. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  92. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  93. INCLUDE HEADERS, PARTITION
  94. ENVELOPE UPSERT
  95. > SELECT key, f1, f2, list_length(headers), partition from headers_also
  96. key f1 f2 list_length partition
  97. -----------------------------------------------
  98. fish fishval 1000 1 0
  99. # esoteric ingestions
  100. $ kafka-ingest format=avro topic=headers_also key-format=avro key-schema=${keyschema} schema=${schema} headers={"gus": "gus=five"}
  101. {"key": "fish"} {"f1": "fishval", "f2": 1000}
  102. > SELECT key, f1, f2, headers[1].value as gus, partition from headers_also
  103. key f1 f2 gus partition
  104. -----------------------------------------------
  105. fish fishval 1000 gus=five 0
  106. # null header
  107. $ kafka-ingest format=avro topic=headers_also key-format=avro key-schema=${keyschema} schema=${schema} headers={"gus": null}
  108. {"key": "fish"} {"f1": "fishval", "f2": 1000}
  109. > SELECT key, f1, f2, headers[1].value as gus, partition from headers_also
  110. key f1 f2 gus partition
  111. -----------------------------------------------
  112. fish fishval 1000 <null> 0
  113. # conflicting naming
  114. $ set schemaheaders={
  115. "type" : "record",
  116. "name" : "test",
  117. "fields" : [
  118. {"name":"headers", "type":"string"}
  119. ]
  120. }
  121. $ kafka-create-topic topic=headers_conflict
  122. ! CREATE SOURCE headers_conflict
  123. IN CLUSTER ${arg.single-replica-cluster}
  124. FROM KAFKA CONNECTION kafka_conn (TOPIC
  125. 'testdrive-headers_conflict-${testdrive.seed}')
  126. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  127. VALUE FORMAT AVRO USING SCHEMA '${schemaheaders}'
  128. INCLUDE HEADERS
  129. ENVELOPE UPSERT
  130. contains: column "headers" specified more than once
  131. # No meaningful way to get data out in td because of the ambiguous name
  132. # + weird type
  133. # > SELECT * from headers_conflict
  134. $ kafka-ingest format=avro topic=headers_conflict key-format=avro key-schema=${keyschema} schema=${schemaheaders} headers={"gus": "gusfive"}
  135. {"key": "fish"} {"headers": "value"}
  136. # using AS to resolve it!
  137. > CREATE CLUSTER headers_conflict2_cluster SIZE '${arg.default-storage-size}';
  138. > CREATE SOURCE headers_conflict2
  139. IN CLUSTER headers_conflict2_cluster
  140. FROM KAFKA CONNECTION kafka_conn (TOPIC
  141. 'testdrive-headers_conflict-${testdrive.seed}')
  142. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  143. VALUE FORMAT AVRO USING SCHEMA '${schemaheaders}'
  144. INCLUDE HEADERS AS kafka_headers
  145. ENVELOPE UPSERT
  146. > SELECT key, headers, kafka_headers[1].value as gus from headers_conflict2
  147. key headers gus
  148. ------------------------
  149. fish value gusfive
  150. # test extracting individual headers with INCLUDE HEADER
  151. $ kafka-create-topic topic=individual_headers
  152. $ kafka-ingest format=avro topic=individual_headers key-format=avro key-schema=${keyschema} schema=${schema} headers={"header1": "message_1_header_1"}
  153. {"key": "message_1"} {"f1": "fishval", "f2": 1000}
  154. $ kafka-ingest format=avro topic=individual_headers key-format=avro key-schema=${keyschema} schema=${schema} headers={"header1": "message_2_header_1", "header2": "message_2_header_2"}
  155. {"key": "message_2"} {"f1": "fishval", "f2": 1000}
  156. > CREATE CLUSTER individual_headers_cluster SIZE '${arg.default-storage-size}';
  157. > CREATE SOURCE individual_headers
  158. IN CLUSTER individual_headers_cluster
  159. FROM KAFKA CONNECTION kafka_conn (TOPIC
  160. 'testdrive-individual_headers-${testdrive.seed}')
  161. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  162. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  163. INCLUDE HEADER 'header1' AS header1
  164. ENVELOPE UPSERT
  165. > SELECT key, header1 from individual_headers
  166. key header1
  167. -------------------------------
  168. message_1 message_1_header_1
  169. message_2 message_2_header_1
  170. # test exposing header as byte array
  171. > CREATE CLUSTER individual_headers_bytes_cluster SIZE '${arg.default-storage-size}';
  172. > CREATE SOURCE individual_headers_bytes
  173. IN CLUSTER individual_headers_bytes_cluster
  174. FROM KAFKA CONNECTION kafka_conn (TOPIC
  175. 'testdrive-individual_headers-${testdrive.seed}')
  176. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  177. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  178. INCLUDE HEADER 'header1' AS header1 BYTES
  179. ENVELOPE UPSERT
  180. > SELECT key, header1::text from individual_headers_bytes
  181. key header1
  182. ---------------------------------------------------
  183. message_1 \x6d6573736167655f315f6865616465725f31
  184. message_2 \x6d6573736167655f325f6865616465725f31
  185. # When there are multiple headers with identical keys, verify that the last header is exposed in the row
  186. $ kafka-create-topic topic=duplicate_individual_headers
  187. $ kafka-ingest format=avro topic=duplicate_individual_headers key-format=avro key-schema=${keyschema} schema=${schema} headers=[{"duplicates": "message_3_header_3_first"}, {"duplicates": "message_3_header_3_second"}, {"duplicates": "message_3_header_3_third"}]
  188. {"key": "message_3"} {"f1": "fishval", "f2": 1000}
  189. > CREATE CLUSTER duplicate_individual_headers_cluster SIZE '${arg.default-storage-size}';
  190. > CREATE SOURCE duplicate_individual_headers
  191. IN CLUSTER duplicate_individual_headers_cluster
  192. FROM KAFKA CONNECTION kafka_conn (TOPIC
  193. 'testdrive-duplicate_individual_headers-${testdrive.seed}')
  194. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  195. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  196. INCLUDE HEADERS, HEADER 'duplicates' AS duplicates
  197. ENVELOPE UPSERT
  198. > SELECT key, duplicates, headers::text from duplicate_individual_headers
  199. key duplicates headers
  200. -------------------------------------------------
  201. message_3 message_3_header_3_third "{\"(duplicates,\\\"\\\\\\\\x6d6573736167655f335f6865616465725f335f6669727374\\\")\",\"(duplicates,\\\"\\\\\\\\x6d6573736167655f335f6865616465725f335f7365636f6e64\\\")\",\"(duplicates,\\\"\\\\\\\\x6d6573736167655f335f6865616465725f335f7468697264\\\")\"}"
  202. # We can control the header map more granularly with `map_agg`.
  203. > SELECT
  204. key,
  205. map_agg((headers).key, convert_from((headers).value, 'utf-8') ORDER BY (headers).value ASC)::TEXT AS headers_map
  206. FROM (
  207. SELECT
  208. key, unnest(headers) AS headers
  209. FROM duplicate_individual_headers
  210. )
  211. GROUP BY key;
  212. key headers_map
  213. ---------------------
  214. message_3 "{duplicates=>message_3_header_3_third}"
  215. # Reverse order of aggregating values.
  216. > SELECT
  217. key,
  218. map_agg((headers).key, convert_from((headers).value, 'utf-8') ORDER BY (headers).value DESC)::TEXT AS headers_map
  219. FROM (
  220. SELECT
  221. key, unnest(headers) AS headers
  222. FROM duplicate_individual_headers
  223. )
  224. GROUP BY key;
  225. key headers_map
  226. ---------------------
  227. message_3 "{duplicates=>message_3_header_3_first}"
  228. # Verify that the source is bricked when there are headers that cannot be parsed as utf-8
  229. $ kafka-create-topic topic=ill_formed_header
  230. $ kafka-ingest format=avro topic=ill_formed_header key-format=avro key-schema=${keyschema} schema=${schema} headers={"header1": "message_1_header_1"}
  231. {"key": "message_1"} {"f1": "fishval", "f2": 1000}
  232. $ kafka-ingest format=avro topic=ill_formed_header key-format=avro key-schema=${keyschema} schema=${schema} headers={"header1": "message_2_header_1"}
  233. {"key": "message_2"} {"f1": "fishval", "f2": 1000}
  234. > CREATE CLUSTER ill_formed_header_cluster SIZE '${arg.default-storage-size}';
  235. > CREATE SOURCE ill_formed_header
  236. IN CLUSTER ill_formed_header_cluster
  237. FROM KAFKA CONNECTION kafka_conn (TOPIC
  238. 'testdrive-ill_formed_header-${testdrive.seed}')
  239. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  240. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  241. INCLUDE HEADERS, HEADER 'header1' AS header1
  242. ENVELOPE UPSERT
  243. > SELECT key, header1 from ill_formed_header
  244. key header1
  245. ------------------------------------------
  246. message_1 message_1_header_1
  247. message_2 message_2_header_1
  248. $ kafka-ingest format=avro topic=ill_formed_header key-format=avro key-schema=${keyschema} schema=${schema} headers={"header1": [195, 40]}
  249. {"key": "message_1"} {"f1": "fishval", "f2": 1000}
  250. ! SELECT key, header1 from ill_formed_header
  251. contains:Found ill-formed byte sequence in header 'header1' that cannot be decoded as valid utf-8 (original bytes: [c3, 28])
  252. # Verify that the source is bricked when messages have missing headers
  253. > CREATE CLUSTER missing_headers_cluster SIZE '${arg.default-storage-size}';
  254. > CREATE SOURCE missing_headers
  255. IN CLUSTER missing_headers_cluster
  256. FROM KAFKA CONNECTION kafka_conn (TOPIC
  257. 'testdrive-individual_headers-${testdrive.seed}')
  258. KEY FORMAT AVRO USING SCHEMA '${keyschema}'
  259. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  260. INCLUDE HEADER 'header2' AS header2
  261. ENVELOPE UPSERT
  262. ! SELECT key, header2 from missing_headers
  263. contains:A header with key 'header2' was not found in the message headers