source-tables.td 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-replica-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. #
  12. # Validate feature flag
  13. #
  14. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  15. ALTER SYSTEM SET enable_create_table_from_source = false
  16. > CREATE SOURCE auction_house
  17. IN CLUSTER ${arg.single-replica-cluster}
  18. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301) FOR ALL TABLES;
  19. ! CREATE TABLE bids2 FROM SOURCE auction_house (REFERENCE "auction"."bids");
  20. contains: not supported
  21. > DROP SOURCE auction_house CASCADE;
  22. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  23. ALTER SYSTEM SET enable_create_table_from_source = true
  24. ALTER SYSTEM SET enable_load_generator_key_value = true
  25. #
  26. # Multi-output load generator source using source-fed tables
  27. #
  28. > CREATE SOURCE auction_house
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301);
  31. > CREATE TABLE bids FROM SOURCE auction_house (REFERENCE "auction"."bids");
  32. > SELECT count(*) FROM bids
  33. 255
  34. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'bids';
  35. running
  36. # Create another table from the same bids upstream using a less qualified reference
  37. > CREATE TABLE bids2 FROM SOURCE auction_house (REFERENCE "bids");
  38. > SHOW TABLES ON auction_house;
  39. bids ""
  40. bids2 ""
  41. > DROP TABLE bids;
  42. > SELECT count(*) FROM bids2
  43. 255
  44. # Validate that the available source references table was correctly populated
  45. > SELECT u.name, u.namespace, u.columns
  46. FROM mz_sources s
  47. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  48. WHERE
  49. s.name = 'auction_house'
  50. ORDER BY u.name;
  51. accounts auction {id,org_id,balance}
  52. auctions auction {id,seller,item,end_time}
  53. bids auction {id,buyer,auction_id,amount,bid_time}
  54. organizations auction {id,name}
  55. users auction {id,org_id,name}
  56. > DROP SOURCE auction_house CASCADE;
  57. > SELECT count(*) FROM mz_internal.mz_source_references;
  58. 0
  59. #
  60. # Single-output load generator source using source-fed tables
  61. #
  62. > CREATE SOURCE counter
  63. IN CLUSTER ${arg.single-replica-cluster}
  64. FROM LOAD GENERATOR COUNTER (AS OF 4, UP TO 5);
  65. > CREATE TABLE counter_1 FROM SOURCE counter (REFERENCE "counter");
  66. > CREATE TABLE counter_2 FROM SOURCE counter (REFERENCE "counter");
  67. > SELECT count(*) from counter_1;
  68. 5
  69. > SELECT count(*) from counter_2;
  70. 5
  71. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'counter_2';
  72. running
  73. > SHOW TABLES ON counter;
  74. counter_1 ""
  75. counter_2 ""
  76. > DROP SOURCE counter CASCADE;
  77. #
  78. # Postgres source using source-fed tables
  79. #
  80. > CREATE SECRET pgpass AS 'postgres'
  81. > CREATE CONNECTION pg TO POSTGRES (
  82. HOST postgres,
  83. DATABASE postgres,
  84. USER postgres,
  85. PASSWORD SECRET pgpass
  86. )
  87. $ postgres-execute connection=postgres://postgres:postgres@postgres
  88. ALTER USER postgres WITH replication;
  89. DROP SCHEMA IF EXISTS public CASCADE;
  90. CREATE SCHEMA public;
  91. DROP PUBLICATION IF EXISTS mz_source;
  92. CREATE PUBLICATION mz_source FOR ALL TABLES;
  93. CREATE TYPE an_enum AS ENUM ('var0', 'var1');
  94. CREATE TABLE pg_table (a an_enum, b INTEGER);
  95. INSERT INTO pg_table VALUES ('var1', 1234), ('var0', 0);
  96. ALTER TABLE pg_table REPLICA IDENTITY FULL;
  97. > CREATE SOURCE pg_source
  98. IN CLUSTER ${arg.single-replica-cluster}
  99. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  100. > SHOW SUBSOURCES ON pg_source
  101. pg_source_progress progress
  102. ! CREATE TABLE pg_table_1 FROM SOURCE pg_source (REFERENCE "pg_table");
  103. contains:referenced tables use unsupported types
  104. > CREATE TABLE pg_table_1 FROM SOURCE pg_source (REFERENCE "pg_table") WITH (TEXT COLUMNS = (a));
  105. > SELECT * FROM pg_table_1;
  106. var0 0
  107. var1 1234
  108. $ postgres-execute connection=postgres://postgres:postgres@postgres
  109. INSERT INTO pg_table VALUES ('var1', 5678), ('var0', 1);
  110. > SELECT * FROM pg_table_1;
  111. var0 0
  112. var1 1234
  113. var0 1
  114. var1 5678
  115. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'pg_table_1';
  116. running
  117. > CREATE TABLE pg_table_2 FROM SOURCE pg_source (REFERENCE "pg_table") WITH (TEXT COLUMNS = (a));
  118. > SELECT * FROM pg_table_2;
  119. var0 0
  120. var1 1234
  121. var0 1
  122. var1 5678
  123. $ postgres-execute connection=postgres://postgres:postgres@postgres
  124. ALTER TABLE pg_table ADD COLUMN c INTEGER;
  125. INSERT INTO pg_table VALUES ('var0', 5555, 6666), ('var1', 4444, 12);
  126. > CREATE TABLE pg_table_3 FROM SOURCE pg_source (REFERENCE "public"."pg_table") WITH (TEXT COLUMNS = (a));
  127. > SELECT * FROM pg_table_2;
  128. var0 0
  129. var1 1234
  130. var0 1
  131. var1 5678
  132. var0 5555
  133. var1 4444
  134. > SELECT * FROM pg_table_3;
  135. var0 0 <null>
  136. var1 1234 <null>
  137. var0 1 <null>
  138. var1 5678 <null>
  139. var0 5555 6666
  140. var1 4444 12
  141. > CREATE TABLE pg_table_4 (a INTEGER);
  142. ! CREATE TABLE pg_table_4 FROM SOURCE pg_source (REFERENCE "public"."pg_table") WITH (TEXT COLUMNS = (a));
  143. contains:already exists
  144. ! CREATE TABLE pg_table_5 FROM SOURCE pg_source (REFERENCE "public"."pg_table_5000");
  145. contains:not found in source
  146. $ postgres-execute connection=postgres://postgres:postgres@postgres
  147. CREATE TABLE pg_table_added_later (a INTEGER);
  148. INSERT INTO pg_table_added_later VALUES (555), (666);
  149. ALTER TABLE pg_table_added_later REPLICA IDENTITY FULL;
  150. > CREATE TABLE pg_table_6a FROM SOURCE pg_source (REFERENCE "public"."pg_table_added_later");
  151. > CREATE TABLE pg_table_6b FROM SOURCE pg_source (REFERENCE "public"."pg_table_added_later");
  152. > SELECT * FROM pg_table_6a;
  153. 555
  154. 666
  155. $ postgres-execute connection=postgres://postgres:postgres@postgres
  156. INSERT INTO pg_table_added_later VALUES (777);
  157. > SELECT * FROM pg_table_6a;
  158. 555
  159. 666
  160. 777
  161. > SELECT * FROM pg_table_6b;
  162. 555
  163. 666
  164. 777
  165. > SHOW TABLES on pg_source;
  166. pg_table_1 ""
  167. pg_table_2 ""
  168. pg_table_3 ""
  169. pg_table_6a ""
  170. pg_table_6b ""
  171. > DROP SOURCE pg_source CASCADE;
  172. #
  173. # MySql source using source-fed tables
  174. #
  175. > CREATE SECRET mysqlpass AS 'p@ssw0rd';
  176. > CREATE CONNECTION mysql_conn TO MYSQL (
  177. HOST mysql,
  178. USER root,
  179. PASSWORD SECRET mysqlpass
  180. )
  181. $ mysql-connect name=mysql url=mysql://root@mysql password=p@ssw0rd
  182. $ mysql-execute name=mysql
  183. DROP DATABASE IF EXISTS public;
  184. CREATE DATABASE public;
  185. USE public;
  186. CREATE TABLE mysql_table (a ENUM ('var0', 'var1'), b INTEGER);
  187. INSERT INTO mysql_table VALUES ('var1', 1234), ('var0', 0);
  188. > CREATE SOURCE mysql_source
  189. IN CLUSTER ${arg.single-replica-cluster}
  190. FROM MYSQL CONNECTION mysql_conn;
  191. > SHOW SUBSOURCES ON mysql_source
  192. mysql_source_progress progress
  193. ! CREATE TABLE mysql_table_1 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table");
  194. contains:referenced tables use unsupported types
  195. > CREATE TABLE mysql_table_1 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  196. > SELECT * FROM mysql_table_1;
  197. var0 0
  198. var1 1234
  199. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mysql_table_1';
  200. running
  201. $ mysql-execute name=mysql
  202. INSERT INTO mysql_table VALUES ('var1', 5678), ('var0', 1);
  203. > SELECT * FROM mysql_table_1;
  204. var0 0
  205. var1 1234
  206. var0 1
  207. var1 5678
  208. > CREATE TABLE mysql_table_2 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  209. > SELECT * FROM mysql_table_2;
  210. var0 0
  211. var1 1234
  212. var0 1
  213. var1 5678
  214. > DROP TABLE mysql_table_1;
  215. $ mysql-execute name=mysql
  216. ALTER TABLE mysql_table ADD COLUMN c INTEGER;
  217. INSERT INTO mysql_table VALUES ('var0', 5555, 6666), ('var1', 4444, 12);
  218. > CREATE TABLE mysql_table_3 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  219. > SELECT * FROM mysql_table_2;
  220. var0 0
  221. var1 1234
  222. var0 1
  223. var1 5678
  224. var0 5555
  225. var1 4444
  226. > SELECT * FROM mysql_table_3;
  227. var0 0 <null>
  228. var1 1234 <null>
  229. var0 1 <null>
  230. var1 5678 <null>
  231. var0 5555 6666
  232. var1 4444 12
  233. # Validate that the available source references table reflects the state of the upstream
  234. # when the create source was run
  235. > SELECT u.name, u.namespace, u.columns
  236. FROM mz_sources s
  237. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  238. WHERE
  239. s.name = 'mysql_source'
  240. ORDER BY u.name;
  241. mysql_table public {a,b}
  242. # Create a new table in MySQL and refresh to see if the new available reference is picked up
  243. $ mysql-execute name=mysql
  244. USE public;
  245. CREATE TABLE mysql_table_new (foo INTEGER, bar INTEGER);
  246. > ALTER SOURCE mysql_source REFRESH REFERENCES;
  247. > SELECT u.name, u.namespace, u.columns
  248. FROM mz_sources s
  249. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  250. WHERE
  251. s.name = 'mysql_source'
  252. ORDER BY u.name;
  253. mysql_table public {a,b,c}
  254. mysql_table_new public {foo,bar}
  255. > SHOW TABLES ON mysql_source;
  256. mysql_table_2 ""
  257. mysql_table_3 ""
  258. > DROP SOURCE mysql_source CASCADE;
  259. # ensure that CASCADE propagates to the tables
  260. ! SELECT * FROM mysql_table_3;
  261. contains:unknown catalog item 'mysql_table_3'
  262. > SELECT count(*) FROM mz_internal.mz_source_references;
  263. 0
  264. #
  265. # Kafka source using source-fed tables
  266. #
  267. $ set keyschema={
  268. "type": "record",
  269. "name": "Key",
  270. "fields": [
  271. {"name": "key", "type": "string"}
  272. ]
  273. }
  274. $ set schema={
  275. "type" : "record",
  276. "name" : "test",
  277. "fields" : [
  278. {"name":"f1", "type":"string"},
  279. {"name":"f2", "type":"long"}
  280. ]
  281. }
  282. > CREATE CONNECTION kafka_conn
  283. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  284. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  285. URL '${testdrive.schema-registry-url}'
  286. );
  287. $ kafka-create-topic topic=avroavro
  288. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  289. {"key": "fish"} {"f1": "fish", "f2": 1000}
  290. {"key": "bird1"} {"f1":"goose", "f2": 1}
  291. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  292. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  293. {"key": "bird1"}
  294. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  295. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  296. {"key": "mammal1"}
  297. {"key": "mammalmore"} {"f1":"moose", "f2": 2}
  298. > CREATE SOURCE avro_source
  299. IN CLUSTER ${arg.single-replica-cluster}
  300. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}');
  301. > CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  302. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  303. ENVELOPE UPSERT
  304. > CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  305. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  306. ENVELOPE NONE
  307. > CREATE TABLE avro_table_append_cols (a, b) FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  308. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  309. ENVELOPE NONE
  310. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'avro_table_upsert';
  311. running
  312. > SELECT * from avro_table_upsert
  313. key f1 f2
  314. ---------------------------
  315. fish fish 1000
  316. birdmore geese 56
  317. mammalmore moose 2
  318. > SELECT * from avro_table_append
  319. f1 f2
  320. ---------------
  321. fish 1000
  322. geese 2
  323. geese 56
  324. goose 1
  325. moose 1
  326. moose 2
  327. moose 42
  328. > SELECT * from avro_table_append_cols
  329. a b
  330. ---------------
  331. fish 1000
  332. geese 2
  333. geese 56
  334. goose 1
  335. moose 1
  336. moose 2
  337. moose 42
  338. > SHOW TABLES ON avro_source;
  339. avro_table_append ""
  340. avro_table_append_cols ""
  341. avro_table_upsert ""
  342. > DROP SOURCE avro_source CASCADE
  343. #
  344. # Key-value load generator source using source-fed tables
  345. #
  346. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  347. ALTER SYSTEM SET enable_create_table_from_source = true
  348. ALTER SYSTEM SET enable_load_generator_key_value = true
  349. > CREATE SOURCE keyvalue
  350. IN CLUSTER ${arg.single-replica-cluster}
  351. FROM LOAD GENERATOR KEY VALUE (
  352. KEYS 16,
  353. PARTITIONS 4,
  354. SNAPSHOT ROUNDS 3,
  355. SEED 123,
  356. VALUE SIZE 10,
  357. BATCH SIZE 2,
  358. TICK INTERVAL '1s'
  359. );
  360. > CREATE TABLE kv_1 FROM SOURCE keyvalue INCLUDE KEY ENVELOPE UPSERT;
  361. > CREATE TABLE kv_2 FROM SOURCE keyvalue INCLUDE KEY ENVELOPE NONE;
  362. > SELECT partition, count(*) FROM kv_1 GROUP BY partition
  363. 0 4
  364. 1 4
  365. 2 4
  366. 3 4
  367. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'kv_1';
  368. running
  369. > SELECT partition, count(*) > 10 FROM kv_2 GROUP BY partition
  370. 0 true
  371. 1 true
  372. 2 true
  373. 3 true
  374. > SHOW TABLES ON keyvalue;
  375. kv_1 ""
  376. kv_2 ""
  377. > DROP SOURCE keyvalue CASCADE;
  378. #
  379. # Force usage of the new syntax and check that old statements are disallowed
  380. #
  381. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  382. ALTER SYSTEM SET force_source_table_syntax = true
  383. ! CREATE SOURCE auction_legacy
  384. IN CLUSTER ${arg.single-replica-cluster}
  385. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301)
  386. FOR TABLES (bids AS bids_3);
  387. contains:not supported; use CREATE TABLE .. FROM SOURCE instead
  388. ! CREATE SOURCE pg_source_2
  389. IN CLUSTER ${arg.single-replica-cluster}
  390. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source', TEXT COLUMNS = (public.pg_table.a))
  391. FOR ALL TABLES
  392. contains:not supported; use CREATE TABLE .. FROM SOURCE instead
  393. ! CREATE SOURCE mysql_source_2
  394. IN CLUSTER ${arg.single-replica-cluster}
  395. FROM MYSQL CONNECTION mysql_conn
  396. FOR SCHEMAS (public);
  397. contains:not supported; use CREATE TABLE .. FROM SOURCE instead
  398. ! CREATE SOURCE avro_source_2
  399. IN CLUSTER ${arg.single-replica-cluster}
  400. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}')
  401. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  402. ENVELOPE UPSERT;
  403. contains:not supported; use CREATE TABLE .. FROM SOURCE instead