sql_server_cdc.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from random import Random
  10. from textwrap import dedent
  11. from typing import Any
  12. from materialize.checks.actions import Testdrive
  13. from materialize.checks.checks import Check, externally_idempotent
  14. from materialize.checks.executors import Executor
  15. from materialize.checks.features import Features
  16. from materialize.mz_version import MzVersion
  17. from materialize.mzcompose.services.sql_server import SqlServer
  18. class SqlServerCdcBase:
  19. base_version: MzVersion
  20. current_version: MzVersion
  21. features: Features
  22. wait: bool
  23. suffix: str
  24. repeats: int
  25. expects: int
  26. def __init__(self, wait: bool, **kwargs: Any) -> None:
  27. self.wait = wait
  28. self.repeats = 1024 if wait else 16384
  29. self.expects = 97350 if wait else 1633350
  30. self.suffix = f"_{str(wait).lower()}"
  31. super().__init__(**kwargs) # forward unused args to Check
  32. def _can_run(self, e: Executor) -> bool:
  33. return (
  34. self.base_version >= MzVersion.parse_mz("v0.142.0-dev")
  35. and self.features.sql_server_enabled()
  36. )
  37. def initialize(self) -> Testdrive:
  38. return Testdrive(
  39. dedent(
  40. f"""
  41. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  42. ALTER SYSTEM SET enable_sql_server_source = true;
  43. $ sql-server-connect name=sql-server
  44. server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID={SqlServer.DEFAULT_USER};Password={SqlServer.DEFAULT_SA_PASSWORD}
  45. $ sql-server-execute name=sql-server
  46. DROP DATABASE IF EXISTS test;
  47. CREATE DATABASE test;
  48. USE test;
  49. > CREATE SECRET sql_server_password_{self.suffix} AS '{SqlServer.DEFAULT_SA_PASSWORD}';
  50. > CREATE CONNECTION sql_server_connection_{self.suffix} TO SQL SERVER (
  51. HOST 'sql-server',
  52. DATABASE test,
  53. USER {SqlServer.DEFAULT_USER},
  54. PASSWORD SECRET sql_server_password_{self.suffix}
  55. )
  56. """
  57. )
  58. )
  59. def manipulate(self) -> list[Testdrive]:
  60. return [
  61. Testdrive(dedent(s))
  62. for s in [
  63. f"""
  64. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  65. ALTER SYSTEM SET enable_sql_server_source = true;
  66. > VALIDATE CONNECTION sql_server_connection_{self.suffix};
  67. """,
  68. f"""
  69. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  70. ALTER SYSTEM SET enable_sql_server_source = true;
  71. > DROP CONNECTION sql_server_connection_{self.suffix};
  72. > CREATE CONNECTION sql_server_connection2_{self.suffix} TO SQL SERVER (
  73. HOST 'sql-server',
  74. DATABASE test,
  75. USER {SqlServer.DEFAULT_USER},
  76. PASSWORD SECRET sql_server_password_{self.suffix}
  77. );
  78. """,
  79. ]
  80. ]
  81. def validate(self) -> Testdrive:
  82. sql = dedent(
  83. f"""
  84. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  85. ALTER SYSTEM SET enable_sql_server_source = true;
  86. > VALIDATE CONNECTION sql_server_connection2_{self.suffix};
  87. """
  88. )
  89. return Testdrive(sql)
  90. @externally_idempotent(False)
  91. class SqlServerCdc(SqlServerCdcBase, Check):
  92. def __init__(
  93. self, base_version: MzVersion, rng: Random | None, features: Features | None
  94. ) -> None:
  95. super().__init__(
  96. wait=True, base_version=base_version, rng=rng, features=features
  97. )