10-sql-server-cdc.td 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Setup SQL Server state.
  10. #
  11. # Create a table that has CDC enabled.
  12. $ sql-server-connect name=sql-server
  13. server=tcp:sql-server,1433;IntegratedSecurity=true;TrustServerCertificate=true;User ID=${arg.default-sql-server-user};Password=${arg.default-sql-server-password}
  14. $ sql-server-execute name=sql-server
  15. DROP DATABASE IF EXISTS test;
  16. CREATE DATABASE test;
  17. USE test;
  18. EXEC sys.sp_cdc_enable_db;
  19. ALTER DATABASE test SET ALLOW_SNAPSHOT_ISOLATION ON;
  20. CREATE TABLE t1_pk (key_col VARCHAR(20) PRIMARY KEY, val_col VARCHAR(1024));
  21. EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't1_pk', @role_name = 'SA', @supports_net_changes = 0;
  22. INSERT INTO t1_pk VALUES ('a', 'hello world'), ('b', 'foobar'), ('c', 'anotha one');
  23. CREATE TABLE t2_no_cdc (key_col VARCHAR(20) PRIMARY KEY, val_col VARCHAR(1024));
  24. CREATE TABLE t3_text (value VARCHAR(100));
  25. EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 't3_text', @role_name = 'SA', @supports_net_changes = 0;
  26. CREATE TABLE dummy (data int);
  27. EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'dummy', @role_name = 'SA', @supports_net_changes = 0;
  28. # Exercise Materialize.
  29. > CREATE SECRET IF NOT EXISTS sql_server_pass AS '${arg.default-sql-server-password}'
  30. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  31. ALTER SYSTEM SET enable_sql_server_source = true;
  32. > CREATE CONNECTION sql_server_test_connection TO SQL SERVER (
  33. HOST 'sql-server',
  34. PORT 1433,
  35. DATABASE test,
  36. USER '${arg.default-sql-server-user}',
  37. PASSWORD = SECRET sql_server_pass
  38. );
  39. > VALIDATE CONNECTION sql_server_test_connection;
  40. > SELECT name, type from mz_connections WHERE name = 'sql_server_test_connection';
  41. name type
  42. ---------------------------------------
  43. sql_server_test_connection sql-server
  44. # Create a SQL Server Source.
  45. > CREATE SOURCE t1_pk_sql_server
  46. FROM SQL SERVER CONNECTION sql_server_test_connection
  47. FOR ALL TABLES;
  48. > SHOW SOURCES
  49. dummy subsource quickstart ""
  50. t1_pk subsource quickstart ""
  51. t1_pk_sql_server sql-server quickstart ""
  52. t1_pk_sql_server_progress progress <null> ""
  53. t3_text subsource quickstart ""
  54. > SELECT schema_name, table_name FROM mz_internal.mz_sql_server_source_tables;
  55. dbo dummy
  56. dbo t1_pk
  57. dbo t3_text
  58. $ set-from-sql var=source-id
  59. SELECT id FROM mz_sources WHERE name = 't1_pk_sql_server';
  60. $ set-from-sql var=subsource-id
  61. SELECT id FROM mz_sources WHERE name = 't1_pk';
  62. # Wait until snapshot has emitted stats and then insert a new row
  63. # to force LSN in MS SQL to progress.
  64. > SELECT s.name, ss.snapshot_records_known = ss.snapshot_records_staged
  65. FROM mz_internal.mz_source_statistics ss JOIN mz_sources s using (id)
  66. WHERE s.name = 't1_pk_sql_server' AND ss.snapshot_records_staged > 0;
  67. t1_pk_sql_server true
  68. $ sql-server-execute name=sql-server
  69. INSERT INTO dummy VALUES (1);
  70. > SELECT snapshot_records_known, snapshot_records_staged, snapshot_committed FROM mz_internal.mz_source_statistics WHERE id = '${source-id}';
  71. 3 3 true
  72. > SELECT * FROM t1_pk;
  73. a "hello world"
  74. b "foobar"
  75. c "anotha one"
  76. $ sql-server-execute name=sql-server
  77. UPDATE t1_pk SET val_col = 'I am an updated value' WHERE key_col = 'a';
  78. > SELECT messages_received FROM mz_internal.mz_source_statistics WHERE id = '${source-id}';
  79. 6
  80. > SELECT updates_staged, updates_committed FROM mz_internal.mz_source_statistics WHERE id = '${subsource-id}';
  81. 5 5
  82. > SELECT offset_known IS NULL, offset_committed IS NULL FROM mz_internal.mz_source_statistics WHERE id = '${subsource-id}';
  83. false false
  84. > SELECT * FROM t1_pk;
  85. a "I am an updated value"
  86. b "foobar"
  87. c "anotha one"
  88. $ sql-server-execute name=sql-server
  89. DELETE t1_pk WHERE key_col = 'a';
  90. > SELECT * FROM t1_pk;
  91. b "foobar"
  92. c "anotha one"
  93. $ sql-server-execute name=sql-server
  94. INSERT INTO t1_pk VALUES ('😊', 'lets see what happens');
  95. # Note: VARCHAR columns in SQL Server do not support emojis, hence the '??'.
  96. > SELECT * FROM t1_pk;
  97. b "foobar"
  98. c "anotha one"
  99. "??" "lets see what happens"
  100. # Insert a lot of data upstream.
  101. $ sql-server-execute name=sql-server
  102. WITH Tally(n) AS (SELECT 1 UNION ALL SELECT n + 1 FROM Tally WHERE n < 1000) INSERT INTO t3_text (value) SELECT 'a longer string that will be a bit of data, cool ' + CAST(n AS VARCHAR) FROM Tally OPTION (MAXRECURSION 1000);
  103. > SELECT COUNT(*) FROM t3_text;
  104. 1000
  105. $ sql-server-execute name=sql-server
  106. INSERT INTO t3_text (value) SELECT value FROM t3_text;
  107. INSERT INTO t3_text (value) SELECT value FROM t3_text;
  108. INSERT INTO t3_text (value) SELECT value FROM t3_text;
  109. INSERT INTO t3_text (value) SELECT value FROM t3_text;
  110. INSERT INTO t3_text (value) SELECT value FROM t3_text;
  111. > SELECT COUNT(*) FROM t3_text;
  112. 32000
  113. > DROP SOURCE t1_pk_sql_server CASCADE;