123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- from materialize.mzcompose.services.mysql import MySql
- class TableFromSourceBase(Check):
- def generic_setup(self) -> str:
- return dedent(
- """
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_create_table_from_source = true
- """
- )
- @externally_idempotent(False)
- class TableFromPgSource(TableFromSourceBase):
- suffix = "tbl_from_pg_source"
- def initialize(self) -> Testdrive:
- return Testdrive(
- self.generic_setup()
- + dedent(
- f"""
- > CREATE SECRET pgpass_{self.suffix} AS 'postgres'
- > CREATE CONNECTION pg_conn_{self.suffix} TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass_{self.suffix}
- )
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- DROP SCHEMA IF EXISTS public_{self.suffix} CASCADE;
- CREATE SCHEMA public_{self.suffix};
- DROP PUBLICATION IF EXISTS mz_source_{self.suffix};
- CREATE PUBLICATION mz_source_{self.suffix} FOR ALL TABLES;
- CREATE TYPE an_enum AS ENUM ('x1', 'x2');
- CREATE TABLE pg_table_1 (a INTEGER, b INTEGER, c an_enum);
- INSERT INTO pg_table_1 VALUES (1, 1234, NULL), (2, 0, 'x1');
- ALTER TABLE pg_table_1 REPLICA IDENTITY FULL;
- CREATE TABLE pg_table_2 (a INTEGER);
- INSERT INTO pg_table_2 VALUES (1000), (2000);
- ALTER TABLE pg_table_2 REPLICA IDENTITY FULL;
- > CREATE SOURCE pg_source_{self.suffix} FROM POSTGRES CONNECTION pg_conn_{self.suffix} (PUBLICATION 'mz_source_{self.suffix}');
- > CREATE SOURCE old_pg_source_{self.suffix} FROM POSTGRES CONNECTION pg_conn_{self.suffix}
- (PUBLICATION 'mz_source_{self.suffix}', TEXT COLUMNS = (pg_table_1.c))
- FOR TABLES (pg_table_1 AS pg_table_1_old_syntax);
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- f"""
- > CREATE TABLE pg_table_1 FROM SOURCE pg_source_{self.suffix}
- (REFERENCE "pg_table_1")
- WITH (TEXT COLUMNS = (c));
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO pg_table_1 VALUES (3, 2345, 'x2');
- """,
- f"""
- > CREATE TABLE pg_table_1b FROM SOURCE pg_source_{self.suffix}
- (REFERENCE "pg_table_1")
- WITH (TEXT COLUMNS = (c));
- > CREATE TABLE pg_table_2 FROM SOURCE pg_source_{self.suffix} (REFERENCE "pg_table_2");
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO pg_table_1 VALUES (4, 3456, 'x2');
- INSERT INTO pg_table_2 VALUES (3000);
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT * FROM pg_table_1;
- 1 1234 <null>
- 2 0 x1
- 3 2345 x2
- 4 3456 x2
- > SELECT * FROM pg_table_1b;
- 1 1234 <null>
- 2 0 x1
- 3 2345 x2
- 4 3456 x2
- > SELECT * FROM pg_table_2;
- 1000
- 2000
- 3000
- > SELECT * FROM pg_table_1_old_syntax;
- 1 1234 <null>
- 2 0 x1
- 3 2345 x2
- 4 3456 x2
- """
- )
- )
- @externally_idempotent(False)
- class TableFromMySqlSource(TableFromSourceBase):
- suffix = "tbl_from_mysql_source"
- def initialize(self) -> Testdrive:
- return Testdrive(
- self.generic_setup()
- + dedent(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- > CREATE SECRET mysqlpass_{self.suffix} AS 'p@ssw0rd';
- > CREATE CONNECTION mysql_conn_{self.suffix} TO MYSQL (
- HOST mysql,
- USER root,
- PASSWORD SECRET mysqlpass_{self.suffix}
- )
- $ mysql-execute name=mysql
- DROP DATABASE IF EXISTS public_{self.suffix};
- CREATE DATABASE public_{self.suffix};
- USE public_{self.suffix};
- CREATE TABLE mysql_source_table_1 (a INTEGER, b INTEGER, y YEAR);
- INSERT INTO mysql_source_table_1 VALUES (1, 1234, 2024), (2, 0, 2001);
- CREATE TABLE mysql_source_table_2 (a INTEGER);
- INSERT INTO mysql_source_table_2 VALUES (1000), (2000);
- > CREATE SOURCE mysql_source_{self.suffix} FROM MYSQL CONNECTION mysql_conn_{self.suffix};
- > CREATE SOURCE old_mysql_source_{self.suffix} FROM MYSQL CONNECTION mysql_conn_{self.suffix}
- (TEXT COLUMNS = (public_{self.suffix}.mysql_source_table_1.y))
- FOR TABLES (public_{self.suffix}.mysql_source_table_1 AS mysql_table_1_old_syntax);
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- f"""
- > CREATE TABLE mysql_table_1 FROM SOURCE mysql_source_{self.suffix}
- (REFERENCE "public_{self.suffix}"."mysql_source_table_1")
- WITH (TEXT COLUMNS = (y));
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public_{self.suffix};
- INSERT INTO mysql_source_table_1 VALUES (3, 2345, 2000);
- """,
- f"""
- > CREATE TABLE mysql_table_1b FROM SOURCE mysql_source_{self.suffix}
- (REFERENCE "public_{self.suffix}"."mysql_source_table_1")
- WITH (IGNORE COLUMNS = (y));
- > CREATE TABLE mysql_table_2 FROM SOURCE mysql_source_{self.suffix} (REFERENCE "public_{self.suffix}"."mysql_source_table_2");
- $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
- $ mysql-execute name=mysql
- USE public_{self.suffix};
- INSERT INTO mysql_source_table_1 VALUES (4, 3456, NULL);
- INSERT INTO mysql_source_table_2 VALUES (3000);
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT * FROM mysql_table_1;
- 1 1234 2024
- 2 0 2001
- 3 2345 2000
- 4 3456 <null>
- > SELECT * FROM mysql_table_1b;
- 1 1234
- 2 0
- 3 2345
- 4 3456
- > SELECT * FROM mysql_table_2;
- 1000
- 2000
- 3000
- # old source syntax still working
- > SELECT * FROM mysql_table_1_old_syntax;
- 1 1234 2024
- 2 0 2001
- 3 2345 2000
- 4 3456 <null>
- """
- )
- )
- class TableFromLoadGenSource(TableFromSourceBase):
- suffix = "tbl_from_lg_source"
- def initialize(self) -> Testdrive:
- return Testdrive(
- self.generic_setup()
- + dedent(
- f"""
- > CREATE SOURCE auction_house_{self.suffix} FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301);
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- f"""
- > CREATE TABLE bids_1 FROM SOURCE auction_house_{self.suffix} (REFERENCE "auction"."bids");
- """,
- f"""
- > CREATE TABLE bids_2 FROM SOURCE auction_house_{self.suffix} (REFERENCE "bids");
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT count(*) FROM bids_1;
- 255
- > SELECT count(*) FROM bids_2;
- 255
- """
- )
- )
|