mzcompose.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Native SQL Server source tests, functional.
  11. """
  12. import glob
  13. import random
  14. import threading
  15. from textwrap import dedent
  16. from materialize import MZ_ROOT
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.services.materialized import Materialized
  19. from materialize.mzcompose.services.mz import Mz
  20. from materialize.mzcompose.services.sql_server import SqlServer
  21. from materialize.mzcompose.services.test_certs import TestCerts
  22. from materialize.mzcompose.services.testdrive import Testdrive
  23. TLS_CONF_PATH = MZ_ROOT / "test" / "sql-server-cdc" / "tls-mssconfig.conf"
  24. SERVICES = [
  25. Mz(app_password=""),
  26. Materialized(
  27. additional_system_parameter_defaults={
  28. "log_filter": "mz_storage::source::sql_server=trace,mz_storage::source::sql_server::replication=trace,mz_sql_server_util=debug,info"
  29. },
  30. ),
  31. Testdrive(),
  32. TestCerts(),
  33. SqlServer(
  34. volumes_extra=[
  35. "secrets:/var/opt/mssql/certs",
  36. f"{TLS_CONF_PATH}:/var/opt/mssql/mssql.conf",
  37. ]
  38. ),
  39. ]
  40. #
  41. # Test that SQL Server ingestion works
  42. #
  43. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  44. for name in c.workflows:
  45. if name == "default":
  46. continue
  47. with c.test_case(name):
  48. c.workflow(name)
  49. def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
  50. parser.add_argument(
  51. "filter",
  52. nargs="*",
  53. default=["*.td"],
  54. help="limit to only the files matching filter",
  55. )
  56. args = parser.parse_args()
  57. matching_files = []
  58. for filter in args.filter:
  59. matching_files.extend(
  60. glob.glob(filter, root_dir=MZ_ROOT / "test" / "sql-server-cdc")
  61. )
  62. matching_files = sorted(matching_files)
  63. print(f"Filter: {args.filter} Files: {matching_files}")
  64. # Start with a fresh state
  65. c.kill("sql-server")
  66. c.rm("sql-server")
  67. c.kill("materialized")
  68. c.rm("materialized")
  69. # must start test-certs, otherwise the certificates needed by sql-server may not be avaiable
  70. # in the secrets volume when it starts up
  71. c.up("materialized", "test-certs", "sql-server")
  72. seed = random.getrandbits(16)
  73. ssl_ca = c.exec(
  74. "sql-server", "cat", "/var/opt/mssql/certs/ca.crt", capture=True
  75. ).stdout
  76. alt_ssl_ca = c.exec(
  77. "sql-server", "cat", "/var/opt/mssql/certs/ca-selective.crt", capture=True
  78. ).stdout
  79. c.test_parts(
  80. matching_files,
  81. lambda file: c.run_testdrive_files(
  82. "--no-reset",
  83. "--max-errors=1",
  84. f"--seed={seed}",
  85. f"--var=ssl-ca={ssl_ca}",
  86. f"--var=alt-ssl-ca={alt_ssl_ca}",
  87. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  88. f"--var=default-sql-server-user={SqlServer.DEFAULT_USER}",
  89. f"--var=default-sql-server-password={SqlServer.DEFAULT_SA_PASSWORD}",
  90. str(file),
  91. ),
  92. )
  93. def workflow_snapshot_consistency(
  94. c: Composition, parser: WorkflowArgumentParser
  95. ) -> None:
  96. """
  97. Tests the scenario where a new source creates a snapshot and transitions to replication
  98. while the upstream source table is seeing updates. This test validates that the source snapshot
  99. sees a consistent view of the table during snapshot and identifies the correct LSN to start
  100. replication.
  101. """
  102. # Start with a fresh state
  103. c.kill("sql-server")
  104. c.rm("sql-server")
  105. c.kill("materialized")
  106. c.rm("materialized")
  107. initial_rows = 100
  108. with c.override(SqlServer()):
  109. c.up("materialized", "sql-server", {"name": "testdrive", "persistent": True})
  110. # Setup MS SQL server and materialize
  111. c.testdrive(
  112. dedent(
  113. f"""
  114. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  115. ALTER SYSTEM SET enable_sql_server_source = true;
  116. $ sql-server-connect name=sql-server
  117. server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID={SqlServer.DEFAULT_USER};Password={SqlServer.DEFAULT_SA_PASSWORD}
  118. $ sql-server-execute name=sql-server
  119. DROP DATABASE IF EXISTS consistency_test;
  120. CREATE DATABASE consistency_test;
  121. USE consistency_test;
  122. ALTER DATABASE consistency_test SET ALLOW_SNAPSHOT_ISOLATION ON;
  123. EXEC sys.sp_cdc_enable_db;
  124. CREATE TABLE t1 (id bigint, val bigint);
  125. EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't1', @role_name = 'SA', @supports_net_changes = 0;
  126. WITH nums AS (SELECT 1 as n UNION ALL SELECT n+1 FROM nums where n < {initial_rows}) INSERT INTO t1 SELECT n, n+1000 FROM nums;
  127. > CREATE SECRET IF NOT EXISTS mssql_pass AS '{SqlServer.DEFAULT_SA_PASSWORD}';
  128. > CREATE CONNECTION mssql_connection TO SQL SERVER (
  129. HOST 'sql-server',
  130. PORT 1433,
  131. DATABASE consistency_test,
  132. USER '{SqlServer.DEFAULT_USER}',
  133. PASSWORD = SECRET mssql_pass);
  134. > DROP SOURCE IF EXISTS mssql_source CASCADE;
  135. """
  136. )
  137. )
  138. # Create a concurrent workload with 2 variaties of updates
  139. # - insert of new rows
  140. # - insert and delete of a row (the same row)
  141. update_id_offset = 10000
  142. update_val_offset = 100000
  143. insert_delete = lambda i: dedent(
  144. f"""
  145. INSERT INTO t1 VALUES (999999999,666666666), ({i+update_id_offset}, {i+update_val_offset});
  146. DELETE FROM t1 WHERE id = 999999999;
  147. """
  148. )
  149. # The number of update rows is based on local testing. While this doesn't guarantee that updates
  150. # to SQL server are ocurring throughout the snapshot -> replication transition of the source, there is
  151. # a generous overlap here. The possibility that they don't overlap should be exremely unlikely.
  152. update_rows = 1500
  153. upstream_updates = "\n".join([insert_delete(i) for i in range(update_rows)])
  154. def concurrent_updates(c: Composition) -> None:
  155. input = (
  156. dedent(
  157. f"""
  158. $ sql-server-connect name=sql-server
  159. server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID={SqlServer.DEFAULT_USER};Password={SqlServer.DEFAULT_SA_PASSWORD}
  160. $ sql-server-execute name=sql-server
  161. USE consistency_test;
  162. """
  163. )
  164. + upstream_updates
  165. )
  166. c.testdrive(args=["--no-reset"], input=input)
  167. driver_thread = threading.Thread(target=concurrent_updates, args=(c,))
  168. print("==== Starting concurrent updates")
  169. driver_thread.start()
  170. # create the subsource that will create a snapshot and start replicating
  171. c.testdrive(
  172. args=["--no-reset"],
  173. input=dedent(
  174. """
  175. > CREATE SOURCE mssql_source
  176. FROM SQL SERVER CONNECTION mssql_connection
  177. FOR TABLES (dbo.t1);
  178. """
  179. ),
  180. )
  181. # validate MZ sees the correct results once the conccurent load is complete
  182. driver_thread.join()
  183. print("==== Validate concurrent updates")
  184. c.testdrive(
  185. args=["--no-reset"],
  186. input=dedent(
  187. f"""
  188. > SELECT COUNT(*), MIN(id), MAX(id) FROM t1;
  189. {update_rows+initial_rows} 1 {update_rows + update_id_offset - 1}
  190. """
  191. ),
  192. )