kafka-upsert-sources.td 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # This testdrive file is exactly the same as upsert-kafka.td, but using the new syntax.
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key", "type": "string"}
  18. ]
  19. }
  20. $ set schema={
  21. "type" : "record",
  22. "name" : "test",
  23. "fields" : [
  24. {"name":"f1", "type":"string"},
  25. {"name":"f2", "type":"long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=avroavro
  29. > CREATE CONNECTION kafka_conn
  30. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  31. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  32. URL '${testdrive.schema-registry-url}'
  33. );
  34. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  35. {"key": "fish"} {"f1": "fish", "f2": 1000}
  36. {"key": "bird1"} {"f1":"goose", "f2": 1}
  37. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  38. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  39. {"key": "bird1"}
  40. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  41. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  42. {"key": "mammal1"}
  43. {"key": "mammalmore"} {"f1":"moose", "f2": 2}
  44. > CREATE CLUSTER avroavro_cluster SIZE '${arg.default-storage-size}';
  45. > CREATE SOURCE avroavro
  46. IN CLUSTER avroavro_cluster
  47. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
  48. > CREATE TABLE avroavro_tbl FROM SOURCE avroavro (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  49. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  50. ENVELOPE UPSERT
  51. > SELECT * from avroavro_tbl
  52. key f1 f2
  53. ---------------------------
  54. fish fish 1000
  55. birdmore geese 56
  56. mammalmore moose 2
  57. $ kafka-create-topic topic=textavro
  58. $ kafka-ingest format=avro topic=textavro key-format=bytes key-terminator=: schema=${schema}
  59. fish: {"f1": "fish", "f2": 1000}
  60. bìrd1: {"f1":"goose", "f2": 1}
  61. birdmore: {"f1":"geese", "f2": 2}
  62. mammal1: {"f1": "moose", "f2": 1}
  63. > CREATE CLUSTER bytesavro_cluster SIZE '${arg.default-storage-size}';
  64. > CREATE SOURCE bytesavro
  65. IN CLUSTER bytesavro_cluster
  66. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textavro-${testdrive.seed}')
  67. > CREATE TABLE bytesavro_tbl FROM SOURCE bytesavro (REFERENCE "testdrive-textavro-${testdrive.seed}")
  68. KEY FORMAT BYTES
  69. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  70. ENVELOPE UPSERT
  71. > CREATE CLUSTER textavro_cluster SIZE '${arg.default-storage-size}';
  72. > CREATE SOURCE textavro
  73. IN CLUSTER textavro_cluster
  74. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textavro-${testdrive.seed}')
  75. > CREATE TABLE textavro_tbl FROM SOURCE textavro (REFERENCE "testdrive-textavro-${testdrive.seed}")
  76. KEY FORMAT TEXT
  77. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  78. ENVELOPE UPSERT
  79. > select * from bytesavro_tbl
  80. key f1 f2
  81. ---------------------------
  82. fish fish 1000
  83. b\xc3\xacrd1 goose 1
  84. birdmore geese 2
  85. mammal1 moose 1
  86. $ kafka-ingest format=avro topic=textavro key-format=bytes key-terminator=: schema=${schema}
  87. bìrd1:
  88. birdmore: {"f1":"geese", "f2": 56}
  89. mämmalmore: {"f1": "moose", "f2": 42}
  90. mammal1:
  91. > select * from textavro_tbl
  92. key f1 f2
  93. ---------------------------
  94. fish fish 1000
  95. birdmore geese 56
  96. mämmalmore moose 42
  97. $ kafka-create-topic topic=textbytes partitions=1
  98. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  99. fish:fish
  100. bìrd1:goose
  101. bírdmore:geese
  102. mammal1:moose
  103. bìrd1:
  104. > CREATE CLUSTER texttext_cluster SIZE '${arg.default-storage-size}';
  105. > CREATE SOURCE texttext
  106. IN CLUSTER texttext_cluster
  107. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  108. > CREATE TABLE texttext_tbl FROM SOURCE texttext (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  109. KEY FORMAT TEXT VALUE FORMAT TEXT
  110. ENVELOPE UPSERT
  111. > CREATE CLUSTER textbytes_cluster SIZE '${arg.default-storage-size}';
  112. > CREATE SOURCE textbytes
  113. IN CLUSTER textbytes_cluster
  114. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  115. > CREATE TABLE textbytes_tbl FROM SOURCE textbytes (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  116. KEY FORMAT TEXT
  117. VALUE FORMAT BYTES
  118. ENVELOPE UPSERT
  119. > CREATE CLUSTER bytesbytes_cluster SIZE '${arg.default-storage-size}';
  120. > CREATE SOURCE bytesbytes
  121. IN CLUSTER bytesbytes_cluster
  122. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  123. > CREATE TABLE bytesbytes_tbl FROM SOURCE bytesbytes (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  124. KEY FORMAT BYTES
  125. VALUE FORMAT BYTES
  126. ENVELOPE UPSERT
  127. > CREATE CLUSTER bytestext_cluster SIZE '${arg.default-storage-size}';
  128. > CREATE SOURCE bytestext
  129. IN CLUSTER bytestext_cluster
  130. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
  131. > CREATE TABLE bytestext_tbl FROM SOURCE bytestext (REFERENCE "testdrive-textbytes-${testdrive.seed}")
  132. KEY FORMAT BYTES
  133. VALUE FORMAT TEXT
  134. ENVELOPE UPSERT
  135. > select * from texttext_tbl
  136. key text
  137. -------------------
  138. fish fish
  139. bírdmore geese
  140. mammal1 moose
  141. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  142. bírdmore:géese
  143. mammalmore:moose
  144. mammal1:
  145. mammal1:mouse
  146. > select * from textbytes_tbl
  147. key data
  148. ---------------------------
  149. fish fish
  150. bírdmore g\xc3\xa9ese
  151. mammal1 mouse
  152. mammalmore moose
  153. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  154. mammalmore:herd
  155. > select * from bytesbytes_tbl
  156. key data
  157. ------------------------------
  158. fish fish
  159. b\xc3\xadrdmore g\xc3\xa9ese
  160. mammal1 mouse
  161. mammalmore herd
  162. $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
  163. bìrd1:
  164. fish:
  165. > select * from bytestext_tbl
  166. key text
  167. -----------------------
  168. b\xc3\xadrdmore géese
  169. mammal1 mouse
  170. mammalmore herd
  171. $ file-append path=test.proto
  172. syntax = "proto3";
  173. message Test {
  174. string f = 1;
  175. }
  176. $ protobuf-compile-descriptors inputs=test.proto output=test.pb set-var=test-schema
  177. $ kafka-create-topic topic=textproto partitions=1
  178. > CREATE CLUSTER textproto_cluster SIZE '${arg.default-storage-size}';
  179. > CREATE SOURCE textproto
  180. IN CLUSTER textproto_cluster
  181. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textproto-${testdrive.seed}')
  182. > CREATE TABLE textproto_tbl FROM SOURCE textproto (REFERENCE "testdrive-textproto-${testdrive.seed}")
  183. KEY FORMAT TEXT
  184. VALUE FORMAT PROTOBUF MESSAGE '.Test' USING SCHEMA '${test-schema}'
  185. ENVELOPE UPSERT
  186. > CREATE CLUSTER bytesproto_cluster SIZE '${arg.default-storage-size}';
  187. > CREATE SOURCE bytesproto
  188. IN CLUSTER bytesproto_cluster
  189. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textproto-${testdrive.seed}')
  190. > CREATE TABLE bytesproto_tbl FROM SOURCE bytesproto (REFERENCE "testdrive-textproto-${testdrive.seed}")
  191. KEY FORMAT BYTES
  192. VALUE FORMAT PROTOBUF MESSAGE '.Test' USING SCHEMA '${test-schema}'
  193. ENVELOPE UPSERT
  194. $ kafka-ingest topic=textproto
  195. format=protobuf descriptor-file=test.pb message=Test
  196. key-format=bytes key-terminator=:
  197. fish:{"f": "one"}
  198. bìrd1:{"f": "two"}
  199. birdmore: {}
  200. > SELECT * FROM bytesproto_tbl
  201. fish one
  202. b\xc3\xacrd1 two
  203. birdmore ""
  204. > SELECT * FROM textproto_tbl
  205. fish one
  206. bìrd1 two
  207. birdmore ""
  208. $ kafka-ingest topic=textproto
  209. format=protobuf descriptor-file=test.pb message=Test
  210. key-format=bytes key-terminator=:
  211. mammal1: {"f": "three"}
  212. bìrd1:
  213. birdmore: {"f": "four"}
  214. mämmalmore: {"f": "five"}
  215. bìrd1: {"f": "six"}
  216. mammal1:
  217. mammalmore: {"f": "seven"}
  218. > SELECT * FROM bytesproto_tbl
  219. fish one
  220. birdmore four
  221. m\xc3\xa4mmalmore five
  222. b\xc3\xacrd1 six
  223. mammalmore seven
  224. > SELECT * FROM textproto_tbl
  225. fish one
  226. birdmore four
  227. mämmalmore five
  228. bìrd1 six
  229. mammalmore seven
  230. $ kafka-create-topic topic=nullkey partitions=1
  231. # A null key should result in an error decoding that row but not a panic
  232. $ kafka-ingest format=bytes topic=nullkey key-format=bytes key-terminator=:
  233. bird1:goose
  234. :geese
  235. mammal1:moose
  236. bird1:
  237. birdmore:geese
  238. mammalmore:moose
  239. mammal1:
  240. > CREATE CLUSTER nullkey_cluster SIZE '${arg.default-storage-size}';
  241. > CREATE SOURCE nullkey
  242. IN CLUSTER nullkey_cluster
  243. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nullkey-${testdrive.seed}')
  244. > CREATE TABLE nullkey_tbl FROM SOURCE nullkey (REFERENCE "testdrive-nullkey-${testdrive.seed}")
  245. KEY FORMAT TEXT
  246. VALUE FORMAT TEXT
  247. ENVELOPE UPSERT
  248. ! select * from nullkey_tbl
  249. contains: record with NULL key in UPSERT source
  250. ! select * from nullkey_tbl
  251. contains: to retract this error, produce a record upstream with a NULL key and NULL value
  252. # Ingest a null value for our null key, to retract it.
  253. $ kafka-ingest format=bytes topic=nullkey key-format=bytes key-terminator=:
  254. :
  255. # Now we should be able to query the source.
  256. > select * from nullkey_tbl
  257. key text
  258. -------------------
  259. birdmore geese
  260. mammalmore moose
  261. $ kafka-create-topic topic=realtimeavroavro partitions=1
  262. # test multi-field avro key
  263. $ set keyschema2={
  264. "type": "record",
  265. "name": "Key2",
  266. "fields": [
  267. {"name": "f3", "type": ["null", "string"]},
  268. {"name": "f4", "type": ["null", "string"]}
  269. ]
  270. }
  271. $ kafka-ingest format=avro topic=realtimeavroavro key-format=avro key-schema=${keyschema2} schema=${schema}
  272. {"f3": {"string": "fire"}, "f4": {"string": "yang"}} {"f1": "dog", "f2": 42}
  273. {"f3": null, "f4": {"string": "yin"}} {"f1": "sheep", "f2": 53}
  274. {"f3": {"string": "water"}, "f4": null} {"f1":"plesiosaur", "f2": 224}
  275. {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "turtle", "f2": 34}
  276. {"f3": null, "f4": {"string": "yin"}} {"f1": "sheep", "f2": 54}
  277. {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "snake", "f2": 68}
  278. {"f3": {"string": "water"}, "f4": null} {"f1": "crocodile", "f2": 7}
  279. {"f3": {"string": "earth"}, "f4":{"string": "dao"}}
  280. > CREATE CLUSTER realtimeavroavro_cluster SIZE '${arg.default-storage-size}';
  281. > CREATE SOURCE realtimeavroavro
  282. IN CLUSTER realtimeavroavro_cluster
  283. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  284. > CREATE TABLE realtimeavroavro_tbl (f3, f4, f1, f2) FROM SOURCE realtimeavroavro (REFERENCE "testdrive-realtimeavroavro-${testdrive.seed}")
  285. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  286. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  287. ENVELOPE UPSERT
  288. > CREATE MATERIALIZED VIEW realtimeavroavro_view as SELECT * from realtimeavroavro_tbl;
  289. > select f3, f4, f1, f2 from realtimeavroavro_view
  290. f3 f4 f1 f2
  291. -----------------------------------
  292. fire yang dog 42
  293. <null> yin sheep 54
  294. water <null> crocodile 7
  295. # Ensure that Upsert sources work with START OFFSET
  296. > CREATE CLUSTER realtimeavroavro_ff_cluster SIZE '${arg.default-storage-size}';
  297. > CREATE SOURCE realtimeavroavro_ff
  298. IN CLUSTER realtimeavroavro_ff_cluster
  299. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  300. > CREATE TABLE realtimeavroavro_ff_tbl FROM SOURCE realtimeavroavro_ff (REFERENCE "testdrive-realtimeavroavro-${testdrive.seed}")
  301. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  302. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  303. ENVELOPE UPSERT
  304. > SELECT * FROM realtimeavroavro_ff_tbl
  305. f3 f4 f1 f2
  306. -----------------------------------
  307. <null> yin sheep 54
  308. water <null> crocodile 7
  309. # ensure that having deletion on a key that never existed does not break anything
  310. $ kafka-ingest format=avro topic=realtimeavroavro key-format=avro key-schema=${keyschema2} schema=${schema}
  311. {"f3": {"string": "fire"}, "f4": {"string": "yin"}}
  312. {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "pigeon", "f2": 10}
  313. {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "owl", "f2": 15}
  314. {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "rhinoceros", "f2": 211}
  315. {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "chicken", "f2": 47}
  316. {"f3": null, "f4":{"string": "yin"}}
  317. {"f3": null, "f4":{"string": "yin"}} {"f1":"dog", "f2": 243}
  318. {"f3": {"string": "water"}, "f4": null}
  319. > select * from realtimeavroavro_view
  320. f3 f4 f1 f2
  321. -----------------------------------------
  322. fire yang dog 42
  323. air qi chicken 47
  324. <null> yin dog 243
  325. earth dao rhinoceros 211
  326. $ kafka-create-topic topic=realtimefilteravro
  327. $ set keyschema3={
  328. "type": "record",
  329. "name": "Key3",
  330. "fields": [
  331. {"name": "k1", "type": ["null", "string"]},
  332. {"name": "k2", "type": ["null", "long"]}
  333. ]
  334. }
  335. $ set schema2={
  336. "type": "record",
  337. "name": "test2",
  338. "fields": [
  339. {"name": "f1", "type": ["null", "string"]},
  340. {"name": "f2", "type": ["null", "long"]}
  341. ]
  342. }
  343. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  344. {"k1": null, "k2": {"long": 2}} {"f1": {"string": "date"}, "f2": {"long": 5}}
  345. {"k1": {"string": "épicerie"}, "k2": {"long": 10}} {"f1": {"string": "melon"}, "f2": {"long": 2}}
  346. {"k1": {"string": "boucherie"}, "k2": {"long": 5}} {"f1": {"string": "apple"}, "f2": {"long": 7}}
  347. {"k1": {"string": "boulangerie"}, "k2": null} {"f1":{"string": "date"}, "f2": {"long": 10}}
  348. {"k1": {"string": "épicerie"}, "k2": {"long": 10}} {"f1": {"string": "pear"}, "f2": {"long": 2}}
  349. {"k1": null, "k2": {"long": 2}} {"f1": {"string": "date"}, "f2": null}
  350. {"k1": {"string": "boulangerie"}, "k2": null} {"f1":null, "f2": {"long": 10}}
  351. {"k1": {"string": "boucherie"}, "k2": {"long": 5}} {"f1": {"string": "apple"}, "f2": {"long": 3}}
  352. > CREATE CLUSTER realtimefilteravro_cluster SIZE '${arg.default-storage-size}';
  353. > CREATE SOURCE realtimefilteravro
  354. IN CLUSTER realtimefilteravro_cluster
  355. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-realtimefilteravro-${testdrive.seed}')
  356. > CREATE TABLE realtimefilteravro_tbl FROM SOURCE realtimefilteravro (REFERENCE "testdrive-realtimefilteravro-${testdrive.seed}")
  357. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  358. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  359. ENVELOPE UPSERT
  360. # filter on key only
  361. > CREATE MATERIALIZED VIEW filterforkey AS
  362. SELECT f1 FROM realtimefilteravro_tbl WHERE k1='épicerie';
  363. > SELECT * from filterforkey
  364. f1
  365. ----
  366. pear
  367. # filter on value only
  368. > CREATE MATERIALIZED VIEW filterforvalue AS
  369. SELECT f2 FROM realtimefilteravro_tbl WHERE f1='date';
  370. > SELECT * from filterforvalue
  371. f2
  372. -------
  373. <null>
  374. # filter with a predicate containing key + value
  375. > CREATE MATERIALIZED VIEW filterforkeyvalue AS
  376. SELECT f1, f2 FROM realtimefilteravro_tbl WHERE k2+f2=12;
  377. > SELECT * from filterforkeyvalue
  378. f1 f2
  379. -------
  380. pear 2
  381. # filter on both a predicate containing a key and a predicate containing a value
  382. > CREATE MATERIALIZED VIEW keyfiltervaluefilter AS
  383. SELECT k1, k2 FROM realtimefilteravro_tbl WHERE k2 > 5 AND f2 < 5
  384. > SELECT * from keyfiltervaluefilter
  385. k1 k2
  386. -----------
  387. épicerie 10
  388. # add records that match the filter
  389. # make sure that rows that differ on unneeded key columns are treated as separate
  390. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  391. {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":null, "f2": {"long": 2}}
  392. {"k1": null, "k2": null} {"f1": {"string": "date"}, "f2": {"long": 5}}
  393. {"k1": {"string": "épicerie"}, "k2": {"long": 6}} {"f1": {"string": "pear"}, "f2": null}
  394. {"k1": {"string": "bureau"}, "k2": {"long": 6}} {"f1": {"string": "grape"}, "f2": {"long": 7}}
  395. > SELECT * from filterforkey
  396. f1
  397. ----
  398. pear
  399. pear
  400. > SELECT * from filterforvalue
  401. f2
  402. -------
  403. <null>
  404. 5
  405. > SELECT * from filterforkeyvalue
  406. f1 f2
  407. ---------
  408. pear 2
  409. <null> 2
  410. > SELECT * from keyfiltervaluefilter
  411. k1 k2
  412. -----------
  413. épicerie 10
  414. librairie 10
  415. # update records so that they don't match the filter
  416. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  417. {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":null, "f2": {"long": 6}}
  418. {"k1": null, "k2": null} {"f1": {"string": "grape"}, "f2": {"long": 5}}
  419. > SELECT * from filterforvalue
  420. f2
  421. -------
  422. <null>
  423. > SELECT * from filterforkeyvalue
  424. f1 f2
  425. ---------
  426. pear 2
  427. > SELECT * from keyfiltervaluefilter
  428. k1 k2
  429. -----------
  430. épicerie 10
  431. # update records so that they do match the filter
  432. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  433. {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":{"string": "melon"}, "f2": {"long": 2}}
  434. {"k1": null, "k2": null} {"f1": {"string": "date"}, "f2": {"long": 12}}
  435. > SELECT * from filterforvalue
  436. f2
  437. -------
  438. <null>
  439. 12
  440. > SELECT * from filterforkeyvalue
  441. f1 f2
  442. ---------
  443. pear 2
  444. melon 2
  445. > SELECT * from keyfiltervaluefilter
  446. k1 k2
  447. -----------
  448. épicerie 10
  449. librairie 10
  450. # delete records
  451. $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
  452. {"k1": {"string": "boucherie"}, "k2": {"long": 5}}
  453. {"k1": {"string": "épicerie"}, "k2": {"long": 10}}
  454. {"k1": {"string": "boulangerie"}, "k2": null}
  455. {"k1": null, "k2": {"long": 2}}
  456. > SELECT * from filterforkey
  457. f1
  458. ----
  459. pear
  460. > SELECT * from filterforvalue
  461. f2
  462. -------
  463. 12
  464. > SELECT * from filterforkeyvalue
  465. f1 f2
  466. ---------
  467. melon 2
  468. > SELECT * from keyfiltervaluefilter
  469. k1 k2
  470. -----------
  471. librairie 10
  472. > CREATE CLUSTER include_metadata_cluster SIZE '${arg.default-storage-size}';
  473. > CREATE SOURCE include_metadata
  474. IN CLUSTER include_metadata_cluster
  475. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  476. > CREATE TABLE include_metadata_tbl FROM SOURCE include_metadata (REFERENCE "testdrive-realtimeavroavro-${testdrive.seed}")
  477. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  478. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  479. INCLUDE PARTITION, OFFSET
  480. ENVELOPE UPSERT
  481. > SELECT * FROM include_metadata_tbl
  482. f3 f4 f1 f2 partition offset
  483. -------------------------------------------------------
  484. <null> yin dog 243 0 14
  485. air qi chicken 47 0 12
  486. earth dao rhinoceros 211 0 11
  487. > CREATE CLUSTER include_metadata_ts_cluster SIZE '${arg.default-storage-size}';
  488. > CREATE SOURCE include_metadata_ts
  489. IN CLUSTER include_metadata_ts_cluster
  490. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
  491. > CREATE TABLE include_metadata_ts_tbl FROM SOURCE include_metadata_ts (REFERENCE "testdrive-realtimeavroavro-${testdrive.seed}")
  492. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  493. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  494. INCLUDE PARTITION, OFFSET, TIMESTAMP as ts
  495. ENVELOPE UPSERT
  496. > SELECT "offset" FROM include_metadata_ts_tbl WHERE ts > '2021-01-01'
  497. offset
  498. ------
  499. 14
  500. 12
  501. 11
  502. > SELECT "offset" FROM include_metadata_ts_tbl WHERE ts < '2021-01-01'
  503. offset
  504. ------
  505. #
  506. # JSON UPSERT source
  507. $ kafka-create-topic topic=format-json-bytes partitions=1
  508. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json-bytes
  509. "object":{"a":"b","c":"d"}
  510. "array":[1,2,3]
  511. "int":1
  512. "float":1.23
  513. "str":"hello"
  514. > CREATE CLUSTER format_json_cluster SIZE '${arg.default-storage-size}';
  515. > CREATE SOURCE format_json
  516. IN CLUSTER format_json_cluster
  517. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-bytes-${testdrive.seed}');
  518. > CREATE TABLE format_json_tbl FROM SOURCE format_json (REFERENCE "testdrive-format-json-bytes-${testdrive.seed}")
  519. KEY FORMAT JSON
  520. VALUE FORMAT JSON
  521. ENVELOPE UPSERT;
  522. > SELECT * FROM format_json_tbl ORDER BY key
  523. "\"array\"" [1,2,3]
  524. "\"float\"" 1.23
  525. "\"int\"" 1
  526. "\"object\"" "{\"a\":\"b\",\"c\":\"d\"}"
  527. "\"str\"" "\"hello\""
  528. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json-bytes
  529. "object":{"y":"z"}
  530. "array":[10,9,8]
  531. "int":99
  532. "float":3.14
  533. "str":"bye"
  534. > SELECT * FROM format_json_tbl ORDER BY key
  535. "\"array\"" [10,9,8]
  536. "\"float\"" 3.14
  537. "\"int\"" 99
  538. "\"object\"" "{\"y\":\"z\"}"
  539. "\"str\"" "\"bye\""
  540. #
  541. # Test Inline error handling with value decode failures for a JSON source
  542. #
  543. $ kafka-create-topic topic=inline-value-errors-1 partitions=1
  544. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=inline-value-errors-1
  545. val1:{"a":,"c":"d"}
  546. val2:[1,2,
  547. > CREATE CLUSTER inline_error_cluster SIZE '${arg.default-storage-size}';
  548. > CREATE SOURCE value_decode_error
  549. IN CLUSTER inline_error_cluster
  550. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}');
  551. # should error without the feature flag enabled
  552. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  553. ALTER SYSTEM SET enable_envelope_upsert_inline_errors = false
  554. ! CREATE TABLE value_decode_error_tbl FROM SOURCE value_decode_error (REFERENCE "testdrive-inline-value-errors-1-${testdrive.seed}")
  555. KEY FORMAT TEXT
  556. VALUE FORMAT JSON
  557. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
  558. contains:The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT is not available
  559. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  560. ALTER SYSTEM SET enable_envelope_upsert_inline_errors = true
  561. # This source table will inline errors and not propagate them
  562. > CREATE TABLE value_decode_error_tbl
  563. FROM SOURCE value_decode_error (REFERENCE "testdrive-inline-value-errors-1-${testdrive.seed}")
  564. KEY FORMAT TEXT
  565. VALUE FORMAT JSON
  566. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
  567. # there is now an additional 'error' column that should store the inline decoding errors, if any
  568. # since this is a json value record the output relation just has a single jsonb column named 'data'
  569. # though for other types (e.g. avro) the value columns should be present and flattened.
  570. > SELECT key, data, error::text FROM value_decode_error_tbl ORDER BY key
  571. val1 <null> "(\"Failed to decode JSON: expected value at line 1 column 6 (original text: {\"\"a\"\":,\"\"c\"\":\"\"d\"\"}, original bytes: \"\"7b2261223a2c2263223a2264227d\"\")\")"
  572. val2 <null> "(\"Failed to decode JSON: EOF while parsing a value at line 1 column 5 (original text: [1,2,, original bytes: \"\"5b312c322c\"\")\")"
  573. # replace the bad values with good ones
  574. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=inline-value-errors-1
  575. val1:{"a": 1,"c":"d"}
  576. val2:[1,2,3]
  577. > SELECT key, data, error::text FROM value_decode_error_tbl ORDER BY key
  578. val1 "{\"a\":1,\"c\":\"d\"}" <null>
  579. val2 [1,2,3] <null>
  580. > CREATE SOURCE value_decode_error_error
  581. IN CLUSTER inline_error_cluster
  582. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}');
  583. ! CREATE TABLE value_decode_error_error_tbl
  584. FROM SOURCE value_decode_error_error (REFERENCE "testdrive-inline-value-errors-1-${testdrive.seed}")
  585. KEY FORMAT TEXT
  586. VALUE FORMAT JSON
  587. INCLUDE KEY AS error
  588. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
  589. contains: column "error" specified more than once
  590. #
  591. # Test Inline error handling with value decode failures for an AVRO source
  592. # that also uses a custom error column name
  593. #
  594. $ kafka-create-topic topic=inline-value-errors-2 partitions=1
  595. $ kafka-ingest format=avro topic=inline-value-errors-2 key-format=avro key-schema=${keyschema} schema=${schema}
  596. {"key": "fish"} {"f1": "fish", "f2": 1000}
  597. {"key": "bird1"} {"f1":"goose", "f2": 1}
  598. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  599. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  600. {"key": "bird1"}
  601. > CREATE CLUSTER inline_error_cluster_avro SIZE '${arg.default-storage-size}';
  602. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  603. ALTER SYSTEM SET enable_envelope_upsert_inline_errors = true
  604. # This source will inline errors and not propagate them
  605. > CREATE SOURCE value_decode_error_avro
  606. IN CLUSTER inline_error_cluster_avro
  607. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-2-${testdrive.seed}');
  608. > CREATE TABLE value_decode_error_avro_tbl FROM SOURCE value_decode_error_avro (REFERENCE "testdrive-inline-value-errors-2-${testdrive.seed}")
  609. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  610. ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE AS decode_error));
  611. # there is now an additional 'error' column that should store the inline decoding errors, if any
  612. > SELECT key, f1, f2, decode_error::text from value_decode_error_avro_tbl ORDER BY key
  613. birdmore geese 2 <null>
  614. fish fish 1000 <null>
  615. mammal1 moose 1 <null>
  616. # insert a bad record for the birdmore key
  617. $ kafka-ingest format=bytes key-format=avro key-schema=${keyschema} topic=inline-value-errors-2
  618. {"key": "birdmore"} "notvalidavro"
  619. > SELECT key, f1, f2, decode_error::text from value_decode_error_avro_tbl ORDER BY key
  620. birdmore <null> <null> "(\"avro deserialization error: wrong Confluent-style avro serialization magic: expected 0, got 32 (original text: \"\"notvalidavro\"\", original bytes: \"\"20226e6f7476616c69646176726f22\"\")\")"
  621. fish fish 1000 <null>
  622. mammal1 moose 1 <null>