123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Native SQL Server source tests, functional.
- """
- import glob
- import random
- import threading
- from textwrap import dedent
- from materialize import MZ_ROOT
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.sql_server import SqlServer
- from materialize.mzcompose.services.test_certs import TestCerts
- from materialize.mzcompose.services.testdrive import Testdrive
- TLS_CONF_PATH = MZ_ROOT / "test" / "sql-server-cdc" / "tls-mssconfig.conf"
- SERVICES = [
- Mz(app_password=""),
- Materialized(
- additional_system_parameter_defaults={
- "log_filter": "mz_storage::source::sql_server=trace,mz_storage::source::sql_server::replication=trace,mz_sql_server_util=debug,info"
- },
- ),
- Testdrive(),
- TestCerts(),
- SqlServer(
- volumes_extra=[
- "secrets:/var/opt/mssql/certs",
- f"{TLS_CONF_PATH}:/var/opt/mssql/mssql.conf",
- ]
- ),
- ]
- #
- # Test that SQL Server ingestion works
- #
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- for name in c.workflows:
- if name == "default":
- continue
- with c.test_case(name):
- c.workflow(name)
- def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument(
- "filter",
- nargs="*",
- default=["*.td"],
- help="limit to only the files matching filter",
- )
- args = parser.parse_args()
- matching_files = []
- for filter in args.filter:
- matching_files.extend(
- glob.glob(filter, root_dir=MZ_ROOT / "test" / "sql-server-cdc")
- )
- matching_files = sorted(matching_files)
- print(f"Filter: {args.filter} Files: {matching_files}")
- # Start with a fresh state
- c.kill("sql-server")
- c.rm("sql-server")
- c.kill("materialized")
- c.rm("materialized")
- # must start test-certs, otherwise the certificates needed by sql-server may not be avaiable
- # in the secrets volume when it starts up
- c.up("materialized", "test-certs", "sql-server")
- seed = random.getrandbits(16)
- ssl_ca = c.exec(
- "sql-server", "cat", "/var/opt/mssql/certs/ca.crt", capture=True
- ).stdout
- alt_ssl_ca = c.exec(
- "sql-server", "cat", "/var/opt/mssql/certs/ca-selective.crt", capture=True
- ).stdout
- c.test_parts(
- matching_files,
- lambda file: c.run_testdrive_files(
- "--no-reset",
- "--max-errors=1",
- f"--seed={seed}",
- f"--var=ssl-ca={ssl_ca}",
- f"--var=alt-ssl-ca={alt_ssl_ca}",
- f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
- f"--var=default-sql-server-user={SqlServer.DEFAULT_USER}",
- f"--var=default-sql-server-password={SqlServer.DEFAULT_SA_PASSWORD}",
- str(file),
- ),
- )
- def workflow_snapshot_consistency(
- c: Composition, parser: WorkflowArgumentParser
- ) -> None:
- """
- Tests the scenario where a new source creates a snapshot and transitions to replication
- while the upstream source table is seeing updates. This test validates that the source snapshot
- sees a consistent view of the table during snapshot and identifies the correct LSN to start
- replication.
- """
- # Start with a fresh state
- c.kill("sql-server")
- c.rm("sql-server")
- c.kill("materialized")
- c.rm("materialized")
- initial_rows = 100
- with c.override(SqlServer()):
- c.up("materialized", "sql-server", {"name": "testdrive", "persistent": True})
- # Setup MS SQL server and materialize
- c.testdrive(
- dedent(
- f"""
- $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- ALTER SYSTEM SET enable_sql_server_source = true;
- $ sql-server-connect name=sql-server
- server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID={SqlServer.DEFAULT_USER};Password={SqlServer.DEFAULT_SA_PASSWORD}
- $ sql-server-execute name=sql-server
- DROP DATABASE IF EXISTS consistency_test;
- CREATE DATABASE consistency_test;
- USE consistency_test;
- ALTER DATABASE consistency_test SET ALLOW_SNAPSHOT_ISOLATION ON;
- EXEC sys.sp_cdc_enable_db;
- CREATE TABLE t1 (id bigint, val bigint);
- EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't1', @role_name = 'SA', @supports_net_changes = 0;
- WITH nums AS (SELECT 1 as n UNION ALL SELECT n+1 FROM nums where n < {initial_rows}) INSERT INTO t1 SELECT n, n+1000 FROM nums;
- > CREATE SECRET IF NOT EXISTS mssql_pass AS '{SqlServer.DEFAULT_SA_PASSWORD}';
- > CREATE CONNECTION mssql_connection TO SQL SERVER (
- HOST 'sql-server',
- PORT 1433,
- DATABASE consistency_test,
- USER '{SqlServer.DEFAULT_USER}',
- PASSWORD = SECRET mssql_pass);
- > DROP SOURCE IF EXISTS mssql_source CASCADE;
- """
- )
- )
- # Create a concurrent workload with 2 variaties of updates
- # - insert of new rows
- # - insert and delete of a row (the same row)
- update_id_offset = 10000
- update_val_offset = 100000
- insert_delete = lambda i: dedent(
- f"""
- INSERT INTO t1 VALUES (999999999,666666666), ({i+update_id_offset}, {i+update_val_offset});
- DELETE FROM t1 WHERE id = 999999999;
- """
- )
- # The number of update rows is based on local testing. While this doesn't guarantee that updates
- # to SQL server are ocurring throughout the snapshot -> replication transition of the source, there is
- # a generous overlap here. The possibility that they don't overlap should be exremely unlikely.
- update_rows = 1500
- upstream_updates = "\n".join([insert_delete(i) for i in range(update_rows)])
- def concurrent_updates(c: Composition) -> None:
- input = (
- dedent(
- f"""
- $ sql-server-connect name=sql-server
- server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID={SqlServer.DEFAULT_USER};Password={SqlServer.DEFAULT_SA_PASSWORD}
- $ sql-server-execute name=sql-server
- USE consistency_test;
- """
- )
- + upstream_updates
- )
- c.testdrive(args=["--no-reset"], input=input)
- driver_thread = threading.Thread(target=concurrent_updates, args=(c,))
- print("==== Starting concurrent updates")
- driver_thread.start()
- # create the subsource that will create a snapshot and start replicating
- c.testdrive(
- args=["--no-reset"],
- input=dedent(
- """
- > CREATE SOURCE mssql_source
- FROM SQL SERVER CONNECTION mssql_connection
- FOR TABLES (dbo.t1);
- """
- ),
- )
- # validate MZ sees the correct results once the conccurent load is complete
- driver_thread.join()
- print("==== Validate concurrent updates")
- c.testdrive(
- args=["--no-reset"],
- input=dedent(
- f"""
- > SELECT COUNT(*), MIN(id), MAX(id) FROM t1;
- {update_rows+initial_rows} 1 {update_rows + update_id_offset - 1}
- """
- ),
- )
|