123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_default_connection_validation = true
- ALTER SYSTEM SET enable_connection_validation_syntax = true
- # ==> Set up. <==
- $ set-from-file ca-crt=/share/secrets/ca.crt
- $ set-sql-timeout duration=60s
- > CREATE SECRET password AS 'sekurity'
- > CREATE SECRET password_wrong AS 'wrong'
- > CREATE CONNECTION kafka to KAFKA (
- BROKER 'kafka:9092',
- SECURITY PROTOCOL PLAINTEXT
- )
- $ set schema={
- "name": "row",
- "type": "record",
- "fields": [
- {"name": "a", "type": "long"}
- ]
- }
- $ kafka-create-topic topic=avro-data
- $ kafka-ingest topic=avro-data format=avro schema=${schema} timestamp=1
- {"a": 1}
- # ==> Test invalid configurations. <==
- ! CREATE CONNECTION schema_registry_invalid TO CONFLUENT SCHEMA REGISTRY (
- URL 'https://ssl-basic.schema-registry.local:8082'
- )
- contains:certificate verify failed
- ! CREATE CONNECTION schema_registry_invalid TO CONFLUENT SCHEMA REGISTRY (
- URL 'https://ssl-basic.schema-registry.local:8082',
- SSL CERTIFICATE AUTHORITY = '${ca-crt}'
- )
- contains:server error 401: Unauthorized
- ! CREATE CONNECTION schema_registry_invalid TO CONFLUENT SCHEMA REGISTRY (
- URL 'https://ssl-basic.schema-registry.local:8082',
- USERNAME 'materialize',
- PASSWORD SECRET password_wrong,
- SSL CERTIFICATE AUTHORITY = '${ca-crt}'
- )
- contains:server error 401: Unauthorized
- # ==> Test without an SSH tunnel. <==
- > CREATE CONNECTION schema_registry TO CONFLUENT SCHEMA REGISTRY (
- URL 'https://ssl-basic.schema-registry.local:8082',
- USERNAME 'materialize',
- PASSWORD SECRET password,
- SSL CERTIFICATE AUTHORITY = '${ca-crt}'
- )
- > CREATE SOURCE avro_data FROM KAFKA CONNECTION kafka (
- TOPIC 'testdrive-avro-data-${testdrive.seed}'
- )
- > CREATE TABLE avro_data_tbl FROM SOURCE avro_data (REFERENCE "testdrive-avro-data-${testdrive.seed}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
- > SELECT * FROM avro_data_tbl
- a
- ----
- 1
- # ==> Test with an SSH tunnel. <==
- > CREATE CONNECTION schema_registry_ssh TO CONFLUENT SCHEMA REGISTRY (
- URL 'https://ssl-basic.schema-registry.local:8082',
- USERNAME 'materialize',
- PASSWORD SECRET password,
- SSL CERTIFICATE AUTHORITY = '${ca-crt}',
- SSH TUNNEL testdrive_no_reset_connections.public.ssh
- )
- > CREATE SOURCE avro_data_ssh FROM KAFKA CONNECTION kafka (
- TOPIC 'testdrive-avro-data-${testdrive.seed}'
- )
- > CREATE TABLE avro_data_ssh_tbl FROM SOURCE avro_data_ssh (REFERENCE "testdrive-avro-data-${testdrive.seed}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
- > SELECT * FROM avro_data_ssh_tbl
- a
- ----
- 1
- # ALTER CONNECTION
- ## Sink Kafka connection
- ### Create a new connection in use only by the sink so that we may break it.
- > CREATE CONNECTION kafka_backup TO KAFKA (
- BROKER 'kafka:9092',
- SECURITY PROTOCOL PLAINTEXT
- );
- #### Create a backup sink
- > CREATE SINK snk_backup FROM avro_data_tbl
- INTO KAFKA CONNECTION kafka_backup (TOPIC 'snk_backup')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
- ENVELOPE DEBEZIUM
- $ kafka-verify-data format=avro sink=materialize.public.snk_backup sort-messages=true
- {"before": null, "after": {"row":{"a": 1}}}
- ### Break sink broker connection and produce data
- > ALTER CONNECTION kafka_backup SET (BROKER 'kafka:9093') WITH (VALIDATE = false);
- $ kafka-ingest topic=avro-data format=avro schema=${schema} timestamp=2
- {"a": 2}
- > SELECT * FROM avro_data_tbl
- 1
- 2
- > SELECT count(status) > 0 FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON sink_id = id WHERE name = 'snk_backup' AND status = 'stalled';
- true
- > ALTER CONNECTION kafka_backup SET (BROKER 'kafka:9092');
- > SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'snk_backup';
- running
- $ kafka-verify-data format=avro sink=materialize.public.snk_backup sort-messages=true
- {"before": null, "after": {"row":{"a": 2}}}
- ## CSR connection
- ! ALTER CONNECTION schema_registry SET (URL = 'abc') WITH (VALIDATE = true);
- contains:invalid ALTER CONNECTION: parsing schema registry url: relative URL without a base
- ! ALTER CONNECTION schema_registry RESET (URL);
- contains:invalid ALTER CONNECTION: invalid CONNECTION: must specify URL
- ! ALTER CONNECTION schema_registry SET (SSL KEY = 'x') WITH (VALIDATE = true);
- contains:invalid SSL KEY: must provide a secret value
- > CREATE SECRET IF NOT EXISTS invalid_secret AS 'x'
- ! ALTER CONNECTION schema_registry SET (SSL KEY = SECRET invalid_secret) WITH (VALIDATE = true);
- contains:requires both SSL KEY and SSL CERTIFICATE
- ! ALTER CONNECTION schema_registry SET (SSL CERTIFICATE = 'x') WITH (VALIDATE = true);
- contains:requires both SSL KEY and SSL CERTIFICATE
- ! ALTER CONNECTION schema_registry SET (SSL KEY = SECRET invalid_secret), SET (SSL CERTIFICATE = 'x') WITH (VALIDATE = true);
- contains:No supported data to decode
- ! ALTER CONNECTION schema_registry SET (SSL CERTIFICATE AUTHORITY = 'x') WITH (VALIDATE = true);
- contains:CERTIFICATE
- > ALTER CONNECTION schema_registry RESET (SSL KEY);
- > ALTER CONNECTION schema_registry RESET (SSL CERTIFICATE);
- ! ALTER CONNECTION schema_registry RESET (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = true);
- contains:self-signed certificate in certificate chain
- ! ALTER CONNECTION schema_registry RESET (SSL KEY), RESET (SSL CERTIFICATE), RESET (SSL CERTIFICATE AUTHORITY);
- contains:self-signed certificate in certificate chain
- ! ALTER CONNECTION schema_registry RESET (USERNAME);
- contains:Unauthorized
- ! ALTER CONNECTION schema_registry RESET (PASSWORD);
- contains:Unauthorized
- ! ALTER CONNECTION schema_registry RESET (USERNAME), RESET (PASSWORD);
- contains:Unauthorized
- # Break CSR connection
- # These won't necessarily stall running sources which might not reach out to the CSR.
- > ALTER CONNECTION schema_registry DROP (SSL CERTIFICATE AUTHORITY) WITH (VALIDATE = false);
- $ kafka-create-topic topic=data partitions=1
- > CREATE SOURCE kafka_broken_csr_connector_source_broken
- FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-data-${testdrive.seed}')
- ! CREATE TABLE kafka_broken_csr_connector_source_broken_tbl FROM SOURCE kafka_broken_csr_connector_source_broken (REFERENCE "testdrive-data-${testdrive.seed}")
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
- contains:failed to fetch schema subject
- > ALTER CONNECTION schema_registry SET (SSL CERTIFICATE AUTHORITY = '${ca-crt}') WITH (VALIDATE = true);
- > CREATE SOURCE kafka_broken_csr_connector_source_fixed FROM KAFKA CONNECTION kafka (
- TOPIC 'testdrive-avro-data-${testdrive.seed}'
- )
- > CREATE TABLE kafka_broken_csr_connector_source_fixed_tbl FROM SOURCE kafka_broken_csr_connector_source_fixed (REFERENCE "testdrive-avro-data-${testdrive.seed}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
- > SELECT * FROM kafka_broken_csr_connector_source_fixed_tbl
- 1
- 2
|