123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
- def schemas() -> str:
- return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
- @externally_idempotent(False)
- class SshPg(Check):
- """
- Testing Postgres CDC source with SSH tunnel
- """
- def initialize(self) -> Testdrive:
- return Testdrive(
- schemas()
- + dedent(
- """
- > CREATE SECRET pgpass AS 'postgres'
- > CREATE CONNECTION pg_ssh1 TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass,
- SSL MODE require,
- SSH TUNNEL ssh_tunnel_0);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- CREATE TABLE t_ssh1 (f1 INTEGER);
- ALTER TABLE t_ssh1 REPLICA IDENTITY FULL;
- CREATE TABLE t_ssh2 (f1 INTEGER);
- ALTER TABLE t_ssh2 REPLICA IDENTITY FULL;
- CREATE TABLE t_ssh3 (f1 INTEGER);
- ALTER TABLE t_ssh3 REPLICA IDENTITY FULL;
- CREATE PUBLICATION mz_source_ssh FOR ALL TABLES;
- INSERT INTO t_ssh1 VALUES (1), (2), (3), (4), (5);
- > CREATE SOURCE mz_source_ssh1
- FROM POSTGRES CONNECTION pg_ssh1
- (PUBLICATION 'mz_source_ssh')
- > CREATE TABLE t_ssh1 FROM SOURCE mz_source_ssh1 (REFERENCE t_ssh1);
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(schemas() + dedent(s))
- for s in [
- """
- > CREATE CONNECTION pg_ssh2 TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass,
- SSL MODE require,
- SSH TUNNEL ssh_tunnel_0);
- > CREATE SOURCE mz_source_ssh2
- FROM POSTGRES CONNECTION pg_ssh2
- (PUBLICATION 'mz_source_ssh');
- > CREATE TABLE t_ssh2 FROM SOURCE mz_source_ssh2 (REFERENCE t_ssh2);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO t_ssh1 VALUES (6), (7), (8), (9), (10);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO t_ssh2 VALUES (6), (7), (8), (9), (10);
- """,
- """
- > CREATE CONNECTION pg_ssh3 TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass,
- SSL MODE require,
- SSH TUNNEL ssh_tunnel_0);
- > CREATE SOURCE mz_source_ssh3
- FROM POSTGRES CONNECTION pg_ssh3
- (PUBLICATION 'mz_source_ssh');
- > CREATE TABLE t_ssh3 FROM SOURCE mz_source_ssh3 (REFERENCE t_ssh3);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO t_ssh1 VALUES (11), (12), (13), (14), (15);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO t_ssh2 VALUES (11), (12), (13), (14), (15);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO t_ssh3 VALUES (11), (12), (13), (14), (15);
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT COUNT(*) FROM t_ssh1;
- 15
- > SELECT COUNT(*) FROM t_ssh2;
- 10
- > SELECT COUNT(*) FROM t_ssh3;
- 5
- """
- )
- )
- @externally_idempotent(False)
- class SshKafka(Check):
- """
- Testing Kafka source with SSH tunnel
- """
- def initialize(self) -> Testdrive:
- return Testdrive(
- schemas()
- + dedent(
- """
- $ kafka-create-topic topic=ssh1
- $ kafka-create-topic topic=ssh2
- $ kafka-create-topic topic=ssh3
- $ kafka-ingest topic=ssh1 format=bytes
- one
- > CREATE CONNECTION kafka_conn_ssh1
- TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);
- > CREATE SOURCE ssh1_src
- FROM KAFKA CONNECTION kafka_conn_ssh1 (TOPIC 'testdrive-ssh1-${testdrive.seed}');
- > CREATE TABLE ssh1 FROM SOURCE ssh1_src (REFERENCE "testdrive-ssh1-${testdrive.seed}")
- FORMAT TEXT
- ENVELOPE NONE;
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(schemas() + dedent(s))
- for s in [
- """
- > CREATE CONNECTION kafka_conn_ssh2
- TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);
- > CREATE SOURCE ssh2_src
- FROM KAFKA CONNECTION kafka_conn_ssh2 (TOPIC 'testdrive-ssh2-${testdrive.seed}');
- > CREATE TABLE ssh2 FROM SOURCE ssh2_src (REFERENCE "testdrive-ssh2-${testdrive.seed}")
- FORMAT TEXT
- ENVELOPE NONE;
- $ kafka-ingest topic=ssh1 format=bytes
- two
- $ kafka-ingest topic=ssh2 format=bytes
- two
- """,
- """
- > CREATE CONNECTION kafka_conn_ssh3
- TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);
- > CREATE SOURCE ssh3_src
- FROM KAFKA CONNECTION kafka_conn_ssh3 (TOPIC 'testdrive-ssh3-${testdrive.seed}');
- > CREATE TABLE ssh3 FROM SOURCE ssh3_src (REFERENCE "testdrive-ssh3-${testdrive.seed}")
- FORMAT TEXT
- ENVELOPE NONE;
- $ kafka-ingest topic=ssh1 format=bytes
- three
- $ kafka-ingest topic=ssh2 format=bytes
- three
- $ kafka-ingest topic=ssh3 format=bytes
- three
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT * FROM ssh1;
- one
- two
- three
- > SELECT * FROM ssh2;
- two
- three
- > SELECT * FROM ssh3;
- three
- """
- )
- )
|