123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- # Test that Materialize can create a sink using a Kafka user with restricted
- # access to consumer groups and transactional IDs.
- # ==> Set up. <==
- $ kafka-create-topic topic=data
- $ kafka-ingest topic=data format=bytes
- banana
- > CREATE SECRET kafka_password AS 'sekurity'
- > CREATE CONNECTION kafka_bad_progress_topic TO KAFKA (
- BROKER 'kafka:9095',
- SASL MECHANISMS = 'PLAIN',
- SASL USERNAME = 'materialize_lockdown',
- SASL PASSWORD = SECRET kafka_password,
- SECURITY PROTOCOL SASL_PLAINTEXT
- );
- > CREATE CONNECTION kafka_good_progress_topic TO KAFKA (
- BROKER 'kafka:9095',
- SASL MECHANISMS = 'PLAIN',
- SASL USERNAME = 'materialize_lockdown',
- SASL PASSWORD = SECRET kafka_password,
- SECURITY PROTOCOL SASL_PLAINTEXT,
- PROGRESS TOPIC = 'lockdown-progress'
- );
- > CREATE TABLE t (column1 integer)
- > INSERT INTO t VALUES (1), (2)
- > CREATE MATERIALIZED VIEW mv AS SELECT DISTINCT column1 FROM t
- # ==> Test. <==
- # The default group ID prefix is not usable by the `materialize_lockdown`
- # user.
- > CREATE SOURCE broken
- FROM KAFKA CONNECTION kafka_good_progress_topic (
- TOPIC 'testdrive-data-${testdrive.seed}'
- )
- > CREATE TABLE broken_tbl FROM SOURCE broken (REFERENCE "testdrive-data-${testdrive.seed}")
- FORMAT TEXT
- > SELECT EXISTS (
- SELECT 1
- FROM mz_sources
- JOIN mz_internal.mz_source_status_history ON mz_sources.id = mz_source_status_history.source_id
- WHERE name = 'broken'
- AND error ILIKE '%error when polling consumer for source%Group authorization failed%'
- )
- true
- > DROP SOURCE broken CASCADE
- # The default group ID prefix *is* writeable by the `materialize_lockdown` user.
- # Ensure that offsets are committed.
- > CREATE SOURCE working_source
- FROM KAFKA CONNECTION kafka_good_progress_topic (
- TOPIC 'testdrive-data-${testdrive.seed}',
- GROUP ID PREFIX 'lockdown-'
- )
- > CREATE TABLE working_source_tbl FROM SOURCE working_source (REFERENCE "testdrive-data-${testdrive.seed}")
- FORMAT TEXT
- > SELECT * FROM working_source_tbl
- banana
- $ set-from-sql var=conn-id
- SELECT id FROM mz_connections WHERE name = 'kafka_good_progress_topic'
- $ set-from-sql var=source-id
- SELECT id FROM mz_sources WHERE name = 'working_source'
- $ kafka-verify-commit topic=data partition=0 consumer-group-id=lockdown-materialize-${testdrive.materialize-environment-id}-${conn-id}-${source-id}
- 1
- # A sink which uses a bad transactional ID should fail.
- > CREATE SINK broken1 FROM mv
- INTO KAFKA CONNECTION kafka_bad_progress_topic (
- TOPIC 'testdrive-broken-${testdrive.seed}'
- )
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- > SELECT EXISTS (
- SELECT 1
- FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE name = 'broken1'
- AND error ILIKE '%Transactional Id authorization failed%'
- )
- true
- > DROP SINK broken1
- # A sink which uses a good transactional ID but a bad progress topic should
- # fail.
- > CREATE SINK broken2 FROM mv
- INTO KAFKA CONNECTION kafka_bad_progress_topic (
- TOPIC 'testdrive-broken-${testdrive.seed}',
- TRANSACTIONAL ID PREFIX 'lockdown'
- )
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- > SELECT EXISTS (
- SELECT 1
- FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE name = 'broken2'
- AND error ILIKE '%kafka: error registering kafka progress topic for sink%Topic authorization failed%'
- )
- true
- > DROP SINK broken2
- # A sink which uses a good transactional ID and progress topic but a bad data
- # topic should fail.
- > CREATE SINK broken3 FROM mv
- INTO KAFKA CONNECTION kafka_good_progress_topic (
- TOPIC 'testdrive-broken-${testdrive.seed}',
- TRANSACTIONAL ID PREFIX 'lockdown'
- )
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- > SELECT EXISTS (
- SELECT 1
- FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE name = 'broken3'
- AND error ILIKE '%Error creating topic testdrive-broken-${testdrive.seed}%Topic authorization failed%'
- )
- true
- > DROP SINK broken3
- # A sink which uses a good transactional ID, progress topic, and data topic
- # but a bad group ID prefix will fail, but only after restart when the progress
- # topic contains entries.
- > CREATE CLUSTER c (SIZE = '1')
- > CREATE SINK broken4 IN CLUSTER c FROM mv
- INTO KAFKA CONNECTION kafka_good_progress_topic (
- TOPIC 'lockdown-data1',
- TRANSACTIONAL ID PREFIX 'lockdown'
- )
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- $ kafka-verify-data format=json key=false sink=materialize.public.broken4 sort-messages=true
- {"column1": 1}
- {"column1": 2}
- # Resize the cluster on which the sink is running to force the sink to restart.
- > ALTER CLUSTER c SET (SIZE = '2')
- > SELECT EXISTS (
- SELECT 1
- FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE name = 'broken4'
- AND error ILIKE '%failed to fetch progress message%Group authorization failed%'
- )
- true
- > DROP SINK broken4
- # A sink which uses a good transactional ID, progress topic, data topic, and
- # group ID prefix should work.
- > CREATE SINK working IN CLUSTER c FROM mv
- INTO KAFKA CONNECTION kafka_good_progress_topic (
- TOPIC 'lockdown-data2',
- TRANSACTIONAL ID PREFIX 'lockdown',
- PROGRESS GROUP ID PREFIX 'lockdown'
- )
- KEY (column1) FORMAT JSON ENVELOPE UPSERT
- # Validate that the sink is actually emitting data. Success of the `CREATE SINK`
- # command itself is not sufficient validation.
- $ kafka-verify-data format=json key=false sink=materialize.public.working sort-messages=true
- {"column1": 1}
- {"column1": 2}
- # Resize the cluster on which the sink is running to force the sink to restart.
- > ALTER CLUSTER c SET (SIZE = '1')
- # Ensure that the sink is emitting new messages.
- > INSERT INTO t VALUES (3)
- $ kafka-verify-data format=json key=false sink=materialize.public.working sort-messages=true
- {"column1": 3}
- # Ensure that the sink never entered the `stalled` status.
- > SELECT DISTINCT status FROM mz_sinks
- JOIN mz_internal.mz_sink_status_history ON mz_sinks.id = mz_sink_status_history.sink_id
- WHERE mz_sinks.name = 'working'
- starting
- running
- paused
|