yugabyte-cdc.td 23 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-sql-timeout duration=60s
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET enable_yugabyte_connection = true;
  12. > CREATE SECRET pgpass AS 'yugabyte'
  13. > CREATE CONNECTION yb TO YUGABYTE (
  14. HOST yugabyte,
  15. PORT 5433,
  16. DATABASE yugabyte,
  17. USER yugabyte,
  18. PASSWORD SECRET pgpass
  19. )
  20. > CREATE CLUSTER cdc_cluster SIZE '${arg.default-replica-size}'
  21. $ postgres-execute connection=postgres://yugabyte:yugabyte@yugabyte:5433/yugabyte
  22. ALTER USER yugabyte WITH replication;
  23. DROP SCHEMA IF EXISTS public CASCADE;
  24. CREATE SCHEMA public;
  25. DROP PUBLICATION IF EXISTS mz_source;
  26. CREATE PUBLICATION mz_source FOR ALL TABLES;
  27. CREATE TABLE pk_table (pk INTEGER PRIMARY KEY, f2 TEXT);
  28. INSERT INTO pk_table VALUES (1, 'one');
  29. ALTER TABLE pk_table REPLICA IDENTITY FULL;
  30. INSERT INTO pk_table VALUES (2, 'two');
  31. INSERT INTO pk_table VALUES (3, 'three');
  32. CREATE TABLE types_table (pk INTEGER PRIMARY KEY, char_col char(3), date_col DATE, time_col TIME, timestamp_col TIMESTAMP, uuid_col UUID, double_col DOUBLE PRECISION, numeric NUMERIC(8,4), int4range_col INT4RANGE, int8range_col INT8RANGE, daterange_col DATERANGE, numrange_col NUMRANGE);
  33. INSERT INTO types_table VALUES (1, 'foo', '2011-11-11', '11:11:11', '2011-11-11 11:11:11', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', 1234.56768, 1234.5678, '(,)', '(,)', '(,)', '(,)');
  34. ALTER TABLE types_table REPLICA IDENTITY FULL;
  35. CREATE TABLE array_types_table (pk INTEGER PRIMARY KEY, date_col DATE[], time_col TIME[], timestamp_col TIMESTAMP[], uuid_col UUID[], double_col DOUBLE PRECISION[], numeric NUMERIC[], int4range_col INT4RANGE[], int8range_col INT8RANGE[], daterange_col DATERANGE[], numrange_col NUMRANGE[]);
  36. INSERT INTO array_types_table VALUES (1, '{2011-11-11}', '{11:11:11}', '{2011-11-11 11:11:11}', '{A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11}', '{1234.56768}', '{1234.5678}', '{"(,)"}', '{"(,)"}', '{"(,)"}', '{"(,)"}');
  37. ALTER TABLE array_types_table REPLICA IDENTITY FULL;
  38. CREATE TABLE large_text (pk INTEGER PRIMARY KEY, f1 TEXT, f2 TEXT);
  39. INSERT INTO large_text VALUES (1, REPEAT('x', 16 * 1024 * 1024), REPEAT('y', 1 * 1024 * 1024));
  40. INSERT INTO large_text VALUES (2, REPEAT('a', 3 * 1024 * 1024), REPEAT('b', 2 * 1024 * 1024));
  41. ALTER TABLE large_text REPLICA IDENTITY FULL;
  42. CREATE TABLE trailing_space_pk (f1 TEXT PRIMARY KEY);
  43. INSERT INTO trailing_space_pk VALUES ('abc ');
  44. ALTER TABLE trailing_space_pk REPLICA IDENTITY FULL;
  45. CREATE TABLE multipart_pk(f1 INTEGER, f2 TEXT, f3 TEXT, PRIMARY KEY (f1, f2));
  46. INSERT INTO multipart_pk VALUES (1, 'abc', 'xyz');
  47. ALTER TABLE multipart_pk REPLICA IDENTITY FULL;
  48. CREATE TABLE nulls_table (pk INTEGER PRIMARY KEY, f2 TEXT, f3 INTEGER);
  49. INSERT INTO nulls_table VALUES (1, NULL, NULL);
  50. ALTER TABLE nulls_table REPLICA IDENTITY FULL;
  51. CREATE TABLE utf8_table (f1 TEXT PRIMARY KEY, f2 TEXT);
  52. INSERT INTO utf8_table VALUES ('това е текст', 'това ''е'' "текст"');
  53. ALTER TABLE utf8_table REPLICA IDENTITY FULL;
  54. CREATE TABLE "таблица" ("колона" TEXT PRIMARY KEY);
  55. ALTER TABLE "таблица" REPLICA IDENTITY FULL;
  56. INSERT INTO "таблица" VALUES ('стойност');
  57. CREATE TABLE tstzrange_table (pk INTEGER PRIMARY KEY, a TSTZRANGE);
  58. ALTER TABLE tstzrange_table REPLICA IDENTITY FULL;
  59. INSERT INTO tstzrange_table VALUES (1, '["2024-02-13 17:01:58.37848+00!?!",)');
  60. CREATE TABLE """literal_quotes""" (a TEXT PRIMARY KEY);
  61. ALTER TABLE """literal_quotes""" REPLICA IDENTITY FULL;
  62. INSERT INTO """literal_quotes""" VALUES ('v');
  63. ALTER TABLE """literal_quotes""" REPLICA IDENTITY FULL;
  64. CREATE TABLE "create" (a TEXT PRIMARY KEY);
  65. ALTER TABLE "create" REPLICA IDENTITY FULL;
  66. INSERT INTO "create" VALUES ('v');
  67. CREATE TABLE escaped_text_table (pk INTEGER PRIMARY KEY, f1 TEXT, f2 TEXt);
  68. ALTER TABLE escaped_text_table REPLICA IDENTITY FULL;
  69. INSERT INTO escaped_text_table VALUES (1, 'escaped\ntext\twith\nnewlines\tand\ntabs', 'more\tescaped\ntext');
  70. INSERT INTO escaped_text_table VALUES (2, 'second\nrow\twith\tmore\ttabs', 'and\nmore\n\nnewlines\n');
  71. CREATE TABLE conflict_table (f1 INTEGER PRIMARY KEY);
  72. ALTER TABLE conflict_table REPLICA IDENTITY FULL;
  73. INSERT INTO conflict_table VALUES (123);
  74. DROP SCHEMA IF EXISTS conflict_schema CASCADE;
  75. CREATE SCHEMA conflict_schema;
  76. CREATE TABLE conflict_schema.conflict_table (f1 TEXT PRIMARY KEY);
  77. ALTER TABLE conflict_schema.conflict_table REPLICA IDENTITY FULL;
  78. INSERT INTO conflict_schema.conflict_table VALUES ('234');
  79. CREATE TABLE "space table" ("space column" INTEGER PRIMARY KEY);
  80. ALTER TABLE "space table" REPLICA IDENTITY FULL;
  81. CREATE TYPE an_enum AS ENUM ('var0', 'var1');
  82. CREATE TABLE enum_table (pk INTEGER PRIMARY KEY, a an_enum);
  83. INSERT INTO enum_table VALUES (1, 'var0'), (2, 'var1');
  84. ALTER TABLE enum_table REPLICA IDENTITY FULL;
  85. CREATE TYPE another_enum AS ENUM ('var2', 'var3');
  86. CREATE TABLE another_enum_table (pk INTEGER PRIMARY KEY, "колона" another_enum);
  87. INSERT INTO another_enum_table VALUES (1, 'var2'), (2, 'var3');
  88. ALTER TABLE another_enum_table REPLICA IDENTITY FULL;
  89. CREATE TABLE conflict_schema.another_enum_table (pk INTEGER PRIMARY KEY, "колона" another_enum);
  90. INSERT INTO conflict_schema.another_enum_table VALUES (1, 'var2'), (2, 'var3');
  91. ALTER TABLE conflict_schema.another_enum_table REPLICA IDENTITY FULL;
  92. DROP PUBLICATION IF EXISTS mz_source_narrow;
  93. CREATE PUBLICATION mz_source_narrow FOR TABLE enum_table, public.another_enum_table, pk_table;
  94. DROP SCHEMA IF EXISTS another_schema CASCADE;
  95. CREATE SCHEMA another_schema;
  96. CREATE TABLE another_schema.another_table (f1 TEXT PRIMARY KEY);
  97. ALTER TABLE another_schema.another_table REPLICA IDENTITY FULL;
  98. INSERT INTO another_schema.another_table VALUES ('123');
  99. DROP PUBLICATION IF EXISTS another_publication;
  100. CREATE PUBLICATION another_publication FOR TABLE another_schema.another_table;
  101. #
  102. # Error checking
  103. #
  104. ! CREATE CONNECTION no_such_host TO YUGABYTE (
  105. HOST 'no_such_postgres.mtrlz.com',
  106. PORT 5433,
  107. DATABASE yugabyte,
  108. USER yugabyte,
  109. PASSWORD SECRET pgpass
  110. )
  111. contains:failed to lookup address information
  112. ! CREATE CONNECTION no_such_port TO YUGABYTE (
  113. HOST yugabyte,
  114. PORT 65534,
  115. DATABASE yugabyte,
  116. USER yugabyte,
  117. PASSWORD SECRET pgpass
  118. )
  119. contains:error connecting to server: Connection refused
  120. ! CREATE CONNECTION no_such_user TO YUGABYTE (
  121. HOST yugabyte,
  122. PORT 5433,
  123. DATABASE yugabyte,
  124. USER no_such_user,
  125. PASSWORD SECRET pgpass
  126. )
  127. contains:role "no_such_user" does not exist
  128. ! CREATE CONNECTION no_such_dbname TO YUGABYTE (
  129. HOST yugabyte,
  130. PORT 5433,
  131. DATABASE no_such_dbname,
  132. USER yugabyte,
  133. PASSWORD SECRET pgpass
  134. )
  135. contains:database "no_such_dbname" does not exist
  136. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  137. ALTER SYSTEM SET storage_enforce_external_addresses = true
  138. ! CREATE CONNECTION private_address TO YUGABYTE (
  139. HOST yugabyte,
  140. PORT 5433,
  141. DATABASE yugabyte,
  142. USER yugabyte,
  143. PASSWORD SECRET pgpass
  144. )
  145. contains:Address resolved to a private IP
  146. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  147. ALTER SYSTEM SET storage_enforce_external_addresses = false
  148. ! CREATE SOURCE "no_such_publication"
  149. IN CLUSTER cdc_cluster
  150. FROM YUGABYTE CONNECTION yb (PUBLICATION 'no_such_publication');
  151. # TODO: assert on `detail` here.
  152. contains:failed to connect to PostgreSQL database
  153. ! CREATE SOURCE "mz_source"
  154. IN CLUSTER cdc_cluster
  155. FROM YUGABYTE CONNECTION yb (PUBLICATION 'mz_source')
  156. FOR TABLES (
  157. "enum_table"
  158. );
  159. contains:referenced tables use unsupported types
  160. ! CREATE SOURCE "mz_source"
  161. IN CLUSTER cdc_cluster
  162. FROM YUGABYTE CONNECTION yb (PUBLICATION 'mz_source')
  163. FOR TABLES (
  164. "enum_table",
  165. public.another_enum_table
  166. );
  167. contains:referenced tables use unsupported types
  168. ! CREATE SOURCE mz_source
  169. IN CLUSTER cdc_cluster
  170. FROM YUGABYTE CONNECTION yb (
  171. PUBLICATION 'mz_source',
  172. DETAILS 'abc'
  173. )
  174. FOR TABLES (
  175. pk_table
  176. );
  177. contains: CREATE SOURCE specifies DETAILS option
  178. ! CREATE SOURCE mz_source
  179. IN CLUSTER cdc_cluster
  180. FROM POSTGRES CONNECTION yb (PUBLICATION 'mz_source')
  181. FOR ALL TABLES;
  182. contains:materialize.public.yb is not a POSTGRES CONNECTION
  183. #
  184. # Establish direct replication
  185. #
  186. #
  187. # Note: This implicitly tests that enum_table being part of the publication does not
  188. # prevent us from using other tables as subsources.
  189. #
  190. > CREATE SOURCE "mz_source"
  191. IN CLUSTER cdc_cluster
  192. FROM YUGABYTE CONNECTION yb (PUBLICATION 'mz_source')
  193. FOR TABLES (
  194. "pk_table",
  195. "types_table",
  196. "array_types_table",
  197. "large_text",
  198. "trailing_space_pk",
  199. "multipart_pk",
  200. "nulls_table",
  201. "utf8_table",
  202. "таблица",
  203. "escaped_text_table",
  204. conflict_schema.conflict_table AS public.conflict_table,
  205. "space table",
  206. """literal_quotes""",
  207. "create",
  208. tstzrange_table
  209. );
  210. ! ALTER SOURCE mz_source ADD SUBSOURCE foobar;
  211. contains:source mz_source does not support ALTER SOURCE.
  212. > SHOW SOURCES
  213. array_types_table subsource cdc_cluster ""
  214. conflict_table subsource cdc_cluster ""
  215. create subsource cdc_cluster ""
  216. escaped_text_table subsource cdc_cluster ""
  217. large_text subsource cdc_cluster ""
  218. multipart_pk subsource cdc_cluster ""
  219. mz_source postgres cdc_cluster ""
  220. mz_source_progress progress <null> ""
  221. nulls_table subsource cdc_cluster ""
  222. pk_table subsource cdc_cluster ""
  223. "space table" subsource cdc_cluster ""
  224. trailing_space_pk subsource cdc_cluster ""
  225. "\"literal_quotes\"" subsource cdc_cluster ""
  226. tstzrange_table subsource cdc_cluster ""
  227. types_table subsource cdc_cluster ""
  228. utf8_table subsource cdc_cluster ""
  229. таблица subsource cdc_cluster ""
  230. > SELECT schema_name, table_name FROM mz_internal.mz_postgres_source_tables
  231. public create
  232. public pk_table
  233. public large_text
  234. public utf8_table
  235. public types_table
  236. public nulls_table
  237. public multipart_pk
  238. public "\"space table\""
  239. public tstzrange_table
  240. public "\"таблица\""
  241. public array_types_table
  242. public trailing_space_pk
  243. public escaped_text_table
  244. public "\"\"\"literal_quotes\"\"\""
  245. conflict_schema conflict_table
  246. # Ensure all ingestion export subsources have an ID greater than the primary source ID
  247. > SELECT bool_and(primary_source_id < subsource_id)
  248. FROM
  249. (SELECT id AS primary_source_id FROM mz_sources WHERE type = 'postgres')
  250. CROSS JOIN (SELECT id AS subsource_id FROM mz_sources WHERE type = 'subsource');
  251. true
  252. # Ensure progress subsources have an ID less than the primary source ID
  253. > SELECT progress_source_id < primary_source_id
  254. FROM (
  255. SELECT
  256. (SELECT id FROM mz_sources WHERE type = 'postgres') AS primary_source_id,
  257. (SELECT id FROM mz_sources WHERE type = 'progress') AS progress_source_id
  258. );
  259. true
  260. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mz_source';
  261. running
  262. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mz_source_progress';
  263. running
  264. > SELECT lsn > 0 FROM mz_source_progress
  265. true
  266. # Ensure we report the write frontier of the progress subsource
  267. $ set-regex match=(\s{12}0|\d{13,20}|u\d{1,5}|\(\d+-\d\d-\d\d\s\d\d:\d\d:\d\d\.\d\d\d\)|true|false) replacement=<>
  268. > EXPLAIN TIMESTAMP FOR SELECT * FROM mz_source_progress
  269. " query timestamp: <> <>\n oracle read timestamp: <> <>\nlargest not in advance of upper: <> <>\n upper:[<> <>]\n since:[<> <>]\n can respond immediately: <>\n timeline: Some(EpochMilliseconds)\n session wall time: <> <>\n\nsource materialize.public.mz_source_progress (<>, storage):\n read frontier:[<> <>]\n write frontier:[<> <>]\n\nbinding constraints:\nlower:\n (IsolationLevel(StrictSerializable)): [<> <>]\n"
  270. $ set-regex match=[0-9]+|_[a-f0-9]+ replacement=<SUPPRESSED>
  271. > SELECT * FROM mz_internal.mz_postgres_sources
  272. id replication_slot timeline_id
  273. ---------------------------------------------------
  274. u<SUPPRESSED> materialize<SUPPRESSED> <SUPPRESSED>
  275. $ unset-regex
  276. #
  277. # Perform sanity checks of the initial snapshot
  278. #
  279. > SELECT * FROM pk_table;
  280. 1 one
  281. 2 two
  282. 3 three
  283. > SELECT * FROM types_table;
  284. 1 "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "(,)" "(,)" "(,)" "(,)"
  285. > SELECT pg_typeof(numeric) FROM types_table;
  286. "numeric"
  287. > SELECT * FROM array_types_table;
  288. 1 "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  289. > SELECT LENGTH(f1), LENGTH(f2) FROM large_text;
  290. 16777216 1048576
  291. 3145728 2097152
  292. > SELECT LENGTH(f1), f1 FROM trailing_space_pk;
  293. "6" "abc "
  294. > SELECT * FROM multipart_pk;
  295. 1 abc xyz
  296. > SELECT f2, f3, f2 IS NULL, f3 IS NULL FROM nulls_table;
  297. <null> <null> true true
  298. > SELECT * FROM utf8_table;
  299. "това е текст" "това \'е\' \"текст\""
  300. > SELECT * FROM "таблица";
  301. стойност
  302. > SELECT * FROM escaped_text_table;
  303. 1 "escaped\\ntext\\twith\\nnewlines\\tand\\ntabs" "more\\tescaped\\ntext"
  304. 2 "second\\nrow\\twith\\tmore\\ttabs" "and\\nmore\\n\\nnewlines\\n"
  305. > SELECT * FROM conflict_table;
  306. 234
  307. > SELECT * FROM """literal_quotes"""
  308. v
  309. > SELECT * FROM "create"
  310. v
  311. > SELECT * FROM tstzrange_table
  312. 1 "[2024-02-13 17:01:58.378480 UTC,)"
  313. #
  314. # Basic sanity check that the timestamps are reasonable
  315. #
  316. > SELECT COUNT(*) > 0 FROM pk_table;
  317. true
  318. #
  319. # Modify the tables on the Yugabyte side
  320. #
  321. $ postgres-execute connection=postgres://yugabyte:yugabyte@yugabyte:5433/yugabyte
  322. INSERT INTO pk_table VALUES (4, 'four');
  323. INSERT INTO pk_table VALUES (5, 'five');
  324. DELETE FROM pk_table WHERE pk = 1;
  325. UPDATE pk_table SET f2 = 'two_two' WHERE pk = 2;
  326. UPDATE pk_table SET pk = pk + 10 WHERE pk BETWEEN 3 AND 4;
  327. INSERT INTO types_table VALUES (2, 'foo', '2011-11-11', '11:11:11', '2011-11-11 11:11:11', 'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11', 1234.56768, 1234.5678, 'empty', 'empty', 'empty', 'empty');
  328. INSERT INTO array_types_table VALUES (2, '{2011-11-11}', '{11:11:11}', '{2011-11-11 11:11:11}', '{A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11}', '{1234.56768}', '{1234.5678}', '{"(,)"}', '{"(,)"}', '{"(,)"}', '{"(,)"}');
  329. INSERT INTO large_text VALUES (3, REPEAT('x', 16 * 1024 * 1024), 'abc');
  330. INSERT INTO trailing_space_pk VALUES ('klm ');
  331. UPDATE trailing_space_pk SET f1 = 'xyz ' WHERE f1 = 'klm ';
  332. DELETE FROM trailing_space_pk WHERE f1 = 'abc ';
  333. INSERT INTO multipart_pk VALUES (2, 'klm', 'xyz');
  334. DELETE FROM multipart_pk WHERE f1 = 1;
  335. UPDATE nulls_table SET f3 = 3 WHERE f3 IS NULL;
  336. INSERT INTO nulls_table VALUES (2, NULL, 1), (3, NULL, 2);
  337. UPDATE nulls_table SET f3 = NULL WHERE f3 = 2;
  338. INSERT INTO utf8_table VALUES ('това е текст 2', 'това ''е'' "текст" 2');
  339. UPDATE utf8_table SET f1 = f1 || f1 , f2 = f2 || f2;
  340. INSERT INTO "таблица" VALUES ('наздраве');
  341. #
  342. # Check the updated data on the Materialize side
  343. #
  344. > SELECT * FROM pk_table;
  345. 13 three
  346. 14 four
  347. 2 two_two
  348. 5 five
  349. > SELECT * FROM types_table;
  350. 1 "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "(,)" "(,)" "(,)" "(,)"
  351. 2 "foo" "2011-11-11" "11:11:11" "2011-11-11 11:11:11" "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11" "1234.56768" "1234.5678" "empty" "empty" "empty" "empty"
  352. > SELECT * FROM array_types_table;
  353. 1 "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  354. 2 "{2011-11-11}" "{11:11:11}" "{2011-11-11 11:11:11}" "{a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11}" "{1234.56768}" "{1234.5678}" "{(,)}" "{(,)}" "{(,)}" "{(,)}"
  355. > SELECT LENGTH(f1), LENGTH(f2) FROM large_text;
  356. 16777216 1048576
  357. 3145728 2097152
  358. 16777216 3
  359. > SELECT LENGTH(f1), f1 FROM trailing_space_pk;
  360. "6" "xyz "
  361. > SELECT * FROM multipart_pk;
  362. "2" "klm" "xyz"
  363. > SELECT f2, f3, f2 IS NULL, f3 IS NULL FROM nulls_table;
  364. "<null>" "1" "true" "false"
  365. "<null>" "3" "true" "false"
  366. "<null>" "<null>" "true" "true"
  367. > SELECT * FROM utf8_table;
  368. "това е текст 2това е текст 2" "това \'е\' \"текст\" 2това \'е\' \"текст\" 2"
  369. "това е тексттова е текст" "това \'е\' \"текст\"това \'е\' \"текст\""
  370. > SELECT * FROM "таблица";
  371. стойност
  372. наздраве
  373. #
  374. # Check that the timestamps continue to be reasonable in the face of incoming updates
  375. #
  376. > SELECT COUNT(*) > 0 FROM pk_table;
  377. true
  378. #
  379. # Ensure we can start a source with more workers than the default max_wal_senders param (10)
  380. #
  381. > CREATE CLUSTER large_cluster SIZE '16';
  382. > CREATE SOURCE large_cluster_source
  383. IN CLUSTER large_cluster
  384. FROM YUGABYTE CONNECTION yb (PUBLICATION 'mz_source')
  385. FOR TABLES ("pk_table" AS large_cluster_source_pk_table);
  386. > SELECT * FROM large_cluster_source_pk_table;
  387. 13 three
  388. 14 four
  389. 2 two_two
  390. 5 five
  391. > SELECT status = 'running' FROM mz_internal.mz_source_statuses WHERE name = 'large_cluster_source_pk_table';
  392. true
  393. > DROP SOURCE large_cluster_source CASCADE;
  394. #
  395. # Remove all data on the Yugabyte side
  396. #
  397. $ postgres-execute connection=postgres://yugabyte:yugabyte@yugabyte:5433/yugabyte
  398. DELETE FROM pk_table;
  399. DELETE FROM large_text;
  400. DELETE FROM trailing_space_pk;
  401. DELETE FROM multipart_pk;
  402. DELETE FROM nulls_table;
  403. DELETE FROM utf8_table;
  404. DELETE FROM "таблица";
  405. DELETE FROM conflict_schema.conflict_table;
  406. DELETE FROM tstzrange_table;
  407. #
  408. # Check that all data sources empty out on the Materialize side
  409. #
  410. > SELECT COUNT(*) FROM pk_table;
  411. 0
  412. > SELECT COUNT(*) FROM large_text;
  413. 0
  414. > SELECT COUNT(*) FROM trailing_space_pk;
  415. 0
  416. > SELECT COUNT(*) FROM multipart_pk;
  417. 0
  418. > SELECT COUNT(*) FROM nulls_table;
  419. 0
  420. > SELECT COUNT(*) FROM utf8_table;
  421. 0
  422. > SELECT COUNT(*) FROM "таблица";
  423. 0
  424. > SELECT COUNT(*) FROM conflict_table;
  425. 0
  426. > SELECT COUNT(*) FROM tstzrange_table;
  427. 0
  428. #
  429. # Support enum values as strings
  430. #
  431. #
  432. ! CREATE SOURCE enum_source
  433. IN CLUSTER cdc_cluster
  434. FROM YUGABYTE CONNECTION yb (
  435. PUBLICATION 'mz_source',
  436. TEXT COLUMNS [pk_table.col_dne]
  437. )
  438. FOR TABLES (
  439. "enum_table"
  440. );
  441. contains: invalid TEXT COLUMNS option value: column "pk_table.col_dne" does not exist
  442. ! CREATE SOURCE case_sensitive_names
  443. IN CLUSTER cdc_cluster
  444. FROM YUGABYTE CONNECTION yb (
  445. PUBLICATION 'mz_source',
  446. TEXT COLUMNS [pk_table."F2"]
  447. )
  448. FOR TABLES (
  449. "enum_table"
  450. );
  451. contains: invalid TEXT COLUMNS option value: column "pk_table.F2" does not exist
  452. hint: The similarly named column "pk_table.f2" does exist.
  453. ! CREATE SOURCE enum_source
  454. IN CLUSTER cdc_cluster
  455. FROM YUGABYTE CONNECTION yb (
  456. PUBLICATION 'mz_source',
  457. TEXT COLUMNS [table_dne.col_dne]
  458. )
  459. FOR TABLES (
  460. "enum_table"
  461. );
  462. contains: reference to table_dne not found in source
  463. # Reference exists in two schemas, so is not unambiguous
  464. ! CREATE SOURCE enum_source
  465. IN CLUSTER cdc_cluster
  466. FROM YUGABYTE CONNECTION yb (
  467. PUBLICATION 'mz_source',
  468. TEXT COLUMNS [another_enum_table."колона"]
  469. )
  470. FOR TABLES(
  471. conflict_schema.another_enum_table AS conflict_enum,
  472. public.another_enum_table AS public_enum
  473. );
  474. contains: reference another_enum_table is ambiguous, consider specifying an additional layer of qualification
  475. ! CREATE SOURCE enum_source
  476. IN CLUSTER cdc_cluster
  477. FROM YUGABYTE CONNECTION yb (
  478. PUBLICATION 'mz_source',
  479. TEXT COLUMNS [foo]
  480. )
  481. FOR TABLES (pk_table);
  482. contains: column name 'foo' must have at least a table qualification
  483. ! CREATE SOURCE enum_source
  484. IN CLUSTER cdc_cluster
  485. FROM YUGABYTE CONNECTION yb (
  486. PUBLICATION 'mz_source',
  487. TEXT COLUMNS [foo.bar.qux.qax.foo]
  488. )
  489. FOR TABLES (pk_table);
  490. contains: reference to foo.bar.qux.qax not found in source
  491. ! CREATE SOURCE enum_source
  492. IN CLUSTER cdc_cluster
  493. FROM YUGABYTE CONNECTION yb (
  494. PUBLICATION 'mz_source',
  495. TEXT COLUMNS [enum_table.a, enum_table.a]
  496. )
  497. FOR TABLES (enum_table);
  498. contains: unexpected multiple references to yugabyte.public.enum_table.a
  499. # utf8_table is not part of mz_source_narrow publication
  500. ! CREATE SOURCE enum_source
  501. IN CLUSTER cdc_cluster
  502. FROM YUGABYTE CONNECTION yb (
  503. PUBLICATION 'mz_source_narrow',
  504. TEXT COLUMNS [enum_table.a, utf8_table.f1]
  505. )
  506. FOR TABLES (enum_table);
  507. contains: reference to utf8_table not found in source
  508. # n.b includes a reference to pk_table, which is not a table that's part of the
  509. # source, but is part of the publication.
  510. ! CREATE SOURCE enum_source
  511. IN CLUSTER cdc_cluster
  512. FROM YUGABYTE CONNECTION yb (
  513. PUBLICATION 'mz_source',
  514. TEXT COLUMNS [
  515. enum_table.a,
  516. public.another_enum_table."колона",
  517. pk_table.pk
  518. ]
  519. )
  520. FOR TABLES (
  521. "enum_table",
  522. public.another_enum_table
  523. );
  524. contains:TEXT COLUMNS refers to table not currently being added
  525. > CREATE SOURCE enum_source
  526. IN CLUSTER cdc_cluster
  527. FROM YUGABYTE CONNECTION yb (
  528. PUBLICATION 'mz_source',
  529. TEXT COLUMNS [
  530. enum_table.a,
  531. public.another_enum_table."колона"
  532. ]
  533. )
  534. FOR TABLES (
  535. "enum_table",
  536. public.another_enum_table
  537. );
  538. > SELECT * FROM (SHOW SOURCES) WHERE name LIKE '%enum%';
  539. another_enum_table subsource cdc_cluster ""
  540. enum_source postgres cdc_cluster ""
  541. enum_source_progress progress <null> ""
  542. enum_table subsource cdc_cluster ""
  543. > SELECT * FROM enum_table
  544. 1 var0
  545. 2 var1
  546. > SHOW CREATE SOURCE enum_table
  547. materialize.public.enum_table "CREATE SUBSOURCE materialize.public.enum_table (pk pg_catalog.int4 NOT NULL, a pg_catalog.text, CONSTRAINT enum_table_pkey PRIMARY KEY (pk)) OF SOURCE materialize.public.enum_source WITH (EXTERNAL REFERENCE = yugabyte.public.enum_table, TEXT COLUMNS = (a));"
  548. > SELECT "колона" FROM another_enum_table
  549. var2
  550. var3
  551. #
  552. # Cleanup
  553. #
  554. #
  555. > DROP SOURCE enum_source CASCADE;
  556. > DROP SOURCE "mz_source" CASCADE;
  557. #
  558. # Check schema scoped tables
  559. > CREATE SOURCE another_source
  560. IN CLUSTER cdc_cluster
  561. FROM YUGABYTE CONNECTION yb (
  562. PUBLICATION 'another_publication'
  563. )
  564. FOR SCHEMAS (
  565. another_schema
  566. );
  567. > SHOW SOURCES
  568. another_source postgres cdc_cluster ""
  569. another_source_progress progress <null> ""
  570. another_table subsource cdc_cluster ""
  571. > DROP SOURCE another_source CASCADE
  572. $ postgres-execute connection=postgres://yugabyte:yugabyte@yugabyte:5433/yugabyte
  573. DROP SCHEMA conflict_schema CASCADE;