force-source-tables.td 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # This whole file is copy and pasted from source-tables.td, with the added force_source_table_syntax flag.
  10. $ set-arg-default default-replica-size=1
  11. $ set-arg-default single-replica-cluster=quickstart
  12. #
  13. # Validate feature flag
  14. #
  15. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  16. ALTER SYSTEM SET enable_create_table_from_source = false
  17. > CREATE SOURCE auction_house
  18. IN CLUSTER ${arg.single-replica-cluster}
  19. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301) FOR ALL TABLES;
  20. ! CREATE TABLE bids2 FROM SOURCE auction_house (REFERENCE "auction"."bids");
  21. contains: not supported
  22. > DROP SOURCE auction_house CASCADE;
  23. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  24. ALTER SYSTEM SET enable_create_table_from_source = true
  25. ALTER SYSTEM SET enable_load_generator_key_value = true
  26. ALTER SYSTEM SET force_source_table_syntax = true
  27. #
  28. # Multi-output load generator source using source-fed tables
  29. #
  30. > CREATE SOURCE auction_house
  31. IN CLUSTER ${arg.single-replica-cluster}
  32. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301);
  33. > CREATE TABLE bids FROM SOURCE auction_house (REFERENCE "auction"."bids");
  34. > SELECT count(*) FROM bids
  35. 255
  36. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'bids';
  37. running
  38. # Create another table from the same bids upstream using a less qualified reference
  39. > CREATE TABLE bids2 FROM SOURCE auction_house (REFERENCE "bids");
  40. > SHOW TABLES ON auction_house;
  41. bids ""
  42. bids2 ""
  43. > DROP TABLE bids;
  44. > SELECT count(*) FROM bids2
  45. 255
  46. # Validate that the available source references table was correctly populated
  47. > SELECT u.name, u.namespace, u.columns
  48. FROM mz_sources s
  49. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  50. WHERE
  51. s.name = 'auction_house'
  52. ORDER BY u.name;
  53. accounts auction {id,org_id,balance}
  54. auctions auction {id,seller,item,end_time}
  55. bids auction {id,buyer,auction_id,amount,bid_time}
  56. organizations auction {id,name}
  57. users auction {id,org_id,name}
  58. > DROP SOURCE auction_house CASCADE;
  59. > SELECT count(*) FROM mz_internal.mz_source_references;
  60. 0
  61. #
  62. # Single-output load generator source using source-fed tables
  63. #
  64. > CREATE SOURCE counter
  65. IN CLUSTER ${arg.single-replica-cluster}
  66. FROM LOAD GENERATOR COUNTER (AS OF 4, UP TO 5);
  67. > CREATE TABLE counter_1 FROM SOURCE counter (REFERENCE "counter");
  68. > CREATE TABLE counter_2 FROM SOURCE counter (REFERENCE "counter");
  69. > SELECT count(*) from counter_1;
  70. 5
  71. > SELECT count(*) from counter_2;
  72. 5
  73. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'counter_2';
  74. running
  75. > SHOW TABLES ON counter;
  76. counter_1 ""
  77. counter_2 ""
  78. > DROP SOURCE counter CASCADE;
  79. #
  80. # Postgres source using source-fed tables
  81. #
  82. > CREATE SECRET pgpass AS 'postgres'
  83. > CREATE CONNECTION pg TO POSTGRES (
  84. HOST postgres,
  85. DATABASE postgres,
  86. USER postgres,
  87. PASSWORD SECRET pgpass
  88. )
  89. $ postgres-execute connection=postgres://postgres:postgres@postgres
  90. ALTER USER postgres WITH replication;
  91. DROP SCHEMA IF EXISTS public CASCADE;
  92. CREATE SCHEMA public;
  93. DROP PUBLICATION IF EXISTS mz_source;
  94. CREATE PUBLICATION mz_source FOR ALL TABLES;
  95. CREATE TYPE an_enum AS ENUM ('var0', 'var1');
  96. CREATE TABLE pg_table (a an_enum, b INTEGER);
  97. INSERT INTO pg_table VALUES ('var1', 1234), ('var0', 0);
  98. ALTER TABLE pg_table REPLICA IDENTITY FULL;
  99. > CREATE SOURCE pg_source
  100. IN CLUSTER ${arg.single-replica-cluster}
  101. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  102. > SHOW SUBSOURCES ON pg_source
  103. pg_source_progress progress
  104. ! CREATE TABLE pg_table_1 FROM SOURCE pg_source (REFERENCE "pg_table");
  105. contains:referenced tables use unsupported types
  106. > CREATE TABLE pg_table_1 FROM SOURCE pg_source (REFERENCE "pg_table") WITH (TEXT COLUMNS = (a));
  107. > SELECT * FROM pg_table_1;
  108. var0 0
  109. var1 1234
  110. $ postgres-execute connection=postgres://postgres:postgres@postgres
  111. INSERT INTO pg_table VALUES ('var1', 5678), ('var0', 1);
  112. > SELECT * FROM pg_table_1;
  113. var0 0
  114. var1 1234
  115. var0 1
  116. var1 5678
  117. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'pg_table_1';
  118. running
  119. > CREATE TABLE pg_table_2 FROM SOURCE pg_source (REFERENCE "pg_table") WITH (TEXT COLUMNS = (a));
  120. > SELECT * FROM pg_table_2;
  121. var0 0
  122. var1 1234
  123. var0 1
  124. var1 5678
  125. $ postgres-execute connection=postgres://postgres:postgres@postgres
  126. ALTER TABLE pg_table ADD COLUMN c INTEGER;
  127. INSERT INTO pg_table VALUES ('var0', 5555, 6666), ('var1', 4444, 12);
  128. > CREATE TABLE pg_table_3 FROM SOURCE pg_source (REFERENCE "public"."pg_table") WITH (TEXT COLUMNS = (a));
  129. > SELECT * FROM pg_table_2;
  130. var0 0
  131. var1 1234
  132. var0 1
  133. var1 5678
  134. var0 5555
  135. var1 4444
  136. > SELECT * FROM pg_table_3;
  137. var0 0 <null>
  138. var1 1234 <null>
  139. var0 1 <null>
  140. var1 5678 <null>
  141. var0 5555 6666
  142. var1 4444 12
  143. > CREATE TABLE pg_table_4 (a INTEGER);
  144. ! CREATE TABLE pg_table_4 FROM SOURCE pg_source (REFERENCE "public"."pg_table") WITH (TEXT COLUMNS = (a));
  145. contains:already exists
  146. ! CREATE TABLE pg_table_5 FROM SOURCE pg_source (REFERENCE "public"."pg_table_5000");
  147. contains:not found in source
  148. $ postgres-execute connection=postgres://postgres:postgres@postgres
  149. CREATE TABLE pg_table_added_later (a INTEGER);
  150. INSERT INTO pg_table_added_later VALUES (555), (666);
  151. ALTER TABLE pg_table_added_later REPLICA IDENTITY FULL;
  152. > CREATE TABLE pg_table_6a FROM SOURCE pg_source (REFERENCE "public"."pg_table_added_later");
  153. > CREATE TABLE pg_table_6b FROM SOURCE pg_source (REFERENCE "public"."pg_table_added_later");
  154. > SELECT * FROM pg_table_6a;
  155. 555
  156. 666
  157. $ postgres-execute connection=postgres://postgres:postgres@postgres
  158. INSERT INTO pg_table_added_later VALUES (777);
  159. > SELECT * FROM pg_table_6a;
  160. 555
  161. 666
  162. 777
  163. > SELECT * FROM pg_table_6b;
  164. 555
  165. 666
  166. 777
  167. > SHOW TABLES on pg_source;
  168. pg_table_1 ""
  169. pg_table_2 ""
  170. pg_table_3 ""
  171. pg_table_6a ""
  172. pg_table_6b ""
  173. > DROP SOURCE pg_source CASCADE;
  174. #
  175. # MySql source using source-fed tables
  176. #
  177. > CREATE SECRET mysqlpass AS 'p@ssw0rd';
  178. > CREATE CONNECTION mysql_conn TO MYSQL (
  179. HOST mysql,
  180. USER root,
  181. PASSWORD SECRET mysqlpass
  182. )
  183. $ mysql-connect name=mysql url=mysql://root@mysql password=p@ssw0rd
  184. $ mysql-execute name=mysql
  185. DROP DATABASE IF EXISTS public;
  186. CREATE DATABASE public;
  187. USE public;
  188. CREATE TABLE mysql_table (a ENUM ('var0', 'var1'), b INTEGER);
  189. INSERT INTO mysql_table VALUES ('var1', 1234), ('var0', 0);
  190. > CREATE SOURCE mysql_source
  191. IN CLUSTER ${arg.single-replica-cluster}
  192. FROM MYSQL CONNECTION mysql_conn;
  193. > SHOW SUBSOURCES ON mysql_source
  194. mysql_source_progress progress
  195. ! CREATE TABLE mysql_table_1 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table");
  196. contains:referenced tables use unsupported types
  197. > CREATE TABLE mysql_table_1 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  198. > SELECT * FROM mysql_table_1;
  199. var0 0
  200. var1 1234
  201. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mysql_table_1';
  202. running
  203. $ mysql-execute name=mysql
  204. INSERT INTO mysql_table VALUES ('var1', 5678), ('var0', 1);
  205. > SELECT * FROM mysql_table_1;
  206. var0 0
  207. var1 1234
  208. var0 1
  209. var1 5678
  210. > CREATE TABLE mysql_table_2 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  211. > SELECT * FROM mysql_table_2;
  212. var0 0
  213. var1 1234
  214. var0 1
  215. var1 5678
  216. > DROP TABLE mysql_table_1;
  217. $ mysql-execute name=mysql
  218. ALTER TABLE mysql_table ADD COLUMN c INTEGER;
  219. INSERT INTO mysql_table VALUES ('var0', 5555, 6666), ('var1', 4444, 12);
  220. > CREATE TABLE mysql_table_3 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  221. > SELECT * FROM mysql_table_2;
  222. var0 0
  223. var1 1234
  224. var0 1
  225. var1 5678
  226. var0 5555
  227. var1 4444
  228. > SELECT * FROM mysql_table_3;
  229. var0 0 <null>
  230. var1 1234 <null>
  231. var0 1 <null>
  232. var1 5678 <null>
  233. var0 5555 6666
  234. var1 4444 12
  235. # Validate that the available source references table reflects the state of the upstream
  236. # when the create source was run
  237. > SELECT u.name, u.namespace, u.columns
  238. FROM mz_sources s
  239. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  240. WHERE
  241. s.name = 'mysql_source'
  242. ORDER BY u.name;
  243. mysql_table public {a,b}
  244. # Create a new table in MySQL and refresh to see if the new available reference is picked up
  245. $ mysql-execute name=mysql
  246. USE public;
  247. CREATE TABLE mysql_table_new (foo INTEGER, bar INTEGER);
  248. > ALTER SOURCE mysql_source REFRESH REFERENCES;
  249. > SELECT u.name, u.namespace, u.columns
  250. FROM mz_sources s
  251. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  252. WHERE
  253. s.name = 'mysql_source'
  254. ORDER BY u.name;
  255. mysql_table public {a,b,c}
  256. mysql_table_new public {foo,bar}
  257. > SHOW TABLES ON mysql_source;
  258. mysql_table_2 ""
  259. mysql_table_3 ""
  260. > DROP SOURCE mysql_source CASCADE;
  261. # ensure that CASCADE propagates to the tables
  262. ! SELECT * FROM mysql_table_3;
  263. contains:unknown catalog item 'mysql_table_3'
  264. > SELECT count(*) FROM mz_internal.mz_source_references;
  265. 0
  266. #
  267. # Kafka source using source-fed tables
  268. #
  269. $ set keyschema={
  270. "type": "record",
  271. "name": "Key",
  272. "fields": [
  273. {"name": "key", "type": "string"}
  274. ]
  275. }
  276. $ set schema={
  277. "type" : "record",
  278. "name" : "test",
  279. "fields" : [
  280. {"name":"f1", "type":"string"},
  281. {"name":"f2", "type":"long"}
  282. ]
  283. }
  284. > CREATE CONNECTION kafka_conn
  285. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  286. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  287. URL '${testdrive.schema-registry-url}'
  288. );
  289. $ kafka-create-topic topic=avroavro
  290. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  291. {"key": "fish"} {"f1": "fish", "f2": 1000}
  292. {"key": "bird1"} {"f1":"goose", "f2": 1}
  293. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  294. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  295. {"key": "bird1"}
  296. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  297. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  298. {"key": "mammal1"}
  299. {"key": "mammalmore"} {"f1":"moose", "f2": 2}
  300. > CREATE SOURCE avro_source
  301. IN CLUSTER ${arg.single-replica-cluster}
  302. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}');
  303. > CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  304. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  305. ENVELOPE UPSERT
  306. > CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  307. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  308. ENVELOPE NONE
  309. > CREATE TABLE avro_table_append_cols (a, b) FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  310. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  311. ENVELOPE NONE
  312. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'avro_table_upsert';
  313. running
  314. > SELECT * from avro_table_upsert
  315. key f1 f2
  316. ---------------------------
  317. fish fish 1000
  318. birdmore geese 56
  319. mammalmore moose 2
  320. > SELECT * from avro_table_append
  321. f1 f2
  322. ---------------
  323. fish 1000
  324. geese 2
  325. geese 56
  326. goose 1
  327. moose 1
  328. moose 2
  329. moose 42
  330. > SELECT * from avro_table_append_cols
  331. a b
  332. ---------------
  333. fish 1000
  334. geese 2
  335. geese 56
  336. goose 1
  337. moose 1
  338. moose 2
  339. moose 42
  340. > SHOW TABLES ON avro_source;
  341. avro_table_append ""
  342. avro_table_append_cols ""
  343. avro_table_upsert ""
  344. > DROP SOURCE avro_source CASCADE
  345. #
  346. # Key-value load generator source using source-fed tables
  347. #
  348. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  349. ALTER SYSTEM SET enable_create_table_from_source = true
  350. ALTER SYSTEM SET enable_load_generator_key_value = true
  351. ALTER SYSTEM SET force_source_table_syntax = true
  352. > CREATE SOURCE keyvalue
  353. IN CLUSTER ${arg.single-replica-cluster}
  354. FROM LOAD GENERATOR KEY VALUE (
  355. KEYS 16,
  356. PARTITIONS 4,
  357. SNAPSHOT ROUNDS 3,
  358. SEED 123,
  359. VALUE SIZE 10,
  360. BATCH SIZE 2,
  361. TICK INTERVAL '1s'
  362. );
  363. > CREATE TABLE kv_1 FROM SOURCE keyvalue INCLUDE KEY ENVELOPE UPSERT;
  364. > CREATE TABLE kv_2 FROM SOURCE keyvalue INCLUDE KEY ENVELOPE NONE;
  365. > SELECT partition, count(*) FROM kv_1 GROUP BY partition
  366. 0 4
  367. 1 4
  368. 2 4
  369. 3 4
  370. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'kv_1';
  371. running
  372. > SELECT partition, count(*) > 10 FROM kv_2 GROUP BY partition
  373. 0 true
  374. 1 true
  375. 2 true
  376. 3 true
  377. > SHOW TABLES ON keyvalue;
  378. kv_1 ""
  379. kv_2 ""
  380. > DROP SOURCE keyvalue CASCADE;
  381. #
  382. # Force usage of the new syntax and check that old statements are disallowed
  383. #
  384. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  385. ALTER SYSTEM SET force_source_table_syntax = true
  386. ! CREATE SOURCE auction_legacy
  387. IN CLUSTER ${arg.single-replica-cluster}
  388. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301)
  389. FOR TABLES (bids AS bids_3);
  390. contains:not supported; use CREATE TABLE .. FROM SOURCE instead
  391. ! CREATE SOURCE pg_source_2
  392. IN CLUSTER ${arg.single-replica-cluster}
  393. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source', TEXT COLUMNS = (public.pg_table.a))
  394. FOR ALL TABLES
  395. contains:not supported; use CREATE TABLE .. FROM SOURCE instead
  396. ! CREATE SOURCE mysql_source_2
  397. IN CLUSTER ${arg.single-replica-cluster}
  398. FROM MYSQL CONNECTION mysql_conn
  399. FOR SCHEMAS (public);
  400. contains:not supported; use CREATE TABLE .. FROM SOURCE instead
  401. ! CREATE SOURCE avro_source_2
  402. IN CLUSTER ${arg.single-replica-cluster}
  403. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
  404. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  405. ENVELOPE UPSERT;
  406. contains:not supported; use CREATE TABLE .. FROM SOURCE instead