pg_cdc.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import re
  10. from random import Random
  11. from textwrap import dedent
  12. from typing import Any
  13. from materialize.checks.actions import Testdrive
  14. from materialize.checks.checks import Check, externally_idempotent
  15. from materialize.checks.features import Features
  16. from materialize.mz_version import MzVersion
  17. class PgCdcBase:
  18. base_version: MzVersion
  19. current_version: MzVersion
  20. wait: bool
  21. suffix: str
  22. repeats: int
  23. expects: int
  24. def __init__(self, wait: bool, **kwargs: Any) -> None:
  25. self.wait = wait
  26. self.repeats = 1024 if wait else 16384
  27. self.expects = 97350 if wait else 1633350
  28. self.suffix = f"_{str(wait).lower()}"
  29. super().__init__(**kwargs) # foward unused args to Check
  30. def initialize(self) -> Testdrive:
  31. return Testdrive(
  32. dedent(
  33. f"""
  34. $ postgres-execute connection=postgres://postgres:postgres@postgres
  35. CREATE USER postgres1{self.suffix} WITH SUPERUSER PASSWORD 'postgres';
  36. ALTER USER postgres1{self.suffix} WITH replication;
  37. DROP PUBLICATION IF EXISTS postgres_source{self.suffix};
  38. DROP TABLE IF EXISTS postgres_source_table{self.suffix};
  39. CREATE TABLE postgres_source_table{self.suffix} (f1 TEXT, f2 INTEGER, f3 TEXT UNIQUE NOT NULL, f4 JSONB, PRIMARY KEY(f1, f2));
  40. ALTER TABLE postgres_source_table{self.suffix} REPLICA IDENTITY FULL;
  41. INSERT INTO postgres_source_table{self.suffix} SELECT 'A', i, REPEAT('A', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  42. CREATE PUBLICATION postgres_source{self.suffix} FOR ALL TABLES;
  43. > CREATE SECRET pgpass1{self.suffix} AS 'postgres';
  44. > CREATE CONNECTION pg1{self.suffix} FOR POSTGRES
  45. HOST 'postgres',
  46. DATABASE postgres,
  47. USER postgres1{self.suffix},
  48. PASSWORD SECRET pgpass1{self.suffix}
  49. """
  50. )
  51. )
  52. def manipulate(self) -> list[Testdrive]:
  53. return [
  54. Testdrive(dedent(s))
  55. for s in [
  56. f"""
  57. > CREATE SOURCE postgres_source1{self.suffix}
  58. FROM POSTGRES CONNECTION pg1{self.suffix}
  59. (PUBLICATION 'postgres_source{self.suffix}');
  60. > CREATE TABLE postgres_source_tableA{self.suffix} FROM SOURCE postgres_source1{self.suffix} (REFERENCE postgres_source_table{self.suffix});
  61. > CREATE DEFAULT INDEX ON postgres_source_tableA{self.suffix};
  62. $ postgres-execute connection=postgres://postgres:postgres@postgres
  63. INSERT INTO postgres_source_table{self.suffix} SELECT 'B', i, REPEAT('B', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  64. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  65. > CREATE SECRET pgpass2{self.suffix} AS 'postgres';
  66. > CREATE CONNECTION pg2{self.suffix} FOR POSTGRES
  67. HOST 'postgres',
  68. DATABASE postgres,
  69. USER postgres1{self.suffix},
  70. PASSWORD SECRET pgpass1{self.suffix}
  71. $ postgres-execute connection=postgres://postgres:postgres@postgres
  72. INSERT INTO postgres_source_table{self.suffix} SELECT 'C', i, REPEAT('C', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  73. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  74. $ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}}
  75. GRANT USAGE ON CONNECTION pg2{self.suffix} TO materialize
  76. $ postgres-execute connection=postgres://postgres:postgres@postgres
  77. INSERT INTO postgres_source_table{self.suffix} SELECT 'D', i, REPEAT('D', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  78. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  79. > CREATE SOURCE postgres_source2{self.suffix}
  80. FROM POSTGRES CONNECTION pg2{self.suffix}
  81. (PUBLICATION 'postgres_source{self.suffix}');
  82. > CREATE TABLE postgres_source_tableB{self.suffix} FROM SOURCE postgres_source2{self.suffix} (REFERENCE postgres_source_table{self.suffix});
  83. # Create a view with a complex dependency structure
  84. > CREATE VIEW IF NOT EXISTS table_a_b_count_sum AS SELECT SUM(total_count) AS total_rows FROM (
  85. SELECT COUNT(*) AS total_count FROM postgres_source_tableA{self.suffix}
  86. UNION ALL
  87. SELECT COUNT(*) AS total_count FROM postgres_source_tableB{self.suffix}
  88. );
  89. """
  90. + (
  91. f"""
  92. # Wait until Pg snapshot is complete in order to avoid database-issues#5601
  93. > SELECT COUNT(*) > 0 FROM postgres_source_tableA{self.suffix}
  94. true
  95. # Wait until Pg snapshot is complete in order to avoid database-issues#5601
  96. > SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix}
  97. true
  98. """
  99. if self.wait
  100. else ""
  101. ),
  102. f"""
  103. $ postgres-execute connection=postgres://postgres:postgres@postgres
  104. INSERT INTO postgres_source_table{self.suffix} SELECT 'E', i, REPEAT('E', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  105. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  106. $ postgres-execute connection=postgres://postgres:postgres@postgres
  107. INSERT INTO postgres_source_table{self.suffix} SELECT 'F', i, REPEAT('F', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  108. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  109. > CREATE SECRET pgpass3{self.suffix} AS 'postgres';
  110. > CREATE CONNECTION pg3{self.suffix} FOR POSTGRES
  111. HOST 'postgres',
  112. DATABASE postgres,
  113. USER postgres1{self.suffix},
  114. PASSWORD SECRET pgpass3{self.suffix}
  115. > CREATE SOURCE postgres_source3{self.suffix}
  116. FROM POSTGRES CONNECTION pg3{self.suffix}
  117. (PUBLICATION 'postgres_source{self.suffix}');
  118. > CREATE TABLE postgres_source_tableC{self.suffix} FROM SOURCE postgres_source3{self.suffix} (REFERENCE postgres_source_table{self.suffix});
  119. $ postgres-execute connection=postgres://postgres:postgres@postgres
  120. INSERT INTO postgres_source_table{self.suffix} SELECT 'G', i, REPEAT('G', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  121. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  122. $ postgres-execute connection=postgres://postgres:postgres@postgres
  123. INSERT INTO postgres_source_table{self.suffix} SELECT 'H', i, REPEAT('X', {self.repeats} - i), NULL FROM generate_series(1,100) AS i;
  124. UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
  125. """
  126. + (
  127. f"""
  128. # Wait until Pg snapshot is complete in order to avoid database-issues#5601
  129. > SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix}
  130. true
  131. > SELECT COUNT(*) > 0 FROM postgres_source_tableC{self.suffix}
  132. true
  133. """
  134. if self.wait
  135. else ""
  136. ),
  137. ]
  138. ]
  139. def validate(self) -> Testdrive:
  140. sql = dedent(
  141. f"""
  142. $ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}}
  143. GRANT SELECT ON postgres_source_tableA{self.suffix} TO materialize
  144. GRANT SELECT ON postgres_source_tableB{self.suffix} TO materialize
  145. GRANT SELECT ON postgres_source_tableC{self.suffix} TO materialize
  146. # Can take longer after a restart
  147. $ set-sql-timeout duration=600s
  148. # Trying to narrow down whether
  149. # https://github.com/MaterializeInc/database-issues/issues/8102
  150. # is a problem with the source or elsewhere.
  151. > WITH t AS (SELECT * FROM postgres_source_tableA{self.suffix})
  152. SELECT COUNT(*)
  153. FROM t
  154. GROUP BY row(t.*)
  155. HAVING COUNT(*) < 0;
  156. > WITH t AS (SELECT * FROM postgres_source_tableB{self.suffix})
  157. SELECT COUNT(*)
  158. FROM t
  159. GROUP BY row(t.*)
  160. HAVING COUNT(*) < 0;
  161. > WITH t AS (SELECT * FROM postgres_source_tableC{self.suffix})
  162. SELECT COUNT(*)
  163. FROM t
  164. GROUP BY row(t.*)
  165. HAVING COUNT(*) < 0;
  166. > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableA{self.suffix} GROUP BY f1;
  167. A 800 {self.expects}
  168. B 800 {self.expects}
  169. C 700 {self.expects}
  170. D 600 {self.expects}
  171. E 500 {self.expects}
  172. F 400 {self.expects}
  173. G 300 {self.expects}
  174. H 200 {self.expects}
  175. > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableB{self.suffix} GROUP BY f1;
  176. A 800 {self.expects}
  177. B 800 {self.expects}
  178. C 700 {self.expects}
  179. D 600 {self.expects}
  180. E 500 {self.expects}
  181. F 400 {self.expects}
  182. G 300 {self.expects}
  183. H 200 {self.expects}
  184. > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableC{self.suffix} GROUP BY f1;
  185. A 800 {self.expects}
  186. B 800 {self.expects}
  187. C 700 {self.expects}
  188. D 600 {self.expects}
  189. E 500 {self.expects}
  190. F 400 {self.expects}
  191. G 300 {self.expects}
  192. H 200 {self.expects}
  193. > SELECT total_rows FROM table_a_b_count_sum;
  194. 1600
  195. # TODO: Figure out the quoting here -- it returns "f4" when done using the SQL shell
  196. # > SELECT regexp_match(create_sql, 'TEXT COLUMNS = \\((.*?)\\)')[1] FROM (SHOW CREATE SOURCE postgres_source_tableA{self.suffix});
  197. # "\"f4\""
  198. # Confirm that the primary key information has been propagated from Pg
  199. > SELECT key FROM (SHOW INDEXES ON postgres_source_tableA{self.suffix});
  200. {{f1,f2}}
  201. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT DISTINCT f1, f2 FROM postgres_source_tableA{self.suffix};
  202. Explained Query (fast path):
  203. Project (#0, #1)
  204. ReadIndex on=materialize.public.postgres_source_tablea{self.suffix} postgres_source_tablea{self.suffix}_primary_idx=[*** full scan ***]
  205. Used Indexes:
  206. - materialize.public.postgres_source_tablea{self.suffix}_primary_idx (*** full scan ***)
  207. Target cluster: quickstart
  208. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT DISTINCT f1, f2 FROM postgres_source_tableA{self.suffix};
  209. Explained Query (fast path):
  210. Project (#0, #1)
  211. ReadIndex on=materialize.public.postgres_source_tablea{self.suffix} postgres_source_tablea{self.suffix}_primary_idx=[*** full scan ***]
  212. Used Indexes:
  213. - materialize.public.postgres_source_tablea{self.suffix}_primary_idx (*** full scan ***)
  214. Target cluster: quickstart
  215. """
  216. )
  217. return Testdrive(sql)
  218. @externally_idempotent(False)
  219. class PgCdc(PgCdcBase, Check):
  220. def __init__(
  221. self, base_version: MzVersion, rng: Random | None, features: Features | None
  222. ) -> None:
  223. super().__init__(
  224. wait=True, base_version=base_version, rng=rng, features=features
  225. )
  226. @externally_idempotent(False)
  227. class PgCdcNoWait(PgCdcBase, Check):
  228. def __init__(
  229. self, base_version: MzVersion, rng: Random | None, features: Features | None
  230. ) -> None:
  231. super().__init__(
  232. wait=False, base_version=base_version, rng=rng, features=features
  233. )
  234. @externally_idempotent(False)
  235. class PgCdcMzNow(Check):
  236. def initialize(self) -> Testdrive:
  237. return Testdrive(
  238. dedent(
  239. """
  240. $ postgres-execute connection=postgres://postgres:postgres@postgres
  241. CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres';
  242. ALTER USER postgres2 WITH replication;
  243. DROP PUBLICATION IF EXISTS postgres_mz_now_publication;
  244. DROP TABLE IF EXISTS postgres_mz_now_table;
  245. CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2));
  246. ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL;
  247. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1');
  248. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1');
  249. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1');
  250. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1');
  251. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1');
  252. CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES;
  253. > CREATE SECRET postgres_mz_now_pass AS 'postgres';
  254. > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES
  255. HOST 'postgres',
  256. DATABASE postgres,
  257. USER postgres2,
  258. PASSWORD SECRET postgres_mz_now_pass
  259. > CREATE SOURCE postgres_mz_now_source
  260. FROM POSTGRES CONNECTION postgres_mz_now_conn
  261. (PUBLICATION 'postgres_mz_now_publication');
  262. > CREATE TABLE postgres_mz_now_table FROM SOURCE postgres_mz_now_source (REFERENCE postgres_mz_now_table);
  263. # Return all rows fresher than 60 seconds
  264. > CREATE MATERIALIZED VIEW postgres_mz_now_view AS
  265. SELECT * FROM postgres_mz_now_table
  266. WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000)
  267. """
  268. )
  269. )
  270. def manipulate(self) -> list[Testdrive]:
  271. return [
  272. Testdrive(dedent(s))
  273. for s in [
  274. """
  275. $ postgres-execute connection=postgres://postgres:postgres@postgres
  276. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2');
  277. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2');
  278. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2');
  279. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2');
  280. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2');
  281. DELETE FROM postgres_mz_now_table WHERE f2 = 'B1';
  282. UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1';
  283. """,
  284. """
  285. $ postgres-execute connection=postgres://postgres:postgres@postgres
  286. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3');
  287. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
  288. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3');
  289. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3');
  290. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3');
  291. DELETE FROM postgres_mz_now_table WHERE f2 = 'B2';
  292. UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1';
  293. """,
  294. ]
  295. ]
  296. def validate(self) -> Testdrive:
  297. return Testdrive(
  298. dedent(
  299. """
  300. > SELECT COUNT(*) FROM postgres_mz_now_table;
  301. 13
  302. $ postgres-execute connection=postgres://postgres:postgres@postgres
  303. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4');
  304. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4');
  305. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4');
  306. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4');
  307. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4');
  308. DELETE FROM postgres_mz_now_table WHERE f2 = 'B3';
  309. UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1'
  310. # Expect some rows newer than 180 seconds in view
  311. > SELECT COUNT(*) >= 6 FROM postgres_mz_now_view
  312. WHERE f1 > NOW() - INTERVAL '180' SECOND;
  313. true
  314. # Expect no rows older than 180 seconds in view
  315. > SELECT COUNT(*) FROM postgres_mz_now_view
  316. WHERE f1 < NOW() - INTERVAL '180' SECOND;
  317. 0
  318. # Rollback the last INSERTs so that validate() can be called multiple times
  319. $ postgres-execute connection=postgres://postgres:postgres@postgres
  320. INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
  321. DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%';
  322. """
  323. )
  324. )
  325. def remove_target_cluster_from_explain(sql: str) -> str:
  326. return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)