123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- $ set-arg-default default-storage-size=1
- # This testdrive file is exactly the same as upsert-kafka.td, but using the new syntax.
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET max_clusters = 20
- $ set keyschema={
- "type": "record",
- "name": "Key",
- "fields": [
- {"name": "key", "type": "string"}
- ]
- }
- $ set schema={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"string"},
- {"name":"f2", "type":"long"}
- ]
- }
- $ kafka-create-topic topic=avroavro
- > CREATE CONNECTION kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- );
- $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
- {"key": "fish"} {"f1": "fish", "f2": 1000}
- {"key": "bird1"} {"f1":"goose", "f2": 1}
- {"key": "birdmore"} {"f1":"geese", "f2": 2}
- {"key": "mammal1"} {"f1": "moose", "f2": 1}
- {"key": "bird1"}
- {"key": "birdmore"} {"f1":"geese", "f2": 56}
- {"key": "mammalmore"} {"f1": "moose", "f2": 42}
- {"key": "mammal1"}
- {"key": "mammalmore"} {"f1":"moose", "f2": 2}
- > CREATE CLUSTER avroavro_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE avroavro
- IN CLUSTER avroavro_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- > SELECT * from avroavro
- key f1 f2
- ---------------------------
- fish fish 1000
- birdmore geese 56
- mammalmore moose 2
- $ kafka-create-topic topic=textavro
- $ kafka-ingest format=avro topic=textavro key-format=bytes key-terminator=: schema=${schema}
- fish: {"f1": "fish", "f2": 1000}
- bìrd1: {"f1":"goose", "f2": 1}
- birdmore: {"f1":"geese", "f2": 2}
- mammal1: {"f1": "moose", "f2": 1}
- > CREATE CLUSTER bytesavro_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE bytesavro
- IN CLUSTER bytesavro_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textavro-${testdrive.seed}')
- KEY FORMAT BYTES
- VALUE FORMAT AVRO USING SCHEMA '${schema}'
- ENVELOPE UPSERT
- > CREATE CLUSTER textavro_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE textavro
- IN CLUSTER textavro_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textavro-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT AVRO USING SCHEMA '${schema}'
- ENVELOPE UPSERT
- > select * from bytesavro
- key f1 f2
- ---------------------------
- fish fish 1000
- b\xc3\xacrd1 goose 1
- birdmore geese 2
- mammal1 moose 1
- $ kafka-ingest format=avro topic=textavro key-format=bytes key-terminator=: schema=${schema}
- bìrd1:
- birdmore: {"f1":"geese", "f2": 56}
- mämmalmore: {"f1": "moose", "f2": 42}
- mammal1:
- > select * from textavro
- key f1 f2
- ---------------------------
- fish fish 1000
- birdmore geese 56
- mämmalmore moose 42
- $ kafka-create-topic topic=textbytes partitions=1
- $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
- fish:fish
- bìrd1:goose
- bírdmore:geese
- mammal1:moose
- bìrd1:
- > CREATE CLUSTER texttext_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE texttext
- IN CLUSTER texttext_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
- KEY FORMAT TEXT VALUE FORMAT TEXT
- ENVELOPE UPSERT
- > CREATE CLUSTER textbytes_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE textbytes
- IN CLUSTER textbytes_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT BYTES
- ENVELOPE UPSERT
- > CREATE CLUSTER bytesbytes_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE bytesbytes
- IN CLUSTER bytesbytes_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
- KEY FORMAT BYTES
- VALUE FORMAT BYTES
- ENVELOPE UPSERT
- > CREATE CLUSTER bytestext_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE bytestext
- IN CLUSTER bytestext_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textbytes-${testdrive.seed}')
- KEY FORMAT BYTES
- VALUE FORMAT TEXT
- ENVELOPE UPSERT
- > select * from texttext
- key text
- -------------------
- fish fish
- bírdmore geese
- mammal1 moose
- $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
- bírdmore:géese
- mammalmore:moose
- mammal1:
- mammal1:mouse
- > select * from textbytes
- key data
- ---------------------------
- fish fish
- bírdmore g\xc3\xa9ese
- mammal1 mouse
- mammalmore moose
- $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
- mammalmore:herd
- > select * from bytesbytes
- key data
- ------------------------------
- fish fish
- b\xc3\xadrdmore g\xc3\xa9ese
- mammal1 mouse
- mammalmore herd
- $ kafka-ingest format=bytes topic=textbytes key-format=bytes key-terminator=:
- bìrd1:
- fish:
- > select * from bytestext
- key text
- -----------------------
- b\xc3\xadrdmore géese
- mammal1 mouse
- mammalmore herd
- $ file-append path=test.proto
- syntax = "proto3";
- message Test {
- string f = 1;
- }
- $ protobuf-compile-descriptors inputs=test.proto output=test.pb set-var=test-schema
- $ kafka-create-topic topic=textproto partitions=1
- > CREATE CLUSTER textproto_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE textproto
- IN CLUSTER textproto_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textproto-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT PROTOBUF MESSAGE '.Test' USING SCHEMA '${test-schema}'
- ENVELOPE UPSERT
- > CREATE CLUSTER bytesproto_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE bytesproto
- IN CLUSTER bytesproto_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-textproto-${testdrive.seed}')
- KEY FORMAT BYTES
- VALUE FORMAT PROTOBUF MESSAGE '.Test' USING SCHEMA '${test-schema}'
- ENVELOPE UPSERT
- $ kafka-ingest topic=textproto
- format=protobuf descriptor-file=test.pb message=Test
- key-format=bytes key-terminator=:
- fish:{"f": "one"}
- bìrd1:{"f": "two"}
- birdmore: {}
- > SELECT * FROM bytesproto
- fish one
- b\xc3\xacrd1 two
- birdmore ""
- > SELECT * FROM textproto
- fish one
- bìrd1 two
- birdmore ""
- $ kafka-ingest topic=textproto
- format=protobuf descriptor-file=test.pb message=Test
- key-format=bytes key-terminator=:
- mammal1: {"f": "three"}
- bìrd1:
- birdmore: {"f": "four"}
- mämmalmore: {"f": "five"}
- bìrd1: {"f": "six"}
- mammal1:
- mammalmore: {"f": "seven"}
- > SELECT * FROM bytesproto
- fish one
- birdmore four
- m\xc3\xa4mmalmore five
- b\xc3\xacrd1 six
- mammalmore seven
- > SELECT * FROM textproto
- fish one
- birdmore four
- mämmalmore five
- bìrd1 six
- mammalmore seven
- $ kafka-create-topic topic=nullkey partitions=1
- # A null key should result in an error decoding that row but not a panic
- $ kafka-ingest format=bytes topic=nullkey key-format=bytes key-terminator=:
- bird1:goose
- :geese
- mammal1:moose
- bird1:
- birdmore:geese
- mammalmore:moose
- mammal1:
- > CREATE CLUSTER nullkey_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE nullkey
- IN CLUSTER nullkey_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nullkey-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT TEXT
- ENVELOPE UPSERT
- ! select * from nullkey
- contains: record with NULL key in UPSERT source
- ! select * from nullkey
- contains: to retract this error, produce a record upstream with a NULL key and NULL value
- # Ingest a null value for our null key, to retract it.
- $ kafka-ingest format=bytes topic=nullkey key-format=bytes key-terminator=:
- :
- # Now we should be able to query the source.
- > select * from nullkey
- key text
- -------------------
- birdmore geese
- mammalmore moose
- $ kafka-create-topic topic=realtimeavroavro partitions=1
- # test multi-field avro key
- $ set keyschema2={
- "type": "record",
- "name": "Key2",
- "fields": [
- {"name": "f3", "type": ["null", "string"]},
- {"name": "f4", "type": ["null", "string"]}
- ]
- }
- $ kafka-ingest format=avro topic=realtimeavroavro key-format=avro key-schema=${keyschema2} schema=${schema}
- {"f3": {"string": "fire"}, "f4": {"string": "yang"}} {"f1": "dog", "f2": 42}
- {"f3": null, "f4": {"string": "yin"}} {"f1": "sheep", "f2": 53}
- {"f3": {"string": "water"}, "f4": null} {"f1":"plesiosaur", "f2": 224}
- {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "turtle", "f2": 34}
- {"f3": null, "f4": {"string": "yin"}} {"f1": "sheep", "f2": 54}
- {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "snake", "f2": 68}
- {"f3": {"string": "water"}, "f4": null} {"f1": "crocodile", "f2": 7}
- {"f3": {"string": "earth"}, "f4":{"string": "dao"}}
- > CREATE CLUSTER realtimeavroavro_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE realtimeavroavro (f3, f4, f1, f2)
- IN CLUSTER realtimeavroavro_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
- KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- > CREATE MATERIALIZED VIEW realtimeavroavro_view as SELECT * from realtimeavroavro;
- > select f3, f4, f1, f2 from realtimeavroavro_view
- f3 f4 f1 f2
- -----------------------------------
- fire yang dog 42
- <null> yin sheep 54
- water <null> crocodile 7
- # Ensure that Upsert sources work with START OFFSET
- > CREATE CLUSTER realtimeavroavro_ff_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE realtimeavroavro_ff
- IN CLUSTER realtimeavroavro_ff_cluster
- FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
- KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- > SELECT * FROM realtimeavroavro_ff
- f3 f4 f1 f2
- -----------------------------------
- <null> yin sheep 54
- water <null> crocodile 7
- # ensure that having deletion on a key that never existed does not break anything
- $ kafka-ingest format=avro topic=realtimeavroavro key-format=avro key-schema=${keyschema2} schema=${schema}
- {"f3": {"string": "fire"}, "f4": {"string": "yin"}}
- {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "pigeon", "f2": 10}
- {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "owl", "f2": 15}
- {"f3": {"string": "earth"}, "f4": {"string": "dao"}} {"f1": "rhinoceros", "f2": 211}
- {"f3": {"string": "air"}, "f4":{"string": "qi"}} {"f1": "chicken", "f2": 47}
- {"f3": null, "f4":{"string": "yin"}}
- {"f3": null, "f4":{"string": "yin"}} {"f1":"dog", "f2": 243}
- {"f3": {"string": "water"}, "f4": null}
- > select * from realtimeavroavro_view
- f3 f4 f1 f2
- -----------------------------------------
- fire yang dog 42
- air qi chicken 47
- <null> yin dog 243
- earth dao rhinoceros 211
- $ kafka-create-topic topic=realtimefilteravro
- $ set keyschema3={
- "type": "record",
- "name": "Key3",
- "fields": [
- {"name": "k1", "type": ["null", "string"]},
- {"name": "k2", "type": ["null", "long"]}
- ]
- }
- $ set schema2={
- "type": "record",
- "name": "test2",
- "fields": [
- {"name": "f1", "type": ["null", "string"]},
- {"name": "f2", "type": ["null", "long"]}
- ]
- }
- $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
- {"k1": null, "k2": {"long": 2}} {"f1": {"string": "date"}, "f2": {"long": 5}}
- {"k1": {"string": "épicerie"}, "k2": {"long": 10}} {"f1": {"string": "melon"}, "f2": {"long": 2}}
- {"k1": {"string": "boucherie"}, "k2": {"long": 5}} {"f1": {"string": "apple"}, "f2": {"long": 7}}
- {"k1": {"string": "boulangerie"}, "k2": null} {"f1":{"string": "date"}, "f2": {"long": 10}}
- {"k1": {"string": "épicerie"}, "k2": {"long": 10}} {"f1": {"string": "pear"}, "f2": {"long": 2}}
- {"k1": null, "k2": {"long": 2}} {"f1": {"string": "date"}, "f2": null}
- {"k1": {"string": "boulangerie"}, "k2": null} {"f1":null, "f2": {"long": 10}}
- {"k1": {"string": "boucherie"}, "k2": {"long": 5}} {"f1": {"string": "apple"}, "f2": {"long": 3}}
- > CREATE CLUSTER realtimefilteravro_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE realtimefilteravro
- IN CLUSTER realtimefilteravro_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-realtimefilteravro-${testdrive.seed}')
- KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- # filter on key only
- > CREATE MATERIALIZED VIEW filterforkey AS
- SELECT f1 FROM realtimefilteravro WHERE k1='épicerie';
- > SELECT * from filterforkey
- f1
- ----
- pear
- # filter on value only
- > CREATE MATERIALIZED VIEW filterforvalue AS
- SELECT f2 FROM realtimefilteravro WHERE f1='date';
- > SELECT * from filterforvalue
- f2
- -------
- <null>
- # filter with a predicate containing key + value
- > CREATE MATERIALIZED VIEW filterforkeyvalue AS
- SELECT f1, f2 FROM realtimefilteravro WHERE k2+f2=12;
- > SELECT * from filterforkeyvalue
- f1 f2
- -------
- pear 2
- # filter on both a predicate containing a key and a predicate containing a value
- > CREATE MATERIALIZED VIEW keyfiltervaluefilter AS
- SELECT k1, k2 FROM realtimefilteravro WHERE k2 > 5 AND f2 < 5
- > SELECT * from keyfiltervaluefilter
- k1 k2
- -----------
- épicerie 10
- # add records that match the filter
- # make sure that rows that differ on unneeded key columns are treated as separate
- $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
- {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":null, "f2": {"long": 2}}
- {"k1": null, "k2": null} {"f1": {"string": "date"}, "f2": {"long": 5}}
- {"k1": {"string": "épicerie"}, "k2": {"long": 6}} {"f1": {"string": "pear"}, "f2": null}
- {"k1": {"string": "bureau"}, "k2": {"long": 6}} {"f1": {"string": "grape"}, "f2": {"long": 7}}
- > SELECT * from filterforkey
- f1
- ----
- pear
- pear
- > SELECT * from filterforvalue
- f2
- -------
- <null>
- 5
- > SELECT * from filterforkeyvalue
- f1 f2
- ---------
- pear 2
- <null> 2
- > SELECT * from keyfiltervaluefilter
- k1 k2
- -----------
- épicerie 10
- librairie 10
- # update records so that they don't match the filter
- $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
- {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":null, "f2": {"long": 6}}
- {"k1": null, "k2": null} {"f1": {"string": "grape"}, "f2": {"long": 5}}
- > SELECT * from filterforvalue
- f2
- -------
- <null>
- > SELECT * from filterforkeyvalue
- f1 f2
- ---------
- pear 2
- > SELECT * from keyfiltervaluefilter
- k1 k2
- -----------
- épicerie 10
- # update records so that they do match the filter
- $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
- {"k1": {"string": "librairie"}, "k2": {"long": 10}} {"f1":{"string": "melon"}, "f2": {"long": 2}}
- {"k1": null, "k2": null} {"f1": {"string": "date"}, "f2": {"long": 12}}
- > SELECT * from filterforvalue
- f2
- -------
- <null>
- 12
- > SELECT * from filterforkeyvalue
- f1 f2
- ---------
- pear 2
- melon 2
- > SELECT * from keyfiltervaluefilter
- k1 k2
- -----------
- épicerie 10
- librairie 10
- # delete records
- $ kafka-ingest format=avro topic=realtimefilteravro key-format=avro key-schema=${keyschema3} schema=${schema2}
- {"k1": {"string": "boucherie"}, "k2": {"long": 5}}
- {"k1": {"string": "épicerie"}, "k2": {"long": 10}}
- {"k1": {"string": "boulangerie"}, "k2": null}
- {"k1": null, "k2": {"long": 2}}
- > SELECT * from filterforkey
- f1
- ----
- pear
- > SELECT * from filterforvalue
- f2
- -------
- 12
- > SELECT * from filterforkeyvalue
- f1 f2
- ---------
- melon 2
- > SELECT * from keyfiltervaluefilter
- k1 k2
- -----------
- librairie 10
- > CREATE CLUSTER include_metadata_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE include_metadata
- IN CLUSTER include_metadata_cluster
- FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
- KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- INCLUDE PARTITION, OFFSET
- ENVELOPE UPSERT
- > SELECT * FROM include_metadata
- f3 f4 f1 f2 partition offset
- -------------------------------------------------------
- <null> yin dog 243 0 14
- air qi chicken 47 0 12
- earth dao rhinoceros 211 0 11
- > CREATE CLUSTER include_metadata_ts_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE include_metadata_ts
- IN CLUSTER include_metadata_ts_cluster
- FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-realtimeavroavro-${testdrive.seed}')
- KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- INCLUDE PARTITION, OFFSET, TIMESTAMP as ts
- ENVELOPE UPSERT
- > SELECT "offset" FROM include_metadata_ts WHERE ts > '2021-01-01'
- offset
- ------
- 14
- 12
- 11
- > SELECT "offset" FROM include_metadata_ts WHERE ts < '2021-01-01'
- offset
- ------
- #
- # JSON UPSERT source
- $ kafka-create-topic topic=format-json-bytes partitions=1
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json-bytes
- "object":{"a":"b","c":"d"}
- "array":[1,2,3]
- "int":1
- "float":1.23
- "str":"hello"
- > CREATE CLUSTER format_json_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE format_json
- IN CLUSTER format_json_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-bytes-${testdrive.seed}')
- KEY FORMAT JSON
- VALUE FORMAT JSON
- ENVELOPE UPSERT;
- > SELECT * FROM format_json ORDER BY key
- "\"array\"" [1,2,3]
- "\"float\"" 1.23
- "\"int\"" 1
- "\"object\"" "{\"a\":\"b\",\"c\":\"d\"}"
- "\"str\"" "\"hello\""
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json-bytes
- "object":{"y":"z"}
- "array":[10,9,8]
- "int":99
- "float":3.14
- "str":"bye"
- > SELECT * FROM format_json ORDER BY key
- "\"array\"" [10,9,8]
- "\"float\"" 3.14
- "\"int\"" 99
- "\"object\"" "{\"y\":\"z\"}"
- "\"str\"" "\"bye\""
- #
- # Test Inline error handling with value decode failures for a JSON source
- #
- $ kafka-create-topic topic=inline-value-errors-1 partitions=1
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=inline-value-errors-1
- val1:{"a":,"c":"d"}
- val2:[1,2,
- > CREATE CLUSTER inline_error_cluster SIZE '${arg.default-storage-size}';
- # should error without the feature flag enabled
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_envelope_upsert_inline_errors = false
- ! CREATE SOURCE value_decode_error
- IN CLUSTER inline_error_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT JSON
- ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
- contains:The VALUE DECODING ERRORS = INLINE option on ENVELOPE UPSERT is not available
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_envelope_upsert_inline_errors = true
- # This source will inline errors and not propagate them
- > CREATE SOURCE value_decode_error
- IN CLUSTER inline_error_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT JSON
- ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
- # there is now an additional 'error' column that should store the inline decoding errors, if any
- # since this is a json value record the output relation just has a single jsonb column named 'data'
- # though for other types (e.g. avro) the value columns should be present and flattened.
- > SELECT key, data, error::text FROM value_decode_error ORDER BY key
- val1 <null> "(\"Failed to decode JSON: expected value at line 1 column 6 (original text: {\"\"a\"\":,\"\"c\"\":\"\"d\"\"}, original bytes: \"\"7b2261223a2c2263223a2264227d\"\")\")"
- val2 <null> "(\"Failed to decode JSON: EOF while parsing a value at line 1 column 5 (original text: [1,2,, original bytes: \"\"5b312c322c\"\")\")"
- # replace the bad values with good ones
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=inline-value-errors-1
- val1:{"a": 1,"c":"d"}
- val2:[1,2,3]
- > SELECT key, data, error::text FROM value_decode_error ORDER BY key
- val1 "{\"a\":1,\"c\":\"d\"}" <null>
- val2 [1,2,3] <null>
- ! CREATE SOURCE value_decode_error_error
- IN CLUSTER inline_error_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-1-${testdrive.seed}')
- KEY FORMAT TEXT
- VALUE FORMAT JSON
- INCLUDE KEY AS error
- ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE));
- contains: column "error" specified more than once
- #
- # Test Inline error handling with value decode failures for an AVRO source
- # that also uses a custom error column name
- #
- $ kafka-create-topic topic=inline-value-errors-2 partitions=1
- $ kafka-ingest format=avro topic=inline-value-errors-2 key-format=avro key-schema=${keyschema} schema=${schema}
- {"key": "fish"} {"f1": "fish", "f2": 1000}
- {"key": "bird1"} {"f1":"goose", "f2": 1}
- {"key": "birdmore"} {"f1":"geese", "f2": 2}
- {"key": "mammal1"} {"f1": "moose", "f2": 1}
- {"key": "bird1"}
- > CREATE CLUSTER inline_error_cluster_avro SIZE '${arg.default-storage-size}';
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_envelope_upsert_inline_errors = true
- # This source will inline errors and not propagate them
- > CREATE SOURCE value_decode_error_avro
- IN CLUSTER inline_error_cluster_avro
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-inline-value-errors-2-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT (VALUE DECODING ERRORS = (INLINE AS decode_error));
- # there is now an additional 'error' column that should store the inline decoding errors, if any
- > SELECT key, f1, f2, decode_error::text from value_decode_error_avro ORDER BY key
- birdmore geese 2 <null>
- fish fish 1000 <null>
- mammal1 moose 1 <null>
- # insert a bad record for the birdmore key
- $ kafka-ingest format=bytes key-format=avro key-schema=${keyschema} topic=inline-value-errors-2
- {"key": "birdmore"} "notvalidavro"
- > SELECT key, f1, f2, decode_error::text from value_decode_error_avro ORDER BY key
- birdmore <null> <null> "(\"avro deserialization error: wrong Confluent-style avro serialization magic: expected 0, got 32 (original text: \"\"notvalidavro\"\", original bytes: \"\"20226e6f7476616c69646176726f22\"\")\")"
- fish fish 1000 <null>
- mammal1 moose 1 <null>
|