123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Functional test for the native (non-Debezium) MySQL sources, using Toxiproxy to
- simulate bad network as well as checking how Materialize handles binary log
- corruptions.
- """
- import time
- from typing import Any
- import pymysql
- from pg8000 import Cursor
- from materialize import buildkite
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.alpine import Alpine
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mysql import MySql, create_mysql_server_args
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.toxiproxy import Toxiproxy
- SERVICES = [
- Alpine(),
- Mz(app_password=""),
- Materialized(default_replication_factor=2),
- MySql(),
- MySql(
- name="mysql-replica-1",
- version=MySql.DEFAULT_VERSION,
- additional_args=create_mysql_server_args(server_id="2", is_master=False),
- ),
- MySql(
- name="mysql-replica-2",
- version=MySql.DEFAULT_VERSION,
- additional_args=create_mysql_server_args(server_id="3", is_master=False),
- ),
- Toxiproxy(),
- Testdrive(
- no_reset=True,
- default_timeout="300s",
- ),
- ]
- def workflow_default(c: Composition) -> None:
- def process(name: str) -> None:
- if name == "default":
- return
- # TODO(def-): Reenable when database-issues#7775 is fixed
- if name in ("bin-log-manipulations", "short-bin-log-retention"):
- return
- # clear to avoid issues
- c.kill("mysql")
- c.rm("mysql")
- with c.test_case(name):
- c.workflow(name)
- workflows_with_internal_sharding = [
- "disruptions",
- "bin-log-manipulations",
- "short-bin-log-retention",
- ]
- sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
- [w for w in c.workflows if w not in workflows_with_internal_sharding],
- lambda w: w,
- )
- print(
- f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
- )
- c.test_parts(sharded_workflows, process)
- def workflow_disruptions(c: Composition) -> None:
- """Test MySQL direct replication's failure handling by
- disrupting replication at various stages using Toxiproxy or service restarts
- """
- scenarios = [
- mysql_out_of_disk_space,
- disconnect_mysql_during_snapshot,
- disconnect_mysql_during_replication,
- restart_mysql_during_snapshot,
- restart_mz_during_snapshot,
- restart_mysql_during_replication,
- restart_mz_during_replication,
- fix_mysql_schema_while_mz_restarts,
- verify_no_snapshot_reingestion,
- transaction_with_rollback,
- ]
- scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
- print(
- f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
- )
- for scenario in scenarios:
- overrides = (
- [MySql(volumes=["sourcedata_512Mb:/var/lib/mysql"])]
- if scenario == mysql_out_of_disk_space
- else []
- )
- with c.override(*overrides):
- print(
- f"--- Running scenario {scenario.__name__} with overrides: {overrides}"
- )
- initialize(c)
- scenario(c)
- end(c)
- def workflow_backup_restore(c: Composition) -> None:
- with c.override(
- Materialized(sanity_restart=False, default_replication_factor=2),
- ):
- scenario = backup_restore_mysql
- print(f"--- Running scenario {scenario.__name__}")
- initialize(c)
- scenario(c)
- # No end confirmation here, since we expect the source to be in a bad state
- def workflow_bin_log_manipulations(c: Composition) -> None:
- with c.override(
- Materialized(sanity_restart=False, default_replication_factor=2),
- ):
- scenarios = [
- reset_master_gtid,
- corrupt_bin_log_to_stall_source,
- corrupt_bin_log_and_add_sub_source,
- ]
- scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
- print(
- f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
- )
- for scenario in scenarios:
- print(f"--- Running scenario {scenario.__name__}")
- initialize(c)
- scenario(c)
- # No end confirmation here, since we expect the source to be in a bad state
- def workflow_short_bin_log_retention(c: Composition) -> None:
- bin_log_expiration_in_sec = 2
- args = MySql.DEFAULT_ADDITIONAL_ARGS.copy()
- args.append(f"--binlog_expire_logs_seconds={bin_log_expiration_in_sec}")
- with c.override(
- Materialized(sanity_restart=False, default_replication_factor=2),
- MySql(additional_args=args),
- ):
- scenarios = [logs_expiration_while_mz_down, create_source_after_logs_expiration]
- scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
- print(
- f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
- )
- for scenario in scenarios:
- print(f"--- Running scenario {scenario.__name__}")
- initialize(c, create_source=False)
- scenario(
- c,
- bin_log_expiration_in_sec,
- )
- # No end confirmation here
- def workflow_master_changes(c: Composition) -> None:
- """
- mysql-replica-1 and mysql-replica-2 replicate mysql. The source is attached to mysql-replica-2. mysql is
- killed and mysql-replica-1 becomes the new master. mysql-replica-2 is configured to replicate from mysql-replica-1.
- """
- with c.override(
- Materialized(sanity_restart=False, default_replication_factor=2),
- MySql(
- name="mysql-replica-1",
- version=MySql.DEFAULT_VERSION,
- additional_args=create_mysql_server_args(server_id="2", is_master=False),
- ),
- MySql(
- name="mysql-replica-2",
- version=MySql.DEFAULT_VERSION,
- additional_args=create_mysql_server_args(server_id="3", is_master=False),
- ),
- ):
- initialize(c, create_source=False)
- host_data_master = "mysql"
- host_for_mz_source = "mysql-replica-2"
- c.up("mysql-replica-1", "mysql-replica-2")
- # configure mysql-replica-1 to replicate mysql
- run_testdrive_files(
- c,
- f"--var=mysql-replication-master-host={host_data_master}",
- "configure-replica.td",
- mysql_host="mysql-replica-1",
- )
- # configure mysql-replica-2 to replicate mysql
- run_testdrive_files(
- c,
- f"--var=mysql-replication-master-host={host_data_master}",
- "configure-replica.td",
- mysql_host="mysql-replica-2",
- )
- # wait for mysql-replica-2 to replicate the current state
- time.sleep(15)
- run_testdrive_files(
- c,
- f"--var=mysql-source-host={host_for_mz_source}",
- "verify-mysql-source-select-all.td",
- )
- # create source pointing to mysql-replica-2
- run_testdrive_files(
- c,
- f"--var=mysql-source-host={host_for_mz_source}",
- "create-source.td",
- )
- run_testdrive_files(
- c,
- "verify-source-running.td",
- )
- c.kill("mysql")
- host_data_master = "mysql-replica-1"
- # let mysql-replica-2 replicate from mysql-replica-1
- run_testdrive_files(
- c,
- f"--var=mysql-replication-master-host={host_data_master}",
- "configure-replica.td",
- mysql_host=host_for_mz_source,
- )
- run_testdrive_files(
- c,
- "verify-source-running.td",
- )
- # delete rows in mysql-replica-1
- run_testdrive_files(c, "delete-rows-t1.td", mysql_host=host_data_master)
- # It may take some time until mysql-replica-2 catches up.
- time.sleep(15)
- run_testdrive_files(
- c,
- "verify-rows-deleted-t1.td",
- )
- def workflow_switch_to_replica_and_kill_master(c: Composition) -> None:
- """
- mysql-replica-1 replicates mysql. The source is attached to mysql. mz switches the connection to mysql-replica-1.
- Changing the connection should not brick the source and the source should still work if mysql is killed.
- """
- with c.override(
- Materialized(sanity_restart=False, default_replication_factor=2),
- MySql(
- name="mysql-replica-1",
- version=MySql.DEFAULT_VERSION,
- additional_args=create_mysql_server_args(server_id="2", is_master=False),
- ),
- ):
- initialize(c)
- host_data_master = "mysql"
- host_for_mz_source = "mysql"
- c.up("mysql-replica-1")
- # configure replica
- run_testdrive_files(
- c,
- f"--var=mysql-replication-master-host={host_data_master}",
- "configure-replica.td",
- mysql_host="mysql-replica-1",
- )
- # give the replica some time to replicate the current state
- time.sleep(3)
- run_testdrive_files(
- c,
- "delete-rows-t1.td",
- "verify-rows-deleted-t1.td",
- )
- # change connection to replica
- host_for_mz_source = "mysql-replica-1"
- run_testdrive_files(
- c,
- f"--var=mysql-source-host={host_for_mz_source}",
- "alter-source-connection.td",
- )
- run_testdrive_files(
- c,
- "verify-source-running.td",
- )
- c.kill("mysql")
- time.sleep(3)
- run_testdrive_files(
- c,
- "verify-source-running.td",
- )
- def initialize(c: Composition, create_source: bool = True) -> None:
- c.down(destroy_volumes=True)
- c.up("materialized", "mysql", "toxiproxy")
- run_testdrive_files(
- c,
- "configure-toxiproxy.td",
- "configure-mysql.td",
- "populate-tables.td",
- )
- if create_source:
- run_testdrive_files(
- c,
- "--var=mysql-source-host=toxiproxy",
- "create-source.td",
- )
- def restart_mysql(c: Composition) -> None:
- c.kill("mysql")
- c.up("mysql")
- def restart_mz(c: Composition) -> None:
- c.kill("materialized")
- c.up("materialized")
- def end(c: Composition) -> None:
- """Validate the data at the end."""
- run_testdrive_files(c, "verify-data.td", "cleanup.td")
- def disconnect_mysql_during_snapshot(c: Composition) -> None:
- run_testdrive_files(
- c,
- "toxiproxy-close-connection.td",
- "toxiproxy-restore-connection.td",
- "delete-rows-t1.td",
- "delete-rows-t2.td",
- "alter-table.td",
- "alter-mz.td",
- )
- def restart_mysql_during_snapshot(c: Composition) -> None:
- restart_mysql(c)
- run_testdrive_files(
- c,
- "delete-rows-t1.td",
- "delete-rows-t2.td",
- "alter-table.td",
- "alter-mz.td",
- )
- def restart_mz_during_snapshot(c: Composition) -> None:
- run_testdrive_files(c, "alter-mz.td")
- restart_mz(c)
- run_testdrive_files(c, "delete-rows-t1.td", "delete-rows-t2.td", "alter-table.td")
- def disconnect_mysql_during_replication(c: Composition) -> None:
- run_testdrive_files(
- c,
- "wait-for-snapshot.td",
- "delete-rows-t1.td",
- "delete-rows-t2.td",
- "alter-table.td",
- "alter-mz.td",
- "toxiproxy-close-connection.td",
- "toxiproxy-restore-connection.td",
- )
- def restart_mysql_during_replication(c: Composition) -> None:
- run_testdrive_files(
- c,
- "wait-for-snapshot.td",
- "delete-rows-t1.td",
- "alter-table.td",
- "alter-mz.td",
- )
- restart_mysql(c)
- run_testdrive_files(c, "delete-rows-t2.td")
- def restart_mz_during_replication(c: Composition) -> None:
- run_testdrive_files(
- c,
- "wait-for-snapshot.td",
- "delete-rows-t1.td",
- "alter-table.td",
- "alter-mz.td",
- )
- restart_mz(c)
- run_testdrive_files(c, "delete-rows-t2.td")
- def fix_mysql_schema_while_mz_restarts(c: Composition) -> None:
- run_testdrive_files(
- c,
- "delete-rows-t1.td",
- "delete-rows-t2.td",
- "alter-table.td",
- "alter-mz.td",
- "verify-data.td",
- "alter-table-fix.td",
- )
- restart_mz(c)
- def verify_no_snapshot_reingestion(c: Composition) -> None:
- """Confirm that Mz does not reingest the entire snapshot on restart by
- revoking its SELECT privileges
- """
- run_testdrive_files(c, "wait-for-snapshot.td", "mysql-disable-select-permission.td")
- restart_mz(c)
- run_testdrive_files(
- c,
- "delete-rows-t1.td",
- "delete-rows-t2.td",
- "alter-table.td",
- "alter-mz.td",
- )
- def reset_master_gtid(c: Composition) -> None:
- """Confirm behavior after resetting GTID in MySQL"""
- run_testdrive_files(c, "reset-master.td")
- run_testdrive_files(
- c,
- "delete-rows-t1.td",
- "verify-source-stalled.td",
- )
- def corrupt_bin_log_to_stall_source(c: Composition) -> None:
- """
- Switch off mz, modify data in mysql, and purge the bin-log so that mz hasn't seen all entries in the replication
- stream.
- """
- _corrupt_bin_log(c)
- run_testdrive_files(
- c,
- "verify-source-stalled.td",
- )
- def corrupt_bin_log_and_add_sub_source(c: Composition) -> None:
- """
- Corrupt the bin log, add a sub source, and expect it to be working.
- """
- _corrupt_bin_log(c)
- run_testdrive_files(
- c,
- "populate-table-t3.td",
- "alter-source-add-subsource.td",
- "verify-t3.td",
- )
- def _corrupt_bin_log(c: Composition) -> None:
- run_testdrive_files(c, "wait-for-snapshot.td")
- c.kill("materialized")
- mysql_conn = create_mysql_connection(c)
- mysql_conn.autocommit(True)
- with mysql_conn.cursor() as cur:
- cur.execute("INSERT INTO public.t1 VALUES (1, 'text')")
- _purge_bin_logs(cur)
- cur.execute("INSERT INTO public.t1 VALUES (2, 'text')")
- c.up("materialized")
- def _purge_bin_logs(cur: Cursor) -> None:
- cur.execute("FLUSH BINARY LOGS")
- time.sleep(2)
- cur.execute("PURGE BINARY LOGS BEFORE now()")
- cur.execute("FLUSH BINARY LOGS")
- def transaction_with_rollback(c: Composition) -> None:
- """
- Rollback a tx in MySQL.
- """
- # needed for verify-data to succeed in the end (triggered by the workflow)
- run_testdrive_files(
- c,
- "delete-rows-t1.td",
- "delete-rows-t2.td",
- )
- mysql_conn = create_mysql_connection(c)
- mysql_conn.autocommit(False)
- with mysql_conn.cursor() as cur:
- cur.execute("INSERT INTO public.t1 VALUES (1, 'text')")
- time.sleep(2)
- cur.execute("INSERT INTO public.t1 VALUES (2, 'text')")
- time.sleep(2)
- cur.execute("ROLLBACK")
- run_testdrive_files(
- c,
- "verify-source-running.td",
- )
- # needed for verify-data to succeed in the end (triggered by the workflow)
- run_testdrive_files(
- c,
- "alter-table.td",
- "alter-mz.td",
- )
- def mysql_out_of_disk_space(c: Composition) -> None:
- run_testdrive_files(
- c,
- "wait-for-snapshot.td",
- "delete-rows-t1.td",
- )
- fill_file = "/var/lib/mysql/fill_file"
- c.exec(
- "mysql",
- "bash",
- "-c",
- f"dd if=/dev/zero of={fill_file} bs=1024 count=$[1024*512] || true",
- )
- print("Sleeping for 30 seconds ...")
- time.sleep(30)
- c.exec("mysql", "bash", "-c", f"rm {fill_file}")
- run_testdrive_files(c, "delete-rows-t2.td", "alter-table.td", "alter-mz.td")
- def backup_restore_mysql(c: Composition) -> None:
- # Backup MySQL, wait for completion
- backup_file = "backup.sql"
- c.exec(
- "mysql",
- "bash",
- "-c",
- f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD} && mysqldump --all-databases -u root --set-gtid-purged=OFF > {backup_file}",
- )
- run_testdrive_files(c, "delete-rows-t1.td")
- run_testdrive_files(c, "verify-rows-deleted-t1.td")
- # Restart MySQL service
- c.stop("mysql")
- c.up("mysql")
- c.exec(
- "mysql",
- "bash",
- "-c",
- f"export MYSQL_PWD={MySql.DEFAULT_ROOT_PASSWORD} && mysql -u root < {backup_file}",
- )
- run_testdrive_files(c, "verify-mysql-select.td")
- # TODO: database-issues#7683: one of the two following commands must succeed
- # run_testdrive_files(c, "verify-rows-after-restore-t1.td")
- # run_testdrive_files(c, "verify-source-failed.td")
- def create_source_after_logs_expiration(
- c: Composition, bin_log_expiration_in_sec: int
- ) -> None:
- """Populate tables, delete rows, and create the source after the log expiration in MySQL took place"""
- run_testdrive_files(c, "delete-rows-t1.td")
- sleep_duration = bin_log_expiration_in_sec + 2
- print(f"Sleeping for {sleep_duration} sec")
- time.sleep(sleep_duration)
- mysql_conn = create_mysql_connection(c)
- with mysql_conn.cursor() as cur:
- cur.execute("FLUSH BINARY LOGS")
- restart_mysql(c)
- # not really necessary, still do it
- mysql_conn = create_mysql_connection(c)
- mysql_conn.autocommit(True)
- with mysql_conn.cursor() as cur:
- cur.execute("FLUSH BINARY LOGS")
- run_testdrive_files(
- c,
- "--var=mysql-source-host=toxiproxy",
- "create-source.td",
- )
- run_testdrive_files(c, "verify-rows-deleted-t1.td")
- def logs_expiration_while_mz_down(
- c: Composition, bin_log_expiration_in_sec: int
- ) -> None:
- """Switch off mz, conduct changes in MySQL, let MySQL bin logs expire, and start mz"""
- run_testdrive_files(
- c,
- "--var=mysql-source-host=toxiproxy",
- "create-source.td",
- )
- c.kill("materialized")
- mysql_conn = create_mysql_connection(c)
- mysql_conn.autocommit(True)
- with mysql_conn.cursor() as cur:
- cur.execute("DELETE FROM public.t1 WHERE f1 % 2 = 0;")
- sleep_duration = bin_log_expiration_in_sec + 2
- print(f"Sleeping for {sleep_duration} sec")
- time.sleep(sleep_duration)
- restart_mysql(c)
- mysql_conn = create_mysql_connection(c)
- mysql_conn.autocommit(True)
- # conduct a further change to be added to the bin log
- with mysql_conn.cursor() as cur:
- cur.execute("UPDATE public.t1 SET f2 = NULL;")
- cur.execute("FLUSH BINARY LOGS")
- c.up("materialized")
- run_testdrive_files(c, "verify-source-stalled.td")
- def run_testdrive_files(c: Composition, *files: str, mysql_host: str = "mysql") -> None:
- c.run_testdrive_files(
- f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
- f"--var=mysql-host={mysql_host}",
- *files,
- )
- def create_mysql_connection(c: Composition) -> Any:
- return pymysql.connect(
- host="localhost",
- user="root",
- password=MySql.DEFAULT_ROOT_PASSWORD,
- database="mysql",
- port=c.default_port("mysql"),
- )
|