source-tables.td 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-replica-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. #
  12. # Validate feature flag
  13. #
  14. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  15. ALTER SYSTEM SET enable_create_table_from_source = false
  16. > CREATE SOURCE auction_house
  17. IN CLUSTER ${arg.single-replica-cluster}
  18. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301) FOR ALL TABLES;
  19. ! CREATE TABLE bids2 FROM SOURCE auction_house (REFERENCE "auction"."bids");
  20. contains: not supported
  21. > DROP SOURCE auction_house CASCADE;
  22. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  23. ALTER SYSTEM SET enable_create_table_from_source = true
  24. ALTER SYSTEM SET enable_load_generator_key_value = true
  25. #
  26. # Multi-output load generator source using source-fed tables
  27. #
  28. > CREATE SOURCE auction_house
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM LOAD GENERATOR AUCTION (AS OF 300, UP TO 301);
  31. > CREATE TABLE bids FROM SOURCE auction_house (REFERENCE "auction"."bids");
  32. > SELECT count(*) FROM bids
  33. 255
  34. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'bids';
  35. running
  36. # Create another table from the same bids upstream using a less qualified reference
  37. > CREATE TABLE bids2 FROM SOURCE auction_house (REFERENCE "bids");
  38. > DROP TABLE bids;
  39. > SELECT count(*) FROM bids2
  40. 255
  41. # Validate that the available source references table was correctly populated
  42. > SELECT u.name, u.namespace, u.columns
  43. FROM mz_sources s
  44. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  45. WHERE
  46. s.name = 'auction_house'
  47. ORDER BY u.name;
  48. accounts auction {id,org_id,balance}
  49. auctions auction {id,seller,item,end_time}
  50. bids auction {id,buyer,auction_id,amount,bid_time}
  51. organizations auction {id,name}
  52. users auction {id,org_id,name}
  53. > DROP SOURCE auction_house CASCADE;
  54. > SELECT count(*) FROM mz_internal.mz_source_references;
  55. 0
  56. #
  57. # Single-output load generator source using source-fed tables
  58. #
  59. > CREATE SOURCE counter
  60. IN CLUSTER ${arg.single-replica-cluster}
  61. FROM LOAD GENERATOR COUNTER (AS OF 4, UP TO 5);
  62. > CREATE TABLE counter_1 FROM SOURCE counter (REFERENCE "counter");
  63. > CREATE TABLE counter_2 FROM SOURCE counter (REFERENCE "counter");
  64. > SELECT count(*) from counter_1;
  65. 5
  66. > SELECT count(*) from counter_2;
  67. 5
  68. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'counter_2';
  69. running
  70. > DROP SOURCE counter CASCADE;
  71. #
  72. # Postgres source using source-fed tables
  73. #
  74. > CREATE SECRET pgpass AS 'postgres'
  75. > CREATE CONNECTION pg TO POSTGRES (
  76. HOST postgres,
  77. DATABASE postgres,
  78. USER postgres,
  79. PASSWORD SECRET pgpass
  80. )
  81. $ postgres-execute connection=postgres://postgres:postgres@postgres
  82. ALTER USER postgres WITH replication;
  83. DROP SCHEMA IF EXISTS public CASCADE;
  84. CREATE SCHEMA public;
  85. DROP PUBLICATION IF EXISTS mz_source;
  86. CREATE PUBLICATION mz_source FOR ALL TABLES;
  87. CREATE TYPE an_enum AS ENUM ('var0', 'var1');
  88. CREATE TABLE pg_table (a an_enum, b INTEGER);
  89. INSERT INTO pg_table VALUES ('var1', 1234), ('var0', 0);
  90. ALTER TABLE pg_table REPLICA IDENTITY FULL;
  91. > CREATE SOURCE pg_source
  92. IN CLUSTER ${arg.single-replica-cluster}
  93. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  94. > SHOW SUBSOURCES ON pg_source
  95. pg_source_progress progress
  96. ! CREATE TABLE pg_table_1 FROM SOURCE pg_source (REFERENCE "pg_table");
  97. contains:referenced tables use unsupported types
  98. > CREATE TABLE pg_table_1 FROM SOURCE pg_source (REFERENCE "pg_table") WITH (TEXT COLUMNS = (a));
  99. > SELECT * FROM pg_table_1;
  100. var0 0
  101. var1 1234
  102. $ postgres-execute connection=postgres://postgres:postgres@postgres
  103. INSERT INTO pg_table VALUES ('var1', 5678), ('var0', 1);
  104. > SELECT * FROM pg_table_1;
  105. var0 0
  106. var1 1234
  107. var0 1
  108. var1 5678
  109. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'pg_table_1';
  110. running
  111. > CREATE TABLE pg_table_2 FROM SOURCE pg_source (REFERENCE "pg_table") WITH (TEXT COLUMNS = (a));
  112. > SELECT * FROM pg_table_2;
  113. var0 0
  114. var1 1234
  115. var0 1
  116. var1 5678
  117. $ postgres-execute connection=postgres://postgres:postgres@postgres
  118. ALTER TABLE pg_table ADD COLUMN c INTEGER;
  119. INSERT INTO pg_table VALUES ('var0', 5555, 6666), ('var1', 4444, 12);
  120. > CREATE TABLE pg_table_3 FROM SOURCE pg_source (REFERENCE "public"."pg_table") WITH (TEXT COLUMNS = (a));
  121. > SELECT * FROM pg_table_2;
  122. var0 0
  123. var1 1234
  124. var0 1
  125. var1 5678
  126. var0 5555
  127. var1 4444
  128. > SELECT * FROM pg_table_3;
  129. var0 0 <null>
  130. var1 1234 <null>
  131. var0 1 <null>
  132. var1 5678 <null>
  133. var0 5555 6666
  134. var1 4444 12
  135. > CREATE TABLE pg_table_4 (a INTEGER);
  136. ! CREATE TABLE pg_table_4 FROM SOURCE pg_source (REFERENCE "public"."pg_table") WITH (TEXT COLUMNS = (a));
  137. contains:already exists
  138. ! CREATE TABLE pg_table_5 FROM SOURCE pg_source (REFERENCE "public"."pg_table_5000");
  139. contains:not found in source
  140. $ postgres-execute connection=postgres://postgres:postgres@postgres
  141. CREATE TABLE pg_table_added_later (a INTEGER);
  142. INSERT INTO pg_table_added_later VALUES (555), (666);
  143. ALTER TABLE pg_table_added_later REPLICA IDENTITY FULL;
  144. > CREATE TABLE pg_table_6a FROM SOURCE pg_source (REFERENCE "public"."pg_table_added_later");
  145. > CREATE TABLE pg_table_6b FROM SOURCE pg_source (REFERENCE "public"."pg_table_added_later");
  146. > SELECT * FROM pg_table_6a;
  147. 555
  148. 666
  149. $ postgres-execute connection=postgres://postgres:postgres@postgres
  150. INSERT INTO pg_table_added_later VALUES (777);
  151. > SELECT * FROM pg_table_6a;
  152. 555
  153. 666
  154. 777
  155. > SELECT * FROM pg_table_6b;
  156. 555
  157. 666
  158. 777
  159. > DROP SOURCE pg_source CASCADE;
  160. #
  161. # MySql source using source-fed tables
  162. #
  163. > CREATE SECRET mysqlpass AS 'p@ssw0rd';
  164. > CREATE CONNECTION mysql_conn TO MYSQL (
  165. HOST mysql,
  166. USER root,
  167. PASSWORD SECRET mysqlpass
  168. )
  169. $ mysql-connect name=mysql url=mysql://root@mysql password=p@ssw0rd
  170. $ mysql-execute name=mysql
  171. DROP DATABASE IF EXISTS public;
  172. CREATE DATABASE public;
  173. USE public;
  174. CREATE TABLE mysql_table (a ENUM ('var0', 'var1'), b INTEGER);
  175. INSERT INTO mysql_table VALUES ('var1', 1234), ('var0', 0);
  176. > CREATE SOURCE mysql_source
  177. IN CLUSTER ${arg.single-replica-cluster}
  178. FROM MYSQL CONNECTION mysql_conn;
  179. > SHOW SUBSOURCES ON mysql_source
  180. mysql_source_progress progress
  181. ! CREATE TABLE mysql_table_1 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table");
  182. contains:referenced tables use unsupported types
  183. > CREATE TABLE mysql_table_1 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  184. > SELECT * FROM mysql_table_1;
  185. var0 0
  186. var1 1234
  187. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'mysql_table_1';
  188. running
  189. $ mysql-execute name=mysql
  190. INSERT INTO mysql_table VALUES ('var1', 5678), ('var0', 1);
  191. > SELECT * FROM mysql_table_1;
  192. var0 0
  193. var1 1234
  194. var0 1
  195. var1 5678
  196. > CREATE TABLE mysql_table_2 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  197. > SELECT * FROM mysql_table_2;
  198. var0 0
  199. var1 1234
  200. var0 1
  201. var1 5678
  202. > DROP TABLE mysql_table_1;
  203. $ mysql-execute name=mysql
  204. ALTER TABLE mysql_table ADD COLUMN c INTEGER;
  205. INSERT INTO mysql_table VALUES ('var0', 5555, 6666), ('var1', 4444, 12);
  206. > CREATE TABLE mysql_table_3 FROM SOURCE mysql_source (REFERENCE "public"."mysql_table") WITH (TEXT COLUMNS = (a));
  207. > SELECT * FROM mysql_table_2;
  208. var0 0
  209. var1 1234
  210. var0 1
  211. var1 5678
  212. var0 5555
  213. var1 4444
  214. > SELECT * FROM mysql_table_3;
  215. var0 0 <null>
  216. var1 1234 <null>
  217. var0 1 <null>
  218. var1 5678 <null>
  219. var0 5555 6666
  220. var1 4444 12
  221. # Validate that the available source references table reflects the state of the upstream
  222. # when the create source was run
  223. > SELECT u.name, u.namespace, u.columns
  224. FROM mz_sources s
  225. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  226. WHERE
  227. s.name = 'mysql_source'
  228. ORDER BY u.name;
  229. mysql_table public {a,b}
  230. # Create a new table in MySQL and refresh to see if the new available reference is picked up
  231. $ mysql-execute name=mysql
  232. USE public;
  233. CREATE TABLE mysql_table_new (foo INTEGER, bar INTEGER);
  234. > ALTER SOURCE mysql_source REFRESH REFERENCES;
  235. > SELECT u.name, u.namespace, u.columns
  236. FROM mz_sources s
  237. JOIN mz_internal.mz_source_references u ON s.id = u.source_id
  238. WHERE
  239. s.name = 'mysql_source'
  240. ORDER BY u.name;
  241. mysql_table public {a,b,c}
  242. mysql_table_new public {foo,bar}
  243. > DROP SOURCE mysql_source CASCADE;
  244. # ensure that CASCADE propagates to the tables
  245. ! SELECT * FROM mysql_table_3;
  246. contains:unknown catalog item 'mysql_table_3'
  247. > SELECT count(*) FROM mz_internal.mz_source_references;
  248. 0
  249. #
  250. # Kafka source using source-fed tables
  251. #
  252. $ set keyschema={
  253. "type": "record",
  254. "name": "Key",
  255. "fields": [
  256. {"name": "key", "type": "string"}
  257. ]
  258. }
  259. $ set schema={
  260. "type" : "record",
  261. "name" : "test",
  262. "fields" : [
  263. {"name":"f1", "type":"string"},
  264. {"name":"f2", "type":"long"}
  265. ]
  266. }
  267. > CREATE CONNECTION kafka_conn
  268. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  269. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  270. URL '${testdrive.schema-registry-url}'
  271. );
  272. $ kafka-create-topic topic=avroavro
  273. $ kafka-ingest format=avro topic=avroavro key-format=avro key-schema=${keyschema} schema=${schema}
  274. {"key": "fish"} {"f1": "fish", "f2": 1000}
  275. {"key": "bird1"} {"f1":"goose", "f2": 1}
  276. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  277. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  278. {"key": "bird1"}
  279. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  280. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  281. {"key": "mammal1"}
  282. {"key": "mammalmore"} {"f1":"moose", "f2": 2}
  283. > CREATE SOURCE avro_source
  284. IN CLUSTER ${arg.single-replica-cluster}
  285. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avroavro-${testdrive.seed}');
  286. > CREATE TABLE avro_table_upsert FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  287. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  288. ENVELOPE UPSERT
  289. > CREATE TABLE avro_table_append FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  290. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  291. ENVELOPE NONE
  292. > CREATE TABLE avro_table_append_cols (a, b) FROM SOURCE avro_source (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  293. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  294. ENVELOPE NONE
  295. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'avro_table_upsert';
  296. running
  297. > SELECT * from avro_table_upsert
  298. key f1 f2
  299. ---------------------------
  300. fish fish 1000
  301. birdmore geese 56
  302. mammalmore moose 2
  303. > SELECT * from avro_table_append
  304. f1 f2
  305. ---------------
  306. fish 1000
  307. geese 2
  308. geese 56
  309. goose 1
  310. moose 1
  311. moose 2
  312. moose 42
  313. > SELECT * from avro_table_append_cols
  314. a b
  315. ---------------
  316. fish 1000
  317. geese 2
  318. geese 56
  319. goose 1
  320. moose 1
  321. moose 2
  322. moose 42
  323. #
  324. # Key-value load generator source using source-fed tables
  325. #
  326. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  327. ALTER SYSTEM SET enable_create_table_from_source = true
  328. ALTER SYSTEM SET enable_load_generator_key_value = true
  329. > CREATE SOURCE keyvalue
  330. IN CLUSTER ${arg.single-replica-cluster}
  331. FROM LOAD GENERATOR KEY VALUE (
  332. KEYS 16,
  333. PARTITIONS 4,
  334. SNAPSHOT ROUNDS 3,
  335. SEED 123,
  336. VALUE SIZE 10,
  337. BATCH SIZE 2,
  338. TICK INTERVAL '1s'
  339. );
  340. > CREATE TABLE kv_1 FROM SOURCE keyvalue INCLUDE KEY ENVELOPE UPSERT;
  341. > CREATE TABLE kv_2 FROM SOURCE keyvalue INCLUDE KEY ENVELOPE NONE;
  342. > SELECT partition, count(*) FROM kv_1 GROUP BY partition
  343. 0 4
  344. 1 4
  345. 2 4
  346. 3 4
  347. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'kv_1';
  348. running
  349. > SELECT partition, count(*) > 10 FROM kv_2 GROUP BY partition
  350. 0 true
  351. 1 true
  352. 2 true
  353. 3 true
  354. > DROP SOURCE keyvalue CASCADE;