mzcompose.py 86 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Explicit deterministic tests for read-only mode and zero downtime deploys (same
  11. version, no upgrade).
  12. """
  13. import time
  14. from datetime import datetime, timedelta
  15. from textwrap import dedent
  16. from threading import Thread
  17. from psycopg.errors import OperationalError
  18. from materialize import buildkite
  19. from materialize.mzcompose import get_default_system_parameters
  20. from materialize.mzcompose.composition import Composition
  21. from materialize.mzcompose.services.kafka import Kafka
  22. from materialize.mzcompose.services.materialized import (
  23. LEADER_STATUS_HEALTHCHECK,
  24. DeploymentStatus,
  25. Materialized,
  26. )
  27. from materialize.mzcompose.services.mysql import MySql
  28. from materialize.mzcompose.services.mz import Mz
  29. from materialize.mzcompose.services.postgres import (
  30. CockroachOrPostgresMetadata,
  31. Postgres,
  32. )
  33. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  34. from materialize.mzcompose.services.testdrive import Testdrive
  35. from materialize.mzcompose.services.zookeeper import Zookeeper
  36. from materialize.ui import CommandFailureCausedUIError
  37. DEFAULT_TIMEOUT = "300s"
  38. SYSTEM_PARAMETER_DEFAULTS = get_default_system_parameters(zero_downtime=True)
  39. SERVICES = [
  40. MySql(),
  41. Postgres(),
  42. Zookeeper(),
  43. Kafka(),
  44. SchemaRegistry(),
  45. CockroachOrPostgresMetadata(),
  46. Mz(app_password=""),
  47. Materialized(
  48. name="mz_old",
  49. sanity_restart=False,
  50. deploy_generation=0,
  51. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  52. external_metadata_store=True,
  53. default_replication_factor=2,
  54. ),
  55. Materialized(
  56. name="mz_new",
  57. sanity_restart=False,
  58. deploy_generation=1,
  59. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  60. restart="on-failure",
  61. external_metadata_store=True,
  62. default_replication_factor=2,
  63. ),
  64. Testdrive(
  65. materialize_url="postgres://materialize@mz_old:6875",
  66. materialize_url_internal="postgres://materialize@mz_old:6877",
  67. mz_service="mz_old",
  68. materialize_params={"cluster": "cluster"},
  69. no_reset=True,
  70. seed=1,
  71. default_timeout=DEFAULT_TIMEOUT,
  72. ),
  73. ]
  74. def workflow_default(c: Composition) -> None:
  75. def process(name: str) -> None:
  76. if name == "default":
  77. return
  78. with c.test_case(name):
  79. c.workflow(name)
  80. workflows = buildkite.shard_list(
  81. list(c.workflows.keys()), lambda workflow: workflow
  82. )
  83. c.test_parts(workflows, process)
  84. def workflow_read_only(c: Composition) -> None:
  85. """Verify read-only mode."""
  86. c.down(destroy_volumes=True)
  87. c.up(
  88. "zookeeper",
  89. "kafka",
  90. "schema-registry",
  91. "postgres",
  92. "mysql",
  93. "mz_old",
  94. {"name": "testdrive", "persistent": True},
  95. )
  96. # Make sure cluster is owned by the system so it doesn't get dropped
  97. # between testdrive runs.
  98. c.sql(
  99. """
  100. DROP CLUSTER IF EXISTS cluster CASCADE;
  101. CREATE CLUSTER cluster SIZE '2-1';
  102. GRANT ALL ON CLUSTER cluster TO materialize;
  103. ALTER SYSTEM SET cluster = cluster;
  104. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  105. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  106. """,
  107. service="mz_old",
  108. port=6877,
  109. user="mz_system",
  110. )
  111. # Inserts should be reflected when writes are allowed.
  112. c.testdrive(
  113. dedent(
  114. f"""
  115. > SET CLUSTER = cluster;
  116. > CREATE TABLE t (a int, b int);
  117. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL = 'PLAINTEXT';
  118. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  119. > CREATE SINK kafka_sink
  120. IN CLUSTER cluster
  121. FROM t
  122. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${{testdrive.seed}}')
  123. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  124. ENVELOPE DEBEZIUM;
  125. > INSERT INTO t VALUES (1, 2);
  126. > CREATE INDEX t_idx ON t (a, b);
  127. > CREATE MATERIALIZED VIEW mv AS SELECT sum(a) FROM t;
  128. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  129. > SELECT * FROM mv;
  130. 1
  131. > SELECT max(b) FROM t;
  132. 2
  133. $ kafka-create-topic topic=kafka
  134. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  135. key1A,key1B:value1A,value1B
  136. > CREATE SOURCE kafka_source
  137. IN CLUSTER cluster
  138. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-${{testdrive.seed}}');
  139. > CREATE TABLE kafka_source_tbl (key1, key2, value1, value2)
  140. FROM SOURCE kafka_source (REFERENCE "testdrive-kafka-${{testdrive.seed}}")
  141. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  142. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  143. ENVELOPE UPSERT;
  144. > SELECT * FROM kafka_source_tbl
  145. key1A key1B value1A value1B
  146. $ postgres-execute connection=postgres://postgres:postgres@postgres
  147. CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
  148. ALTER USER postgres1 WITH replication;
  149. DROP PUBLICATION IF EXISTS postgres_source;
  150. DROP TABLE IF EXISTS postgres_source_table;
  151. CREATE TABLE postgres_source_table (f1 TEXT, f2 INTEGER);
  152. ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;
  153. INSERT INTO postgres_source_table SELECT 'A', 0;
  154. CREATE PUBLICATION postgres_source FOR ALL TABLES;
  155. > CREATE SECRET pgpass AS 'postgres';
  156. > CREATE CONNECTION pg FOR POSTGRES
  157. HOST 'postgres',
  158. DATABASE postgres,
  159. USER postgres1,
  160. PASSWORD SECRET pgpass;
  161. > CREATE SOURCE postgres_source
  162. IN CLUSTER cluster
  163. FROM POSTGRES CONNECTION pg
  164. (PUBLICATION 'postgres_source');
  165. > CREATE TABLE postgres_source_table FROM SOURCE postgres_source (REFERENCE postgres_source_table)
  166. > SELECT * FROM postgres_source_table;
  167. A 0
  168. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  169. $ mysql-execute name=mysql
  170. # create the database if it does not exist yet but do not drop it
  171. CREATE DATABASE IF NOT EXISTS public;
  172. USE public;
  173. CREATE USER mysql1 IDENTIFIED BY 'mysql';
  174. GRANT REPLICATION SLAVE ON *.* TO mysql1;
  175. GRANT ALL ON public.* TO mysql1;
  176. CREATE TABLE mysql_source_table (f1 VARCHAR(32), f2 INTEGER);
  177. INSERT INTO mysql_source_table VALUES ('A', 0);
  178. > CREATE SECRET mysqlpass AS 'mysql';
  179. > CREATE CONNECTION mysql TO MYSQL (
  180. HOST 'mysql',
  181. USER mysql1,
  182. PASSWORD SECRET mysqlpass);
  183. > CREATE SOURCE mysql_source
  184. IN CLUSTER cluster
  185. FROM MYSQL CONNECTION mysql;
  186. > CREATE TABLE mysql_source_table FROM SOURCE mysql_source (REFERENCE public.mysql_source_table);
  187. > SELECT * FROM mysql_source_table;
  188. A 0
  189. $ kafka-verify-topic sink=materialize.public.kafka_sink
  190. > CREATE SOURCE kafka_sink_source
  191. IN CLUSTER cluster
  192. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${{testdrive.seed}}')
  193. > CREATE TABLE kafka_sink_source_tbl FROM SOURCE kafka_sink_source (REFERENCE "testdrive-kafka-sink-${{testdrive.seed}}")
  194. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  195. ENVELOPE NONE
  196. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  197. <null> <null> 1 2
  198. > CREATE SOURCE webhook_source
  199. IN CLUSTER cluster_singlereplica
  200. FROM WEBHOOK BODY FORMAT TEXT
  201. $ webhook-append database=materialize schema=public name=webhook_source
  202. AAA
  203. > SELECT * FROM webhook_source
  204. AAA
  205. """
  206. )
  207. )
  208. # Restart in a new deploy generation, which will cause Materialize to
  209. # boot in read-only mode.
  210. with c.override(
  211. Materialized(
  212. name="mz_old",
  213. deploy_generation=1,
  214. external_metadata_store=True,
  215. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  216. default_replication_factor=2,
  217. )
  218. ):
  219. c.up("mz_old")
  220. c.testdrive(
  221. dedent(
  222. f"""
  223. $ webhook-append database=materialize schema=public name=webhook_source status=500
  224. BBB
  225. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  226. key2A,key2B:value2A,value2B
  227. $ postgres-execute connection=postgres://postgres:postgres@postgres
  228. INSERT INTO postgres_source_table VALUES ('B', 1);
  229. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  230. $ mysql-execute name=mysql
  231. USE public;
  232. INSERT INTO mysql_source_table VALUES ('B', 1);
  233. > SET CLUSTER = cluster;
  234. > SELECT 1
  235. 1
  236. ! INSERT INTO t VALUES (3, 4);
  237. contains: cannot write in read-only mode
  238. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  239. > SELECT * FROM mv;
  240. 1
  241. # TODO: Currently hangs
  242. # > SELECT max(b) FROM t;
  243. # 2
  244. > SELECT mz_unsafe.mz_sleep(5)
  245. <null>
  246. ! INSERT INTO t VALUES (5, 6);
  247. contains: cannot write in read-only mode
  248. > SELECT * FROM mv;
  249. 1
  250. ! DROP INDEX t_idx
  251. contains: cannot write in read-only mode
  252. ! CREATE INDEX t_idx2 ON t (a, b)
  253. contains: cannot write in read-only mode
  254. ! CREATE MATERIALIZED VIEW mv2 AS SELECT sum(a) FROM t;
  255. contains: cannot write in read-only mode
  256. $ set-regex match=(s\\d+|\\d{{13}}|[ ]{{12}}0|u\\d{{1,3}}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)) replacement=<>
  257. > EXPLAIN TIMESTAMP FOR SELECT * FROM mv;
  258. " query timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: true\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.mv (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User(6)])): [<> <>]\\n"
  259. > SELECT * FROM kafka_source_tbl
  260. key1A key1B value1A value1B
  261. > SELECT * FROM postgres_source_table
  262. A 0
  263. > SELECT * FROM mysql_source_table;
  264. A 0
  265. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  266. <null> <null> 1 2
  267. > SELECT * FROM webhook_source
  268. AAA
  269. """
  270. )
  271. )
  272. c.up("mz_old")
  273. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_old")
  274. c.promote_mz("mz_old")
  275. # After promotion, the deployment should boot with writes allowed.
  276. with c.override(
  277. Materialized(
  278. name="mz_old",
  279. healthcheck=[
  280. "CMD-SHELL",
  281. """[ "$(curl -f localhost:6878/api/leader/status)" = '{"status":"IsLeader"}' ]""",
  282. ],
  283. deploy_generation=1,
  284. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  285. external_metadata_store=True,
  286. default_replication_factor=2,
  287. )
  288. ):
  289. c.up("mz_old")
  290. c.testdrive(
  291. dedent(
  292. f"""
  293. $ webhook-append database=materialize schema=public name=webhook_source
  294. CCC
  295. > SET CLUSTER = cluster;
  296. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  297. > CREATE MATERIALIZED VIEW mv2 AS SELECT sum(a) FROM t;
  298. > SELECT * FROM mv;
  299. 1
  300. > SELECT * FROM mv2;
  301. 1
  302. > SELECT max(b) FROM t;
  303. 2
  304. > INSERT INTO t VALUES (7, 8);
  305. > SELECT * FROM mv;
  306. 8
  307. > SELECT * FROM mv2;
  308. 8
  309. > SELECT max(b) FROM t;
  310. 8
  311. > SELECT * FROM kafka_source_tbl
  312. key1A key1B value1A value1B
  313. key2A key2B value2A value2B
  314. > SELECT * FROM postgres_source_table
  315. A 0
  316. B 1
  317. > SELECT * FROM mysql_source_table;
  318. A 0
  319. B 1
  320. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  321. <null> <null> 1 2
  322. <null> <null> 7 8
  323. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  324. key3A,key3B:value3A,value3B
  325. $ postgres-execute connection=postgres://postgres:postgres@postgres
  326. INSERT INTO postgres_source_table VALUES ('C', 2);
  327. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  328. $ mysql-execute name=mysql
  329. USE public;
  330. INSERT INTO mysql_source_table VALUES ('C', 2);
  331. > SELECT * FROM kafka_source_tbl
  332. key1A key1B value1A value1B
  333. key2A key2B value2A value2B
  334. key3A key3B value3A value3B
  335. > SELECT * FROM postgres_source_table
  336. A 0
  337. B 1
  338. C 2
  339. > SELECT * FROM mysql_source_table;
  340. A 0
  341. B 1
  342. C 2
  343. > SELECT * FROM webhook_source
  344. AAA
  345. CCC
  346. """
  347. )
  348. )
  349. def workflow_basic(c: Composition) -> None:
  350. """Verify basic 0dt deployment flow."""
  351. c.down(destroy_volumes=True)
  352. c.up(
  353. "zookeeper",
  354. "kafka",
  355. "schema-registry",
  356. "postgres",
  357. "mysql",
  358. "mz_old",
  359. {"name": "testdrive", "persistent": True},
  360. )
  361. # Make sure cluster is owned by the system so it doesn't get dropped
  362. # between testdrive runs.
  363. c.sql(
  364. """
  365. DROP CLUSTER IF EXISTS cluster CASCADE;
  366. CREATE CLUSTER cluster SIZE '2-1';
  367. GRANT ALL ON CLUSTER cluster TO materialize;
  368. ALTER SYSTEM SET cluster = cluster;
  369. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  370. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  371. """,
  372. service="mz_old",
  373. port=6877,
  374. user="mz_system",
  375. )
  376. # Inserts should be reflected when writes are allowed.
  377. c.testdrive(
  378. dedent(
  379. f"""
  380. > SET CLUSTER = cluster;
  381. > CREATE TABLE t (a int, b int);
  382. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL = 'PLAINTEXT';
  383. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  384. > CREATE SINK kafka_sink
  385. IN CLUSTER cluster
  386. FROM t
  387. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${{testdrive.seed}}')
  388. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  389. ENVELOPE DEBEZIUM;
  390. > INSERT INTO t VALUES (1, 2);
  391. > CREATE INDEX t_idx ON t (a, b);
  392. > CREATE MATERIALIZED VIEW mv AS SELECT sum(a) FROM t;
  393. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  394. > SELECT * FROM mv;
  395. 1
  396. > SELECT max(b) FROM t;
  397. 2
  398. $ kafka-create-topic topic=kafka
  399. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  400. key1A,key1B:value1A,value1B
  401. > CREATE SOURCE kafka_source
  402. IN CLUSTER cluster
  403. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-${{testdrive.seed}}');
  404. > CREATE TABLE kafka_source_tbl (key1, key2, value1, value2)
  405. FROM SOURCE kafka_source (REFERENCE "testdrive-kafka-${{testdrive.seed}}")
  406. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  407. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  408. ENVELOPE UPSERT;
  409. > SELECT * FROM kafka_source_tbl
  410. key1A key1B value1A value1B
  411. $ postgres-execute connection=postgres://postgres:postgres@postgres
  412. CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
  413. ALTER USER postgres1 WITH replication;
  414. DROP PUBLICATION IF EXISTS postgres_source;
  415. DROP TABLE IF EXISTS postgres_source_table;
  416. CREATE TABLE postgres_source_table (f1 TEXT, f2 INTEGER);
  417. ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;
  418. INSERT INTO postgres_source_table SELECT 'A', 0;
  419. CREATE PUBLICATION postgres_source FOR ALL TABLES;
  420. > CREATE SECRET pgpass AS 'postgres';
  421. > CREATE CONNECTION pg FOR POSTGRES
  422. HOST 'postgres',
  423. DATABASE postgres,
  424. USER postgres1,
  425. PASSWORD SECRET pgpass;
  426. > CREATE SOURCE postgres_source
  427. IN CLUSTER cluster
  428. FROM POSTGRES CONNECTION pg
  429. (PUBLICATION 'postgres_source');
  430. > CREATE TABLE postgres_source_table FROM SOURCE postgres_source (REFERENCE postgres_source_table)
  431. > SELECT * FROM postgres_source_table;
  432. A 0
  433. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  434. $ mysql-execute name=mysql
  435. # create the database if it does not exist yet but do not drop it
  436. CREATE DATABASE IF NOT EXISTS public;
  437. USE public;
  438. CREATE USER mysql1 IDENTIFIED BY 'mysql';
  439. GRANT REPLICATION SLAVE ON *.* TO mysql1;
  440. GRANT ALL ON public.* TO mysql1;
  441. CREATE TABLE mysql_source_table (f1 VARCHAR(32), f2 INTEGER);
  442. INSERT INTO mysql_source_table VALUES ('A', 0);
  443. > CREATE SECRET mysqlpass AS 'mysql';
  444. > CREATE CONNECTION mysql TO MYSQL (
  445. HOST 'mysql',
  446. USER mysql1,
  447. PASSWORD SECRET mysqlpass);
  448. > CREATE SOURCE mysql_source1
  449. IN CLUSTER cluster
  450. FROM MYSQL CONNECTION mysql;
  451. > CREATE TABLE mysql_source_table FROM SOURCE mysql_source1 (REFERENCE public.mysql_source_table);
  452. > SELECT * FROM mysql_source_table;
  453. A 0
  454. $ kafka-verify-topic sink=materialize.public.kafka_sink
  455. > CREATE SOURCE kafka_sink_source
  456. IN CLUSTER cluster
  457. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${{testdrive.seed}}')
  458. > CREATE TABLE kafka_sink_source_tbl FROM SOURCE kafka_sink_source (REFERENCE "testdrive-kafka-sink-${{testdrive.seed}}")
  459. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  460. ENVELOPE NONE
  461. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  462. <null> <null> 1 2
  463. > CREATE SOURCE webhook_source
  464. IN CLUSTER cluster_singlereplica
  465. FROM WEBHOOK BODY FORMAT TEXT
  466. $ webhook-append database=materialize schema=public name=webhook_source
  467. AAA
  468. > SELECT * FROM webhook_source
  469. AAA
  470. $ set-max-tries max-tries=1
  471. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  472. > BEGIN
  473. > DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  474. > FETCH ALL c WITH (timeout='5s');
  475. <TIMESTAMP> 1 1
  476. > COMMIT
  477. """
  478. )
  479. )
  480. # Start new Materialize in a new deploy generation, which will cause
  481. # Materialize to boot in read-only mode.
  482. c.up("mz_new")
  483. # Verify against new Materialize that it is in read-only mode
  484. with c.override(
  485. Testdrive(
  486. materialize_url="postgres://materialize@mz_new:6875",
  487. materialize_url_internal="postgres://materialize@mz_new:6877",
  488. mz_service="mz_new",
  489. materialize_params={"cluster": "cluster"},
  490. no_reset=True,
  491. seed=1,
  492. default_timeout=DEFAULT_TIMEOUT,
  493. )
  494. ):
  495. c.up({"name": "testdrive", "persistent": True})
  496. c.testdrive(
  497. dedent(
  498. f"""
  499. $ webhook-append database=materialize schema=public name=webhook_source status=500
  500. BBB
  501. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  502. key2A,key2B:value2A,value2B
  503. $ postgres-execute connection=postgres://postgres:postgres@postgres
  504. INSERT INTO postgres_source_table VALUES ('B', 1);
  505. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  506. $ mysql-execute name=mysql
  507. USE public;
  508. INSERT INTO mysql_source_table VALUES ('B', 1);
  509. > SET CLUSTER = cluster;
  510. > SELECT 1
  511. 1
  512. ! INSERT INTO t VALUES (3, 4);
  513. contains: cannot write in read-only mode
  514. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  515. > SELECT * FROM mv;
  516. 1
  517. # TODO: Currently hangs
  518. # > SELECT max(b) FROM t;
  519. # 2
  520. > SELECT mz_unsafe.mz_sleep(5)
  521. <null>
  522. ! INSERT INTO t VALUES (5, 6);
  523. contains: cannot write in read-only mode
  524. > SELECT * FROM mv;
  525. 1
  526. ! DROP INDEX t_idx
  527. contains: cannot write in read-only mode
  528. ! CREATE INDEX t_idx2 ON t (a, b)
  529. contains: cannot write in read-only mode
  530. ! CREATE MATERIALIZED VIEW mv2 AS SELECT sum(a) FROM t;
  531. contains: cannot write in read-only mode
  532. $ set-regex match=(s\\d+|\\d{{13}}|[ ]{{12}}0|u\\d{{1,3}}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)) replacement=<>
  533. > EXPLAIN TIMESTAMP FOR SELECT * FROM mv;
  534. " query timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: true\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.mv (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User(6)])): [<> <>]\\n"
  535. > SELECT * FROM kafka_source_tbl
  536. key1A key1B value1A value1B
  537. key2A key2B value2A value2B
  538. > SELECT * FROM postgres_source_table
  539. A 0
  540. B 1
  541. > SELECT * FROM mysql_source_table;
  542. A 0
  543. B 1
  544. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  545. <null> <null> 1 2
  546. > SELECT * FROM webhook_source
  547. AAA
  548. $ set-max-tries max-tries=1
  549. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  550. > BEGIN
  551. ! DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  552. contains: cannot write in read-only mode
  553. > ROLLBACK
  554. # Actual subscribes without a declare still work though
  555. > SUBSCRIBE (WITH a(x) AS (SELECT 'a') SELECT generate_series(1, 2), x FROM a)
  556. <TIMESTAMP> 1 1 a
  557. <TIMESTAMP> 1 2 a
  558. """
  559. )
  560. )
  561. # But the old Materialize can still run writes
  562. c.up({"name": "testdrive", "persistent": True})
  563. c.testdrive(
  564. dedent(
  565. f"""
  566. $ webhook-append database=materialize schema=public name=webhook_source
  567. CCC
  568. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  569. key3A,key3B:value3A,value3B
  570. $ postgres-execute connection=postgres://postgres:postgres@postgres
  571. INSERT INTO postgres_source_table VALUES ('C', 2);
  572. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  573. $ mysql-execute name=mysql
  574. USE public;
  575. INSERT INTO mysql_source_table VALUES ('C', 2);
  576. > SET CLUSTER = cluster;
  577. > SELECT 1
  578. 1
  579. > INSERT INTO t VALUES (3, 4);
  580. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  581. > SELECT * FROM mv;
  582. 4
  583. > SELECT max(b) FROM t;
  584. 4
  585. > SELECT mz_unsafe.mz_sleep(5)
  586. <null>
  587. > INSERT INTO t VALUES (5, 6);
  588. > SELECT * FROM mv;
  589. 9
  590. > DROP INDEX t_idx
  591. > CREATE INDEX t_idx2 ON t (a, b)
  592. > CREATE MATERIALIZED VIEW mv2 AS SELECT sum(a) FROM t;
  593. $ set-regex match=(s\\d+|\\d{{13}}|[ ]{{12}}0|u\\d{{1,3}}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)) replacement=<>
  594. > EXPLAIN TIMESTAMP FOR SELECT * FROM mv;
  595. " query timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: true\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.mv (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User(6)])): [<> <>]\\n"
  596. > SELECT * FROM kafka_source_tbl
  597. key1A key1B value1A value1B
  598. key2A key2B value2A value2B
  599. key3A key3B value3A value3B
  600. > SELECT * FROM postgres_source_table
  601. A 0
  602. B 1
  603. C 2
  604. > SELECT * FROM mysql_source_table;
  605. A 0
  606. B 1
  607. C 2
  608. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  609. <null> <null> 1 2
  610. <null> <null> 3 4
  611. <null> <null> 5 6
  612. > SELECT * FROM webhook_source
  613. AAA
  614. CCC
  615. $ set-max-tries max-tries=1
  616. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  617. > BEGIN
  618. > DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  619. > FETCH ALL c WITH (timeout='5s');
  620. <TIMESTAMP> 1 1
  621. <TIMESTAMP> 1 3
  622. <TIMESTAMP> 1 5
  623. > COMMIT
  624. """
  625. )
  626. )
  627. with c.override(
  628. Testdrive(
  629. materialize_url="postgres://materialize@mz_new:6875",
  630. materialize_url_internal="postgres://materialize@mz_new:6877",
  631. mz_service="mz_new",
  632. materialize_params={"cluster": "cluster"},
  633. no_reset=True,
  634. seed=1,
  635. default_timeout=DEFAULT_TIMEOUT,
  636. )
  637. ):
  638. c.up({"name": "testdrive", "persistent": True})
  639. c.testdrive(
  640. dedent(
  641. """
  642. $ webhook-append database=materialize schema=public name=webhook_source status=500
  643. DDD
  644. > SET CLUSTER = cluster;
  645. > SELECT 1
  646. 1
  647. ! INSERT INTO t VALUES (3, 4);
  648. contains: cannot write in read-only mode
  649. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  650. > SELECT * FROM mv;
  651. 9
  652. > SELECT max(b) FROM t;
  653. 6
  654. > SELECT * FROM mv;
  655. 9
  656. > SELECT * FROM kafka_source_tbl
  657. key1A key1B value1A value1B
  658. key2A key2B value2A value2B
  659. key3A key3B value3A value3B
  660. > SELECT * FROM postgres_source_table
  661. A 0
  662. B 1
  663. C 2
  664. > SELECT * FROM mysql_source_table;
  665. A 0
  666. B 1
  667. C 2
  668. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  669. <null> <null> 1 2
  670. <null> <null> 3 4
  671. <null> <null> 5 6
  672. > SELECT * FROM webhook_source
  673. AAA
  674. CCC
  675. $ set-max-tries max-tries=1
  676. $ set-regex match=\\d{13,20} replacement=<TIMESTAMP>
  677. > BEGIN
  678. ! DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  679. contains: cannot write in read-only mode
  680. > ROLLBACK
  681. # Actual subscribes without a declare still work though
  682. > SUBSCRIBE (WITH a(x) AS (SELECT 'a') SELECT generate_series(1, 2), x FROM a)
  683. <TIMESTAMP> 1 1 a
  684. <TIMESTAMP> 1 2 a
  685. """
  686. )
  687. )
  688. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  689. c.promote_mz("mz_new")
  690. # Give some time for Mz to restart after promotion
  691. for i in range(10):
  692. try:
  693. c.sql("SELECT 1", service="mz_old")
  694. except OperationalError as e:
  695. assert (
  696. "server closed the connection unexpectedly" in str(e)
  697. or "Can't create a connection to host" in str(e)
  698. or "Connection refused" in str(e)
  699. ), f"Unexpected error: {e}"
  700. except CommandFailureCausedUIError as e:
  701. # service "mz_old" is not running
  702. assert "running docker compose failed" in str(
  703. e
  704. ), f"Unexpected error: {e}"
  705. break
  706. time.sleep(1)
  707. else:
  708. raise RuntimeError("mz_old didn't stop running within 10 seconds")
  709. for i in range(10):
  710. try:
  711. c.sql("SELECT 1", service="mz_new")
  712. break
  713. except CommandFailureCausedUIError:
  714. pass
  715. except OperationalError:
  716. pass
  717. time.sleep(1)
  718. else:
  719. raise RuntimeError("mz_new didn't come up within 10 seconds")
  720. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, "mz_new")
  721. c.testdrive(
  722. dedent(
  723. f"""
  724. $ webhook-append database=materialize schema=public name=webhook_source
  725. EEE
  726. > SET CLUSTER = cluster;
  727. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  728. > CREATE MATERIALIZED VIEW mv3 AS SELECT sum(a) FROM t;
  729. > SELECT * FROM mv;
  730. 9
  731. > SELECT * FROM mv2;
  732. 9
  733. > SELECT * FROM mv3;
  734. 9
  735. > SELECT max(b) FROM t;
  736. 6
  737. > INSERT INTO t VALUES (7, 8);
  738. > SELECT * FROM mv;
  739. 16
  740. > SELECT * FROM mv2;
  741. 16
  742. > SELECT max(b) FROM t;
  743. 8
  744. > SELECT * FROM kafka_source_tbl
  745. key1A key1B value1A value1B
  746. key2A key2B value2A value2B
  747. key3A key3B value3A value3B
  748. > SELECT * FROM postgres_source_table
  749. A 0
  750. B 1
  751. C 2
  752. > SELECT * FROM mysql_source_table;
  753. A 0
  754. B 1
  755. C 2
  756. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  757. key4A,key4B:value4A,value4B
  758. $ postgres-execute connection=postgres://postgres:postgres@postgres
  759. INSERT INTO postgres_source_table VALUES ('D', 3);
  760. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  761. $ mysql-execute name=mysql
  762. USE public;
  763. INSERT INTO mysql_source_table VALUES ('D', 3);
  764. > SELECT * FROM kafka_source_tbl
  765. key1A key1B value1A value1B
  766. key2A key2B value2A value2B
  767. key3A key3B value3A value3B
  768. key4A key4B value4A value4B
  769. > SELECT * FROM postgres_source_table
  770. A 0
  771. B 1
  772. C 2
  773. D 3
  774. > SELECT * FROM mysql_source_table;
  775. A 0
  776. B 1
  777. C 2
  778. D 3
  779. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  780. <null> <null> 1 2
  781. <null> <null> 3 4
  782. <null> <null> 5 6
  783. <null> <null> 7 8
  784. > SELECT * FROM webhook_source
  785. AAA
  786. CCC
  787. EEE
  788. $ set-max-tries max-tries=1
  789. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  790. > BEGIN
  791. > DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  792. > FETCH ALL c WITH (timeout='5s');
  793. <TIMESTAMP> 1 1
  794. <TIMESTAMP> 1 3
  795. <TIMESTAMP> 1 5
  796. <TIMESTAMP> 1 7
  797. > COMMIT
  798. """
  799. )
  800. )
  801. def workflow_kafka_source_rehydration(c: Composition) -> None:
  802. """Verify Kafka source rehydration in 0dt deployment"""
  803. c.down(destroy_volumes=True)
  804. c.up(
  805. "zookeeper",
  806. "kafka",
  807. "schema-registry",
  808. "mz_old",
  809. {"name": "testdrive", "persistent": True},
  810. )
  811. count = 1000000
  812. repeats = 20
  813. # Make sure cluster is owned by the system so it doesn't get dropped
  814. # between testdrive runs.
  815. c.sql(
  816. """
  817. DROP CLUSTER IF EXISTS cluster CASCADE;
  818. CREATE CLUSTER cluster SIZE '1';
  819. GRANT ALL ON CLUSTER cluster TO materialize;
  820. ALTER SYSTEM SET cluster = cluster;
  821. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  822. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  823. """,
  824. service="mz_old",
  825. port=6877,
  826. user="mz_system",
  827. )
  828. start_time = time.time()
  829. c.testdrive(
  830. dedent(
  831. f"""
  832. > SET CLUSTER = cluster;
  833. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL = 'PLAINTEXT';
  834. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  835. $ kafka-create-topic topic=kafka-large
  836. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
  837. key0A,key${{kafka-ingest.iteration}}:value0A,${{kafka-ingest.iteration}}
  838. > CREATE SOURCE kafka_source
  839. IN CLUSTER cluster
  840. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-large-${{testdrive.seed}}');
  841. > CREATE TABLE kafka_source_tbl (key1, key2, value1, value2)
  842. FROM SOURCE kafka_source (REFERENCE "testdrive-kafka-large-${{testdrive.seed}}")
  843. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  844. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  845. ENVELOPE UPSERT;
  846. > CREATE VIEW kafka_source_cnt AS SELECT count(*) FROM kafka_source_tbl
  847. > CREATE DEFAULT INDEX on kafka_source_cnt
  848. > SELECT * FROM kafka_source_cnt
  849. {count}
  850. """
  851. )
  852. )
  853. for i in range(1, repeats):
  854. c.testdrive(
  855. dedent(
  856. f"""
  857. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
  858. key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
  859. > SELECT * FROM kafka_source_cnt
  860. {count*(i+1)}
  861. """
  862. )
  863. )
  864. elapsed = time.time() - start_time
  865. print(f"initial ingestion took {elapsed} seconds")
  866. with c.override(
  867. Materialized(
  868. name="mz_new",
  869. sanity_restart=False,
  870. deploy_generation=1,
  871. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  872. restart="on-failure",
  873. external_metadata_store=True,
  874. default_replication_factor=2,
  875. ),
  876. Testdrive(
  877. materialize_url="postgres://materialize@mz_new:6875",
  878. materialize_url_internal="postgres://materialize@mz_new:6877",
  879. mz_service="mz_new",
  880. materialize_params={"cluster": "cluster"},
  881. no_reset=True,
  882. seed=1,
  883. default_timeout=DEFAULT_TIMEOUT,
  884. ),
  885. ):
  886. c.up("mz_new")
  887. start_time = time.time()
  888. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  889. elapsed = time.time() - start_time
  890. print(f"re-hydration took {elapsed} seconds")
  891. c.promote_mz("mz_new")
  892. start_time = time.time()
  893. c.await_mz_deployment_status(
  894. DeploymentStatus.IS_LEADER, "mz_new", sleep_time=None
  895. )
  896. elapsed = time.time() - start_time
  897. print(f"promotion took {elapsed} seconds")
  898. start_time = time.time()
  899. result = c.sql_query("SELECT * FROM kafka_source_cnt", service="mz_new")
  900. elapsed = time.time() - start_time
  901. print(f"final check took {elapsed} seconds")
  902. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  903. result = c.sql_query("SELECT count(*) FROM kafka_source_tbl", service="mz_new")
  904. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  905. assert (
  906. elapsed < 3
  907. ), f"Took {elapsed}s to SELECT on Kafka source after 0dt upgrade, is it hydrated?"
  908. start_time = time.time()
  909. result = c.sql_query("SELECT 1", service="mz_new")
  910. elapsed = time.time() - start_time
  911. print(f"bootstrapping (checked via SELECT 1) took {elapsed} seconds")
  912. assert result[0][0] == 1, f"Wrong result: {result}"
  913. print("Ingesting again")
  914. for i in range(repeats, repeats * 2):
  915. c.testdrive(
  916. dedent(
  917. f"""
  918. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
  919. key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
  920. """
  921. )
  922. )
  923. c.testdrive(
  924. dedent(
  925. f"""
  926. > SET CLUSTER = cluster;
  927. > SELECT * FROM kafka_source_cnt
  928. {2*count*repeats}
  929. > SELECT count(*) FROM kafka_source_tbl
  930. {2*count*repeats}
  931. """
  932. )
  933. )
  934. def workflow_kafka_source_rehydration_large_initial(c: Composition) -> None:
  935. """Verify Kafka source rehydration in 0dt deployment"""
  936. c.down(destroy_volumes=True)
  937. c.up(
  938. "zookeeper",
  939. "kafka",
  940. "schema-registry",
  941. "mz_old",
  942. {"name": "testdrive", "persistent": True},
  943. )
  944. count = 1000000
  945. repeats = 20
  946. # Make sure cluster is owned by the system so it doesn't get dropped
  947. # between testdrive runs.
  948. c.sql(
  949. """
  950. DROP CLUSTER IF EXISTS cluster CASCADE;
  951. CREATE CLUSTER cluster SIZE '1';
  952. GRANT ALL ON CLUSTER cluster TO materialize;
  953. ALTER SYSTEM SET cluster = cluster;
  954. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  955. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  956. """,
  957. service="mz_old",
  958. port=6877,
  959. user="mz_system",
  960. )
  961. start_time = time.time()
  962. c.testdrive(
  963. dedent(
  964. """
  965. $ kafka-create-topic topic=kafka-large
  966. """
  967. )
  968. )
  969. for i in range(repeats):
  970. c.testdrive(
  971. dedent(
  972. f"""
  973. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
  974. key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
  975. """
  976. )
  977. )
  978. c.testdrive(
  979. dedent(
  980. f"""
  981. > SET CLUSTER = cluster;
  982. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL = 'PLAINTEXT';
  983. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  984. > CREATE SOURCE kafka_source
  985. IN CLUSTER cluster
  986. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-large-${{testdrive.seed}}');
  987. > CREATE TABLE kafka_source_tbl (key1, key2, value1, value2)
  988. FROM SOURCE kafka_source (REFERENCE "testdrive-kafka-large-${{testdrive.seed}}")
  989. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  990. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  991. ENVELOPE UPSERT;
  992. > CREATE VIEW kafka_source_cnt AS SELECT count(*) FROM kafka_source_tbl
  993. > CREATE DEFAULT INDEX on kafka_source_cnt
  994. > SELECT * FROM kafka_source_cnt
  995. {count*repeats}
  996. """
  997. )
  998. )
  999. elapsed = time.time() - start_time
  1000. print(f"initial ingestion took {elapsed} seconds")
  1001. with c.override(
  1002. Materialized(
  1003. name="mz_new",
  1004. sanity_restart=False,
  1005. deploy_generation=1,
  1006. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1007. restart="on-failure",
  1008. external_metadata_store=True,
  1009. default_replication_factor=2,
  1010. ),
  1011. Testdrive(
  1012. materialize_url="postgres://materialize@mz_new:6875",
  1013. materialize_url_internal="postgres://materialize@mz_new:6877",
  1014. mz_service="mz_new",
  1015. materialize_params={"cluster": "cluster"},
  1016. no_reset=True,
  1017. seed=1,
  1018. default_timeout=DEFAULT_TIMEOUT,
  1019. ),
  1020. ):
  1021. c.up("mz_new")
  1022. start_time = time.time()
  1023. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  1024. elapsed = time.time() - start_time
  1025. print(f"re-hydration took {elapsed} seconds")
  1026. c.promote_mz("mz_new")
  1027. start_time = time.time()
  1028. c.await_mz_deployment_status(
  1029. DeploymentStatus.IS_LEADER, "mz_new", sleep_time=None
  1030. )
  1031. elapsed = time.time() - start_time
  1032. print(f"promotion took {elapsed} seconds")
  1033. start_time = time.time()
  1034. result = c.sql_query("SELECT * FROM kafka_source_cnt", service="mz_new")
  1035. elapsed = time.time() - start_time
  1036. print(f"final check took {elapsed} seconds")
  1037. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  1038. result = c.sql_query("SELECT count(*) FROM kafka_source_tbl", service="mz_new")
  1039. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  1040. assert (
  1041. elapsed < 3
  1042. ), f"Took {elapsed}s to SELECT on Kafka source after 0dt upgrade, is it hydrated?"
  1043. start_time = time.time()
  1044. result = c.sql_query("SELECT 1", service="mz_new")
  1045. elapsed = time.time() - start_time
  1046. print(f"bootstrapping (checked via SELECT 1) took {elapsed} seconds")
  1047. assert result[0][0] == 1, f"Wrong result: {result}"
  1048. print("Ingesting again")
  1049. for i in range(repeats, repeats * 2):
  1050. c.testdrive(
  1051. dedent(
  1052. f"""
  1053. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-large repeat={count}
  1054. key{i}A,key{i}${{kafka-ingest.iteration}}:value{i}A,${{kafka-ingest.iteration}}
  1055. """
  1056. )
  1057. )
  1058. c.testdrive(
  1059. dedent(
  1060. f"""
  1061. > SET CLUSTER = cluster;
  1062. > SELECT * FROM kafka_source_cnt
  1063. {2*count*repeats}
  1064. > SELECT count(*) FROM kafka_source_tbl
  1065. {2*count*repeats}
  1066. """
  1067. )
  1068. )
  1069. def workflow_pg_source_rehydration(c: Composition) -> None:
  1070. """Verify Postgres source rehydration in 0dt deployment"""
  1071. c.down(destroy_volumes=True)
  1072. c.up("postgres", "mz_old", {"name": "testdrive", "persistent": True})
  1073. count = 1000000
  1074. repeats = 100
  1075. # Make sure cluster is owned by the system so it doesn't get dropped
  1076. # between testdrive runs.
  1077. c.sql(
  1078. """
  1079. DROP CLUSTER IF EXISTS cluster CASCADE;
  1080. CREATE CLUSTER cluster SIZE '1';
  1081. GRANT ALL ON CLUSTER cluster TO materialize;
  1082. ALTER SYSTEM SET cluster = cluster;
  1083. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  1084. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  1085. """,
  1086. service="mz_old",
  1087. port=6877,
  1088. user="mz_system",
  1089. )
  1090. inserts = (
  1091. "INSERT INTO postgres_source_table VALUES "
  1092. + ", ".join([f"({i})" for i in range(count)])
  1093. + ";"
  1094. )
  1095. start_time = time.time()
  1096. c.testdrive(
  1097. dedent(
  1098. f"""
  1099. > SET CLUSTER = cluster;
  1100. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1101. CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
  1102. ALTER USER postgres1 WITH replication;
  1103. DROP PUBLICATION IF EXISTS postgres_source;
  1104. DROP TABLE IF EXISTS postgres_source_table;
  1105. CREATE TABLE postgres_source_table (f1 INTEGER);
  1106. ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;
  1107. {inserts}
  1108. CREATE PUBLICATION postgres_source FOR ALL TABLES;
  1109. > CREATE SECRET pgpass AS 'postgres';
  1110. > CREATE CONNECTION pg FOR POSTGRES
  1111. HOST 'postgres',
  1112. DATABASE postgres,
  1113. USER postgres1,
  1114. PASSWORD SECRET pgpass;
  1115. > CREATE SOURCE postgres_source
  1116. IN CLUSTER cluster
  1117. FROM POSTGRES CONNECTION pg
  1118. (PUBLICATION 'postgres_source');
  1119. > CREATE TABLE postgres_source_table FROM SOURCE postgres_source (REFERENCE postgres_source_table)
  1120. > CREATE VIEW postgres_source_cnt AS SELECT count(*) FROM postgres_source_table
  1121. > CREATE DEFAULT INDEX ON postgres_source_cnt
  1122. > SELECT * FROM postgres_source_cnt;
  1123. {count}
  1124. """
  1125. ),
  1126. quiet=True,
  1127. )
  1128. for i in range(1, repeats):
  1129. c.testdrive(
  1130. dedent(
  1131. f"""
  1132. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1133. {inserts}
  1134. > SELECT * FROM postgres_source_cnt
  1135. {count*(i+1)}
  1136. """
  1137. ),
  1138. quiet=True,
  1139. )
  1140. elapsed = time.time() - start_time
  1141. print(f"initial ingestion took {elapsed} seconds")
  1142. with c.override(
  1143. Materialized(
  1144. name="mz_new",
  1145. sanity_restart=False,
  1146. deploy_generation=1,
  1147. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1148. restart="on-failure",
  1149. external_metadata_store=True,
  1150. default_replication_factor=2,
  1151. ),
  1152. Testdrive(
  1153. materialize_url="postgres://materialize@mz_new:6875",
  1154. materialize_url_internal="postgres://materialize@mz_new:6877",
  1155. mz_service="mz_new",
  1156. materialize_params={"cluster": "cluster"},
  1157. no_reset=True,
  1158. seed=1,
  1159. default_timeout=DEFAULT_TIMEOUT,
  1160. ),
  1161. ):
  1162. c.up("mz_new")
  1163. start_time = time.time()
  1164. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  1165. elapsed = time.time() - start_time
  1166. print(f"re-hydration took {elapsed} seconds")
  1167. c.promote_mz("mz_new")
  1168. start_time = time.time()
  1169. c.await_mz_deployment_status(
  1170. DeploymentStatus.IS_LEADER, "mz_new", sleep_time=None
  1171. )
  1172. elapsed = time.time() - start_time
  1173. print(f"promotion took {elapsed} seconds")
  1174. start_time = time.time()
  1175. result = c.sql_query("SELECT * FROM postgres_source_cnt", service="mz_new")
  1176. elapsed = time.time() - start_time
  1177. print(f"final check took {elapsed} seconds")
  1178. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  1179. assert (
  1180. elapsed < 4
  1181. ), f"Took {elapsed}s to SELECT on Postgres source after 0dt upgrade, is it hydrated?"
  1182. result = c.sql_query(
  1183. "SELECT count(*) FROM postgres_source_table", service="mz_new"
  1184. )
  1185. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  1186. print("Ingesting again")
  1187. for i in range(repeats, repeats * 2):
  1188. c.testdrive(
  1189. dedent(
  1190. f"""
  1191. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1192. {inserts}
  1193. > SELECT * FROM postgres_source_cnt
  1194. {count*(i+1)}
  1195. """
  1196. ),
  1197. quiet=True,
  1198. )
  1199. result = c.sql_query("SELECT * FROM postgres_source_cnt", service="mz_new")
  1200. assert result[0][0] == 2 * count * repeats, f"Wrong result: {result}"
  1201. result = c.sql_query(
  1202. "SELECT count(*) FROM postgres_source_table", service="mz_new"
  1203. )
  1204. assert result[0][0] == 2 * count * repeats, f"Wrong result: {result}"
  1205. def workflow_mysql_source_rehydration(c: Composition) -> None:
  1206. """Verify Postgres source rehydration in 0dt deployment"""
  1207. c.down(destroy_volumes=True)
  1208. c.up("mysql", "mz_old", {"name": "testdrive", "persistent": True})
  1209. count = 1000000
  1210. repeats = 100
  1211. # Make sure cluster is owned by the system so it doesn't get dropped
  1212. # between testdrive runs.
  1213. c.sql(
  1214. """
  1215. DROP CLUSTER IF EXISTS cluster CASCADE;
  1216. CREATE CLUSTER cluster SIZE '1';
  1217. GRANT ALL ON CLUSTER cluster TO materialize;
  1218. ALTER SYSTEM SET cluster = cluster;
  1219. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  1220. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  1221. """,
  1222. service="mz_old",
  1223. port=6877,
  1224. user="mz_system",
  1225. )
  1226. inserts = (
  1227. "INSERT INTO mysql_source_table VALUES "
  1228. + ", ".join([f"({i})" for i in range(count)])
  1229. + ";"
  1230. )
  1231. start_time = time.time()
  1232. c.testdrive(
  1233. dedent(
  1234. f"""
  1235. > SET CLUSTER = cluster;
  1236. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  1237. $ mysql-execute name=mysql
  1238. # create the database if it does not exist yet but do not drop it
  1239. CREATE DATABASE IF NOT EXISTS public;
  1240. USE public;
  1241. CREATE USER mysql1 IDENTIFIED BY 'mysql';
  1242. GRANT REPLICATION SLAVE ON *.* TO mysql1;
  1243. GRANT ALL ON public.* TO mysql1;
  1244. CREATE TABLE mysql_source_table (f1 INTEGER);
  1245. {inserts}
  1246. > CREATE SECRET mysqlpass AS 'mysql';
  1247. > CREATE CONNECTION mysql TO MYSQL (
  1248. HOST 'mysql',
  1249. USER mysql1,
  1250. PASSWORD SECRET mysqlpass);
  1251. > CREATE SOURCE mysql_source
  1252. IN CLUSTER cluster
  1253. FROM MYSQL CONNECTION mysql;
  1254. > CREATE TABLE mysql_source_table FROM SOURCE mysql_source (REFERENCE public.mysql_source_table);
  1255. > CREATE VIEW mysql_source_cnt AS SELECT count(*) FROM mysql_source_table
  1256. > CREATE DEFAULT INDEX ON mysql_source_cnt
  1257. > SELECT * FROM mysql_source_cnt;
  1258. {count}
  1259. """
  1260. ),
  1261. quiet=True,
  1262. )
  1263. for i in range(1, repeats):
  1264. c.testdrive(
  1265. dedent(
  1266. f"""
  1267. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  1268. $ mysql-execute name=mysql
  1269. USE public;
  1270. {inserts}
  1271. > SELECT * FROM mysql_source_cnt;
  1272. {count*(i+1)}
  1273. """
  1274. ),
  1275. quiet=True,
  1276. )
  1277. elapsed = time.time() - start_time
  1278. print(f"initial ingestion took {elapsed} seconds")
  1279. with c.override(
  1280. Materialized(
  1281. name="mz_new",
  1282. sanity_restart=False,
  1283. deploy_generation=1,
  1284. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1285. restart="on-failure",
  1286. external_metadata_store=True,
  1287. default_replication_factor=2,
  1288. ),
  1289. Testdrive(
  1290. materialize_url="postgres://materialize@mz_new:6875",
  1291. materialize_url_internal="postgres://materialize@mz_new:6877",
  1292. mz_service="mz_new",
  1293. materialize_params={"cluster": "cluster"},
  1294. no_reset=True,
  1295. seed=1,
  1296. default_timeout=DEFAULT_TIMEOUT,
  1297. ),
  1298. ):
  1299. c.up("mz_new")
  1300. start_time = time.time()
  1301. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  1302. elapsed = time.time() - start_time
  1303. print(f"re-hydration took {elapsed} seconds")
  1304. c.promote_mz("mz_new")
  1305. start_time = time.time()
  1306. c.await_mz_deployment_status(
  1307. DeploymentStatus.IS_LEADER, "mz_new", sleep_time=None
  1308. )
  1309. elapsed = time.time() - start_time
  1310. print(f"promotion took {elapsed} seconds")
  1311. start_time = time.time()
  1312. result = c.sql_query("SELECT * FROM mysql_source_cnt", service="mz_new")
  1313. elapsed = time.time() - start_time
  1314. print(f"final check took {elapsed} seconds")
  1315. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  1316. assert (
  1317. elapsed < 4
  1318. ), f"Took {elapsed}s to SELECT on MySQL source after 0dt upgrade, is it hydrated?"
  1319. result = c.sql_query(
  1320. "SELECT count(*) FROM mysql_source_table", service="mz_new"
  1321. )
  1322. assert result[0][0] == count * repeats, f"Wrong result: {result}"
  1323. print("Ingesting again")
  1324. for i in range(repeats, repeats * 2):
  1325. c.testdrive(
  1326. dedent(
  1327. f"""
  1328. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  1329. $ mysql-execute name=mysql
  1330. USE public;
  1331. {inserts}
  1332. > SELECT * FROM mysql_source_cnt;
  1333. {count*(i+1)}
  1334. """
  1335. ),
  1336. quiet=True,
  1337. )
  1338. result = c.sql_query("SELECT * FROM mysql_source_cnt", service="mz_new")
  1339. assert result[0][0] == 2 * count * repeats, f"Wrong result: {result}"
  1340. result = c.sql_query(
  1341. "SELECT count(*) FROM mysql_source_table", service="mz_new"
  1342. )
  1343. assert result[0][0] == 2 * count * repeats, f"Wrong result: {result}"
  1344. def workflow_kafka_source_failpoint(c: Composition) -> None:
  1345. """Verify that source status updates of the newly deployed environment take
  1346. precedent over older source status updates when promoted.
  1347. The original Materialized instance (mz_old) is started with a failpoint
  1348. that simulates a failure during state multi-put. After creating a Kafka
  1349. source, we promote a new deployment (mz_new) and verify that the source
  1350. status in mz_source_statuses is marked as 'running', indicating that the
  1351. source has rehydrated correctly despite the injected failure."""
  1352. c.down(destroy_volumes=True)
  1353. # Start the required services.
  1354. c.up(
  1355. "zookeeper",
  1356. "kafka",
  1357. "schema-registry",
  1358. {"name": "testdrive", "persistent": True},
  1359. )
  1360. # Start the original Materialized instance with the failpoint enabled.
  1361. with c.override(
  1362. Materialized(
  1363. name="mz_old",
  1364. sanity_restart=False,
  1365. deploy_generation=0,
  1366. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1367. external_metadata_store=True,
  1368. environment_extra=["FAILPOINTS=fail_state_multi_put=return"],
  1369. default_replication_factor=2,
  1370. )
  1371. ):
  1372. c.up("mz_old")
  1373. # Make sure cluster is owned by the system so it doesn't get dropped
  1374. # between testdrive runs.
  1375. c.sql(
  1376. dedent(
  1377. """
  1378. DROP CLUSTER IF EXISTS cluster CASCADE;
  1379. CREATE CLUSTER cluster SIZE '1';
  1380. GRANT ALL ON CLUSTER cluster TO materialize;
  1381. ALTER SYSTEM SET cluster = cluster;
  1382. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  1383. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  1384. """
  1385. ),
  1386. service="mz_old",
  1387. port=6877,
  1388. user="mz_system",
  1389. )
  1390. c.testdrive(
  1391. dedent(
  1392. """
  1393. > SET CLUSTER = cluster;
  1394. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL = 'PLAINTEXT';
  1395. $ kafka-create-topic topic=kafka-fp
  1396. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka-fp
  1397. keyA,keyA:valA,valA
  1398. > CREATE SOURCE kafka_source_fp
  1399. IN CLUSTER cluster
  1400. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-fp-${testdrive.seed}');
  1401. > CREATE TABLE kafka_source_tbl (key1, key2, value1, value2)
  1402. FROM SOURCE kafka_source_fp (REFERENCE "testdrive-kafka-fp-${testdrive.seed}")
  1403. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  1404. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  1405. ENVELOPE UPSERT;
  1406. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'kafka_source_fp';
  1407. stalled
  1408. """
  1409. )
  1410. )
  1411. with c.override(
  1412. Materialized(
  1413. name="mz_new",
  1414. sanity_restart=False,
  1415. deploy_generation=1,
  1416. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1417. restart="on-failure",
  1418. external_metadata_store=True,
  1419. default_replication_factor=2,
  1420. ),
  1421. Testdrive(
  1422. materialize_url="postgres://materialize@mz_new:6875",
  1423. materialize_url_internal="postgres://materialize@mz_new:6877",
  1424. mz_service="mz_new",
  1425. materialize_params={"cluster": "cluster"},
  1426. no_reset=True,
  1427. seed=1,
  1428. default_timeout=DEFAULT_TIMEOUT,
  1429. ),
  1430. ):
  1431. c.up("mz_new")
  1432. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  1433. c.promote_mz("mz_new")
  1434. c.await_mz_deployment_status(
  1435. DeploymentStatus.IS_LEADER, "mz_new", sleep_time=None
  1436. )
  1437. # Verify that the Kafka source's status is marked as "running" in mz_source_statuses.
  1438. c.testdrive(
  1439. dedent(
  1440. """
  1441. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'kafka_source_fp';
  1442. running
  1443. """
  1444. )
  1445. )
  1446. def fetch_reconciliation_metrics(c: Composition, process: str) -> tuple[int, int]:
  1447. # TODO: Replace me with mz_internal.mz_cluster_replica_ports when it exists
  1448. internal_http = c.exec(
  1449. process,
  1450. "bash",
  1451. "-c",
  1452. 'ps aux | grep -v grep | grep "cluster_id=s2" | sed -e "s#.* --internal-http-listen-addr=\\([^ ]*\\) .*#\\1#"',
  1453. capture=True,
  1454. ).stdout.strip()
  1455. metrics = c.exec(
  1456. process,
  1457. "curl",
  1458. "--silent",
  1459. "--unix-socket",
  1460. internal_http,
  1461. "localhost/metrics",
  1462. capture=True,
  1463. ).stdout
  1464. reused = 0
  1465. replaced = 0
  1466. for metric in metrics.splitlines():
  1467. if metric.startswith("mz_compute_reconciliation_reused_dataflows_count_total"):
  1468. reused += int(metric.split()[1])
  1469. elif metric.startswith(
  1470. "mz_compute_reconciliation_replaced_dataflows_count_total"
  1471. ):
  1472. replaced += int(metric.split()[1])
  1473. return reused, replaced
  1474. def workflow_builtin_item_migrations(c: Composition) -> None:
  1475. """Verify builtin item migrations"""
  1476. c.down(destroy_volumes=True)
  1477. c.up("mz_old")
  1478. c.sql(
  1479. "CREATE MATERIALIZED VIEW mv AS SELECT name FROM mz_tables;",
  1480. service="mz_old",
  1481. port=6877,
  1482. user="mz_system",
  1483. )
  1484. mz_tables_gid = c.sql_query(
  1485. "SELECT id FROM mz_tables WHERE name = 'mz_tables'",
  1486. service="mz_old",
  1487. )[0][0]
  1488. mv_gid = c.sql_query(
  1489. "SELECT id FROM mz_materialized_views WHERE name = 'mv'",
  1490. service="mz_old",
  1491. )[0][0]
  1492. mz_tables_shard_id = c.sql_query(
  1493. f"SELECT shard_id FROM mz_internal.mz_storage_shards WHERE object_id = '{mz_tables_gid}'",
  1494. service="mz_old",
  1495. )[0][0]
  1496. mv_shard_id = c.sql_query(
  1497. f"SELECT shard_id FROM mz_internal.mz_storage_shards WHERE object_id = '{mv_gid}'",
  1498. service="mz_old",
  1499. )[0][0]
  1500. with c.override(
  1501. Materialized(
  1502. name="mz_new",
  1503. sanity_restart=False,
  1504. deploy_generation=1,
  1505. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1506. restart="on-failure",
  1507. external_metadata_store=True,
  1508. force_migrations="all",
  1509. healthcheck=LEADER_STATUS_HEALTHCHECK,
  1510. default_replication_factor=2,
  1511. ),
  1512. ):
  1513. c.up("mz_new")
  1514. new_mz_tables_gid = c.sql_query(
  1515. "SELECT id FROM mz_tables WHERE name = 'mz_tables'",
  1516. service="mz_new",
  1517. )[0][0]
  1518. new_mv_gid = c.sql_query(
  1519. "SELECT id FROM mz_materialized_views WHERE name = 'mv'",
  1520. service="mz_new",
  1521. )[0][0]
  1522. assert new_mz_tables_gid == mz_tables_gid
  1523. assert new_mv_gid == mv_gid
  1524. # mz_internal.mz_storage_shards won't update until this instance becomes the leader
  1525. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  1526. c.promote_mz("mz_new")
  1527. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, "mz_new")
  1528. new_mz_tables_gid = c.sql_query(
  1529. "SELECT id FROM mz_tables WHERE name = 'mz_tables'",
  1530. service="mz_new",
  1531. reuse_connection=False,
  1532. )[0][0]
  1533. new_mv_gid = c.sql_query(
  1534. "SELECT id FROM mz_materialized_views WHERE name = 'mv'",
  1535. service="mz_new",
  1536. reuse_connection=False,
  1537. )[0][0]
  1538. assert new_mz_tables_gid == mz_tables_gid
  1539. assert new_mv_gid == mv_gid
  1540. new_mz_tables_shard_id = c.sql_query(
  1541. f"SELECT shard_id FROM mz_internal.mz_storage_shards WHERE object_id = '{mz_tables_gid}'",
  1542. service="mz_new",
  1543. reuse_connection=False,
  1544. )[0][0]
  1545. new_mv_shard_id = c.sql_query(
  1546. f"SELECT shard_id FROM mz_internal.mz_storage_shards WHERE object_id = '{mv_gid}'",
  1547. service="mz_new",
  1548. reuse_connection=False,
  1549. )[0][0]
  1550. assert new_mz_tables_shard_id != mz_tables_shard_id
  1551. assert new_mv_shard_id == mv_shard_id
  1552. reused, replaced = fetch_reconciliation_metrics(c, "mz_new")
  1553. assert reused > 0
  1554. assert (
  1555. replaced == 0
  1556. ), f"{replaced} dataflows have been replaced, expected all to be reused"
  1557. def workflow_materialized_view_correction_pruning(c: Composition) -> None:
  1558. """
  1559. Verify that the MV sink consolidates away the snapshot updates in read-only
  1560. mode.
  1561. """
  1562. c.down(destroy_volumes=True)
  1563. c.up("mz_old")
  1564. c.sql(
  1565. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1566. service="mz_old",
  1567. port=6877,
  1568. user="mz_system",
  1569. )
  1570. c.sql(
  1571. """
  1572. CREATE TABLE t (a int);
  1573. INSERT INTO t SELECT generate_series(1, 1000);
  1574. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  1575. SELECT * FROM mv LIMIT 1;
  1576. """,
  1577. service="mz_old",
  1578. )
  1579. c.up("mz_new")
  1580. c.sql("SELECT * FROM mv LIMIT 1", service="mz_new")
  1581. def get_clusterd_internal_http_address():
  1582. logs = c.invoke("logs", "mz_new", capture=True).stdout
  1583. for line in logs.splitlines():
  1584. # quickstart must be u1 since it's the only non-system cluster
  1585. if (
  1586. "cluster-u1-replica-u1-gen-1" in line
  1587. and "mz_clusterd: serving internal HTTP server on" in line
  1588. ):
  1589. return line.split(" ")[-1]
  1590. raise RuntimeError("No HTTP endpoint for quickstart clusterd found in logs")
  1591. def get_correction_metrics():
  1592. address = get_clusterd_internal_http_address()
  1593. resp = c.exec(
  1594. "mz_new",
  1595. "curl",
  1596. "--unix-socket",
  1597. address,
  1598. "http:/prof/metrics",
  1599. capture=True,
  1600. ).stdout
  1601. metrics = {}
  1602. for line in resp.splitlines():
  1603. key, value = line.split(maxsplit=1)
  1604. metrics[key] = value
  1605. insertions = int(metrics["mz_persist_sink_correction_insertions_total"])
  1606. deletions = int(metrics["mz_persist_sink_correction_deletions_total"])
  1607. return (insertions, deletions)
  1608. insertions = None
  1609. deletions = None
  1610. # The correction buffer should stabilize in a state where it has seen 2000
  1611. # insertions (positive + negative updates), and as many deletions. The
  1612. # absolute amount of records in the correction buffer should be zero.
  1613. for _ in range(10):
  1614. time.sleep(1)
  1615. insertions, deletions = get_correction_metrics()
  1616. if insertions > 1000 and insertions - deletions == 0:
  1617. break
  1618. else:
  1619. raise AssertionError(
  1620. f"unexpected correction metrics: {insertions=}, {deletions=}"
  1621. )
  1622. def workflow_upsert_sources(c: Composition) -> None:
  1623. c.down(destroy_volumes=True)
  1624. c.up(
  1625. "zookeeper",
  1626. "kafka",
  1627. "schema-registry",
  1628. "postgres",
  1629. "mysql",
  1630. "mz_old",
  1631. {"name": "testdrive", "persistent": True},
  1632. )
  1633. num_threads = 50
  1634. c.sql(
  1635. f"""
  1636. DROP CLUSTER IF EXISTS cluster CASCADE;
  1637. CREATE CLUSTER cluster SIZE '2-1';
  1638. GRANT ALL ON CLUSTER cluster TO materialize;
  1639. ALTER SYSTEM SET cluster = cluster;
  1640. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  1641. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  1642. ALTER SYSTEM SET max_sources = {num_threads * 2};
  1643. ALTER SYSTEM SET max_materialized_views = {num_threads * 2};
  1644. """,
  1645. service="mz_old",
  1646. port=6877,
  1647. user="mz_system",
  1648. )
  1649. c.testdrive(
  1650. dedent(
  1651. """
  1652. > SET CLUSTER = cluster;
  1653. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL = 'PLAINTEXT';
  1654. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${testdrive.schema-registry-url}';
  1655. """
  1656. )
  1657. )
  1658. end_time = datetime.now() + timedelta(seconds=200)
  1659. mz1 = "mz_old"
  1660. mz2 = "mz_new"
  1661. def worker(i: int) -> None:
  1662. c.testdrive(
  1663. dedent(
  1664. f"""
  1665. $ kafka-create-topic topic=kafka{i}
  1666. > CREATE SOURCE kafka_source{i}
  1667. IN CLUSTER cluster
  1668. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka{i}-${{testdrive.seed}}');
  1669. > CREATE TABLE kafka_source_tbl{i} (key1, key2, value1, value2)
  1670. FROM SOURCE kafka_source{i} (REFERENCE "testdrive-kafka{i}-${{testdrive.seed}}")
  1671. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  1672. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  1673. ENVELOPE UPSERT;
  1674. > CREATE DEFAULT INDEX ON kafka_source_tbl{i}
  1675. > CREATE MATERIALIZED VIEW mv{i} AS SELECT * FROM kafka_source_tbl{i}
  1676. """
  1677. )
  1678. )
  1679. while datetime.now() < end_time:
  1680. try:
  1681. c.testdrive(
  1682. dedent(
  1683. f"""
  1684. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka{i} repeat=10000
  1685. key1A,key1B:value1A,value1B
  1686. """
  1687. )
  1688. )
  1689. except:
  1690. pass
  1691. threads = []
  1692. for i in range(num_threads):
  1693. thread = Thread(name=f"worker_{i}", target=worker, args=(i,))
  1694. threads.append(thread)
  1695. for thread in threads:
  1696. thread.start()
  1697. i = 1
  1698. while datetime.now() < end_time:
  1699. with c.override(
  1700. Materialized(
  1701. name=mz2,
  1702. sanity_restart=False,
  1703. deploy_generation=i,
  1704. system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS,
  1705. restart="on-failure",
  1706. external_metadata_store=True,
  1707. default_replication_factor=2,
  1708. ),
  1709. Testdrive(
  1710. materialize_url=f"postgres://materialize@{mz1}:6875",
  1711. materialize_url_internal=f"postgres://materialize@{mz1}:6877",
  1712. mz_service=mz1,
  1713. materialize_params={"cluster": "cluster"},
  1714. no_consistency_checks=True,
  1715. no_reset=True,
  1716. seed=1,
  1717. default_timeout=DEFAULT_TIMEOUT,
  1718. ),
  1719. ):
  1720. c.up(mz2)
  1721. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, mz2)
  1722. c.promote_mz(mz2)
  1723. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, mz2)
  1724. i += 1
  1725. mz1, mz2 = mz2, mz1
  1726. for thread in threads:
  1727. thread.join()
  1728. def workflow_ddl(c: Composition) -> None:
  1729. """Verify basic 0dt deployment flow with DDLs running during the 0dt deployment."""
  1730. c.down(destroy_volumes=True)
  1731. c.up(
  1732. "zookeeper",
  1733. "kafka",
  1734. "schema-registry",
  1735. "postgres",
  1736. "mysql",
  1737. "mz_old",
  1738. {"name": "testdrive", "persistent": True},
  1739. )
  1740. # Make sure cluster is owned by the system so it doesn't get dropped
  1741. # between testdrive runs.
  1742. c.sql(
  1743. """
  1744. DROP CLUSTER IF EXISTS cluster CASCADE;
  1745. CREATE CLUSTER cluster SIZE '2-1';
  1746. GRANT ALL ON CLUSTER cluster TO materialize;
  1747. ALTER SYSTEM SET cluster = cluster;
  1748. CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1;
  1749. GRANT ALL ON CLUSTER cluster_singlereplica TO materialize;
  1750. """,
  1751. service="mz_old",
  1752. port=6877,
  1753. user="mz_system",
  1754. )
  1755. # Inserts should be reflected when writes are allowed.
  1756. c.testdrive(
  1757. dedent(
  1758. f"""
  1759. > SET CLUSTER = cluster;
  1760. > CREATE TABLE t (a int, b int);
  1761. > CREATE CONNECTION IF NOT EXISTS kafka_conn FOR KAFKA BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL = 'PLAINTEXT';
  1762. > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  1763. > CREATE SINK kafka_sink
  1764. IN CLUSTER cluster
  1765. FROM t
  1766. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${{testdrive.seed}}')
  1767. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1768. ENVELOPE DEBEZIUM;
  1769. > INSERT INTO t VALUES (1, 2);
  1770. > CREATE INDEX t_idx ON t (a, b);
  1771. > CREATE MATERIALIZED VIEW mv AS SELECT sum(a) FROM t;
  1772. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  1773. > SELECT * FROM mv;
  1774. 1
  1775. > SELECT max(b) FROM t;
  1776. 2
  1777. $ kafka-create-topic topic=kafka
  1778. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  1779. key1A,key1B:value1A,value1B
  1780. > CREATE SOURCE kafka_source
  1781. IN CLUSTER cluster
  1782. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-${{testdrive.seed}}');
  1783. > CREATE TABLE kafka_source_tbl (key1, key2, value1, value2)
  1784. FROM SOURCE kafka_source (REFERENCE "testdrive-kafka-${{testdrive.seed}}")
  1785. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  1786. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  1787. ENVELOPE UPSERT;
  1788. > SELECT * FROM kafka_source_tbl
  1789. key1A key1B value1A value1B
  1790. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1791. CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
  1792. ALTER USER postgres1 WITH replication;
  1793. DROP PUBLICATION IF EXISTS postgres_source;
  1794. DROP TABLE IF EXISTS postgres_source_table;
  1795. CREATE TABLE postgres_source_table (f1 TEXT, f2 INTEGER);
  1796. ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;
  1797. INSERT INTO postgres_source_table SELECT 'A', 0;
  1798. CREATE PUBLICATION postgres_source FOR ALL TABLES;
  1799. > CREATE SECRET pgpass AS 'postgres';
  1800. > CREATE CONNECTION pg FOR POSTGRES
  1801. HOST 'postgres',
  1802. DATABASE postgres,
  1803. USER postgres1,
  1804. PASSWORD SECRET pgpass;
  1805. > CREATE SOURCE postgres_source
  1806. IN CLUSTER cluster
  1807. FROM POSTGRES CONNECTION pg
  1808. (PUBLICATION 'postgres_source');
  1809. > CREATE TABLE postgres_source_table FROM SOURCE postgres_source (REFERENCE postgres_source_table)
  1810. > SELECT * FROM postgres_source_table;
  1811. A 0
  1812. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  1813. $ mysql-execute name=mysql
  1814. # create the database if it does not exist yet but do not drop it
  1815. CREATE DATABASE IF NOT EXISTS public;
  1816. USE public;
  1817. CREATE USER mysql1 IDENTIFIED BY 'mysql';
  1818. GRANT REPLICATION SLAVE ON *.* TO mysql1;
  1819. GRANT ALL ON public.* TO mysql1;
  1820. CREATE TABLE mysql_source_table (f1 VARCHAR(32), f2 INTEGER);
  1821. INSERT INTO mysql_source_table VALUES ('A', 0);
  1822. > CREATE SECRET mysqlpass AS 'mysql';
  1823. > CREATE CONNECTION mysql TO MYSQL (
  1824. HOST 'mysql',
  1825. USER mysql1,
  1826. PASSWORD SECRET mysqlpass);
  1827. > CREATE SOURCE mysql_source1
  1828. IN CLUSTER cluster
  1829. FROM MYSQL CONNECTION mysql;
  1830. > CREATE TABLE mysql_source_table FROM SOURCE mysql_source1 (REFERENCE public.mysql_source_table);
  1831. > SELECT * FROM mysql_source_table;
  1832. A 0
  1833. $ kafka-verify-topic sink=materialize.public.kafka_sink
  1834. > CREATE SOURCE kafka_sink_source
  1835. IN CLUSTER cluster
  1836. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${{testdrive.seed}}')
  1837. > CREATE TABLE kafka_sink_source_tbl FROM SOURCE kafka_sink_source (REFERENCE "testdrive-kafka-sink-${{testdrive.seed}}")
  1838. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1839. ENVELOPE NONE
  1840. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  1841. <null> <null> 1 2
  1842. > CREATE SOURCE webhook_source
  1843. IN CLUSTER cluster_singlereplica
  1844. FROM WEBHOOK BODY FORMAT TEXT
  1845. $ webhook-append database=materialize schema=public name=webhook_source
  1846. AAA
  1847. > SELECT * FROM webhook_source
  1848. AAA
  1849. $ set-max-tries max-tries=1
  1850. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  1851. > BEGIN
  1852. > DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  1853. > FETCH ALL c WITH (timeout='5s');
  1854. <TIMESTAMP> 1 1
  1855. > COMMIT
  1856. """
  1857. )
  1858. )
  1859. # Start new Materialize in a new deploy generation, which will cause
  1860. # Materialize to boot in read-only mode.
  1861. c.up("mz_new")
  1862. # Verify against new Materialize that it is in read-only mode
  1863. with c.override(
  1864. Testdrive(
  1865. materialize_url="postgres://materialize@mz_new:6875",
  1866. materialize_url_internal="postgres://materialize@mz_new:6877",
  1867. mz_service="mz_new",
  1868. materialize_params={"cluster": "cluster"},
  1869. no_reset=True,
  1870. seed=1,
  1871. default_timeout=DEFAULT_TIMEOUT,
  1872. )
  1873. ):
  1874. c.up({"name": "testdrive", "persistent": True})
  1875. c.testdrive(
  1876. dedent(
  1877. f"""
  1878. $ webhook-append database=materialize schema=public name=webhook_source status=500
  1879. BBB
  1880. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  1881. key2A,key2B:value2A,value2B
  1882. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1883. INSERT INTO postgres_source_table VALUES ('B', 1);
  1884. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  1885. $ mysql-execute name=mysql
  1886. USE public;
  1887. INSERT INTO mysql_source_table VALUES ('B', 1);
  1888. > SET CLUSTER = cluster;
  1889. > SELECT 1
  1890. 1
  1891. ! INSERT INTO t VALUES (3, 4);
  1892. contains: cannot write in read-only mode
  1893. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  1894. > SELECT * FROM mv;
  1895. 1
  1896. # TODO: Currently hangs
  1897. # > SELECT max(b) FROM t;
  1898. # 2
  1899. > SELECT mz_unsafe.mz_sleep(5)
  1900. <null>
  1901. ! INSERT INTO t VALUES (5, 6);
  1902. contains: cannot write in read-only mode
  1903. > SELECT * FROM mv;
  1904. 1
  1905. ! DROP INDEX t_idx
  1906. contains: cannot write in read-only mode
  1907. ! CREATE INDEX t_idx2 ON t (a, b)
  1908. contains: cannot write in read-only mode
  1909. ! CREATE MATERIALIZED VIEW mv2 AS SELECT sum(a) FROM t;
  1910. contains: cannot write in read-only mode
  1911. $ set-regex match=(s\\d+|\\d{{13}}|[ ]{{12}}0|u\\d{{1,3}}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)) replacement=<>
  1912. > EXPLAIN TIMESTAMP FOR SELECT * FROM mv;
  1913. " query timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: true\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.mv (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User(6)])): [<> <>]\\n"
  1914. > SELECT * FROM kafka_source_tbl
  1915. key1A key1B value1A value1B
  1916. key2A key2B value2A value2B
  1917. > SELECT * FROM postgres_source_table
  1918. A 0
  1919. B 1
  1920. > SELECT * FROM mysql_source_table;
  1921. A 0
  1922. B 1
  1923. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  1924. <null> <null> 1 2
  1925. > SELECT * FROM webhook_source
  1926. AAA
  1927. $ set-max-tries max-tries=1
  1928. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  1929. > BEGIN
  1930. ! DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  1931. contains: cannot write in read-only mode
  1932. > ROLLBACK
  1933. # Actual subscribes without a declare still work though
  1934. > SUBSCRIBE (WITH a(x) AS (SELECT 'a') SELECT generate_series(1, 2), x FROM a)
  1935. <TIMESTAMP> 1 1 a
  1936. <TIMESTAMP> 1 2 a
  1937. """
  1938. )
  1939. )
  1940. # Run DDLs against the old Materialize, which should restart the new one
  1941. c.up({"name": "testdrive", "persistent": True})
  1942. c.testdrive(
  1943. dedent(
  1944. f"""
  1945. > CREATE TABLE t1 (a INT);
  1946. $ webhook-append database=materialize schema=public name=webhook_source
  1947. CCC
  1948. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  1949. key3A,key3B:value3A,value3B
  1950. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1951. INSERT INTO postgres_source_table VALUES ('C', 2);
  1952. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  1953. $ mysql-execute name=mysql
  1954. USE public;
  1955. INSERT INTO mysql_source_table VALUES ('C', 2);
  1956. > CREATE TABLE t2 (a INT);
  1957. > SET CLUSTER = cluster;
  1958. > SELECT 1
  1959. 1
  1960. > INSERT INTO t VALUES (3, 4);
  1961. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  1962. > SELECT * FROM mv;
  1963. 4
  1964. > SELECT max(b) FROM t;
  1965. 4
  1966. > SELECT mz_unsafe.mz_sleep(5)
  1967. <null>
  1968. > INSERT INTO t VALUES (5, 6);
  1969. > SELECT * FROM mv;
  1970. 9
  1971. > DROP INDEX t_idx
  1972. > CREATE INDEX t_idx2 ON t (a, b)
  1973. > CREATE MATERIALIZED VIEW mv2 AS SELECT sum(a) FROM t;
  1974. $ set-regex match=(s\\d+|\\d{{13}}|[ ]{{12}}0|u\\d{{1,3}}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)) replacement=<>
  1975. > EXPLAIN TIMESTAMP FOR SELECT * FROM mv;
  1976. " query timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: true\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.mv (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User(6)])): [<> <>]\\n"
  1977. > SELECT * FROM kafka_source_tbl
  1978. key1A key1B value1A value1B
  1979. key2A key2B value2A value2B
  1980. key3A key3B value3A value3B
  1981. > SELECT * FROM postgres_source_table
  1982. A 0
  1983. B 1
  1984. C 2
  1985. > SELECT * FROM mysql_source_table;
  1986. A 0
  1987. B 1
  1988. C 2
  1989. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  1990. <null> <null> 1 2
  1991. <null> <null> 3 4
  1992. <null> <null> 5 6
  1993. > SELECT * FROM webhook_source
  1994. AAA
  1995. CCC
  1996. > CREATE TABLE t3 (a INT);
  1997. $ set-max-tries max-tries=1
  1998. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  1999. > BEGIN
  2000. > DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  2001. > FETCH ALL c WITH (timeout='5s');
  2002. <TIMESTAMP> 1 1
  2003. <TIMESTAMP> 1 3
  2004. <TIMESTAMP> 1 5
  2005. > COMMIT
  2006. > CREATE TABLE t4 (a INT);
  2007. """
  2008. )
  2009. )
  2010. with c.override(
  2011. Testdrive(
  2012. materialize_url="postgres://materialize@mz_new:6875",
  2013. materialize_url_internal="postgres://materialize@mz_new:6877",
  2014. mz_service="mz_new",
  2015. materialize_params={"cluster": "cluster"},
  2016. no_reset=True,
  2017. seed=1,
  2018. default_timeout=DEFAULT_TIMEOUT,
  2019. )
  2020. ):
  2021. c.up({"name": "testdrive", "persistent": True})
  2022. c.testdrive(
  2023. dedent(
  2024. """
  2025. $ webhook-append database=materialize schema=public name=webhook_source status=500
  2026. DDD
  2027. > SET CLUSTER = cluster;
  2028. > SELECT 1
  2029. 1
  2030. ! INSERT INTO t VALUES (3, 4);
  2031. contains: cannot write in read-only mode
  2032. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  2033. > SELECT * FROM mv;
  2034. 9
  2035. > SELECT max(b) FROM t;
  2036. 6
  2037. > SELECT * FROM mv;
  2038. 9
  2039. > SELECT * FROM kafka_source_tbl
  2040. key1A key1B value1A value1B
  2041. key2A key2B value2A value2B
  2042. key3A key3B value3A value3B
  2043. > SELECT * FROM postgres_source_table
  2044. A 0
  2045. B 1
  2046. C 2
  2047. > SELECT * FROM mysql_source_table;
  2048. A 0
  2049. B 1
  2050. C 2
  2051. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  2052. <null> <null> 1 2
  2053. <null> <null> 3 4
  2054. <null> <null> 5 6
  2055. > SELECT * FROM webhook_source
  2056. AAA
  2057. CCC
  2058. $ set-max-tries max-tries=1
  2059. $ set-regex match=\\d{13,20} replacement=<TIMESTAMP>
  2060. > BEGIN
  2061. ! DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  2062. contains: cannot write in read-only mode
  2063. > ROLLBACK
  2064. # Actual subscribes without a declare still work though
  2065. > SUBSCRIBE (WITH a(x) AS (SELECT 'a') SELECT generate_series(1, 2), x FROM a)
  2066. <TIMESTAMP> 1 1 a
  2067. <TIMESTAMP> 1 2 a
  2068. """
  2069. )
  2070. )
  2071. c.await_mz_deployment_status(DeploymentStatus.READY_TO_PROMOTE, "mz_new")
  2072. c.promote_mz("mz_new")
  2073. # Give some time for Mz to restart after promotion
  2074. for i in range(10):
  2075. try:
  2076. c.sql("SELECT 1", service="mz_old")
  2077. except OperationalError as e:
  2078. assert (
  2079. "server closed the connection unexpectedly" in str(e)
  2080. or "Can't create a connection to host" in str(e)
  2081. or "Connection refused" in str(e)
  2082. ), f"Unexpected error: {e}"
  2083. except CommandFailureCausedUIError as e:
  2084. # service "mz_old" is not running
  2085. assert "running docker compose failed" in str(
  2086. e
  2087. ), f"Unexpected error: {e}"
  2088. break
  2089. time.sleep(1)
  2090. else:
  2091. raise RuntimeError("mz_old didn't stop running within 10 seconds")
  2092. for i in range(10):
  2093. try:
  2094. c.sql("SELECT 1", service="mz_new")
  2095. break
  2096. except CommandFailureCausedUIError:
  2097. pass
  2098. except OperationalError:
  2099. pass
  2100. time.sleep(1)
  2101. else:
  2102. raise RuntimeError("mz_new didn't come up within 10 seconds")
  2103. c.await_mz_deployment_status(DeploymentStatus.IS_LEADER, "mz_new")
  2104. c.testdrive(
  2105. dedent(
  2106. f"""
  2107. $ webhook-append database=materialize schema=public name=webhook_source
  2108. EEE
  2109. > SET CLUSTER = cluster;
  2110. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  2111. > CREATE MATERIALIZED VIEW mv3 AS SELECT sum(a) FROM t;
  2112. > SELECT * FROM mv;
  2113. 9
  2114. > SELECT * FROM mv2;
  2115. 9
  2116. > SELECT * FROM mv3;
  2117. 9
  2118. > SELECT max(b) FROM t;
  2119. 6
  2120. > INSERT INTO t VALUES (7, 8);
  2121. > SELECT * FROM mv;
  2122. 16
  2123. > SELECT * FROM mv2;
  2124. 16
  2125. > SELECT max(b) FROM t;
  2126. 8
  2127. > SELECT * FROM kafka_source_tbl
  2128. key1A key1B value1A value1B
  2129. key2A key2B value2A value2B
  2130. key3A key3B value3A value3B
  2131. > SELECT * FROM postgres_source_table
  2132. A 0
  2133. B 1
  2134. C 2
  2135. > SELECT * FROM mysql_source_table;
  2136. A 0
  2137. B 1
  2138. C 2
  2139. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=kafka
  2140. key4A,key4B:value4A,value4B
  2141. $ postgres-execute connection=postgres://postgres:postgres@postgres
  2142. INSERT INTO postgres_source_table VALUES ('D', 3);
  2143. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  2144. $ mysql-execute name=mysql
  2145. USE public;
  2146. INSERT INTO mysql_source_table VALUES ('D', 3);
  2147. > SELECT * FROM kafka_source_tbl
  2148. key1A key1B value1A value1B
  2149. key2A key2B value2A value2B
  2150. key3A key3B value3A value3B
  2151. key4A key4B value4A value4B
  2152. > SELECT * FROM postgres_source_table
  2153. A 0
  2154. B 1
  2155. C 2
  2156. D 3
  2157. > SELECT * FROM mysql_source_table;
  2158. A 0
  2159. B 1
  2160. C 2
  2161. D 3
  2162. > SELECT (before).a, (before).b, (after).a, (after).b FROM kafka_sink_source_tbl
  2163. <null> <null> 1 2
  2164. <null> <null> 3 4
  2165. <null> <null> 5 6
  2166. <null> <null> 7 8
  2167. > SELECT * FROM webhook_source
  2168. AAA
  2169. CCC
  2170. EEE
  2171. $ set-max-tries max-tries=1
  2172. $ set-regex match=\\d{{13,20}} replacement=<TIMESTAMP>
  2173. > BEGIN
  2174. > DECLARE c CURSOR FOR SUBSCRIBE (SELECT a FROM t);
  2175. > FETCH ALL c WITH (timeout='5s');
  2176. <TIMESTAMP> 1 1
  2177. <TIMESTAMP> 1 3
  2178. <TIMESTAMP> 1 5
  2179. <TIMESTAMP> 1 7
  2180. > COMMIT
  2181. """
  2182. )
  2183. )