123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- $ set-arg-default default-storage-size=1
- $ set-arg-default default-replica-size=1
- $ set-arg-default single-replica-cluster=quickstart
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET max_clusters = 20
- $ kafka-create-topic topic=test partitions=1
- $ kafka-ingest topic=test format=bytes
- jack,jill
- goofus,gallant
- > CREATE CONNECTION kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- );
- > CREATE CLUSTER src_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE src (a, b)
- IN CLUSTER src_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
- FORMAT CSV WITH 2 COLUMNS
- INCLUDE OFFSET
- > CREATE CLUSTER src_materialized_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE src_materialized (a, b)
- IN CLUSTER src_materialized_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
- FORMAT CSV WITH 2 COLUMNS
- INCLUDE OFFSET
- > CREATE VIEW v1 AS
- SELECT a || b AS c FROM src
- > CREATE VIEW v2 AS
- SELECT a || b AS c FROM src_materialized
- > CREATE MATERIALIZED VIEW v3 AS
- SELECT a || b AS c FROM src
- # We should refuse to create a sink with invalid WITH options
- ! CREATE SINK invalid_with_option
- IN CLUSTER ${arg.single-replica-cluster}
- FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- WITH (badoption=true)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- contains:Expected one of PARTITION or SNAPSHOT or VERSION
- ! CREATE SINK invalid_with_option
- IN CLUSTER ${arg.single-replica-cluster}
- FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- WITH (VERSION=1)
- contains:CREATE SINK...WITH (VERSION..) is not allowed
- > SHOW SINKS
- name type cluster comment
- -------------------------------------------
- # # We should refuse to create a sink with an invalid schema registry URL.
- #
- # # Invalid in that the address is not well formed
- # ! CREATE SINK bad_schema_registry
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM v3
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.kafka-addr}'
- # contains:cannot construct a CCSR client with a cannot-be-a-base URL
- #
- # # Invalid in that the address points to an invalid host
- # ! CREATE SINK bad_schema_registry
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM v3
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://no-such-host'
- # contains:unable to publish value schema to registry in kafka sink
- #
- # # Invalid in that the address is not for a schema registry
- # ! CREATE SINK bad_schema_registry
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM v3
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://materialized:6875'
- # contains:unable to publish value schema to registry in kafka sink
- #
- # ! CREATE SINK bad_view
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM v1
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- # contains:v1 is a view, which cannot be exported as a sink
- #
- # # ...Even if that view is based on a materialized source
- # ! CREATE SINK bad_view2
- # IN CLUSTER ${arg.single-replica-cluster}
- # FROM v2
- # INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- # WITH (retention_ms=1000000)
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- # contains:v2 is a view, which cannot be exported as a sink
- #
- # > SHOW SINKS
- # name type size cluster
- # ---------------------------------------
- # N.B. it is important to test sinks that depend on sources directly vs. sinks
- # that depend on views, as the code paths are different.
- > CREATE CLUSTER snk1_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk1
- IN CLUSTER snk1_cluster
- FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > CREATE CLUSTER snk2_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk2
- IN CLUSTER snk2_cluster
- FROM src_materialized
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk2-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > CREATE CLUSTER snk3_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk3
- IN CLUSTER snk3_cluster
- FROM v3
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk3-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > SHOW SINKS
- name type cluster comment
- -----------------------------------------------
- snk1 kafka snk1_cluster ""
- snk2 kafka snk2_cluster ""
- snk3 kafka snk3_cluster ""
- $ kafka-verify-data format=avro sink=materialize.public.snk1 sort-messages=true
- {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
- {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
- $ kafka-verify-data format=avro sink=materialize.public.snk2 sort-messages=true
- {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
- {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
- $ kafka-verify-data format=avro sink=materialize.public.snk3 sort-messages=true
- {"before": null, "after": {"row":{"c": "goofusgallant"}}}
- {"before": null, "after": {"row":{"c": "jackjill"}}}
- # Test Avro serialization of unsigned values.
- > CREATE MATERIALIZED VIEW unsigned (a, b, c, d, e, f) AS
- VALUES ('1'::uint2, '2'::uint2, '3'::uint4, '4'::uint4, '5'::uint8, '6'::uint8)
- > CREATE CLUSTER snk_unsigned_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk_unsigned
- IN CLUSTER snk_unsigned_cluster
- FROM unsigned
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'snk2')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.snk_unsigned sort-messages=true
- {"before": null, "after": {"row":{"a": [0, 1], "b": [0, 2], "c": [0, 0, 0, 3], "d": [0, 0, 0, 4], "e": [0, 0, 0, 0, 0, 0, 0, 5], "f": [0, 0, 0, 0, 0, 0, 0, 6]}}}
- # Test the case where we have non +/- 1 multiplicities
- > CREATE MATERIALIZED VIEW v4 AS
- SELECT true AS c FROM src
- > CREATE CLUSTER snk4_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk4
- IN CLUSTER snk4_cluster
- FROM v4
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk4-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.snk4
- {"before": null, "after": {"row":{"c": true}}}
- {"before": null, "after": {"row":{"c": true}}}
- # Test WITH (SNAPSHOT).
- #
- # N.B. It's important that we've verified above that a sink exporting
- # src_materialized has processed the row. This means the data has a definite
- # timestamp. Without that, WITH (SNAPSHOT = false) could correct either include or
- # exclude the old rows.
- > CREATE CLUSTER snk5_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk5
- IN CLUSTER snk5_cluster
- FROM src_materialized
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk5-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- WITH (SNAPSHOT = false)
- > CREATE CLUSTER snk6_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk6
- IN CLUSTER snk6_cluster
- FROM src_materialized
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk6-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- WITH (SNAPSHOT = true)
- $ kafka-ingest topic=test format=bytes
- extra,row
- $ kafka-verify-data format=avro sink=materialize.public.snk5
- {"before": null, "after": {"row":{"a": "extra", "b": "row", "offset": [0, 0, 0, 0, 0, 0, 0, 2]}}}
- $ kafka-verify-data format=avro sink=materialize.public.snk6 sort-messages=true
- {"before": null, "after": {"row":{"a": "extra", "b": "row", "offset": [0, 0, 0, 0, 0, 0, 0, 2]}}}
- {"before": null, "after": {"row":{"a": "goofus", "b": "gallant", "offset": [0, 0, 0, 0, 0, 0, 0, 1]}}}
- {"before": null, "after": {"row":{"a": "jack", "b": "jill", "offset": [0, 0, 0, 0, 0, 0, 0, 0]}}}
- # Test that we are correctly handling SNAPSHOT on views with empty upper
- # frontier
- > CREATE MATERIALIZED VIEW foo AS VALUES (1), (2), (3);
- > CREATE CLUSTER snk7_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk7
- IN CLUSTER snk7_cluster
- FROM foo
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk7-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- WITH (SNAPSHOT = false)
- > CREATE CLUSTER snk8_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk8
- IN CLUSTER snk8_cluster
- FROM foo
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk8-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- WITH (SNAPSHOT)
- $ kafka-verify-data format=avro sink=materialize.public.snk8 sort-messages=true
- {"before": null, "after": {"row":{"column1": 1}}}
- {"before": null, "after": {"row":{"column1": 2}}}
- {"before": null, "after": {"row":{"column1": 3}}}
- # test already existing topic with non-default partition count
- $ kafka-create-topic topic=snk9 partitions=4
- > CREATE CLUSTER snk9_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK snk9
- IN CLUSTER snk9_cluster
- FROM foo
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk14-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > SET cluster TO ${arg.single-replica-cluster}
- # create sink without specifying CLUSTER
- > CREATE SINK default_cluster_sink
- FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > SET cluster TO default
- # linked clusters totally deprecated
- ! CREATE SINK sink_with_size FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- WITH (SIZE = '2')
- contains:Expected one of PARTITION or SNAPSHOT or VERSION, found SIZE
- # create sink with SNAPSHOT set
- > CREATE CLUSTER sink_with_options_cluster SIZE '${arg.default-storage-size}';
- > CREATE SINK sink_with_options
- IN CLUSTER sink_with_options_cluster
- FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- WITH (SNAPSHOT = false)
- > CREATE CLUSTER c SIZE '4'
- > CREATE SINK cluster_c_sink
- IN CLUSTER c
- FROM src
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- # All sinks are unlinked
- > SELECT bool_and(size IS NULL) FROM mz_sinks;
- true
- # Check SHOW SINKS
- > SHOW SINKS
- name type cluster comment
- -------------------------------------------------------------------
- cluster_c_sink kafka c ""
- default_cluster_sink kafka ${arg.single-replica-cluster} ""
- sink_with_options kafka sink_with_options_cluster ""
- snk1 kafka snk1_cluster ""
- snk2 kafka snk2_cluster ""
- snk3 kafka snk3_cluster ""
- snk4 kafka snk4_cluster ""
- snk5 kafka snk5_cluster ""
- snk6 kafka snk6_cluster ""
- snk7 kafka snk7_cluster ""
- snk8 kafka snk8_cluster ""
- snk9 kafka snk9_cluster ""
- snk_unsigned kafka snk_unsigned_cluster ""
|