pg-cdc.td 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # IMPORTANT: The Postgres server has a custom pg_hba.conf that only
  10. # accepts connections from specific users. You will have to update
  11. # pg_hba.conf if you modify the existing user names or add new ones.
  12. > CREATE SECRET pgpass AS 'postgres'
  13. ! CREATE CONNECTION pg TO POSTGRES (
  14. HOST postgres,
  15. DATABASE postgres,
  16. USER postgres,
  17. PASSWORD SECRET pgpass,
  18. BROKER '${testdrive.kafka-addr}'
  19. )
  20. contains:POSTGRES connections do not support BROKER values
  21. > CREATE CONNECTION pg TO POSTGRES (
  22. HOST postgres,
  23. DATABASE postgres,
  24. USER postgres,
  25. PASSWORD SECRET pgpass
  26. )
  27. > CREATE CLUSTER cdc_cluster SIZE '${arg.default-replica-size}'
  28. $ postgres-execute connection=postgres://postgres:postgres@postgres
  29. ALTER USER postgres WITH replication;
  30. DROP SCHEMA IF EXISTS public CASCADE;
  31. CREATE SCHEMA public;
  32. DROP PUBLICATION IF EXISTS mz_source;
  33. CREATE PUBLICATION mz_source FOR ALL TABLES;
  34. CREATE TABLE pk_table (pk INTEGER PRIMARY KEY, f2 TEXT);
  35. INSERT INTO pk_table VALUES (1, 'one');
  36. ALTER TABLE pk_table REPLICA IDENTITY FULL;
  37. INSERT INTO pk_table VALUES (2, 'two');
  38. INSERT INTO pk_table VALUES (3, 'three');
  39. CREATE TABLE nonpk_table (f1 INTEGER, f2 INTEGER);
  40. INSERT INTO nonpk_table VALUES (1, 1), (1, 1);
  41. ALTER TABLE nonpk_table REPLICA IDENTITY FULL;
  42. INSERT INTO nonpk_table VALUES (2, 2), (2, 2);
  43. CREATE TABLE types_table (char_col char(3), date_col DATE, time_col TIME, timestamp_col TIMESTAMP, uuid_col UUID, double_col DOUBLE PRECISION, numeric NUMERIC(8,4), int4range_col INT4RANGE, int8range_col INT8RANGE, daterange_col DATERANGE, numrange_col NUMRANGE);
  44. INSERT INTO types_table VALUES ('foo', '2011-11-11', '11:11:11', '2011-11-11 11:11:11', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', 1234.56768, 1234.5678, '(,)', '(,)', '(,)', '(,)');
  45. ALTER TABLE types_table REPLICA IDENTITY FULL;
  46. CREATE TABLE array_types_table (date_col DATE[], time_col TIME[], timestamp_col TIMESTAMP[], uuid_col UUID[], double_col DOUBLE PRECISION[], numeric NUMERIC[], int4range_col INT4RANGE[], int8range_col INT8RANGE[], daterange_col DATERANGE[], numrange_col NUMRANGE[]);
  47. INSERT INTO array_types_table VALUES ('{2011-11-11}', '{11:11:11}', '{2011-11-11 11:11:11}', '{A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11}', '{1234.56768}', '{1234.5678}', '{"(,)"}', '{"(,)"}', '{"(,)"}', '{"(,)"}');
  48. ALTER TABLE array_types_table REPLICA IDENTITY FULL;
  49. CREATE TABLE large_text (f1 TEXT, f2 TEXT);
  50. INSERT INTO large_text VALUES (REPEAT('x', 16 * 1024 * 1024), REPEAT('y', 1 * 1024 * 1024));
  51. INSERT INTO large_text VALUES (REPEAT('a', 3 * 1024 * 1024), REPEAT('b', 2 * 1024 * 1024));
  52. ALTER TABLE large_text REPLICA IDENTITY FULL;
  53. CREATE TABLE trailing_space_pk (f1 TEXT PRIMARY KEY);
  54. INSERT INTO trailing_space_pk VALUES ('abc ');
  55. ALTER TABLE trailing_space_pk REPLICA IDENTITY FULL;
  56. CREATE TABLE trailing_space_nopk (f1 TEXT);
  57. INSERT INTO trailing_space_nopk VALUES ('abc ');
  58. ALTER TABLE trailing_space_nopk REPLICA IDENTITY FULL;
  59. CREATE TABLE multipart_pk(f1 INTEGER, f2 TEXT, f3 TEXT, PRIMARY KEY (f1, f2));
  60. INSERT INTO multipart_pk VALUES (1, 'abc', 'xyz');
  61. ALTER TABLE multipart_pk REPLICA IDENTITY FULL;
  62. CREATE TABLE nulls_table (f1 TEXT, f2 INTEGER);
  63. INSERT INTO nulls_table VALUES (NULL, NULL);
  64. ALTER TABLE nulls_table REPLICA IDENTITY FULL;
  65. CREATE TABLE utf8_table (f1 TEXT PRIMARY KEY, f2 TEXT);
  66. INSERT INTO utf8_table VALUES ('това е текст', 'това ''е'' "текст"');
  67. ALTER TABLE utf8_table REPLICA IDENTITY FULL;
  68. CREATE TABLE no_replica_identity (f1 INTEGER);
  69. INSERT INTO no_replica_identity VALUES (1), (2);
  70. CREATE TABLE "таблица" ("колона" TEXT);
  71. ALTER TABLE "таблица" REPLICA IDENTITY FULL;
  72. INSERT INTO "таблица" VALUES ('стойност');
  73. CREATE TABLE tstzrange_table (a TSTZRANGE);
  74. ALTER TABLE tstzrange_table REPLICA IDENTITY FULL;
  75. INSERT INTO tstzrange_table VALUES ('["2024-02-13 17:01:58.37848+00!?!",)');
  76. CREATE TABLE """literal_quotes""" (a TEXT);
  77. ALTER TABLE """literal_quotes""" REPLICA IDENTITY FULL;
  78. INSERT INTO """literal_quotes""" VALUES ('v');
  79. ALTER TABLE """literal_quotes""" REPLICA IDENTITY FULL;
  80. CREATE TABLE "create" (a TEXT);
  81. ALTER TABLE "create" REPLICA IDENTITY FULL;
  82. INSERT INTO "create" VALUES ('v');
  83. CREATE TABLE escaped_text_table (f1 TEXT, f2 TEXt);
  84. ALTER TABLE escaped_text_table REPLICA IDENTITY FULL;
  85. INSERT INTO escaped_text_table VALUES ('escaped\ntext\twith\nnewlines\tand\ntabs', 'more\tescaped\ntext');
  86. INSERT INTO escaped_text_table VALUES ('second\nrow\twith\tmore\ttabs', 'and\nmore\n\nnewlines\n');
  87. CREATE TABLE conflict_table (f1 INTEGER);
  88. ALTER TABLE conflict_table REPLICA IDENTITY FULL;
  89. INSERT INTO conflict_table VALUES (123);
  90. DROP SCHEMA IF EXISTS conflict_schema CASCADE;
  91. CREATE SCHEMA conflict_schema;
  92. CREATE TABLE conflict_schema.conflict_table (f1 TEXT);
  93. ALTER TABLE conflict_schema.conflict_table REPLICA IDENTITY FULL;
  94. INSERT INTO conflict_schema.conflict_table VALUES ('234');
  95. CREATE TABLE "space table" ("space column" INTEGER);
  96. ALTER TABLE "space table" REPLICA IDENTITY FULL;
  97. CREATE TYPE an_enum AS ENUM ('var0', 'var1');
  98. CREATE TABLE enum_table (a an_enum);
  99. INSERT INTO enum_table VALUES ('var1'), ('var0');
  100. ALTER TABLE enum_table REPLICA IDENTITY FULL;
  101. CREATE TYPE another_enum AS ENUM ('var2', 'var3');
  102. CREATE TABLE another_enum_table ("колона" another_enum);
  103. INSERT INTO another_enum_table VALUES ('var2'), ('var3');
  104. ALTER TABLE another_enum_table REPLICA IDENTITY FULL;
  105. CREATE TABLE conflict_schema.another_enum_table ("колона" another_enum);
  106. INSERT INTO conflict_schema.another_enum_table VALUES ('var2'), ('var3');
  107. ALTER TABLE conflict_schema.another_enum_table REPLICA IDENTITY FULL;
  108. DROP PUBLICATION IF EXISTS mz_source_narrow;
  109. CREATE PUBLICATION mz_source_narrow FOR TABLE enum_table, public.another_enum_table, pk_table;
  110. DROP SCHEMA IF EXISTS another_schema CASCADE;
  111. CREATE SCHEMA another_schema;
  112. CREATE TABLE another_schema.another_table (f1 TEXT);
  113. ALTER TABLE another_schema.another_table REPLICA IDENTITY FULL;
  114. INSERT INTO another_schema.another_table VALUES ('123');
  115. DROP PUBLICATION IF EXISTS another_publication;
  116. CREATE PUBLICATION another_publication FOR TABLE another_schema.another_table;
  117. #
  118. # Test that slots created for replication sources are deleted on DROP
  119. # TODO: enable once we land database-issues#7327
  120. # $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=false
  121. # Sneak in a test for pg_source_snapshot_statement_timeout, pg_source_wal_sender_timeout
  122. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  123. $ postgres-execute connection=mz_system
  124. ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 1000;
  125. ALTER SYSTEM SET pg_source_wal_sender_timeout = 0;
  126. > CREATE SOURCE "test_slot_source"
  127. IN CLUSTER cdc_cluster
  128. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  129. FOR TABLES ("pk_table");
  130. # TODO: enable once we land database-issues#7327
  131. # $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=true
  132. > SHOW SUBSOURCES ON test_slot_source
  133. pk_table subsource
  134. test_slot_source_progress progress
  135. > DROP SOURCE test_slot_source CASCADE
  136. # TODO: enable once we land database-issues#7327
  137. # $ postgres-verify-slot connection=postgres://postgres:postgres@postgres slot=materialize_% active=false
  138. $ postgres-execute connection=mz_system
  139. ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 0
  140. #
  141. # Error checking
  142. #
  143. ! CREATE CONNECTION no_such_host TO POSTGRES (
  144. HOST 'no_such_postgres.mtrlz.com',
  145. DATABASE postgres,
  146. USER postgres,
  147. PASSWORD SECRET pgpass
  148. )
  149. contains:failed to lookup address information
  150. ! CREATE CONNECTION no_such_port TO POSTGRES (
  151. HOST postgres,
  152. PORT 65534,
  153. DATABASE postgres,
  154. USER postgres,
  155. PASSWORD SECRET pgpass
  156. )
  157. contains:error connecting to server: Connection refused
  158. ! CREATE CONNECTION no_such_user TO POSTGRES (
  159. HOST postgres,
  160. DATABASE postgres,
  161. USER no_such_user,
  162. PASSWORD SECRET pgpass
  163. )
  164. contains:password authentication failed for user "no_such_user"
  165. > CREATE SECRET badpass AS 'badpass'
  166. ! CREATE CONNECTION no_such_password TO POSTGRES (
  167. HOST postgres,
  168. DATABASE postgres,
  169. USER postgres,
  170. PASSWORD SECRET badpass
  171. )
  172. contains:password authentication failed for user "postgres"
  173. ! CREATE CONNECTION no_such_dbname TO POSTGRES (
  174. HOST postgres,
  175. DATABASE no_such_dbname,
  176. USER postgres,
  177. PASSWORD SECRET pgpass
  178. )
  179. contains:database "no_such_dbname" does not exist
  180. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  181. ALTER SYSTEM SET storage_enforce_external_addresses = true
  182. ! CREATE CONNECTION private_address TO POSTGRES (
  183. HOST postgres,
  184. DATABASE postgres,
  185. USER postgres,
  186. PASSWORD SECRET pgpass
  187. )
  188. contains:Address resolved to a private IP
  189. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  190. ALTER SYSTEM SET storage_enforce_external_addresses = false
  191. ! CREATE SOURCE "no_such_publication"
  192. IN CLUSTER cdc_cluster
  193. FROM POSTGRES CONNECTION pg (PUBLICATION 'no_such_publication');
  194. # TODO: assert on `detail` here.
  195. contains:failed to connect to PostgreSQL database
  196. ! CREATE SOURCE "mz_source"
  197. IN CLUSTER cdc_cluster
  198. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  199. FOR TABLES (
  200. "enum_table"
  201. );
  202. contains:referenced tables use unsupported types
  203. ! CREATE SOURCE "mz_source"
  204. IN CLUSTER cdc_cluster
  205. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  206. FOR TABLES (
  207. "enum_table",
  208. public.another_enum_table
  209. );
  210. contains:referenced tables use unsupported types
  211. ! CREATE SOURCE mz_source
  212. IN CLUSTER cdc_cluster
  213. FROM POSTGRES CONNECTION pg (
  214. PUBLICATION 'mz_source',
  215. DETAILS 'abc'
  216. )
  217. FOR TABLES (
  218. pk_table
  219. );
  220. contains: CREATE SOURCE specifies DETAILS option
  221. ! CREATE SOURCE "mz_source"
  222. IN CLUSTER cdc_cluster
  223. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  224. FOR TABLES (
  225. "no_replica_identity",
  226. "pk_table"
  227. );
  228. contains:referenced items not tables with REPLICA IDENTITY FULL
  229. detail:referenced items: public.no_replica_identity
  230. #
  231. # Establish direct replication
  232. #
  233. #
  234. # Note: This implicitly tests that enum_table being part of the publication does not
  235. # prevent us from using other tables as subsources.
  236. #
  237. > CREATE SOURCE "mz_source"
  238. IN CLUSTER cdc_cluster
  239. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  240. FOR TABLES (
  241. "pk_table",
  242. "nonpk_table",
  243. "types_table",
  244. "array_types_table",
  245. "large_text",
  246. "trailing_space_pk",
  247. "trailing_space_nopk",
  248. "multipart_pk",
  249. "nulls_table",
  250. "utf8_table",
  251. "таблица",
  252. "escaped_text_table",
  253. conflict_schema.conflict_table AS public.conflict_table,
  254. "space table",
  255. """literal_quotes""",
  256. "create",
  257. tstzrange_table
  258. );
  259. > SHOW SOURCES
  260. array_types_table subsource cdc_cluster ""
  261. conflict_table subsource cdc_cluster ""
  262. create subsource cdc_cluster ""
  263. escaped_text_table subsource cdc_cluster ""
  264. large_text subsource cdc_cluster ""
  265. multipart_pk subsource cdc_cluster ""
  266. mz_source postgres cdc_cluster ""
  267. mz_source_progress progress <null> ""
  268. nonpk_table subsource cdc_cluster ""
  269. nulls_table subsource cdc_cluster ""
  270. pk_table subsource cdc_cluster ""
  271. "space table" subsource cdc_cluster ""
  272. trailing_space_nopk subsource cdc_cluster ""
  273. trailing_space_pk subsource cdc_cluster ""
  274. "\"literal_quotes\"" subsource cdc_cluster ""
  275. tstzrange_table subsource cdc_cluster ""
  276. types_table subsource cdc_cluster ""
  277. utf8_table subsource cdc_cluster ""
  278. таблица subsource cdc_cluster ""
  279. > SELECT schema_name, table_name FROM mz_internal.mz_postgres_source_tables
  280. public create
  281. public pk_table
  282. public large_text
  283. public utf8_table
  284. public nonpk_table
  285. public types_table
  286. public nulls_table
  287. public multipart_pk
  288. public "\"space table\""
  289. public tstzrange_table
  290. public "\"таблица\""
  291. public array_types_table
  292. public trailing_space_pk
  293. public escaped_text_table
  294. public trailing_space_nopk
  295. public "\"\"\"literal_quotes\"\"\""
  296. conflict_schema conflict_table
  297. # Ensure all ingestion export subsources have an ID greater than the primary source ID
  298. > SELECT bool_and(primary_source_id < subsource_id)
  299. FROM
  300. (SELECT id AS primary_source_id FROM mz_sources WHERE type = 'postgres')
  301. CROSS JOIN (SELECT id AS subsource_id FROM mz_sources WHERE type = 'subsource');
  302. true
  303. # Ensure progress subsources have an ID less than the primary source ID
  304. > SELECT progress_source_id < primary_source_id
  305. FROM (
  306. SELECT
  307. (SELECT id FROM mz_sources WHERE type = 'postgres') AS primary_source_id,
  308. (SELECT id FROM mz_sources WHERE type = 'progress') AS progress_source_id
  309. );
  310. true
  311. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mz_source';
  312. running
  313. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mz_source_progress';
  314. running
  315. > SELECT lsn > 0 FROM mz_source_progress
  316. true
  317. # Ensure we report the write frontier of the progress subsource
  318. $ set-regex match=(\s{12}0|\d{13,20}|u\d{1,5}|\(\d+-\d\d-\d\d\s\d\d:\d\d:\d\d\.\d\d\d\)|true|false) replacement=<>
  319. > EXPLAIN TIMESTAMP FOR SELECT * FROM mz_source_progress
  320. " query timestamp: <> <>\n oracle read timestamp: <> <>\nlargest not in advance of upper: <> <>\n upper:[<> <>]\n since:[<> <>]\n can respond immediately: <>\n timeline: Some(EpochMilliseconds)\n session wall time: <> <>\n\nsource materialize.public.mz_source_progress (<>, storage):\n read frontier:[<> <>]\n write frontier:[<> <>]\n\nbinding constraints:\nlower:\n (IsolationLevel(StrictSerializable)): [<> <>]\n"
  321. $ set-regex match=[0-9]+|_[a-f0-9]+ replacement=<SUPPRESSED>
  322. > SELECT * FROM mz_internal.mz_postgres_sources
  323. id replication_slot timeline_id
  324. ---------------------------------------------------
  325. u<SUPPRESSED> materialize<SUPPRESSED> <SUPPRESSED>
  326. $ unset-regex
  327. #
  328. # Perform sanity checks of the initial snapshot
  329. #
  330. > SELECT * FROM pk_table;
  331. 1 one
  332. 2 two
  333. 3 three
  334. > SELECT * FROM nonpk_table;
  335. 1 1
  336. 1 1
  337. 2 2
  338. 2 2
  339. > SELECT * FROM types_table;
  340. "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "(,)" "(,)" "(,)" "(,)"
  341. > SELECT pg_typeof(numeric) FROM types_table;
  342. "numeric"
  343. > SELECT * FROM array_types_table;
  344. "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  345. > SELECT LENGTH(f1), LENGTH(f2) FROM large_text;
  346. 16777216 1048576
  347. 3145728 2097152
  348. > SELECT LENGTH(f1), f1 FROM trailing_space_pk;
  349. "6" "abc "
  350. > SELECT LENGTH(f1), f1 FROM trailing_space_nopk;
  351. "6" "abc "
  352. > SELECT * FROM multipart_pk;
  353. 1 abc xyz
  354. > SELECT f1, f2, f1 IS NULL, f2 IS NULL FROM nulls_table;
  355. <null> <null> true true
  356. > SELECT * FROM utf8_table;
  357. "това е текст" "това \'е\' \"текст\""
  358. > SELECT * FROM "таблица";
  359. стойност
  360. > SELECT * FROM escaped_text_table;
  361. "escaped\\ntext\\twith\\nnewlines\\tand\\ntabs" "more\\tescaped\\ntext"
  362. "second\\nrow\\twith\\tmore\\ttabs" "and\\nmore\\n\\nnewlines\\n"
  363. > SELECT * FROM conflict_table;
  364. 234
  365. > SELECT * FROM """literal_quotes"""
  366. v
  367. > SELECT * FROM "create"
  368. v
  369. > SELECT * FROM tstzrange_table
  370. "[2024-02-13 17:01:58.378480 UTC,)"
  371. #
  372. # Confirm that the new sources can be used to build upon
  373. #
  374. > CREATE MATERIALIZED VIEW join_view (a, b, c, d) AS SELECT * FROM pk_table, nonpk_table WHERE pk_table.pk = nonpk_table.f1;
  375. > SELECT * FROM join_view;
  376. "1" "one" "1" "1"
  377. "1" "one" "1" "1"
  378. "2" "two" "2" "2"
  379. "2" "two" "2" "2"
  380. #
  381. # Basic sanity check that the timestamps are reasonable
  382. #
  383. > SELECT COUNT(*) > 0 FROM pk_table;
  384. true
  385. > SELECT COUNT(*) > 0 FROM nonpk_table;
  386. true
  387. > SELECT COUNT(*) > 0 FROM join_view;
  388. true
  389. #
  390. # Modify the tables on the Postgres side
  391. #
  392. $ postgres-execute connection=postgres://postgres:postgres@postgres
  393. INSERT INTO pk_table VALUES (4, 'four');
  394. INSERT INTO pk_table VALUES (5, 'five');
  395. DELETE FROM pk_table WHERE pk = 1;
  396. UPDATE pk_table SET f2 = 'two_two' WHERE pk = 2;
  397. UPDATE pk_table SET pk = pk + 10 WHERE pk BETWEEN 3 AND 4;
  398. INSERT INTO nonpk_table VALUES (3, 3), (3, 3);
  399. DELETE FROM nonpk_table WHERE ctid = '(0,1)';
  400. UPDATE nonpk_table SET f1 = f1 + 10 WHERE ctid = '(0,2)';
  401. UPDATE nonpk_table SET f1 = f1 + 100 WHERE f1 = 3;
  402. INSERT INTO types_table VALUES ('foo', '2011-11-11', '11:11:11', '2011-11-11 11:11:11', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', 1234.56768, 1234.5678, 'empty', 'empty', 'empty', 'empty');
  403. INSERT INTO array_types_table VALUES ('{2011-11-11}', '{11:11:11}', '{2011-11-11 11:11:11}', '{A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11}', '{1234.56768}', '{1234.5678}', '{"(,)"}', '{"(,)"}', '{"(,)"}', '{"(,)"}');
  404. INSERT INTO large_text VALUES (REPEAT('x', 16 * 1024 * 1024), 'abc');
  405. INSERT INTO trailing_space_pk VALUES ('klm ');
  406. UPDATE trailing_space_pk SET f1 = 'xyz ' WHERE f1 = 'klm ';
  407. DELETE FROM trailing_space_pk WHERE f1 = 'abc ';
  408. INSERT INTO trailing_space_nopk VALUES ('klm ');
  409. UPDATE trailing_space_nopk SET f1 = 'xyz ' WHERE f1 = 'klm ';
  410. DELETE FROM trailing_space_nopk WHERE f1 = 'abc ';
  411. INSERT INTO multipart_pk VALUES (2, 'klm', 'xyz');
  412. DELETE FROM multipart_pk WHERE f1 = 1;
  413. UPDATE nulls_table SET f2 = 3 WHERE f2 IS NULL;
  414. INSERT INTO nulls_table VALUES (NULL, 1), (NULL, 2);
  415. UPDATE nulls_table SET f2 = NULL WHERE f2 = 2;
  416. INSERT INTO utf8_table VALUES ('това е текст 2', 'това ''е'' "текст" 2');
  417. UPDATE utf8_table SET f1 = f1 || f1 , f2 = f2 || f2;
  418. INSERT INTO "таблица" SELECT * FROM "таблица";
  419. #
  420. # Check the updated data on the Materialize side
  421. #
  422. > SELECT * FROM pk_table;
  423. 13 three
  424. 14 four
  425. 2 two_two
  426. 5 five
  427. > SELECT * FROM nonpk_table;
  428. 103 3
  429. 103 3
  430. 11 1
  431. 2 2
  432. 2 2
  433. > SELECT * FROM types_table;
  434. "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "(,)" "(,)" "(,)" "(,)"
  435. "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "empty" "empty" "empty" "empty"
  436. > SELECT * FROM array_types_table;
  437. "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  438. "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  439. > SELECT LENGTH(f1), LENGTH(f2) FROM large_text;
  440. 16777216 1048576
  441. 3145728 2097152
  442. 16777216 3
  443. > SELECT LENGTH(f1), f1 FROM trailing_space_pk;
  444. "6" "xyz "
  445. > SELECT LENGTH(f1), f1 FROM trailing_space_nopk;
  446. "6" "xyz "
  447. > SELECT * FROM multipart_pk;
  448. "2" "klm" "xyz"
  449. > SELECT f1, f2, f1 IS NULL, f2 IS NULL FROM nulls_table;
  450. "<null>" "1" "true" "false"
  451. "<null>" "3" "true" "false"
  452. "<null>" "<null>" "true" "true"
  453. > SELECT * FROM utf8_table;
  454. "това е текст 2това е текст 2" "това \'е\' \"текст\" 2това \'е\' \"текст\" 2"
  455. "това е тексттова е текст" "това \'е\' \"текст\"това \'е\' \"текст\""
  456. > SELECT * FROM "таблица";
  457. стойност
  458. стойност
  459. > SELECT * FROM join_view;
  460. "2" "two_two" "2" "2"
  461. "2" "two_two" "2" "2"
  462. #
  463. # Check that the timestamps continue to be reasonable in the face of incoming updates
  464. #
  465. > SELECT COUNT(*) > 0 FROM pk_table;
  466. true
  467. > SELECT COUNT(*) > 0 FROM nonpk_table;
  468. true
  469. > SELECT COUNT(*) > 0 FROM join_view;
  470. true
  471. #
  472. # Ensure we can start a source with more workers than the default max_wal_senders param (10)
  473. #
  474. > CREATE CLUSTER large_cluster SIZE '16';
  475. > CREATE SOURCE large_cluster_source
  476. IN CLUSTER large_cluster
  477. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  478. FOR TABLES ("pk_table" AS large_cluster_source_pk_table);
  479. > SELECT * FROM large_cluster_source_pk_table;
  480. 13 three
  481. 14 four
  482. 2 two_two
  483. 5 five
  484. > SELECT status = 'running' FROM mz_internal.mz_source_statuses WHERE name = 'large_cluster_source_pk_table';
  485. true
  486. > DROP SOURCE large_cluster_source CASCADE;
  487. #
  488. # Remove all data on the Postgres side
  489. #
  490. $ postgres-execute connection=postgres://postgres:postgres@postgres
  491. DELETE FROM pk_table;
  492. DELETE FROM nonpk_table;
  493. DELETE FROM large_text;
  494. DELETE FROM trailing_space_pk;
  495. DELETE FROM trailing_space_nopk;
  496. DELETE FROM multipart_pk;
  497. DELETE FROM nulls_table;
  498. DELETE FROM utf8_table;
  499. DELETE FROM "таблица";
  500. DELETE FROM conflict_schema.conflict_table;
  501. DELETE FROM tstzrange_table;
  502. #
  503. # Check that all data sources empty out on the Materialize side
  504. #
  505. > SELECT COUNT(*) FROM pk_table;
  506. 0
  507. > SELECT COUNT(*) FROM nonpk_table;
  508. 0
  509. > SELECT COUNT(*) FROM large_text;
  510. 0
  511. > SELECT COUNT(*) FROM trailing_space_nopk;
  512. 0
  513. > SELECT COUNT(*) FROM trailing_space_pk;
  514. 0
  515. > SELECT COUNT(*) FROM multipart_pk;
  516. 0
  517. > SELECT COUNT(*) FROM nulls_table;
  518. 0
  519. > SELECT COUNT(*) FROM utf8_table;
  520. 0
  521. > SELECT COUNT(*) FROM join_view;
  522. 0
  523. > SELECT COUNT(*) FROM "таблица";
  524. 0
  525. > SELECT COUNT(*) FROM conflict_table;
  526. 0
  527. > SELECT COUNT(*) FROM tstzrange_table;
  528. 0
  529. #
  530. # Support enum values as strings
  531. #
  532. #
  533. ! CREATE SOURCE enum_source
  534. IN CLUSTER cdc_cluster
  535. FROM POSTGRES CONNECTION pg (
  536. PUBLICATION 'mz_source',
  537. TEXT COLUMNS [pk_table.col_dne]
  538. )
  539. FOR TABLES (
  540. "enum_table"
  541. );
  542. contains: invalid TEXT COLUMNS option value: column "pk_table.col_dne" does not exist
  543. ! CREATE SOURCE case_sensitive_names
  544. IN CLUSTER cdc_cluster
  545. FROM POSTGRES CONNECTION pg (
  546. PUBLICATION 'mz_source',
  547. TEXT COLUMNS [pk_table."F2"]
  548. )
  549. FOR TABLES (
  550. "enum_table"
  551. );
  552. contains: invalid TEXT COLUMNS option value: column "pk_table.F2" does not exist
  553. hint: The similarly named column "pk_table.f2" does exist.
  554. ! CREATE SOURCE enum_source
  555. IN CLUSTER cdc_cluster
  556. FROM POSTGRES CONNECTION pg (
  557. PUBLICATION 'mz_source',
  558. TEXT COLUMNS [table_dne.col_dne]
  559. )
  560. FOR TABLES (
  561. "enum_table"
  562. );
  563. contains: reference to table_dne not found in source
  564. # Reference exists in two schemas, so is not unambiguous
  565. ! CREATE SOURCE enum_source
  566. IN CLUSTER cdc_cluster
  567. FROM POSTGRES CONNECTION pg (
  568. PUBLICATION 'mz_source',
  569. TEXT COLUMNS [another_enum_table."колона"]
  570. )
  571. FOR TABLES(
  572. conflict_schema.another_enum_table AS conflict_enum,
  573. public.another_enum_table AS public_enum
  574. );
  575. contains: reference another_enum_table is ambiguous, consider specifying an additional layer of qualification
  576. ! CREATE SOURCE enum_source
  577. IN CLUSTER cdc_cluster
  578. FROM POSTGRES CONNECTION pg (
  579. PUBLICATION 'mz_source',
  580. TEXT COLUMNS [foo]
  581. )
  582. FOR TABLES (pk_table);
  583. contains: invalid TEXT COLUMNS option value: column name 'foo' must have at least a table qualification
  584. ! CREATE SOURCE enum_source
  585. IN CLUSTER cdc_cluster
  586. FROM POSTGRES CONNECTION pg (
  587. PUBLICATION 'mz_source',
  588. TEXT COLUMNS [foo.bar.qux.qax.foo]
  589. )
  590. FOR TABLES (pk_table);
  591. contains: reference to foo.bar.qux.qax not found in source
  592. ! CREATE SOURCE enum_source
  593. IN CLUSTER cdc_cluster
  594. FROM POSTGRES CONNECTION pg (
  595. PUBLICATION 'mz_source',
  596. TEXT COLUMNS [enum_table.a, enum_table.a]
  597. )
  598. FOR TABLES (enum_table);
  599. contains: invalid TEXT COLUMNS option value: unexpected multiple references to postgres.public.enum_table.a
  600. # utf8_table is not part of mz_source_narrow publication
  601. ! CREATE SOURCE enum_source
  602. IN CLUSTER cdc_cluster
  603. FROM POSTGRES CONNECTION pg (
  604. PUBLICATION 'mz_source_narrow',
  605. TEXT COLUMNS [enum_table.a, utf8_table.f1]
  606. )
  607. FOR TABLES (enum_table);
  608. contains: reference to utf8_table not found in source
  609. # n.b includes a reference to pk_table, which is not a table that's part of the
  610. # source, but is part of the publication.
  611. ! CREATE SOURCE enum_source
  612. IN CLUSTER cdc_cluster
  613. FROM POSTGRES CONNECTION pg (
  614. PUBLICATION 'mz_source',
  615. TEXT COLUMNS [
  616. enum_table.a,
  617. public.another_enum_table."колона",
  618. pk_table.pk
  619. ]
  620. )
  621. FOR TABLES (
  622. "enum_table",
  623. public.another_enum_table
  624. );
  625. contains:TEXT COLUMNS refers to table not currently being added
  626. > CREATE SOURCE enum_source
  627. IN CLUSTER cdc_cluster
  628. FROM POSTGRES CONNECTION pg (
  629. PUBLICATION 'mz_source',
  630. TEXT COLUMNS [
  631. enum_table.a,
  632. public.another_enum_table."колона"
  633. ]
  634. )
  635. FOR TABLES (
  636. "enum_table",
  637. public.another_enum_table
  638. );
  639. > SELECT * FROM (SHOW SOURCES) WHERE name LIKE '%enum%';
  640. another_enum_table subsource cdc_cluster ""
  641. enum_source postgres cdc_cluster ""
  642. enum_source_progress progress <null> ""
  643. enum_table subsource cdc_cluster ""
  644. > SELECT * FROM enum_table
  645. var0
  646. var1
  647. >[version>=14000] SHOW CREATE SOURCE enum_table
  648. materialize.public.enum_table "CREATE SUBSOURCE materialize.public.enum_table (a pg_catalog.text) OF SOURCE materialize.public.enum_source WITH (EXTERNAL REFERENCE = postgres.public.enum_table, TEXT COLUMNS = (a));"
  649. >[version<14000] SHOW CREATE SOURCE enum_table
  650. materialize.public.enum_table "CREATE SUBSOURCE \"materialize\".\"public\".\"enum_table\" (\"a\" \"pg_catalog\".\"text\") OF SOURCE \"materialize\".\"public\".\"enum_source\" WITH (EXTERNAL REFERENCE = \"postgres\".\"public\".\"enum_table\", TEXT COLUMNS = (\"a\"))"
  651. # Test that TEXT COLUMN types can change
  652. $ postgres-execute connection=postgres://postgres:postgres@postgres
  653. BEGIN;
  654. ALTER TYPE an_enum RENAME TO an_enum_old;
  655. CREATE TYPE an_enum AS ENUM ('var0', 'var1', 'var2');
  656. ALTER TABLE enum_table ALTER COLUMN a TYPE an_enum USING a::text::an_enum;
  657. DROP TYPE an_enum_old;
  658. COMMIT;
  659. INSERT INTO enum_table VALUES ('var2');
  660. > SELECT * FROM enum_table
  661. var0
  662. var1
  663. var2
  664. > SELECT "колона" FROM another_enum_table
  665. var2
  666. var3
  667. #
  668. # Cleanup
  669. #
  670. #
  671. $ postgres-execute connection=postgres://postgres:postgres@postgres
  672. DROP PUBLICATION mz_source;
  673. DROP PUBLICATION mz_source_narrow;
  674. INSERT INTO pk_table VALUES (99999);
  675. # Ensure that source + all subsources have error
  676. > SELECT bool_and(error ~* 'publication .+ does not exist')
  677. FROM mz_internal.mz_source_statuses
  678. WHERE id IN ( SELECT id FROM mz_sources WHERE type != 'progress' );
  679. true
  680. > DROP SOURCE enum_source CASCADE;
  681. > DROP SOURCE "mz_source" CASCADE;
  682. #
  683. # Check schema scoped tables
  684. > CREATE SOURCE another_source
  685. IN CLUSTER cdc_cluster
  686. FROM POSTGRES CONNECTION pg (
  687. PUBLICATION 'another_publication'
  688. )
  689. FOR SCHEMAS (
  690. another_schema
  691. );
  692. > SHOW SOURCES
  693. another_source postgres cdc_cluster ""
  694. another_source_progress progress <null> ""
  695. another_table subsource cdc_cluster ""
  696. > DROP SOURCE another_source CASCADE
  697. $ postgres-execute connection=postgres://postgres:postgres@postgres
  698. DROP SCHEMA conflict_schema CASCADE;