123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- @externally_idempotent(False)
- class KafkaProtocols(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- $ set-from-file ca-crt=/share/secrets/ca.crt
- $ set-from-file kafka-crt=/share/secrets/materialized-kafka.crt
- $ set-from-file kafka-key=/share/secrets/materialized-kafka.key
- $ set-from-file kafka1-crt=/share/secrets/materialized-kafka1.crt
- $ set-from-file kafka1-key=/share/secrets/materialized-kafka1.key
- > CREATE SECRET kafka_key AS '${kafka-key}'
- > CREATE SECRET kafka1_key AS '${kafka1-key}'
- > CREATE SECRET garbage_key AS 'garbage'
- $ kafka-create-topic topic=kafka-protocols-1
- $ kafka-ingest topic=kafka-protocols-1 format=bytes
- one
- $ kafka-create-topic topic=kafka-protocols-2
- $ kafka-ingest topic=kafka-protocols-2 format=bytes
- one
- $ kafka-create-topic topic=kafka-protocols-3
- $ kafka-ingest topic=kafka-protocols-3 format=bytes
- one
- > CREATE SECRET kafka_protocols_password AS 'sekurity'
- > CREATE CONNECTION kafka_plaintext TO KAFKA (
- BROKER 'kafka:9093' USING SSH TUNNEL ssh_tunnel_0,
- SSL CERTIFICATE AUTHORITY '${ca-crt}'
- )
- > CREATE CONNECTION kafka_ssl TO KAFKA (
- BROKER 'kafka:9094',
- SSL CERTIFICATE '${kafka-crt}',
- SSL KEY SECRET kafka_key,
- SSL CERTIFICATE AUTHORITY '${ca-crt}'
- )
- > CREATE CONNECTION kafka_scram_sha_512 TO KAFKA (
- BROKER 'kafka:9095' USING SSH TUNNEL ssh_tunnel_0,
- SASL MECHANISMS 'SCRAM-SHA-512',
- SASL USERNAME 'materialize',
- SASL PASSWORD SECRET kafka_protocols_password,
- SECURITY PROTOCOL SASL_PLAINTEXT
- )
- > CREATE CONNECTION kafka_ssl_scram_sha_512 TO KAFKA (
- BROKER 'kafka:9096',
- SASL MECHANISMS 'SCRAM-SHA-512',
- SASL USERNAME 'materialize',
- SASL PASSWORD SECRET kafka_protocols_password,
- SSL CERTIFICATE AUTHORITY '${ca-crt}'
- )
- > CREATE CONNECTION kafka_sasl TO KAFKA (
- BROKER 'kafka:9097' USING SSH TUNNEL ssh_tunnel_0,
- SASL MECHANISMS 'PLAIN',
- SASL USERNAME 'materialize',
- SASL PASSWORD SECRET kafka_protocols_password,
- SSL CERTIFICATE '${kafka-crt}',
- SSL KEY SECRET kafka_key,
- SSL CERTIFICATE AUTHORITY '${ca-crt}'
- )
- > CREATE SOURCE kafka_plaintext_1_src FROM KAFKA CONNECTION kafka_plaintext (
- TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
- )
- > CREATE TABLE kafka_plaintext_1 FROM SOURCE kafka_plaintext_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_ssl_1_src FROM KAFKA CONNECTION kafka_ssl (
- TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
- )
- > CREATE TABLE kafka_ssl_1 FROM SOURCE kafka_ssl_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_scram_sha_512_1_src FROM KAFKA CONNECTION kafka_scram_sha_512 (
- TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
- )
- > CREATE TABLE kafka_scram_sha_512_1 FROM SOURCE kafka_scram_sha_512_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_ssl_scram_sha_512_1_src FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
- TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
- )
- > CREATE TABLE kafka_ssl_scram_sha_512_1 FROM SOURCE kafka_ssl_scram_sha_512_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_sasl_1_src FROM KAFKA CONNECTION kafka_sasl (
- TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
- )
- > CREATE TABLE kafka_sasl_1 FROM SOURCE kafka_sasl_1_src (REFERENCE "testdrive-kafka-protocols-1-${testdrive.seed}")
- FORMAT TEXT
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- """
- > CREATE SOURCE kafka_plaintext_2_src FROM KAFKA CONNECTION kafka_plaintext (
- TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
- )
- > CREATE TABLE kafka_plaintext_2 FROM SOURCE kafka_plaintext_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_ssl_2_src FROM KAFKA CONNECTION kafka_ssl (
- TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
- )
- > CREATE TABLE kafka_ssl_2 FROM SOURCE kafka_ssl_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_scram_sha_512_2_src FROM KAFKA CONNECTION kafka_scram_sha_512 (
- TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
- )
- > CREATE TABLE kafka_scram_sha_512_2 FROM SOURCE kafka_scram_sha_512_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_ssl_scram_sha_512_2_src FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
- TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
- )
- > CREATE TABLE kafka_ssl_scram_sha_512_2 FROM SOURCE kafka_ssl_scram_sha_512_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_sasl_2_src FROM KAFKA CONNECTION kafka_sasl (
- TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
- )
- > CREATE TABLE kafka_sasl_2 FROM SOURCE kafka_sasl_2_src (REFERENCE "testdrive-kafka-protocols-2-${testdrive.seed}")
- FORMAT TEXT
- $ kafka-ingest topic=kafka-protocols-1 format=bytes
- two
- $ kafka-ingest topic=kafka-protocols-2 format=bytes
- two
- $ kafka-ingest topic=kafka-protocols-3 format=bytes
- two
- """,
- """
- > CREATE SOURCE kafka_plaintext_3_src FROM KAFKA CONNECTION kafka_plaintext (
- TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
- )
- > CREATE TABLE kafka_plaintext_3 FROM SOURCE kafka_plaintext_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_ssl_3_src FROM KAFKA CONNECTION kafka_ssl (
- TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
- )
- > CREATE TABLE kafka_ssl_3 FROM SOURCE kafka_ssl_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_scram_sha_512_3_src FROM KAFKA CONNECTION kafka_scram_sha_512 (
- TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
- )
- > CREATE TABLE kafka_scram_sha_512_3 FROM SOURCE kafka_scram_sha_512_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_ssl_scram_sha_512_3_src FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
- TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
- )
- > CREATE TABLE kafka_ssl_scram_sha_512_3 FROM SOURCE kafka_ssl_scram_sha_512_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
- FORMAT TEXT
- > CREATE SOURCE kafka_sasl_3_src FROM KAFKA CONNECTION kafka_sasl (
- TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
- )
- > CREATE TABLE kafka_sasl_3 FROM SOURCE kafka_sasl_3_src (REFERENCE "testdrive-kafka-protocols-3-${testdrive.seed}")
- FORMAT TEXT
- $ kafka-ingest topic=kafka-protocols-1 format=bytes
- three
- $ kafka-ingest topic=kafka-protocols-2 format=bytes
- three
- $ kafka-ingest topic=kafka-protocols-3 format=bytes
- three
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT * FROM kafka_plaintext_1
- one
- two
- three
- > SELECT * FROM kafka_ssl_1
- one
- two
- three
- > SELECT * FROM kafka_scram_sha_512_1
- one
- two
- three
- > SELECT * FROM kafka_ssl_scram_sha_512_1
- one
- two
- three
- > SELECT * FROM kafka_sasl_1
- one
- two
- three
- > SELECT * FROM kafka_plaintext_2
- one
- two
- three
- > SELECT * FROM kafka_ssl_2
- one
- two
- three
- > SELECT * FROM kafka_scram_sha_512_2
- one
- two
- three
- > SELECT * FROM kafka_ssl_scram_sha_512_2
- one
- two
- three
- > SELECT * FROM kafka_sasl_2
- one
- two
- three
- > SELECT * FROM kafka_plaintext_3
- one
- two
- three
- > SELECT * FROM kafka_ssl_3
- one
- two
- three
- > SELECT * FROM kafka_scram_sha_512_3
- one
- two
- three
- > SELECT * FROM kafka_ssl_scram_sha_512_3
- one
- two
- three
- > SELECT * FROM kafka_sasl_3
- one
- two
- three
- """
- )
- )
|