# Copyright Materialize, Inc. and contributors. All rights reserved. # # Use of this software is governed by the Business Source License # included in the LICENSE file at the root of this repository. # # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. import re from random import Random from textwrap import dedent from typing import Any from materialize.checks.actions import Testdrive from materialize.checks.checks import Check, externally_idempotent from materialize.checks.executors import Executor from materialize.checks.features import Features from materialize.mz_version import MzVersion from materialize.mzcompose.services.mysql import MySql class MySqlCdcBase: base_version: MzVersion current_version: MzVersion wait: bool suffix: str repeats: int expects: int def __init__(self, wait: bool, **kwargs: Any) -> None: self.wait = wait self.repeats = 1024 if wait else 16384 self.expects = 97350 if wait else 1633350 self.suffix = f"_{str(wait).lower()}" super().__init__(**kwargs) # forward unused args to Check def initialize(self) -> Testdrive: return Testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql # create the database if it does not exist yet but do not drop it CREATE DATABASE IF NOT EXISTS public; USE public; CREATE USER mysql1{self.suffix} IDENTIFIED BY 'mysql'; GRANT REPLICATION SLAVE ON *.* TO mysql1{self.suffix}; GRANT ALL ON public.* TO mysql1{self.suffix}; DROP TABLE IF EXISTS mysql_source_table{self.suffix}; # uniqueness constraint not possible for length of 1024 characters upwards (max key length is 3072 bytes) CREATE TABLE mysql_source_table{self.suffix} (f1 VARCHAR(32), f2 INTEGER, f3 TEXT NOT NULL, f4 JSON, PRIMARY KEY(f1, f2)); SET @i:=0; CREATE TABLE sequence{self.suffix} (i INT); INSERT INTO sequence{self.suffix} SELECT (@i:=@i+1) FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT 100; INSERT INTO mysql_source_table{self.suffix} SELECT 'A', i, REPEAT('A', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; > CREATE SECRET mysqlpass1{self.suffix} AS 'mysql'; > CREATE CONNECTION mysql1{self.suffix} TO MYSQL ( HOST 'mysql', USER mysql1{self.suffix}, PASSWORD SECRET mysqlpass1{self.suffix} ) """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" > CREATE SOURCE mysql_source1{self.suffix} FROM MYSQL CONNECTION mysql1{self.suffix}; > CREATE TABLE mysql_source_tableA{self.suffix} FROM SOURCE mysql_source1{self.suffix} (REFERENCE public.mysql_source_table{self.suffix}); > CREATE DEFAULT INDEX ON mysql_source_tableA{self.suffix}; $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'B', i, REPEAT('B', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SECRET mysqlpass2{self.suffix} AS 'mysql'; > CREATE CONNECTION mysql2{self.suffix} TO MYSQL ( HOST 'mysql', USER mysql1{self.suffix}, PASSWORD SECRET mysqlpass2{self.suffix} ) $ mysql-execute name=mysql SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'C', i, REPEAT('C', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; """ + ( f""" # Wait until MySQL snapshot is complete > SELECT COUNT(*) > 0 FROM mysql_source_tableA{self.suffix} true """ if self.wait else "" ), f""" $ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}} GRANT USAGE ON CONNECTION mysql2{self.suffix} TO materialize $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'D', i, REPEAT('D', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SOURCE mysql_source2{self.suffix} FROM MYSQL CONNECTION mysql2{self.suffix}; > CREATE TABLE mysql_source_tableB{self.suffix} FROM SOURCE mysql_source2{self.suffix} (REFERENCE public.mysql_source_table{self.suffix}); $ mysql-execute name=mysql SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'E', i, REPEAT('E', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; $ mysql-execute name=mysql SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'F', i, REPEAT('F', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SECRET mysqlpass3{self.suffix} AS 'mysql'; > CREATE CONNECTION mysql3{self.suffix} TO MYSQL ( HOST 'mysql', USER mysql1{self.suffix}, PASSWORD SECRET mysqlpass3{self.suffix} ) > CREATE SOURCE mysql_source3{self.suffix} FROM MYSQL CONNECTION mysql3{self.suffix}; > CREATE TABLE mysql_source_tableC{self.suffix} FROM SOURCE mysql_source3{self.suffix} (REFERENCE public.mysql_source_table{self.suffix}); $ mysql-execute name=mysql SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'G', i, REPEAT('G', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; $ mysql-execute name=mysql SET @i:=0; INSERT INTO mysql_source_table{self.suffix} SELECT 'H', i, REPEAT('X', {self.repeats} - i), NULL FROM sequence{self.suffix} WHERE i <= 100; UPDATE mysql_source_table{self.suffix} SET f2 = f2 + 100; """ + ( f""" # Wait until MySQL snapshot is complete > SELECT COUNT(*) > 0 FROM mysql_source_tableB{self.suffix} true > SELECT COUNT(*) > 0 FROM mysql_source_tableC{self.suffix} true """ if self.wait else "" ), ] ] def validate(self) -> Testdrive: sql = dedent( f""" $ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}} GRANT SELECT ON mysql_source_tableA{self.suffix} TO materialize GRANT SELECT ON mysql_source_tableB{self.suffix} TO materialize GRANT SELECT ON mysql_source_tableC{self.suffix} TO materialize > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM mysql_source_tableA{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM mysql_source_tableB{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM mysql_source_tableC{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} # TODO: Figure out the quoting here -- it returns "f4" when done using the SQL shell # (Might have changed again with https://github.com/MaterializeInc/materialize/pull/31933) # > SELECT regexp_match(create_sql, 'TEXT COLUMNS = \\((.*?)\\)')[1] FROM (SHOW CREATE SOURCE mysql_source_tableA{self.suffix}); # "\"f4\"" # Confirm that the primary key information has been propagated from MySQL > SELECT key FROM (SHOW INDEXES ON mysql_source_tableA{self.suffix}); {{f1,f2}} ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT f1, f2 FROM mysql_source_tableA{self.suffix}; Explained Query (fast path): Project (#0, #1) ReadIndex on=materialize.public.mysql_source_tablea{self.suffix} mysql_source_tablea{self.suffix}_primary_idx=[*** full scan ***] Used Indexes: - materialize.public.mysql_source_tablea{self.suffix}_primary_idx (*** full scan ***) Target cluster: quickstart ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT DISTINCT f1, f2 FROM mysql_source_tableA{self.suffix}; Explained Query (fast path): Project (#0, #1) ReadIndex on=materialize.public.mysql_source_tablea{self.suffix} mysql_source_tablea{self.suffix}_primary_idx=[*** full scan ***] Used Indexes: - materialize.public.mysql_source_tablea{self.suffix}_primary_idx (*** full scan ***) Target cluster: quickstart """ ) return Testdrive(sql) @externally_idempotent(False) class MySqlCdc(MySqlCdcBase, Check): def __init__( self, base_version: MzVersion, rng: Random | None, features: Features | None ) -> None: super().__init__( wait=True, base_version=base_version, rng=rng, features=features ) @externally_idempotent(False) class MySqlCdcNoWait(MySqlCdcBase, Check): def __init__( self, base_version: MzVersion, rng: Random | None, features: Features | None ) -> None: super().__init__( wait=False, base_version=base_version, rng=rng, features=features ) @externally_idempotent(False) class MySqlCdcMzNow(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql # create the database if it does not exist yet but do not drop it CREATE DATABASE IF NOT EXISTS public; USE public; CREATE USER mysql2 IDENTIFIED BY 'mysql'; GRANT REPLICATION SLAVE ON *.* TO mysql2; GRANT ALL ON public.* TO mysql2; DROP TABLE IF EXISTS mysql_mz_now_table; CREATE TABLE mysql_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2)); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'A1'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'B1'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'C1'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'D1'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'E1'); > CREATE SECRET mysql_mz_now_pass AS 'mysql'; > CREATE CONNECTION mysql_mz_now_conn TO MYSQL ( HOST 'mysql', USER mysql2, PASSWORD SECRET mysql_mz_now_pass ) > CREATE SOURCE mysql_mz_now_source FROM MYSQL CONNECTION mysql_mz_now_conn; > CREATE TABLE mysql_mz_now_table FROM SOURCE mysql_mz_now_source (REFERENCE public.mysql_mz_now_table); # Return all rows fresher than 60 seconds > CREATE MATERIALIZED VIEW mysql_mz_now_view AS SELECT * FROM mysql_mz_now_table WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000) """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_mz_now_table VALUES (NOW(), 'A2'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'B2'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'C2'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'D2'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'E2'); DELETE FROM mysql_mz_now_table WHERE f2 = 'B1'; UPDATE mysql_mz_now_table SET f1 = NOW() WHERE f2 = 'C1'; """, f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_mz_now_table VALUES (NOW(), 'A3'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'B3'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'C3'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'D3'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'E3'); DELETE FROM mysql_mz_now_table WHERE f2 = 'B2'; UPDATE mysql_mz_now_table SET f1 = NOW() WHERE f2 = 'D1'; """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( f""" > SELECT COUNT(*) FROM mysql_mz_now_table; 13 $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_mz_now_table VALUES (NOW(), 'A4'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'B4'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'C4'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'D4'); INSERT INTO mysql_mz_now_table VALUES (NOW(), 'E4'); DELETE FROM mysql_mz_now_table WHERE f2 = 'B3'; UPDATE mysql_mz_now_table SET f1 = NOW() WHERE f2 = 'E1' # Expect some rows newer than 60 seconds in view > SELECT COUNT(*) >= 6 FROM mysql_mz_now_view WHERE f1 > NOW() - INTERVAL '60' SECOND; true # Expect no rows older than 60 seconds in view > SELECT COUNT(*) FROM mysql_mz_now_view WHERE f1 < NOW() - INTERVAL '60' SECOND; 0 # Rollback the last INSERTs so that validate() can be called multiple times $ mysql-execute name=mysql INSERT INTO mysql_mz_now_table VALUES (NOW(), 'B3'); DELETE FROM mysql_mz_now_table WHERE f2 LIKE '%4%'; """ ) ) @externally_idempotent(False) class MySqlBitType(Check): def _can_run(self, e: Executor) -> bool: return self.base_version > MzVersion.parse_mz("v0.131.0-dev") def initialize(self) -> Testdrive: return Testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql # create the database if it does not exist yet but do not drop it CREATE DATABASE IF NOT EXISTS public; USE public; CREATE USER mysql3 IDENTIFIED BY 'mysql'; GRANT REPLICATION SLAVE ON *.* TO mysql3; GRANT ALL ON public.* TO mysql3; DROP TABLE IF EXISTS mysql_bit_table; CREATE TABLE mysql_bit_table (f1 BIT(11), f2 BIT(1)); INSERT INTO mysql_bit_table VALUES (8, 0); INSERT INTO mysql_bit_table VALUES (13, 1) INSERT INTO mysql_bit_table VALUES (b'11100000100', b'1'); INSERT INTO mysql_bit_table VALUES (b'0000', b'0'); INSERT INTO mysql_bit_table VALUES (b'11111111111', b'0'); > CREATE SECRET mysql_bit_pass AS 'mysql'; > CREATE CONNECTION mysql_bit_conn TO MYSQL ( HOST 'mysql', USER mysql3, PASSWORD SECRET mysql_bit_pass ) > CREATE SOURCE mysql_bit_source FROM MYSQL CONNECTION mysql_bit_conn; > CREATE TABLE mysql_bit_table FROM SOURCE mysql_bit_source (REFERENCE public.mysql_bit_table); # Return all rows > CREATE MATERIALIZED VIEW mysql_bit_view AS SELECT * FROM mysql_bit_table """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_bit_table VALUES (20, 1); """, f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_bit_table VALUES (30, 1); """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( f""" > SELECT * FROM mysql_bit_table; 0 0 8 0 13 1 20 1 30 1 1796 1 2047 0 $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_bit_table VALUES (40, 1); > SELECT * FROM mysql_bit_table; 0 0 8 0 13 1 20 1 30 1 40 1 1796 1 2047 0 # Rollback the last INSERTs so that validate() can be called multiple times $ mysql-execute name=mysql DELETE FROM mysql_bit_table WHERE f1 = 40; > SELECT * FROM mysql_bit_table; 0 0 8 0 13 1 20 1 30 1 1796 1 2047 0 """ ) ) @externally_idempotent(False) class MySqlInvisibleColumn(Check): def _can_run(self, e: Executor) -> bool: return self.base_version > MzVersion.parse_mz("v0.133.0-dev") def initialize(self) -> Testdrive: return Testdrive( dedent( f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql # create the database if it does not exist yet but do not drop it CREATE DATABASE IF NOT EXISTS public; USE public; CREATE USER mysql4 IDENTIFIED BY 'mysql'; GRANT REPLICATION SLAVE ON *.* TO mysql4; GRANT ALL ON public.* TO mysql4; DROP TABLE IF EXISTS mysql_invisible_table; CREATE TABLE mysql_invisible_table (f1 INT, f2 FLOAT INVISIBLE, f3 DATE INVISIBLE, f4 TEXT INVISIBLE); INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (1, 0.1, '2025-01-01', 'one'); > CREATE SECRET mysql_invisible_pass AS 'mysql'; > CREATE CONNECTION mysql_invisible_conn TO MYSQL ( HOST 'mysql', USER mysql4, PASSWORD SECRET mysql_invisible_pass ) > CREATE SOURCE mysql_invisible_source FROM MYSQL CONNECTION mysql_invisible_conn; > CREATE TABLE mysql_invisible_table FROM SOURCE mysql_invisible_source (REFERENCE public.mysql_invisible_table); # Return all rows > CREATE MATERIALIZED VIEW mysql_invisible_view AS SELECT * FROM mysql_invisible_table """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (2, 0.2, '2025-02-02', 'two'); """, f""" $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (3, 0.3, '2025-03-03', 'three'); """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( f""" > SELECT * FROM mysql_invisible_table; 1 0.1 2025-01-01 one 2 0.2 2025-02-02 two 3 0.3 2025-03-03 three $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD} $ mysql-execute name=mysql USE public; ALTER TABLE mysql_invisible_table ALTER COLUMN f2 SET VISIBLE; INSERT INTO mysql_invisible_table (f1, f2, f3, f4) VALUES (4, 0.4, '2025-04-04', 'four'); > SELECT * FROM mysql_invisible_table; 1 0.1 2025-01-01 one 2 0.2 2025-02-02 two 3 0.3 2025-03-03 three 4 0.4 2025-04-04 four # Rollback the last INSERTs so that validate() can be called multiple times $ mysql-execute name=mysql DELETE FROM mysql_invisible_table WHERE f1 = 4; ALTER TABLE mysql_invisible_table ALTER COLUMN f2 SET INVISIBLE; > SELECT * FROM mysql_invisible_table; 1 0.1 2025-01-01 one 2 0.2 2025-02-02 two 3 0.3 2025-03-03 three """ ) ) def remove_target_cluster_from_explain(sql: str) -> str: return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)