pg-cdc.td 26 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # IMPORTANT: The Postgres server has a custom pg_hba.conf that only
  10. # accepts connections from specific users. You will have to update
  11. # pg_hba.conf if you modify the existing user names or add new ones.
  12. > CREATE SECRET pgpass AS 'postgres'
  13. ! CREATE CONNECTION pg TO POSTGRES (
  14. HOST postgres,
  15. DATABASE postgres,
  16. USER postgres,
  17. PASSWORD SECRET pgpass,
  18. BROKER '${testdrive.kafka-addr}'
  19. )
  20. contains:POSTGRES connections do not support BROKER values
  21. > CREATE CONNECTION pg TO POSTGRES (
  22. HOST postgres,
  23. DATABASE postgres,
  24. USER postgres,
  25. PASSWORD SECRET pgpass
  26. )
  27. > CREATE CLUSTER cdc_cluster SIZE '${arg.default-replica-size}'
  28. $ postgres-execute connection=postgres://postgres:postgres@postgres
  29. ALTER USER postgres WITH replication;
  30. DROP SCHEMA IF EXISTS public CASCADE;
  31. CREATE SCHEMA public;
  32. DROP PUBLICATION IF EXISTS mz_source;
  33. CREATE PUBLICATION mz_source FOR ALL TABLES;
  34. CREATE TABLE pk_table (pk INTEGER PRIMARY KEY, f2 TEXT);
  35. INSERT INTO pk_table VALUES (1, 'one');
  36. ALTER TABLE pk_table REPLICA IDENTITY FULL;
  37. INSERT INTO pk_table VALUES (2, 'two');
  38. INSERT INTO pk_table VALUES (3, 'three');
  39. CREATE TABLE nonpk_table (f1 INTEGER, f2 INTEGER, f3 INTEGER GENERATED ALWAYS AS (f1 * 2) STORED);
  40. INSERT INTO nonpk_table VALUES (1, 1), (1, 1);
  41. ALTER TABLE nonpk_table REPLICA IDENTITY FULL;
  42. INSERT INTO nonpk_table VALUES (2, 2), (2, 2);
  43. CREATE TABLE types_table (char_col char(3), date_col DATE, time_col TIME, timestamp_col TIMESTAMP, uuid_col UUID, double_col DOUBLE PRECISION, numeric NUMERIC(8,4), int4range_col INT4RANGE, int8range_col INT8RANGE, daterange_col DATERANGE, numrange_col NUMRANGE);
  44. INSERT INTO types_table VALUES ('foo', '2011-11-11', '11:11:11', '2011-11-11 11:11:11', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', 1234.56768, 1234.5678, '(,)', '(,)', '(,)', '(,)');
  45. ALTER TABLE types_table REPLICA IDENTITY FULL;
  46. CREATE TABLE array_types_table (date_col DATE[], time_col TIME[], timestamp_col TIMESTAMP[], uuid_col UUID[], double_col DOUBLE PRECISION[], numeric NUMERIC[], int4range_col INT4RANGE[], int8range_col INT8RANGE[], daterange_col DATERANGE[], numrange_col NUMRANGE[]);
  47. INSERT INTO array_types_table VALUES ('{2011-11-11}', '{11:11:11}', '{2011-11-11 11:11:11}', '{A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11}', '{1234.56768}', '{1234.5678}', '{"(,)"}', '{"(,)"}', '{"(,)"}', '{"(,)"}');
  48. ALTER TABLE array_types_table REPLICA IDENTITY FULL;
  49. CREATE TABLE large_text (f1 TEXT, f2 TEXT);
  50. INSERT INTO large_text VALUES (REPEAT('x', 16 * 1024 * 1024), REPEAT('y', 1 * 1024 * 1024));
  51. INSERT INTO large_text VALUES (REPEAT('a', 3 * 1024 * 1024), REPEAT('b', 2 * 1024 * 1024));
  52. ALTER TABLE large_text REPLICA IDENTITY FULL;
  53. CREATE TABLE trailing_space_pk (f1 TEXT PRIMARY KEY);
  54. INSERT INTO trailing_space_pk VALUES ('abc ');
  55. ALTER TABLE trailing_space_pk REPLICA IDENTITY FULL;
  56. CREATE TABLE trailing_space_nopk (f1 TEXT);
  57. INSERT INTO trailing_space_nopk VALUES ('abc ');
  58. ALTER TABLE trailing_space_nopk REPLICA IDENTITY FULL;
  59. CREATE TABLE multipart_pk(f1 INTEGER, f2 TEXT, f3 TEXT, PRIMARY KEY (f1, f2));
  60. INSERT INTO multipart_pk VALUES (1, 'abc', 'xyz');
  61. ALTER TABLE multipart_pk REPLICA IDENTITY FULL;
  62. CREATE TABLE nulls_table (f1 TEXT, f2 INTEGER);
  63. INSERT INTO nulls_table VALUES (NULL, NULL);
  64. ALTER TABLE nulls_table REPLICA IDENTITY FULL;
  65. CREATE TABLE utf8_table (f1 TEXT PRIMARY KEY, f2 TEXT);
  66. INSERT INTO utf8_table VALUES ('това е текст', 'това ''е'' "текст"');
  67. ALTER TABLE utf8_table REPLICA IDENTITY FULL;
  68. CREATE TABLE no_replica_identity (f1 INTEGER);
  69. INSERT INTO no_replica_identity VALUES (1), (2);
  70. CREATE TABLE "таблица" ("колона" TEXT);
  71. ALTER TABLE "таблица" REPLICA IDENTITY FULL;
  72. INSERT INTO "таблица" VALUES ('стойност');
  73. CREATE TABLE tstzrange_table (a TSTZRANGE);
  74. ALTER TABLE tstzrange_table REPLICA IDENTITY FULL;
  75. INSERT INTO tstzrange_table VALUES ('["2024-02-13 17:01:58.37848+00!?!",)');
  76. CREATE TABLE """literal_quotes""" (a TEXT);
  77. ALTER TABLE """literal_quotes""" REPLICA IDENTITY FULL;
  78. INSERT INTO """literal_quotes""" VALUES ('v');
  79. ALTER TABLE """literal_quotes""" REPLICA IDENTITY FULL;
  80. CREATE TABLE "create" (a TEXT);
  81. ALTER TABLE "create" REPLICA IDENTITY FULL;
  82. INSERT INTO "create" VALUES ('v');
  83. CREATE TABLE escaped_text_table (f1 TEXT, f2 TEXt);
  84. ALTER TABLE escaped_text_table REPLICA IDENTITY FULL;
  85. INSERT INTO escaped_text_table VALUES ('escaped\ntext\twith\nnewlines\tand\ntabs', 'more\tescaped\ntext');
  86. INSERT INTO escaped_text_table VALUES ('second\nrow\twith\tmore\ttabs', 'and\nmore\n\nnewlines\n');
  87. CREATE TABLE conflict_table (f1 INTEGER);
  88. ALTER TABLE conflict_table REPLICA IDENTITY FULL;
  89. INSERT INTO conflict_table VALUES (123);
  90. DROP SCHEMA IF EXISTS conflict_schema CASCADE;
  91. CREATE SCHEMA conflict_schema;
  92. CREATE TABLE conflict_schema.conflict_table (f1 TEXT);
  93. ALTER TABLE conflict_schema.conflict_table REPLICA IDENTITY FULL;
  94. INSERT INTO conflict_schema.conflict_table VALUES ('234');
  95. CREATE TABLE "space table" ("space column" INTEGER);
  96. ALTER TABLE "space table" REPLICA IDENTITY FULL;
  97. CREATE TYPE an_enum AS ENUM ('var0', 'var1');
  98. CREATE TABLE enum_table (a an_enum);
  99. INSERT INTO enum_table VALUES ('var1'), ('var0');
  100. ALTER TABLE enum_table REPLICA IDENTITY FULL;
  101. CREATE TYPE another_enum AS ENUM ('var2', 'var3');
  102. CREATE TABLE another_enum_table ("колона" another_enum);
  103. INSERT INTO another_enum_table VALUES ('var2'), ('var3');
  104. ALTER TABLE another_enum_table REPLICA IDENTITY FULL;
  105. CREATE TABLE conflict_schema.another_enum_table ("колона" another_enum);
  106. INSERT INTO conflict_schema.another_enum_table VALUES ('var2'), ('var3');
  107. ALTER TABLE conflict_schema.another_enum_table REPLICA IDENTITY FULL;
  108. DROP PUBLICATION IF EXISTS mz_source_narrow;
  109. CREATE PUBLICATION mz_source_narrow FOR TABLE enum_table, public.another_enum_table, pk_table;
  110. DROP SCHEMA IF EXISTS another_schema CASCADE;
  111. CREATE SCHEMA another_schema;
  112. CREATE TABLE another_schema.another_table (f1 TEXT);
  113. ALTER TABLE another_schema.another_table REPLICA IDENTITY FULL;
  114. INSERT INTO another_schema.another_table VALUES ('123');
  115. DROP PUBLICATION IF EXISTS another_publication;
  116. CREATE PUBLICATION another_publication FOR TABLE another_schema.another_table;
  117. #
  118. # Test that slots created for replication sources are deleted on DROP
  119. # TODO: enable once we land database-issues#7327
  120. # $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=false
  121. # Sneak in a test for pg_source_snapshot_statement_timeout, pg_source_wal_sender_timeout
  122. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  123. $ postgres-execute connection=mz_system
  124. ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 1000;
  125. ALTER SYSTEM SET pg_source_wal_sender_timeout = 0;
  126. > CREATE SOURCE "test_slot_source"
  127. IN CLUSTER cdc_cluster
  128. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  129. > CREATE TABLE pk_table FROM SOURCE "test_slot_source" (REFERENCE pk_table);
  130. # TODO: enable once we land database-issues#7327
  131. # $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=true
  132. > SHOW SUBSOURCES ON test_slot_source
  133. test_slot_source_progress progress
  134. > DROP SOURCE test_slot_source CASCADE
  135. # TODO: enable once we land database-issues#7327
  136. # $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=false
  137. $ postgres-execute connection=mz_system
  138. ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 0
  139. #
  140. # Error checking
  141. #
  142. ! CREATE CONNECTION no_such_host TO POSTGRES (
  143. HOST 'no_such_postgres.mtrlz.com',
  144. DATABASE postgres,
  145. USER postgres,
  146. PASSWORD SECRET pgpass
  147. )
  148. contains:failed to lookup address information
  149. ! CREATE CONNECTION no_such_port TO POSTGRES (
  150. HOST postgres,
  151. PORT 65534,
  152. DATABASE postgres,
  153. USER postgres,
  154. PASSWORD SECRET pgpass
  155. )
  156. contains:error connecting to server: Connection refused
  157. ! CREATE CONNECTION no_such_user TO POSTGRES (
  158. HOST postgres,
  159. DATABASE postgres,
  160. USER no_such_user,
  161. PASSWORD SECRET pgpass
  162. )
  163. contains:password authentication failed for user "no_such_user"
  164. > CREATE SECRET badpass AS 'badpass'
  165. ! CREATE CONNECTION no_such_password TO POSTGRES (
  166. HOST postgres,
  167. DATABASE postgres,
  168. USER postgres,
  169. PASSWORD SECRET badpass
  170. )
  171. contains:password authentication failed for user "postgres"
  172. ! CREATE CONNECTION no_such_dbname TO POSTGRES (
  173. HOST postgres,
  174. DATABASE no_such_dbname,
  175. USER postgres,
  176. PASSWORD SECRET pgpass
  177. )
  178. contains:database "no_such_dbname" does not exist
  179. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  180. ALTER SYSTEM SET storage_enforce_external_addresses = true
  181. ! CREATE CONNECTION private_address TO POSTGRES (
  182. HOST postgres,
  183. DATABASE postgres,
  184. USER postgres,
  185. PASSWORD SECRET pgpass
  186. )
  187. contains:Address resolved to a private IP
  188. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  189. ALTER SYSTEM SET storage_enforce_external_addresses = false
  190. ! CREATE SOURCE "no_such_publication"
  191. IN CLUSTER cdc_cluster
  192. FROM POSTGRES CONNECTION pg (PUBLICATION 'no_such_publication');
  193. # TODO: assert on `detail` here.
  194. contains:failed to connect to PostgreSQL database
  195. > CREATE SOURCE "mz_source"
  196. IN CLUSTER cdc_cluster
  197. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  198. ! CREATE TABLE enum_table FROM SOURCE "mz_source" (REFERENCE enum_table);
  199. contains:referenced tables use unsupported types
  200. ! CREATE TABLE another_enum_table FROM SOURCE "mz_source" (REFERENCE public.another_enum_table);
  201. contains:referenced tables use unsupported types
  202. ! CREATE SOURCE mz_source_2
  203. IN CLUSTER cdc_cluster
  204. FROM POSTGRES CONNECTION pg (
  205. PUBLICATION 'mz_source',
  206. DETAILS 'abc'
  207. );
  208. contains: CREATE SOURCE specifies DETAILS option
  209. ! CREATE TABLE no_replica_identity FROM SOURCE "mz_source" (REFERENCE no_replica_identity);
  210. contains:referenced items not tables with REPLICA IDENTITY FULL
  211. detail:referenced items: public.no_replica_identity
  212. #
  213. # Establish direct replication
  214. #
  215. #
  216. # Note: This implicitly tests that enum_table being part of the publication does not
  217. # prevent us from using other tables as subsources.
  218. #
  219. > CREATE TABLE "pk_table" FROM SOURCE mz_source (REFERENCE "pk_table");
  220. > CREATE TABLE "nonpk_table" FROM SOURCE mz_source (REFERENCE "nonpk_table");
  221. > CREATE TABLE "types_table" FROM SOURCE mz_source (REFERENCE "types_table");
  222. > CREATE TABLE "array_types_table" FROM SOURCE mz_source (REFERENCE "array_types_table");
  223. > CREATE TABLE "large_text" FROM SOURCE mz_source (REFERENCE "large_text");
  224. > CREATE TABLE "trailing_space_pk" FROM SOURCE mz_source (REFERENCE "trailing_space_pk");
  225. > CREATE TABLE "trailing_space_nopk" FROM SOURCE mz_source (REFERENCE "trailing_space_nopk");
  226. > CREATE TABLE "multipart_pk" FROM SOURCE mz_source (REFERENCE "multipart_pk");
  227. > CREATE TABLE "nulls_table" FROM SOURCE mz_source (REFERENCE "nulls_table");
  228. > CREATE TABLE "utf8_table" FROM SOURCE mz_source (REFERENCE "utf8_table");
  229. > CREATE TABLE "таблица" FROM SOURCE mz_source (REFERENCE "таблица");
  230. > CREATE TABLE "escaped_text_table" FROM SOURCE mz_source (REFERENCE "escaped_text_table");
  231. > CREATE TABLE conflict_table FROM SOURCE mz_source (REFERENCE conflict_schema.conflict_table);
  232. > CREATE TABLE "space table" FROM SOURCE mz_source (REFERENCE "space table");
  233. > CREATE TABLE """literal_quotes""" FROM SOURCE mz_source (REFERENCE """literal_quotes""");
  234. > CREATE TABLE "create" FROM SOURCE mz_source (REFERENCE "create");
  235. > CREATE TABLE tstzrange_table FROM SOURCE mz_source (REFERENCE "tstzrange_table");
  236. > SHOW SOURCES
  237. mz_source postgres cdc_cluster ""
  238. mz_source_progress progress <null> ""
  239. > SHOW TABLES
  240. array_types_table ""
  241. conflict_table ""
  242. create ""
  243. escaped_text_table ""
  244. large_text ""
  245. multipart_pk ""
  246. nonpk_table ""
  247. nulls_table ""
  248. pk_table ""
  249. "space table" ""
  250. trailing_space_nopk ""
  251. trailing_space_pk ""
  252. "\"literal_quotes\"" ""
  253. tstzrange_table ""
  254. types_table ""
  255. utf8_table ""
  256. таблица ""
  257. > SELECT schema_name, table_name FROM mz_internal.mz_postgres_source_tables
  258. public create
  259. public pk_table
  260. public large_text
  261. public utf8_table
  262. public nonpk_table
  263. public types_table
  264. public nulls_table
  265. public multipart_pk
  266. public "\"space table\""
  267. public tstzrange_table
  268. public "\"таблица\""
  269. public array_types_table
  270. public trailing_space_pk
  271. public escaped_text_table
  272. public trailing_space_nopk
  273. public "\"\"\"literal_quotes\"\"\""
  274. conflict_schema conflict_table
  275. # Ensure all ingestion export subsources have an ID greater than the primary source ID
  276. > SELECT bool_and(primary_source_id < subsource_id)
  277. FROM
  278. (SELECT id AS primary_source_id FROM mz_sources WHERE type = 'postgres')
  279. CROSS JOIN (SELECT id AS subsource_id FROM mz_tables WHERE source_id IS NOT NULL);
  280. true
  281. # Ensure progress subsources have an ID less than the primary source ID
  282. > SELECT progress_source_id < primary_source_id
  283. FROM (
  284. SELECT
  285. (SELECT id FROM mz_sources WHERE type = 'postgres') AS primary_source_id,
  286. (SELECT id FROM mz_sources WHERE type = 'progress') AS progress_source_id
  287. );
  288. true
  289. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mz_source';
  290. running
  291. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mz_source_progress';
  292. running
  293. > SELECT lsn > 0 FROM mz_source_progress
  294. true
  295. # Ensure we report the write frontier of the progress subsource
  296. $ set-regex match=(\s{12}0|\d{13,20}|u\d{1,5}|\(\d+-\d\d-\d\d\s\d\d:\d\d:\d\d\.\d\d\d\)|true|false) replacement=<>
  297. > EXPLAIN TIMESTAMP FOR SELECT * FROM mz_source_progress
  298. " query timestamp: <> <>\n oracle read timestamp: <> <>\nlargest not in advance of upper: <> <>\n upper:[<> <>]\n since:[<> <>]\n can respond immediately: <>\n timeline: Some(EpochMilliseconds)\n session wall time: <> <>\n\nsource materialize.public.mz_source_progress (<>, storage):\n read frontier:[<> <>]\n write frontier:[<> <>]\n\nbinding constraints:\nlower:\n (IsolationLevel(StrictSerializable)): [<> <>]\n"
  299. $ set-regex match=[0-9]+|_[a-f0-9]+ replacement=<SUPPRESSED>
  300. > SELECT * FROM mz_internal.mz_postgres_sources
  301. id replication_slot timeline_id
  302. ---------------------------------------------------
  303. u<SUPPRESSED> materialize<SUPPRESSED> <SUPPRESSED>
  304. $ unset-regex
  305. #
  306. # Perform sanity checks of the initial snapshot
  307. #
  308. > SELECT * FROM pk_table;
  309. 1 one
  310. 2 two
  311. 3 three
  312. > SELECT * FROM nonpk_table;
  313. 1 1
  314. 1 1
  315. 2 2
  316. 2 2
  317. > SELECT * FROM types_table;
  318. "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "(,)" "(,)" "(,)" "(,)"
  319. > SELECT pg_typeof(numeric) FROM types_table;
  320. "numeric"
  321. > SELECT * FROM array_types_table;
  322. "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  323. > SELECT LENGTH(f1), LENGTH(f2) FROM large_text;
  324. 16777216 1048576
  325. 3145728 2097152
  326. > SELECT LENGTH(f1), f1 FROM trailing_space_pk;
  327. "6" "abc "
  328. > SELECT LENGTH(f1), f1 FROM trailing_space_nopk;
  329. "6" "abc "
  330. > SELECT * FROM multipart_pk;
  331. 1 abc xyz
  332. > SELECT f1, f2, f1 IS NULL, f2 IS NULL FROM nulls_table;
  333. <null> <null> true true
  334. > SELECT * FROM utf8_table;
  335. "това е текст" "това \'е\' \"текст\""
  336. > SELECT * FROM "таблица";
  337. стойност
  338. > SELECT * FROM escaped_text_table;
  339. "escaped\\ntext\\twith\\nnewlines\\tand\\ntabs" "more\\tescaped\\ntext"
  340. "second\\nrow\\twith\\tmore\\ttabs" "and\\nmore\\n\\nnewlines\\n"
  341. > SELECT * FROM conflict_table;
  342. 234
  343. > SELECT * FROM """literal_quotes"""
  344. v
  345. > SELECT * FROM "create"
  346. v
  347. > SELECT * FROM tstzrange_table
  348. "[2024-02-13 17:01:58.378480 UTC,)"
  349. #
  350. # Confirm that the new sources can be used to build upon
  351. #
  352. > CREATE MATERIALIZED VIEW join_view (a, b, c, d) AS SELECT * FROM pk_table, nonpk_table WHERE pk_table.pk = nonpk_table.f1;
  353. > SELECT * FROM join_view;
  354. "1" "one" "1" "1"
  355. "1" "one" "1" "1"
  356. "2" "two" "2" "2"
  357. "2" "two" "2" "2"
  358. #
  359. # Basic sanity check that the timestamps are reasonable
  360. #
  361. > SELECT COUNT(*) > 0 FROM pk_table;
  362. true
  363. > SELECT COUNT(*) > 0 FROM nonpk_table;
  364. true
  365. > SELECT COUNT(*) > 0 FROM join_view;
  366. true
  367. #
  368. # Modify the tables on the Postgres side
  369. #
  370. $ postgres-execute connection=postgres://postgres:postgres@postgres
  371. INSERT INTO pk_table VALUES (4, 'four');
  372. INSERT INTO pk_table VALUES (5, 'five');
  373. DELETE FROM pk_table WHERE pk = 1;
  374. UPDATE pk_table SET f2 = 'two_two' WHERE pk = 2;
  375. UPDATE pk_table SET pk = pk + 10 WHERE pk BETWEEN 3 AND 4;
  376. INSERT INTO nonpk_table VALUES (3, 3), (3, 3);
  377. DELETE FROM nonpk_table WHERE ctid = '(0,1)';
  378. UPDATE nonpk_table SET f1 = f1 + 10 WHERE ctid = '(0,2)';
  379. UPDATE nonpk_table SET f1 = f1 + 100 WHERE f1 = 3;
  380. INSERT INTO types_table VALUES ('foo', '2011-11-11', '11:11:11', '2011-11-11 11:11:11', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', 1234.56768, 1234.5678, 'empty', 'empty', 'empty', 'empty');
  381. INSERT INTO array_types_table VALUES ('{2011-11-11}', '{11:11:11}', '{2011-11-11 11:11:11}', '{A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11}', '{1234.56768}', '{1234.5678}', '{"(,)"}', '{"(,)"}', '{"(,)"}', '{"(,)"}');
  382. INSERT INTO large_text VALUES (REPEAT('x', 16 * 1024 * 1024), 'abc');
  383. INSERT INTO trailing_space_pk VALUES ('klm ');
  384. UPDATE trailing_space_pk SET f1 = 'xyz ' WHERE f1 = 'klm ';
  385. DELETE FROM trailing_space_pk WHERE f1 = 'abc ';
  386. INSERT INTO trailing_space_nopk VALUES ('klm ');
  387. UPDATE trailing_space_nopk SET f1 = 'xyz ' WHERE f1 = 'klm ';
  388. DELETE FROM trailing_space_nopk WHERE f1 = 'abc ';
  389. INSERT INTO multipart_pk VALUES (2, 'klm', 'xyz');
  390. DELETE FROM multipart_pk WHERE f1 = 1;
  391. UPDATE nulls_table SET f2 = 3 WHERE f2 IS NULL;
  392. INSERT INTO nulls_table VALUES (NULL, 1), (NULL, 2);
  393. UPDATE nulls_table SET f2 = NULL WHERE f2 = 2;
  394. INSERT INTO utf8_table VALUES ('това е текст 2', 'това ''е'' "текст" 2');
  395. UPDATE utf8_table SET f1 = f1 || f1 , f2 = f2 || f2;
  396. INSERT INTO "таблица" SELECT * FROM "таблица";
  397. #
  398. # Check the updated data on the Materialize side
  399. #
  400. > SELECT * FROM pk_table;
  401. 13 three
  402. 14 four
  403. 2 two_two
  404. 5 five
  405. > SELECT * FROM nonpk_table;
  406. 103 3
  407. 103 3
  408. 11 1
  409. 2 2
  410. 2 2
  411. > SELECT * FROM types_table;
  412. "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "(,)" "(,)" "(,)" "(,)"
  413. "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "empty" "empty" "empty" "empty"
  414. > SELECT * FROM array_types_table;
  415. "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  416. "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  417. > SELECT LENGTH(f1), LENGTH(f2) FROM large_text;
  418. 16777216 1048576
  419. 3145728 2097152
  420. 16777216 3
  421. > SELECT LENGTH(f1), f1 FROM trailing_space_pk;
  422. "6" "xyz "
  423. > SELECT LENGTH(f1), f1 FROM trailing_space_nopk;
  424. "6" "xyz "
  425. > SELECT * FROM multipart_pk;
  426. "2" "klm" "xyz"
  427. > SELECT f1, f2, f1 IS NULL, f2 IS NULL FROM nulls_table;
  428. "<null>" "1" "true" "false"
  429. "<null>" "3" "true" "false"
  430. "<null>" "<null>" "true" "true"
  431. > SELECT * FROM utf8_table;
  432. "това е текст 2това е текст 2" "това \'е\' \"текст\" 2това \'е\' \"текст\" 2"
  433. "това е тексттова е текст" "това \'е\' \"текст\"това \'е\' \"текст\""
  434. > SELECT * FROM "таблица";
  435. стойност
  436. стойност
  437. > SELECT * FROM join_view;
  438. "2" "two_two" "2" "2"
  439. "2" "two_two" "2" "2"
  440. #
  441. # Check that the timestamps continue to be reasonable in the face of incoming updates
  442. #
  443. > SELECT COUNT(*) > 0 FROM pk_table;
  444. true
  445. > SELECT COUNT(*) > 0 FROM nonpk_table;
  446. true
  447. > SELECT COUNT(*) > 0 FROM join_view;
  448. true
  449. #
  450. # Ensure we can start a source with more workers than the default max_wal_senders param (10)
  451. #
  452. > CREATE CLUSTER large_cluster SIZE '16';
  453. > CREATE SOURCE large_cluster_source
  454. IN CLUSTER large_cluster
  455. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  456. > CREATE TABLE large_cluster_source_pk_table
  457. FROM SOURCE large_cluster_source
  458. (REFERENCE "pk_table");
  459. > SELECT * FROM large_cluster_source_pk_table;
  460. 13 three
  461. 14 four
  462. 2 two_two
  463. 5 five
  464. > SELECT status = 'running' FROM mz_internal.mz_source_statuses WHERE name = 'large_cluster_source_pk_table' AND type = 'table';
  465. true
  466. > DROP SOURCE large_cluster_source CASCADE;
  467. #
  468. # Remove all data on the Postgres side
  469. #
  470. $ postgres-execute connection=postgres://postgres:postgres@postgres
  471. DELETE FROM pk_table;
  472. DELETE FROM nonpk_table;
  473. DELETE FROM large_text;
  474. DELETE FROM trailing_space_pk;
  475. DELETE FROM trailing_space_nopk;
  476. DELETE FROM multipart_pk;
  477. DELETE FROM nulls_table;
  478. DELETE FROM utf8_table;
  479. DELETE FROM "таблица";
  480. DELETE FROM conflict_schema.conflict_table;
  481. DELETE FROM tstzrange_table;
  482. #
  483. # Check that all data sources empty out on the Materialize side
  484. #
  485. > SELECT COUNT(*) FROM pk_table;
  486. 0
  487. > SELECT COUNT(*) FROM nonpk_table;
  488. 0
  489. > SELECT COUNT(*) FROM large_text;
  490. 0
  491. > SELECT COUNT(*) FROM trailing_space_nopk;
  492. 0
  493. > SELECT COUNT(*) FROM trailing_space_pk;
  494. 0
  495. > SELECT COUNT(*) FROM multipart_pk;
  496. 0
  497. > SELECT COUNT(*) FROM nulls_table;
  498. 0
  499. > SELECT COUNT(*) FROM utf8_table;
  500. 0
  501. > SELECT COUNT(*) FROM join_view;
  502. 0
  503. > SELECT COUNT(*) FROM "таблица";
  504. 0
  505. > SELECT COUNT(*) FROM conflict_table;
  506. 0
  507. > SELECT COUNT(*) FROM tstzrange_table;
  508. 0
  509. #
  510. # Support enum values as strings
  511. #
  512. #
  513. > CREATE SOURCE enum_source
  514. IN CLUSTER cdc_cluster
  515. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  516. ! CREATE TABLE "enum_table"
  517. FROM SOURCE enum_source
  518. (REFERENCE "enum_table")
  519. WITH (TEXT COLUMNS [col_dne]);
  520. contains: invalid TEXT COLUMNS option value: column "enum_table.col_dne" does not exist
  521. ! CREATE TABLE "enum_table"
  522. FROM SOURCE enum_source
  523. (REFERENCE "enum_table")
  524. WITH (TEXT COLUMNS ["F2"]);
  525. contains: invalid TEXT COLUMNS option value: column "enum_table.F2" does not exist
  526. ! CREATE TABLE "enum_table"
  527. FROM SOURCE enum_source
  528. (REFERENCE "enum_table")
  529. WITH (TEXT COLUMNS [col_dne]);
  530. contains: invalid TEXT COLUMNS option value: column "enum_table.col_dne" does not exist
  531. ! CREATE TABLE pk_table
  532. FROM SOURCE enum_source
  533. (REFERENCE pk_table)
  534. WITH (TEXT COLUMNS [foo]);
  535. contains: invalid TEXT COLUMNS option value: column "pk_table.foo" does not exist
  536. ! CREATE TABLE pk_table
  537. FROM SOURCE enum_source
  538. (REFERENCE pk_table)
  539. WITH (TEXT COLUMNS [foo]);
  540. contains: invalid TEXT COLUMNS option value: column "pk_table.foo" does not exist
  541. ! CREATE TABLE enum_table
  542. FROM SOURCE enum_source
  543. (REFERENCE enum_table)
  544. WITH (TEXT COLUMNS [a, a]);
  545. contains: invalid TEXT COLUMNS option value: unexpected multiple references to postgres.public.enum_table.a
  546. # utf8_table is not part of mz_source_narrow publication
  547. ! CREATE TABLE enum_table
  548. FROM SOURCE enum_source
  549. (REFERENCE enum_table)
  550. WITH (TEXT COLUMNS [a, f1]);
  551. contains: invalid TEXT COLUMNS option value: column "enum_table.f1" does not exist
  552. # n.b includes a reference to pk_table, which is not a table that's part of the
  553. # source, but is part of the publication.
  554. ! CREATE TABLE enum_table
  555. FROM SOURCE enum_source
  556. (REFERENCE enum_table)
  557. WITH (TEXT COLUMNS [a, "колона", pk]);
  558. contains:invalid TEXT COLUMNS option value: column "enum_table.колона" does not exist
  559. > CREATE TABLE enum_table
  560. FROM SOURCE enum_source
  561. (REFERENCE enum_table)
  562. WITH (TEXT COLUMNS [a]);
  563. > CREATE TABLE another_enum_table
  564. FROM SOURCE enum_source
  565. (REFERENCE public.another_enum_table)
  566. WITH (TEXT COLUMNS ["колона"]);
  567. > SELECT * FROM (SHOW SOURCES) WHERE name LIKE '%enum%';
  568. enum_source postgres cdc_cluster ""
  569. enum_source_progress progress <null> ""
  570. > SELECT * FROM (SHOW TABLES) WHERE name LIKE '%enum%';
  571. another_enum_table ""
  572. enum_table ""
  573. > SELECT * FROM enum_table
  574. var0
  575. var1
  576. $ set-regex match="DETAILS = '[a-f0-9]+'" replacement=<DETAILS>
  577. > SHOW CREATE TABLE enum_table
  578. materialize.public.enum_table "CREATE TABLE materialize.public.enum_table (a pg_catalog.text) FROM SOURCE materialize.public.enum_source (REFERENCE = postgres.public.enum_table) WITH (TEXT COLUMNS = (a), <DETAILS>);"
  579. # Test that TEXT COLUMN types can change
  580. $ postgres-execute connection=postgres://postgres:postgres@postgres
  581. BEGIN;
  582. ALTER TYPE an_enum RENAME TO an_enum_old;
  583. CREATE TYPE an_enum AS ENUM ('var0', 'var1', 'var2');
  584. ALTER TABLE enum_table ALTER COLUMN a TYPE an_enum USING a::text::an_enum;
  585. DROP TYPE an_enum_old;
  586. COMMIT;
  587. INSERT INTO enum_table VALUES ('var2');
  588. > SELECT * FROM enum_table
  589. var0
  590. var1
  591. var2
  592. > SELECT "колона" FROM another_enum_table
  593. var2
  594. var3
  595. #
  596. # Cleanup
  597. #
  598. #
  599. $ postgres-execute connection=postgres://postgres:postgres@postgres
  600. DROP PUBLICATION mz_source;
  601. DROP PUBLICATION mz_source_narrow;
  602. INSERT INTO pk_table VALUES (99999);
  603. # Ensure that source + all subsources have error
  604. > SELECT bool_and(error ~* 'publication .+ does not exist')
  605. FROM mz_internal.mz_source_statuses
  606. WHERE id IN ( SELECT id FROM mz_sources WHERE type != 'progress' );
  607. true
  608. > DROP SOURCE enum_source CASCADE;
  609. > DROP SOURCE "mz_source" CASCADE;
  610. #
  611. # Check schema scoped tables
  612. > CREATE SOURCE another_source
  613. IN CLUSTER cdc_cluster
  614. FROM POSTGRES CONNECTION pg (
  615. PUBLICATION 'another_publication'
  616. );
  617. > CREATE TABLE another_table FROM SOURCE another_source (REFERENCE another_schema.another_table);
  618. > SHOW SOURCES
  619. another_source postgres cdc_cluster ""
  620. another_source_progress progress <null> ""
  621. > SHOW TABLES
  622. another_table ""
  623. > DROP SOURCE another_source CASCADE
  624. $ postgres-execute connection=postgres://postgres:postgres@postgres
  625. DROP SCHEMA conflict_schema CASCADE;