123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- # Test that sinks can be created even in the absence of the `DescribeConfigs`
- # cluster permission.
- # ==> Set up. <==
- > CREATE SECRET kafka_password AS 'sekurity'
- > CREATE CONNECTION kafka TO KAFKA (
- BROKER 'kafka:9095',
- SASL MECHANISMS = 'PLAIN',
- SASL USERNAME = 'materialize_no_describe_configs',
- SASL PASSWORD = SECRET kafka_password,
- SECURITY PROTOCOL SASL_PLAINTEXT
- );
- > CREATE TABLE t (column1 integer)
- > INSERT INTO t VALUES (1), (2)
- > CREATE MATERIALIZED VIEW mv AS SELECT DISTINCT column1 FROM t
- # ==> Test. <==
- # Creating the sink when the topic does not yet exist should work, because
- # Materialize will gracefully continue even if it cannot discover the default
- # number of partitions and replication factor to use.
- > CREATE SINK nonexisting FROM mv
- INTO KAFKA CONNECTION kafka (TOPIC 'testdrive-nonexisting-${testdrive.seed}')
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- # Validate that the sink is actually emitting data. Success of the `CREATE SINK`
- # command itself is not sufficient validation.
- $ kafka-verify-data format=json key=false sink=materialize.public.nonexisting sort-messages=true
- {"column1": 1}
- {"column1": 2}
- # Ensure that the sink never entered the `stalled` status. Previously we had a
- # glitch where the sink would stall once before becoming healthy due to a glitch
- # in the topic creation code path when access to `DescribeConfigs` is banned.
- > SELECT DISTINCT status FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE mz_sinks.name = 'nonexisting'
- starting
- running
- # Creating the sink after creating the topic outside of Materialize should
- # succeed, because Materialize no longer needs to run DescribeConfigs to
- # determine the replication factor/number of partitions to use.
- $ kafka-create-topic topic=no-describe-configs
- > CREATE SINK preexisting FROM mv
- INTO KAFKA CONNECTION kafka (TOPIC 'testdrive-preexisting-${testdrive.seed}')
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- # Validate that the sink is actually emitting data. Success of the `CREATE SINK`
- # command itself is not sufficient validation.
- $ kafka-verify-data format=json key=false sink=materialize.public.preexisting sort-messages=true
- {"column1": 1}
- {"column1": 2}
- # Ensure that the sink never entered the `stalled` status. Previously we had a
- # glitch where the sink would stall once before becoming healthy due to a glitch
- # in the topic creation code path when access to `DescribeConfigs` is banned.
- > SELECT DISTINCT status FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE mz_sinks.name = 'preexisting'
- starting
- running
|