kafka-upsert-sources.td 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # This testdrive file is exactly the same as upsert-kafka.td, but using the new syntax.
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key", "type": "string"}
  18. ]
  19. }
  20. $ set schema={
  21. "type" : "record",
  22. "name" : "test",
  23. "fields" : [
  24. {"name":"f1", "type":"string"},
  25. {"name":"f2", "type":"long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=avroavro
  29. > CREATE CONNECTION kafka_conn
  30. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  31. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  32. URL '${testdrive.schema-registry-url}'
  33. );
  34. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  35. {"key": "fish"} {"f1": "fish", "f2": 1000}
  36. {"key": "bird1"} {"f1":"goose", "f2": 1}
  37. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  38. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  39. {"key": "bird1"}
  40. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  41. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  42. {"key": "mammal1"}
  43. {"key": "mammalmore"} {"f1":"moose", "f2": 2}
  44. > CREATE CLUSTER avroavro_cluster SIZE '${arg.default-storage-size}';
  45. > CREATE SOURCE avroavro
  46. IN CLUSTER avroavro_cluster
  47. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
  48. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  49. ENVELOPE UPSERT
  50. > SELECT * from avroavro
  51. key f1 f2
  52. ---------------------------
  53. fish fish 1000
  54. birdmore geese 56
  55. mammalmore moose 2
  56. $ kafka-create-topic topic=textavro
  57. $ kafka-ingest format=avro topic=textavro key-format=bytes key-terminator=: schema=${schema}
  58. fish: {"f1": "fish", "f2": 1000}
  59. bìrd1: {"f1":"goose", "f2": 1}
  60. birdmore: {"f1":"geese", "f2": 2}
  61. mammal1: {"f1": "moose", "f2": 1}
  62. > CREATE CLUSTER bytesavro_cluster SIZE '${arg.default-storage-size}';
  63. > CREATE SOURCE bytesavro
  64. IN CLUSTER bytesavro_cluster
  65. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textavro-${testdrive.seed}')
  66. KEY FORMAT BYTES
  67. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  68. ENVELOPE UPSERT
  69. > CREATE CLUSTER textavro_cluster SIZE '${arg.default-storage-size}';
  70. > CREATE SOURCE textavro
  71. IN CLUSTER textavro_cluster
  72. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textavro-${testdrive.seed}')
  73. KEY FORMAT TEXT
  74. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  75. ENVELOPE UPSERT
  76. > select * from bytesavro
  77. key f1 f2
  78. ---------------------------
  79. fish fish 1000
  80. b\xc3\xacrd1 goose 1
  81. birdmore geese 2
  82. mammal1 moose 1
  83. $ kafka-ingest format=avro topic=textavro key-format=bytes key-terminator=: schema=${schema}
  84. bìrd1:
  85. birdmore: {"f1":"geese", "f2": 56}
  86. mämmalmore: {"f1": "moose", "f2": 42}
  87. mammal1:
  88. > select * from textavro
  89. key f1 f2
  90. ---------------------------
  91. fish fish 1000
  92. birdmore geese 56
  93. mämmalmore moose 42
  94. $ kafka-create-topic topic=textbytes partitions=1
  95. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  96. fish:fish
  97. bìrd1:goose
  98. bírdmore:geese
  99. mammal1:moose
  100. bìrd1:
  101. > CREATE CLUSTER texttext_cluster SIZE '${arg.default-storage-size}';
  102. > CREATE SOURCE texttext
  103. IN CLUSTER texttext_cluster
  104. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  105. KEY FORMAT TEXT VALUE FORMAT TEXT
  106. ENVELOPE UPSERT
  107. > CREATE CLUSTER textbytes_cluster SIZE '${arg.default-storage-size}';
  108. > CREATE SOURCE textbytes
  109. IN CLUSTER textbytes_cluster
  110. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  111. KEY FORMAT TEXT
  112. VALUE FORMAT BYTES
  113. ENVELOPE UPSERT
  114. > CREATE CLUSTER bytesbytes_cluster SIZE '${arg.default-storage-size}';
  115. > CREATE SOURCE bytesbytes
  116. IN CLUSTER bytesbytes_cluster
  117. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  118. KEY FORMAT BYTES
  119. VALUE FORMAT BYTES
  120. ENVELOPE UPSERT
  121. > CREATE CLUSTER bytestext_cluster SIZE '${arg.default-storage-size}';
  122. > CREATE SOURCE bytestext
  123. IN CLUSTER bytestext_cluster
  124. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  125. KEY FORMAT BYTES
  126. VALUE FORMAT TEXT
  127. ENVELOPE UPSERT
  128. > select * from texttext
  129. key text
  130. -------------------
  131. fish fish
  132. bírdmore geese
  133. mammal1 moose
  134. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  135. bírdmore:géese
  136. mammalmore:moose
  137. mammal1:
  138. mammal1:mouse
  139. > select * from textbytes
  140. key data
  141. ---------------------------
  142. fish fish
  143. bírdmore g\xc3\xa9ese
  144. mammal1 mouse
  145. mammalmore moose
  146. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  147. mammalmore:herd
  148. > select * from bytesbytes
  149. key data
  150. ------------------------------
  151. fish fish
  152. b\xc3\xadrdmore g\xc3\xa9ese
  153. mammal1 mouse
  154. mammalmore herd
  155. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  156. bìrd1:
  157. fish:
  158. > select * from bytestext
  159. key text
  160. -----------------------
  161. b\xc3\xadrdmore géese
  162. mammal1 mouse
  163. mammalmore herd
  164. $ file-append path=test.proto
  165. syntax = "proto3";
  166. message Test {
  167. string f = 1;
  168. }
  169. $ protobuf-compile-descriptors inputs=test.proto output=test.pb set-var=test-schema
  170. $ kafka-create-topic topic=textproto partitions=1
  171. > CREATE CLUSTER textproto_cluster SIZE '${arg.default-storage-size}';
  172. > CREATE SOURCE textproto
  173. IN CLUSTER textproto_cluster
  174. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textproto-${testdrive.seed}')
  175. KEY FORMAT TEXT
  176. VALUE FORMAT PROTOBUF MESSAGE '.Test' USING SCHEMA '${test-schema}'
  177. ENVELOPE UPSERT
  178. > CREATE CLUSTER bytesproto_cluster SIZE '${arg.default-storage-size}';
  179. > CREATE SOURCE bytesproto
  180. IN CLUSTER bytesproto_cluster
  181. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textproto-${testdrive.seed}')
  182. KEY FORMAT BYTES
  183. VALUE FORMAT PROTOBUF MESSAGE '.Test' USING SCHEMA '${test-schema}'
  184. ENVELOPE UPSERT
  185. $ kafka-ingest topic=textproto
  186. format=protobuf descriptor-file=test.pb message=Test
  187. key-format=bytes key-terminator=:
  188. fish:{"f": "one"}
  189. bìrd1:{"f": "two"}
  190. birdmore: {}
  191. > SELECT * FROM bytesproto
  192. fish one
  193. b\xc3\xacrd1 two
  194. birdmore ""
  195. > SELECT * FROM textproto
  196. fish one
  197. bìrd1 two
  198. birdmore ""
  199. $ kafka-ingest topic=textproto
  200. format=protobuf descriptor-file=test.pb message=Test
  201. key-format=bytes key-terminator=:
  202. mammal1: {"f": "three"}
  203. bìrd1:
  204. birdmore: {"f": "four"}
  205. mämmalmore: {"f": "five"}
  206. bìrd1: {"f": "six"}
  207. mammal1:
  208. mammalmore: {"f": "seven"}
  209. > SELECT * FROM bytesproto
  210. fish one
  211. birdmore four
  212. m\xc3\xa4mmalmore five
  213. b\xc3\xacrd1 six
  214. mammalmore seven
  215. > SELECT * FROM textproto
  216. fish one
  217. birdmore four
  218. mämmalmore five
  219. bìrd1 six
  220. mammalmore seven
  221. $ kafka-create-topic topic=nullkey partitions=1
  222. # A null key should result in an error decoding that row but not a panic
  223. $ kafka-ingest format=bytes topic=nullkey key-format=bytes key-terminator=:
  224. bird1:goose
  225. :geese
  226. mammal1:moose
  227. bird1:
  228. birdmore:geese
  229. mammalmore:moose
  230. mammal1:
  231. > CREATE CLUSTER nullkey_cluster SIZE '${arg.default-storage-size}';
  232. > CREATE SOURCE nullkey
  233. IN CLUSTER nullkey_cluster
  234. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nullkey-${testdrive.seed}')
  235. KEY FORMAT TEXT
  236. VALUE FORMAT TEXT
  237. ENVELOPE UPSERT
  238. ! select * from nullkey
  239. contains: record with NULL key in UPSERT source
  240. ! select * from nullkey
  241. contains: to retract this error, produce a record upstream with a NULL key and NULL value
  242. # Ingest a null value for our null key, to retract it.
  243. $ kafka-ingest format=bytes topic=nullkey key-format=bytes key-terminator=:
  244. :
  245. # Now we should be able to query the source.
  246. > select * from nullkey
  247. key text
  248. -------------------
  249. birdmore geese
  250. mammalmore moose
  251. $ kafka-create-topic topic=realtimeavroavro partitions=1
  252. # test multi-field avro key
  253. $ set keyschema2={
  254. "type": "record",
  255. "name": "Key2",
  256. "fields": [
  257. {"name": "f3", "type": ["null", "string"]},
  258. {"name": "f4", "type": ["null", "string"]}
  259. ]
  260. }
  261. $ kafka-ingest format=avro topic=realtimeavroavro key-format=avro key-schema=${keyschema2} schema=${schema}
  262. {"f3": {"string": "fire"}, "f4": {"string": "yang"}} {"f1": "dog", "f2": 42}
  263. {"f3": null, "f4": {"string": "yin"}} {"f1": "sheep", "f2": 53}
  264. {"f3": {"string": "water"}, "f4": null} {"f1":"plesiosaur", "f2": 224}
  265. {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "turtle", "f2": 34}
  266. {"f3": null, "f4": {"string": "yin"}} {"f1": "sheep", "f2": 54}
  267. {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "snake", "f2": 68}
  268. {"f3": {"string": "water"}, "f4": null} {"f1": "crocodile", "f2": 7}
  269. {"f3": {"string": "earth"}, "f4":{"string": "dao"}}
  270. > CREATE CLUSTER realtimeavroavro_cluster SIZE '${arg.default-storage-size}';
  271. > CREATE SOURCE realtimeavroavro (f3, f4, f1, f2)
  272. IN CLUSTER realtimeavroavro_cluster
  273. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  274. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  275. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  276. ENVELOPE UPSERT
  277. > CREATE MATERIALIZED VIEW realtimeavroavro_view as SELECT * from realtimeavroavro;
  278. > select f3, f4, f1, f2 from realtimeavroavro_view
  279. f3 f4 f1 f2
  280. -----------------------------------
  281. fire yang dog 42
  282. <null> yin sheep 54
  283. water <null> crocodile 7
  284. # Ensure that Upsert sources work with START OFFSET
  285. > CREATE CLUSTER realtimeavroavro_ff_cluster SIZE '${arg.default-storage-size}';
  286. > CREATE SOURCE realtimeavroavro_ff
  287. IN CLUSTER realtimeavroavro_ff_cluster
  288. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  289. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  290. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  291. ENVELOPE UPSERT
  292. > SELECT * FROM realtimeavroavro_ff
  293. f3 f4 f1 f2
  294. -----------------------------------
  295. <null> yin sheep 54
  296. water <null> crocodile 7
  297. # ensure that having deletion on a key that never existed does not break anything
  298. $ kafka-ingest format=avro topic=realtimeavroavro key-format=avro key-schema=${keyschema2} schema=${schema}
  299. {"f3": {"string": "fire"}, "f4": {"string": "yin"}}
  300. {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "pigeon", "f2": 10}
  301. {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "owl", "f2": 15}
  302. {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "rhinoceros", "f2": 211}
  303. {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "chicken", "f2": 47}
  304. {"f3": null, "f4":{"string": "yin"}}
  305. {"f3": null, "f4":{"string": "yin"}} {"f1":"dog", "f2": 243}
  306. {"f3": {"string": "water"}, "f4": null}
  307. > select * from realtimeavroavro_view
  308. f3 f4 f1 f2
  309. -----------------------------------------
  310. fire yang dog 42
  311. air qi chicken 47
  312. <null> yin dog 243
  313. earth dao rhinoceros 211
  314. $ kafka-create-topic topic=realtimefilteravro
  315. $ set keyschema3={
  316. "type": "record",
  317. "name": "Key3",
  318. "fields": [
  319. {"name": "k1", "type": ["null", "string"]},
  320. {"name": "k2", "type": ["null", "long"]}
  321. ]
  322. }
  323. $ set schema2={
  324. "type": "record",
  325. "name": "test2",
  326. "fields": [
  327. {"name": "f1", "type": ["null", "string"]},
  328. {"name": "f2", "type": ["null", "long"]}
  329. ]
  330. }
  331. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  332. {"k1": null, "k2": {"long": 2}} {"f1": {"string": "date"}, "f2": {"long": 5}}
  333. {"k1": {"string": "épicerie"}, "k2": {"long": 10}} {"f1": {"string": "melon"}, "f2": {"long": 2}}
  334. {"k1": {"string": "boucherie"}, "k2": {"long": 5}} {"f1": {"string": "apple"}, "f2": {"long": 7}}
  335. {"k1": {"string": "boulangerie"}, "k2": null} {"f1":{"string": "date"}, "f2": {"long": 10}}
  336. {"k1": {"string": "épicerie"}, "k2": {"long": 10}} {"f1": {"string": "pear"}, "f2": {"long": 2}}
  337. {"k1": null, "k2": {"long": 2}} {"f1": {"string": "date"}, "f2": null}
  338. {"k1": {"string": "boulangerie"}, "k2": null} {"f1":null, "f2": {"long": 10}}
  339. {"k1": {"string": "boucherie"}, "k2": {"long": 5}} {"f1": {"string": "apple"}, "f2": {"long": 3}}
  340. > CREATE CLUSTER realtimefilteravro_cluster SIZE '${arg.default-storage-size}';
  341. > CREATE SOURCE realtimefilteravro
  342. IN CLUSTER realtimefilteravro_cluster
  343. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-realtimefilteravro-${testdrive.seed}')
  344. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  345. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  346. ENVELOPE UPSERT
  347. # filter on key only
  348. > CREATE MATERIALIZED VIEW filterforkey AS
  349. SELECT f1 FROM realtimefilteravro WHERE k1='épicerie';
  350. > SELECT * from filterforkey
  351. f1
  352. ----
  353. pear
  354. # filter on value only
  355. > CREATE MATERIALIZED VIEW filterforvalue AS
  356. SELECT f2 FROM realtimefilteravro WHERE f1='date';
  357. > SELECT * from filterforvalue
  358. f2
  359. -------
  360. <null>
  361. # filter with a predicate containing key + value
  362. > CREATE MATERIALIZED VIEW filterforkeyvalue AS
  363. SELECT f1, f2 FROM realtimefilteravro WHERE k2+f2=12;
  364. > SELECT * from filterforkeyvalue
  365. f1 f2
  366. -------
  367. pear 2
  368. # filter on both a predicate containing a key and a predicate containing a value
  369. > CREATE MATERIALIZED VIEW keyfiltervaluefilter AS
  370. SELECT k1, k2 FROM realtimefilteravro WHERE k2 > 5 AND f2 < 5
  371. > SELECT * from keyfiltervaluefilter
  372. k1 k2
  373. -----------
  374. épicerie 10
  375. # add records that match the filter
  376. # make sure that rows that differ on unneeded key columns are treated as separate
  377. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  378. {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":null, "f2": {"long": 2}}
  379. {"k1": null, "k2": null} {"f1": {"string": "date"}, "f2": {"long": 5}}
  380. {"k1": {"string": "épicerie"}, "k2": {"long": 6}} {"f1": {"string": "pear"}, "f2": null}
  381. {"k1": {"string": "bureau"}, "k2": {"long": 6}} {"f1": {"string": "grape"}, "f2": {"long": 7}}
  382. > SELECT * from filterforkey
  383. f1
  384. ----
  385. pear
  386. pear
  387. > SELECT * from filterforvalue
  388. f2
  389. -------
  390. <null>
  391. 5
  392. > SELECT * from filterforkeyvalue
  393. f1 f2
  394. ---------
  395. pear 2
  396. <null> 2
  397. > SELECT * from keyfiltervaluefilter
  398. k1 k2
  399. -----------
  400. épicerie 10
  401. librairie 10
  402. # update records so that they don't match the filter
  403. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  404. {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":null, "f2": {"long": 6}}
  405. {"k1": null, "k2": null} {"f1": {"string": "grape"}, "f2": {"long": 5}}
  406. > SELECT * from filterforvalue
  407. f2
  408. -------
  409. <null>
  410. > SELECT * from filterforkeyvalue
  411. f1 f2
  412. ---------
  413. pear 2
  414. > SELECT * from keyfiltervaluefilter
  415. k1 k2
  416. -----------
  417. épicerie 10
  418. # update records so that they do match the filter
  419. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  420. {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":{"string": "melon"}, "f2": {"long": 2}}
  421. {"k1": null, "k2": null} {"f1": {"string": "date"}, "f2": {"long": 12}}
  422. > SELECT * from filterforvalue
  423. f2
  424. -------
  425. <null>
  426. 12
  427. > SELECT * from filterforkeyvalue
  428. f1 f2
  429. ---------
  430. pear 2
  431. melon 2
  432. > SELECT * from keyfiltervaluefilter
  433. k1 k2
  434. -----------
  435. épicerie 10
  436. librairie 10
  437. # delete records
  438. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  439. {"k1": {"string": "boucherie"}, "k2": {"long": 5}}
  440. {"k1": {"string": "épicerie"}, "k2": {"long": 10}}
  441. {"k1": {"string": "boulangerie"}, "k2": null}
  442. {"k1": null, "k2": {"long": 2}}
  443. > SELECT * from filterforkey
  444. f1
  445. ----
  446. pear
  447. > SELECT * from filterforvalue
  448. f2
  449. -------
  450. 12
  451. > SELECT * from filterforkeyvalue
  452. f1 f2
  453. ---------
  454. melon 2
  455. > SELECT * from keyfiltervaluefilter
  456. k1 k2
  457. -----------
  458. librairie 10
  459. > CREATE CLUSTER include_metadata_cluster SIZE '${arg.default-storage-size}';
  460. > CREATE SOURCE include_metadata
  461. IN CLUSTER include_metadata_cluster
  462. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  463. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  464. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  465. INCLUDE PARTITION, OFFSET
  466. ENVELOPE UPSERT
  467. > SELECT * FROM include_metadata
  468. f3 f4 f1 f2 partition offset
  469. -------------------------------------------------------
  470. <null> yin dog 243 0 14
  471. air qi chicken 47 0 12
  472. earth dao rhinoceros 211 0 11
  473. > CREATE CLUSTER include_metadata_ts_cluster SIZE '${arg.default-storage-size}';
  474. > CREATE SOURCE include_metadata_ts
  475. IN CLUSTER include_metadata_ts_cluster
  476. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  477. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  478. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  479. INCLUDE PARTITION, OFFSET, TIMESTAMP as ts
  480. ENVELOPE UPSERT
  481. > SELECT "offset" FROM include_metadata_ts WHERE ts > '2021-01-01'
  482. offset
  483. ------
  484. 14
  485. 12
  486. 11
  487. > SELECT "offset" FROM include_metadata_ts WHERE ts < '2021-01-01'
  488. offset
  489. ------
  490. #
  491. # JSON UPSERT source
  492. $ kafka-create-topic topic=format-json-bytes partitions=1
  493. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json-bytes
  494. "object":{"a":"b","c":"d"}
  495. "array":[1,2,3]
  496. "int":1
  497. "float":1.23
  498. "str":"hello"
  499. > CREATE CLUSTER format_json_cluster SIZE '${arg.default-storage-size}';
  500. > CREATE SOURCE format_json
  501. IN CLUSTER format_json_cluster
  502. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-bytes-${testdrive.seed}')
  503. KEY FORMAT JSON
  504. VALUE FORMAT JSON
  505. ENVELOPE UPSERT;
  506. > SELECT * FROM format_json ORDER BY key
  507. "\"array\"" [1,2,3]
  508. "\"float\"" 1.23
  509. "\"int\"" 1
  510. "\"object\"" "{\"a\":\"b\",\"c\":\"d\"}"
  511. "\"str\"" "\"hello\""
  512. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json-bytes
  513. "object":{"y":"z"}
  514. "array":[10,9,8]
  515. "int":99
  516. "float":3.14
  517. "str":"bye"
  518. > SELECT * FROM format_json ORDER BY key
  519. "\"array\"" [10,9,8]
  520. "\"float\"" 3.14
  521. "\"int\"" 99
  522. "\"object\"" "{\"y\":\"z\"}"
  523. "\"str\"" "\"bye\""
  524. #
  525. # Test Inline error handling with value decode failures for a JSON source
  526. #
  527. $ kafka-create-topic topic=inline-value-errors-1 partitions=1
  528. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=inline-value-errors-1
  529. val1:{"a":,"c":"d"}
  530. val2:[1,2,
  531. > CREATE CLUSTER inline_error_cluster SIZE '${arg.default-storage-size}';
  532. # should error without the feature flag enabled
  533. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  534. ALTER SYSTEM SET enable_envelope_upsert_inline_errors = false
  535. ! CREATE SOURCE value_decode_error
  536. IN CLUSTER inline_error_cluster
  537. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}')
  538. KEY FORMAT TEXT
  539. VALUE FORMAT JSON
  540. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
  541. contains:The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT is not available
  542. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  543. ALTER SYSTEM SET enable_envelope_upsert_inline_errors = true
  544. # This source will inline errors and not propagate them
  545. > CREATE SOURCE value_decode_error
  546. IN CLUSTER inline_error_cluster
  547. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}')
  548. KEY FORMAT TEXT
  549. VALUE FORMAT JSON
  550. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
  551. # there is now an additional 'error' column that should store the inline decoding errors, if any
  552. # since this is a json value record the output relation just has a single jsonb column named 'data'
  553. # though for other types (e.g. avro) the value columns should be present and flattened.
  554. > SELECT key, data, error::text FROM value_decode_error ORDER BY key
  555. val1 <null> "(\"Failed to decode JSON: expected value at line 1 column 6 (original text: {\"\"a\"\":,\"\"c\"\":\"\"d\"\"}, original bytes: \"\"7b2261223a2c2263223a2264227d\"\")\")"
  556. val2 <null> "(\"Failed to decode JSON: EOF while parsing a value at line 1 column 5 (original text: [1,2,, original bytes: \"\"5b312c322c\"\")\")"
  557. # replace the bad values with good ones
  558. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=inline-value-errors-1
  559. val1:{"a": 1,"c":"d"}
  560. val2:[1,2,3]
  561. > SELECT key, data, error::text FROM value_decode_error ORDER BY key
  562. val1 "{\"a\":1,\"c\":\"d\"}" <null>
  563. val2 [1,2,3] <null>
  564. ! CREATE SOURCE value_decode_error_error
  565. IN CLUSTER inline_error_cluster
  566. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}')
  567. KEY FORMAT TEXT
  568. VALUE FORMAT JSON
  569. INCLUDE KEY AS error
  570. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
  571. contains: column "error" specified more than once
  572. #
  573. # Test Inline error handling with value decode failures for an AVRO source
  574. # that also uses a custom error column name
  575. #
  576. $ kafka-create-topic topic=inline-value-errors-2 partitions=1
  577. $ kafka-ingest format=avro topic=inline-value-errors-2 key-format=avro key-schema=${keyschema} schema=${schema}
  578. {"key": "fish"} {"f1": "fish", "f2": 1000}
  579. {"key": "bird1"} {"f1":"goose", "f2": 1}
  580. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  581. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  582. {"key": "bird1"}
  583. > CREATE CLUSTER inline_error_cluster_avro SIZE '${arg.default-storage-size}';
  584. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  585. ALTER SYSTEM SET enable_envelope_upsert_inline_errors = true
  586. # This source will inline errors and not propagate them
  587. > CREATE SOURCE value_decode_error_avro
  588. IN CLUSTER inline_error_cluster_avro
  589. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-2-${testdrive.seed}')
  590. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  591. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE AS decode_error));
  592. # there is now an additional 'error' column that should store the inline decoding errors, if any
  593. > SELECT key, f1, f2, decode_error::text from value_decode_error_avro ORDER BY key
  594. birdmore geese 2 <null>
  595. fish fish 1000 <null>
  596. mammal1 moose 1 <null>
  597. # insert a bad record for the birdmore key
  598. $ kafka-ingest format=bytes key-format=avro key-schema=${keyschema} topic=inline-value-errors-2
  599. {"key": "birdmore"} "notvalidavro"
  600. > SELECT key, f1, f2, decode_error::text from value_decode_error_avro ORDER BY key
  601. birdmore <null> <null> "(\"avro deserialization error: wrong Confluent-style avro serialization magic: expected 0, got 32 (original text: \"\"notvalidavro\"\", original bytes: \"\"20226e6f7476616c69646176726f22\"\")\")"
  602. fish fish 1000 <null>
  603. mammal1 moose 1 <null>