123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from __future__ import annotations
- from enum import Enum
- from random import Random
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
- from materialize.checks.executors import Executor
- from materialize.checks.features import Features
- from materialize.mz_version import MzVersion
- def schema() -> str:
- return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
- class SshChange(Enum):
- ADD_SSH = 1
- DROP_SSH = 2
- CHANGE_SSH_HOST = 3
- # {i} will be replaced later
- SHOW_CONNECTION_SSH_TUNNEL = """materialize.public.ssh_tunnel_{i} "CREATE CONNECTION \\"materialize\\".\\"public\\".\\"ssh_tunnel_{i}\\" TO SSH TUNNEL (HOST = 'ssh-bastion-host', PORT = 22, USER = 'mz')" """
- WITH_SSH_SUFFIX = "USING SSH TUNNEL ssh_tunnel_{i}"
- class AlterConnectionSshChangeBase(Check):
- def __init__(
- self,
- ssh_change: SshChange,
- index: int,
- base_version: MzVersion,
- rng: Random | None,
- features: Features | None,
- ):
- super().__init__(base_version, rng, features)
- self.ssh_change = ssh_change
- self.index = index
- def initialize(self) -> Testdrive:
- i = self.index
- return Testdrive(
- schema()
- + dedent(
- f"""
- $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- ALTER SYSTEM SET enable_connection_validation_syntax = true
- $ kafka-create-topic topic=alter-connection-{i}a
- $ kafka-ingest topic=alter-connection-{i}a format=bytes
- one
- > CREATE CONNECTION kafka_conn_alter_connection_{i}a
- TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});
- > CREATE SOURCE alter_connection_source_{i}a_src
- FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-{i}a-${{testdrive.seed}}');
- > CREATE TABLE alter_connection_source_{i}a FROM SOURCE alter_connection_source_{i}a_src (REFERENCE "testdrive-alter-connection-{i}a-${{testdrive.seed}}")
- FORMAT TEXT
- ENVELOPE NONE;
- > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
- {'false' if self.ssh_change == SshChange.ADD_SSH else 'true'}
- > SELECT count(regexp_match(create_sql, 'ssh-bastion-host')) > 0 FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
- true
- > CREATE TABLE alter_connection_table_{i} (f1 INTEGER, PRIMARY KEY (f1));
- > INSERT INTO alter_connection_table_{i} VALUES (1);
- > CREATE MATERIALIZED VIEW mv_alter_connection_{i} AS SELECT f1 FROM alter_connection_table_{i};
- > CREATE SINK alter_connection_sink_{i} FROM mv_alter_connection_{i}
- INTO KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM;
- $ kafka-verify-topic sink=materialize.public.alter_connection_sink_{i} await-value-schema=true
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- i = self.index
- return [
- Testdrive(schema() + dedent(s))
- for s in [
- f"""
- $ kafka-ingest topic=alter-connection-{i}a format=bytes
- two
- > ALTER CONNECTION kafka_conn_alter_connection_{i}a SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});
- $ kafka-ingest topic=alter-connection-{i}a format=bytes
- three
- $ kafka-create-topic topic=alter-connection-{i}b
- $ kafka-ingest topic=alter-connection-{i}b format=bytes
- ten
- > CREATE CONNECTION kafka_conn_alter_connection_{i}b
- TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});
- > CREATE SOURCE alter_connection_source_{i}b_src
- FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}b (TOPIC 'testdrive-alter-connection-{i}b-${{testdrive.seed}}');
- > CREATE TABLE alter_connection_source_{i}b FROM SOURCE alter_connection_source_{i}b_src (REFERENCE "testdrive-alter-connection-{i}b-${{testdrive.seed}}")
- FORMAT TEXT
- ENVELOPE NONE;
- $ kafka-ingest topic=alter-connection-{i}b format=bytes
- twenty
- {f"> ALTER CONNECTION ssh_tunnel_{i} SET (HOST = 'other_ssh_bastion') WITH (VALIDATE = true);" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "$ nop"}
- > INSERT INTO alter_connection_table_{i} VALUES (2);
- """,
- f"""
- $ kafka-ingest topic=alter-connection-{i}a format=bytes
- four
- $ kafka-ingest topic=alter-connection-{i}b format=bytes
- thirty
- > ALTER CONNECTION kafka_conn_alter_connection_{i}b SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});
- $ kafka-ingest topic=alter-connection-{i}b format=bytes
- fourty
- > INSERT INTO alter_connection_table_{i} VALUES (3);
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- i = self.index
- return Testdrive(
- dedent(
- f"""
- > SELECT regexp_match(create_sql, '(ssh-bastion-host|other_ssh_bastion)') FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
- {"{other_ssh_bastion}" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "{ssh-bastion-host}"}
- > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
- {'true' if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else 'false'}
- > SELECT * FROM alter_connection_source_{i}a;
- one
- two
- three
- four
- > SELECT * FROM alter_connection_source_{i}b;
- ten
- twenty
- thirty
- fourty
- > CREATE SOURCE alter_connection_sink_source_{i}_src
- FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}');
- > CREATE TABLE alter_connection_sink_source_{i} FROM SOURCE alter_connection_sink_source_{i}_src (REFERENCE "testdrive-alter-connection-sink-{i}-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE NONE;
- # Table has expected data
- > SELECT * FROM alter_connection_table_{i};
- 1
- 2
- 3
- # TODO: Reenable this check when kafka-verify-data can deal with validate being run twice
- # Sink Kafka topic has expected data
- # $ kafka-verify-data format=avro sink=materialize.public.alter_connection_sink_{i} sort-messages=true
- # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
- # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
- # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}
- # Source based on sink topic has ingested data; data must be text-formatted because records are not
- # supported in testdrive
- > SELECT before::text, after::text FROM alter_connection_sink_source_{i};
- <null> (1)
- <null> (2)
- <null> (3)
- > DROP SOURCE IF EXISTS alter_connection_sink_source_{i}_src CASCADE
- """
- )
- )
- @externally_idempotent(False)
- class AlterConnectionToSsh(AlterConnectionSshChangeBase):
- def __init__(
- self, base_version: MzVersion, rng: Random | None, features: Features | None
- ):
- super().__init__(SshChange.ADD_SSH, 1, base_version, rng, features)
- @externally_idempotent(False)
- class AlterConnectionToNonSsh(AlterConnectionSshChangeBase):
- def __init__(
- self, base_version: MzVersion, rng: Random | None, features: Features | None
- ):
- super().__init__(SshChange.DROP_SSH, 2, base_version, rng, features)
- @externally_idempotent(False)
- class AlterConnectionHost(AlterConnectionSshChangeBase):
- def __init__(
- self, base_version: MzVersion, rng: Random | None, features: Features | None
- ):
- super().__init__(SshChange.CHANGE_SSH_HOST, 3, base_version, rng, features)
- class AlterConnectionDependencyOrder(Check):
- """
- Ensure that ALTER-ing a CONNECTION to reference one with a greater ID, does not panic.
- """
- def __init__(
- self,
- base_version: MzVersion,
- rng: Random | None,
- features: Features | None,
- ):
- super().__init__(base_version, rng, features)
- def _can_run(self, e: Executor) -> bool:
- return self.base_version >= MzVersion.parse_mz("v0.144.0-dev")
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > CREATE CONNECTION my_kafka_alter_conn TO KAFKA (BROKER 'localhost:32816') WITH (VALIDATE = false);
- > CREATE CONNECTION other_ssh TO SSH TUNNEL (host 'foo', user 'bar', port 42) WITH (VALIDATE = false);
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- """
- > ALTER CONNECTION my_kafka_alter_conn SET (BROKER 'localhost:32816' USING SSH TUNNEL other_ssh) WITH (VALIDATE = false);
- """,
- """
- > CREATE CONNECTION another_kafka_conn TO KAFKA (BROKER 'localhost:32816') WITH (VALIDATE = false);
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- $ set-from-sql var=other_ssh_id
- SELECT id FROM mz_connections WHERE name = 'other_ssh';
- > SELECT name FROM mz_connections WHERE create_sql LIKE '%[${other_ssh_id} AS %';
- my_kafka_alter_conn
- """
- )
- )
|