123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- $ set-arg-default default-storage-size=1
- $ set-arg-default single-replica-cluster=quickstart
- #
- # Test basic connection functionality
- $ skip-consistency-checks reason="workflow uses SSH keys which we currently can't check"
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_connection_validation_syntax = true
- ###
- # Test core functionality by creating, introspecting and dropping a connection
- ###
- $ kafka-create-topic topic=connection_test partitions=1
- $ kafka-ingest format=bytes topic=connection_test
- 1,2
- 2,3
- > CREATE SECRET s AS '...';
- ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, ACCESS KEY ID = 'abc')
- contains:KAFKA connections do not support ACCESS KEY ID values
- ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, ACCESS KEY ID = 'abc', PORT = 1)
- contains:KAFKA connections do not support ACCESS KEY ID, PORT values
- ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL NOTHINGREAL)
- contains: unknown security protocol: NOTHINGREAL
- ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SASL MECHANISMS 'PLAIN', SASL USERNAME 'materialize', SASL PASSWORD SECRET s, SSL KEY SECRET s)
- contains: option SSL KEY not supported with this configuration
- ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, SASL USERNAME 'materialize')
- contains: option SASL USERNAME not supported with this configuration
- ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, PROGRESS TOPIC REPLICATION FACTOR -4)
- contains:PROGRESS TOPIC REPLICATION FACTOR must be greater than 0
- > CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
- > SELECT name, type from mz_connections WHERE id LIKE 'u%'
- name type
- ------------------------------
- testconn kafka
- > SHOW CONNECTIONS
- testconn kafka ""
- > SELECT
- brokers,
- sink_progress_topic = '_materialize-progress-' || mz_environment_id() || '-' || id
- FROM mz_kafka_connections
- JOIN mz_connections USING (id)
- WHERE name = 'testconn'
- {${testdrive.kafka-addr}} true
- > DROP CONNECTION testconn
- > CREATE CONNECTION progress_override TO KAFKA (
- BROKER '${testdrive.kafka-addr}',
- PROGRESS TOPIC 'override_topic',
- PROGRESS TOPIC REPLICATION FACTOR 1,
- SECURITY PROTOCOL PLAINTEXT
- )
- > SELECT
- brokers, sink_progress_topic
- FROM mz_kafka_connections
- JOIN mz_connections USING (id)
- WHERE name = 'progress_override'
- {${testdrive.kafka-addr}} override_topic
- ###
- # Test that connections work in creating a source
- ###
- > CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
- > CREATE CLUSTER connection_source_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE connection_source (first, second)
- IN CLUSTER connection_source_cluster
- FROM KAFKA CONNECTION testconn (TOPIC 'testdrive-connection_test-${testdrive.seed}')
- FORMAT CSV WITH 2 COLUMNS
- > SELECT * FROM connection_source
- first second
- ------------
- 1 2
- 2 3
- # Confirm we cannot drop the connection while a source depends upon it
- ! DROP CONNECTION testconn;
- contains:depended upon by source "connection_source"
- # Confirm the drop works if we add cascade
- > DROP CONNECTION testconn CASCADE;
- # Validate the cascading drop actually happened
- ! SELECT * FROM connection_source
- contains:unknown catalog item 'connection_source'
- ###
- # Test schema registry connection create and drop
- ###
- # Setup kafka topic with schema
- # must be a subset of the keys in the rows
- $ set keyschema={
- "type": "record",
- "name": "Key",
- "fields": [
- {"name": "id", "type": "long"}
- ]
- }
- $ set schema={
- "type" : "record",
- "name" : "envelope",
- "fields" : [
- {
- "name": "before",
- "type": [
- {
- "name": "row",
- "type": "record",
- "fields": [
- {
- "name": "id",
- "type": "long"
- },
- {
- "name": "creature",
- "type": "string"
- }]
- },
- "null"
- ]
- },
- {
- "name": "after",
- "type": ["row", "null"]
- }
- ]
- }
- $ kafka-create-topic topic=csr_test partitions=1
- $ kafka-ingest format=avro topic=csr_test key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
- {"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}}
- {"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}}
- {"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}}
- > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- );
- ! CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}',
- SESSION TOKEN = 'abc'
- );
- contains:CONFLUENT SCHEMA REGISTRY connections do not support SESSION TOKEN values
- > CREATE CONNECTION kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- ! CREATE SOURCE csr_source
- IN CLUSTER ${arg.single-replica-cluster}
- FROM KAFKA CONNECTION csr_conn (TOPIC 'testdrive-csr_test-${testdrive.seed}')
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- contains:is not a KAFKA CONNECTION
- > CREATE CLUSTER csr_source_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE csr_source
- IN CLUSTER csr_source_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-csr_test-${testdrive.seed}')
- FORMAT AVRO
- USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > SELECT * from csr_source
- id creature
- -----------
- 1 lizard
- > CREATE CONNECTION broker_connection TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
- > CREATE CLUSTER two_connection_source_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE two_connection_source
- IN CLUSTER two_connection_source_cluster
- FROM KAFKA CONNECTION broker_connection (TOPIC 'testdrive-csr_test-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > SELECT * from two_connection_source
- id creature
- -----------
- 1 lizard
- ! DROP CONNECTION csr_conn
- contains:depended upon by source "csr_source"
- > DROP CONNECTION csr_conn CASCADE
- ! CREATE SOURCE should_fail
- IN CLUSTER ${arg.single-replica-cluster}
- FROM KAFKA CONNECTION does_not_exist (TOPIC 'testdrive-error_topic-${testdrive.seed}')
- FORMAT TEXT
- contains: unknown catalog item 'does_not_exist'
- ! CREATE SOURCE should_fail
- IN CLUSTER ${arg.single-replica-cluster}
- FROM KAFKA CONNECTION kafka_conn
- (TOPIC 'testdrive-csr_test-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION does_not_exist
- ENVELOPE DEBEZIUM
- contains: unknown catalog item 'does_not_exist'
- # Test protobuf CSR connection
- # Duplicated from protobuf-import.td since once a topic has been read we can only create the source again by forcing offsets which is itself a different test case
- $ set empty-schema
- syntax = "proto3";
- $ set importee-schema
- syntax = "proto3";
- import "google/protobuf/timestamp.proto";
- message Importee1 {
- bool b = 1;
- }
- message Importee2 {
- google.protobuf.Timestamp ts = 3;
- }
- $ set importer-schema
- syntax = "proto3";
- import "empty.proto";
- import "importee.proto";
- message Importer {
- Importee1 importee1 = 1;
- Importee2 importee2 = 2;
- }
- $ file-append path=empty.proto
- \${empty-schema}
- $ file-append path=importee.proto
- \${importee-schema}
- $ file-append path=importer.proto
- \${importer-schema}
- $ protobuf-compile-descriptors inputs=empty.proto,importee.proto,importer.proto output=import.pb set-var=import-schema
- $ kafka-create-topic topic=import-csr partitions=1
- # The Confluent toolchain publishes even schemas for well-known types, so we
- # have to do the same.
- # See: https://github.com/protocolbuffers/protobuf/blob/61e0395c89fe520ae7569aea6838313195e05ec5/src/google/protobuf/timestamp.proto
- $ schema-registry-publish subject=google/protobuf/timestamp.proto schema-type=protobuf
- syntax = "proto3";
- package google.protobuf;
- message Timestamp {
- int64 seconds = 1;
- int32 nanos = 2;
- }
- $ schema-registry-publish subject=empty.proto schema-type=protobuf
- \${empty-schema}
- $ schema-registry-publish subject=importee.proto schema-type=protobuf references=google/protobuf/timestamp.proto
- \${importee-schema}
- $ schema-registry-publish subject=testdrive-import-csr-${testdrive.seed}-value schema-type=protobuf references=empty.proto,importee.proto
- \${importer-schema}
- $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true
- {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
- > CREATE CONNECTION proto_csr TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- )
- > CREATE CLUSTER import_connection_csr_cluster SIZE '${arg.default-storage-size}';
- > CREATE SOURCE import_connection_csr
- IN CLUSTER import_connection_csr_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-csr-${testdrive.seed}')
- FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION proto_csr
- > SELECT importee1::text, importee2::text FROM import_connection_csr
- importee1 importee2
- --------------------------------
- (f) "(\"(1234,5678)\")"
- # SSH
- ! CREATE CONNECTION ssh_conn TO SSH TUNNEL (
- HOST 'host',
- USER 'user',
- PORT 1,
- REGION = 'abc'
- );
- contains:SSH TUNNEL connections do not support REGION values
- > CREATE CONNECTION ssh_conn TO SSH TUNNEL (
- HOST 'host',
- USER 'user',
- PORT 1
- );
- > SELECT name, public_key_1 LIKE 'ssh-ed25519%' public_key_1
- FROM mz_ssh_tunnel_connections
- JOIN mz_connections USING (id)
- name public_key_1
- -----------
- ssh_conn true
- # Test invalid connection parameter combinations
- ## Kafka
- ! CREATE CONNECTION not_a_secret TO KAFKA (
- BROKER '',
- SASL PASSWORD = '',
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:invalid SASL PASSWORD: must provide a secret value
- ! CREATE CONNECTION not_a_secret TO KAFKA (
- BROKER '',
- SSL KEY = '',
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:invalid SSL KEY: must provide a secret value
- ! CREATE CONNECTION duplicate_option TO KAFKA (
- BROKER '',
- BROKER '',
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:BROKER specified more than once
- ! CREATE CONNECTION no_broker TO KAFKA (
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:must set one of BROKER, BROKERS, or AWS PRIVATELINK
- ! CREATE CONNECTION ssl_underspeced TO KAFKA (
- BROKER 'kafka:9092',
- BROKERS ['kafka:9092', 'kafka:9093'],
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:can only set one of BROKER, BROKERS, or AWS PRIVATELINK
- ! CREATE CONNECTION ssl_underspeced TO KAFKA (
- BROKER 'kafka:9092',
- SSL CERTIFICATE = ''
- );
- contains:SSL KEY must be specified with SSL CERTIFICATE
- ! CREATE CONNECTION sasl_underspeced TO KAFKA (
- BROKER 'kafka:9092',
- SASL MECHANISMS = 'PLAIN'
- );
- contains:SASL USERNAME must be specified
- > CREATE CONNECTION kafka_sasl_lowercase_string_mechanism TO KAFKA (
- BROKER 'kafka:9092',
- SASL MECHANISMS = 'plain',
- SASL USERNAME = 'materialize',
- SASL PASSWORD = SECRET s
- ) WITH (VALIDATE = FALSE);
- > CREATE CONNECTION kafka_sasl_spongebob_string_mechanism TO KAFKA (
- BROKER 'kafka:9092',
- SASL MECHANISMS = 'pLaIN',
- SASL USERNAME = 'materialize',
- SASL PASSWORD = SECRET s
- ) WITH (VALIDATE = FALSE);
- > CREATE CONNECTION kafka_sasl_uppercase_ident_mechanism TO KAFKA (
- BROKER 'kafka:9092',
- SASL MECHANISMS = PLAIN,
- SASL USERNAME = 'materialize',
- SASL PASSWORD = SECRET s
- ) WITH (VALIDATE = FALSE);
- > CREATE CONNECTION kafka_sasl_spongebob_ident_mechanism TO KAFKA (
- BROKER 'kafka:9092',
- SASL MECHANISMS = pLaIN,
- SASL USERNAME = 'materialize',
- SASL PASSWORD = SECRET s
- ) WITH (VALIDATE = FALSE);
- ! CREATE CONNECTION multiple_brokers TO KAFKA (
- BROKER 'kafka:9092, kafka:9093',
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:cannot specify multiple Kafka broker addresses in one string
- ! CREATE CONNECTION multiple_brokers TO KAFKA (
- BROKERS ['kafka:9092, kafka:9093'],
- SECURITY PROTOCOL PLAINTEXT
- );
- contains:cannot specify multiple Kafka broker addresses in one string
- ## CSR
- ! CREATE CONNECTION missing_url TO CONFLUENT SCHEMA REGISTRY (
- USERNAME 'foo'
- );
- contains:must specify URL
- ! CREATE CONNECTION missing_cert TO CONFLUENT SCHEMA REGISTRY (
- URL 'http://localhost',
- SSL KEY = SECRET s
- );
- contains: requires both SSL KEY and SSL CERTIFICATE
- ! CREATE CONNECTION missing_key TO CONFLUENT SCHEMA REGISTRY (
- URL 'http://localhost',
- SSL CERTIFICATE = ''
- );
- contains: requires both SSL KEY and SSL CERTIFICATE
- ! CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}/FOO/BAR/BAZ'
- );
- contains: URL must have an empty path
- ## SSH
- ! CREATE CONNECTION missing_user TO SSH TUNNEL (
- USER 'foo'
- );
- contains: HOST option is required
- ## AWS PrivateLink
- ! CREATE CONNECTION conn1 TO KAFKA (BROKER '${testdrive.kafka-addr}' USING AWS PRIVATELINK foo (PORT 9093), SECURITY PROTOCOL PLAINTEXT);
- contains: unknown catalog item 'foo'
- ! CREATE CONNECTION conn1 TO CONFLUENT SCHEMA REGISTRY (AWS PRIVATELINK foo, PORT 8080)
- contains: unknown catalog item 'foo'
- ! CREATE CONNECTION pgconn TO POSTGRES (AWS PRIVATELINK foo, PORT 1234)
- contains: unknown catalog item 'foo'
- # Error in mzcompose: AWS PrivateLink connections are not supported
- # Error in cloudtest/K8s: creating AWS PrivateLink Connection would violate max_aws_privatelink_connections limit
- ! CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (
- SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
- AVAILABILITY ZONES ('use1-az1', 'use1-az4')
- )
- contains: AWS PrivateLink
|