123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- $ set-arg-default default-storage-size=1
- $ set-arg-default single-replica-cluster=quickstart
- # Test Avro Sinks in general. This tests things that are not specific to an
- # envelope. Mostly that we can correctly encode various data types and how we
- # determine field names. This uses ENVELOPE DEBEZIUM implicitly but the tested
- # behavior is not specific to DEBEZIUM sinks.
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET max_clusters = 20
- # Test that we invent field names for unnamed columns.
- > CREATE CONNECTION kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- );
- # Test interval type.
- > CREATE MATERIALIZED VIEW interval_data (interval) AS VALUES
- (INTERVAL '0s'),
- (INTERVAL '1month 1day 1us'),
- (INTERVAL '-1month -1day -1us'),
- (INTERVAL '-178000000 years'),
- (INTERVAL '178000000 years')
- > CREATE CLUSTER interval_data_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK interval_data_sink
- IN CLUSTER interval_data_sink_cluster
- FROM interval_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-interval-data-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.interval_data_sink sort-messages=true
- {"before": null, "after": {"row": {"interval": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}}
- {"before": null, "after": {"row": {"interval": [0, 198, 80, 127, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}}
- {"before": null, "after": {"row": {"interval": [0, 58, 175, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}}}
- {"before": null, "after": {"row": {"interval": [1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]}}}
- {"before": null, "after": {"row": {"interval": [255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255]}}}
- # See database-issues#2957
- #> CREATE MATERIALIZED VIEW unnamed_cols AS SELECT 1, 2 AS b, 3;
- #
- #> CREATE SINK unnamed_cols_sink
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM unnamed_cols
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-unnamed-cols-sink-${testdrive.seed}')
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- #
- #$ kafka-verify-data format=avro sink=materialize.public.unnamed_cols_sink
- #{"before": null, "after": {"row": {"column1": 1, "b": 2, "column3": 3}}}
- # Test that invented field names do not clash with named columns.
- # See database-issues#2957
- #> CREATE MATERIALIZED VIEW clashing_cols AS SELECT 1, 2 AS column1, 3 as b, 4 as b2, 5 as b3;
- #
- #> CREATE SINK clashing_cols_sink
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM clashing_cols
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-clashing-cols-sink-${testdrive.seed}')
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- #
- #$ kafka-verify-data format=avro sink=materialize.public.clashing_cols_sink
- #{"before": null, "after": {"row": {"column1": 1, "column1_1": 2, "b": 3, "b2": 4, "b3": 5}}}
- # Test date/time types.
- > CREATE MATERIALIZED VIEW datetime_data (date, ts, ts_tz) AS VALUES
- (DATE '2000-01-01', TIMESTAMP '2000-01-01 10:10:10.111', TIMESTAMPTZ '2000-01-01 10:10:10.111+02'),
- (DATE '2000-02-01', TIMESTAMP '2000-02-01 10:10:10.111', TIMESTAMPTZ '2000-02-01 10:10:10.111+02'),
- (('0001-01-01'::DATE - '1721389days'::INTERVAL)::DATE, ('0001-01-01'::DATE - '1721389days'::INTERVAL)::TIMESTAMP, ('0001-01-01'::DATE - '1721389days'::INTERVAL)::TIMESTAMPTZ),
- (('0001-01-01'::DATE + '262141years 11months 30days'::INTERVAL)::DATE, ('0001-01-01'::DATE + '262141years 11months 30days'::INTERVAL)::TIMESTAMP, ('0001-01-01'::DATE + '262141years 11months 30days'::INTERVAL)::TIMESTAMPTZ)
- > CREATE CLUSTER datetime_data_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK datetime_data_sink
- IN CLUSTER datetime_data_sink_cluster
- FROM datetime_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-datetime-data-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.datetime_data_sink sort-messages=true
- {"before": null, "after": {"row": {"date": -2440551, "ts": -210863606400000000, "ts_tz": -210863606400000000}}}
- {"before": null, "after": {"row": {"date": 10957, "ts": 946721410111000, "ts_tz": 946714210111000}}}
- {"before": null, "after": {"row": {"date": 10988, "ts": 949399810111000, "ts_tz": 949392610111000}}}
- {"before": null, "after": {"row": {"date": 95026236, "ts": 8210266790400000000, "ts_tz": 8210266790400000000}}}
- > CREATE MATERIALIZED VIEW time_data (time) AS VALUES (TIME '01:02:03'), (TIME '01:02:04'), (TIME '00:00:00'), (TIME '23:59:59')
- > CREATE CLUSTER time_data_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK time_data_sink
- IN CLUSTER time_data_sink_cluster
- FROM time_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-time-data-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.time_data_sink sort-messages=true
- {"before": null, "after": {"row": {"time": 0}}}
- {"before": null, "after": {"row": {"time": 3723000000}}}
- {"before": null, "after": {"row": {"time": 3724000000}}}
- {"before": null, "after": {"row": {"time": 86399000000}}}
- # Test jsonb
- > CREATE MATERIALIZED VIEW json_data (a, b) AS VALUES ('{"a":1, "b":2}'::jsonb, 2)
- # Sinks with JSON columns should not crash - see https://github.com/MaterializeInc/database-issues/issues/1477
- > CREATE CLUSTER json_data_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK json_data_sink
- IN CLUSTER json_data_sink_cluster
- FROM json_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-json-data-sink-${testdrive.seed}')
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- # Test map
- > CREATE MATERIALIZED VIEW map_data (map) AS SELECT '{a => 1, b => 2}'::map[text=>int];
- > CREATE CLUSTER map_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK map_sink
- IN CLUSTER map_sink_cluster
- FROM map_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-map-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.map_sink sort-messages=true
- {"before": null, "after": {"row": {"map": {"a": {"int": 1}, "b": {"int": 2}}}}}
- > CREATE MATERIALIZED VIEW list_data (list) AS SELECT LIST[1, 2];
- > CREATE CLUSTER list_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK list_sink
- IN CLUSTER list_sink_cluster
- FROM list_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-list-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.list_sink sort-messages=true
- {"before": null, "after": {"row": {"list": [{"int": 1}, {"int": 2}]}}}
- # Test optional namespace for auto-generated value schema
- > CREATE MATERIALIZED VIEW namespace_value_data (namespace) AS SELECT 1;
- > CREATE CLUSTER namespace_value_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK namespace_value_sink
- IN CLUSTER namespace_value_sink_cluster
- FROM namespace_value_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-namespace-value-sink-${testdrive.seed}')
- FORMAT AVRO USING
- CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO VALUE FULLNAME = 'abc.def.ghi')
- ENVELOPE DEBEZIUM
- $ schema-registry-verify schema-type=avro subject=testdrive-namespace-value-sink-${testdrive.seed}-value
- {"type":"record","name":"ghi","namespace":"abc.def","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"namespace","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
- $ kafka-verify-data format=avro sink=materialize.public.namespace_value_sink sort-messages=true
- {"before": null, "after": {"row": {"namespace": 1}}}
- # Test optional namespaces for autogenerated key and value schemas
- > CREATE MATERIALIZED VIEW namespace_key_value_data (a, b) AS SELECT * FROM (VALUES (1, 2));
- > CREATE CLUSTER namespace_key_value_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK namespace_key_value_sink
- IN CLUSTER namespace_key_value_sink_cluster
- FROM namespace_key_value_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-namespace-key-value-sink-${testdrive.seed}')
- KEY (b)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'some.neat.class.foo', AVRO VALUE FULLNAME = 'some.neat.class.bar')
- ENVELOPE DEBEZIUM
- $ schema-registry-verify schema-type=avro subject=testdrive-namespace-key-value-sink-${testdrive.seed}-key
- {"type":"record","name":"foo","namespace":"some.neat.class","fields":[{"name":"b","type":"int"}]}
- $ schema-registry-verify schema-type=avro subject=testdrive-namespace-key-value-sink-${testdrive.seed}-value
- {"type":"record","name":"bar","namespace":"some.neat.class","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
- $ kafka-verify-data format=avro sink=materialize.public.namespace_key_value_sink sort-messages=true
- {"b": 2} {"before": null, "after": {"row": {"a": 1, "b": 2}}}
- > SELECT key_format, value_format, format FROM mz_sinks WHERE name = 'namespace_key_value_sink'
- avro avro avro
- # Test setting the compatibility level for both topics
- > CREATE MATERIALIZED VIEW compat_level_data (a, b) AS SELECT * FROM (VALUES (1, 2));
- > CREATE CLUSTER compat_level_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK compat_level_sink
- IN CLUSTER compat_level_sink_cluster
- FROM compat_level_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-compat-level-sink-${testdrive.seed}')
- KEY (b)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (
- KEY COMPATIBILITY LEVEL 'FULL_TRANSITIVE',
- VALUE COMPATIBILITY LEVEL 'FORWARD_TRANSITIVE'
- )
- ENVELOPE DEBEZIUM
- $ schema-registry-verify schema-type=avro subject=testdrive-compat-level-sink-${testdrive.seed}-key compatibility-level=FULL_TRANSITIVE
- {"type":"record","name":"row","fields":[{"name":"b","type":"int"}]}
- $ schema-registry-verify schema-type=avro subject=testdrive-compat-level-sink-${testdrive.seed}-value compatibility-level=FORWARD_TRANSITIVE
- {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
- $ kafka-verify-data format=avro sink=materialize.public.compat_level_sink sort-messages=true
- {"b": 2} {"before": null, "after": {"row": {"a": 1, "b": 2}}}
- # Test a sink with an Avro value format and Json key format
- > CREATE MATERIALIZED VIEW mixed_format_data (a, b) AS SELECT * FROM (VALUES (1, 2));
- > CREATE CLUSTER mixed_format_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK mixed_format_sink
- IN CLUSTER mixed_format_sink_cluster
- FROM mixed_format_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-mixed-format-sink-${testdrive.seed}')
- KEY (b)
- KEY FORMAT JSON
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (
- VALUE COMPATIBILITY LEVEL 'FORWARD_TRANSITIVE'
- )
- ENVELOPE DEBEZIUM
- $ schema-registry-verify schema-type=avro subject=testdrive-mixed-format-sink-${testdrive.seed}-value compatibility-level=FORWARD_TRANSITIVE
- {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
- $ kafka-verify-data key-format=json value-format=avro sink=materialize.public.mixed_format_sink sort-messages=true
- {"b": 2} {"before": null, "after": {"row": {"a": 1, "b": 2}}}
- > SELECT key_format, value_format, format FROM mz_sinks WHERE name = 'mixed_format_sink'
- json avro key-json-value-avro
- # Test a sink over a view whose column names are not directly usable as
- # Avro schema names.
- > CREATE MATERIALIZED VIEW tricky_names AS SELECT 1 AS "tricky-name", 1 AS "tricky!name", 1 AS "tricky@name", 1 AS "666";
- > CREATE CLUSTER tricky_sink_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK tricky_sink
- IN CLUSTER tricky_sink_cluster
- FROM tricky_names
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-tricky-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ schema-registry-verify schema-type=avro subject=testdrive-tricky-sink-${testdrive.seed}-value
- {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"tricky_name","type":"int"},{"name":"tricky_name1","type":"int"},{"name":"tricky_name2","type":"int"},{"name":"_666","type":"int"}]}]},{"name":"after","type":["null","row"]}]}
- # Bad Sinks
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM namespace_key_value_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bad-sink-${testdrive.seed}')
- KEY (b)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'bad-name', AVRO VALUE FULLNAME = 'goodname')
- ENVELOPE DEBEZIUM
- contains:Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: bad-name
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM namespace_key_value_data
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-bad-sink-${testdrive.seed}')
- KEY (b)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'goodname', AVRO VALUE FULLNAME = 'bad-name')
- ENVELOPE DEBEZIUM
- contains:Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: bad-name
- > CREATE MATERIALIZED VIEW input (a, b) AS SELECT * FROM (VALUES (1, 2))
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM input
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}') KEY (a, a)
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- contains:duplicate column referenced in KEY: a
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM input
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO VALUE FULLNAME = 'some.neat.class.foo', AVRO KEY FULLNAME = 'some.neat.class.bar')
- ENVELOPE DEBEZIUM
- contains:Cannot specify AVRO KEY FULLNAME without a corresponding KEY field
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM input
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'some.neat.class.bar')
- ENVELOPE DEBEZIUM
- contains:Cannot specify AVRO KEY FULLNAME without a corresponding KEY field
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM input
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}') KEY (a)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO KEY FULLNAME = 'some.neat.class.bar')
- ENVELOPE DEBEZIUM
- contains:Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names
- ! CREATE SINK bad_sink
- IN CLUSTER ${arg.single-replica-cluster}
- FROM input
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-input-sink-${testdrive.seed}') KEY (a)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn (AVRO VALUE FULLNAME = 'some.neat.class.bar')
- ENVELOPE DEBEZIUM
- contains:Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names
|