mzcompose.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test the detection and reporting of source/sink errors by introducing a
  11. Disruption and then checking the mz_internal.mz_*_statuses tables
  12. """
  13. import random
  14. from collections.abc import Callable
  15. from dataclasses import dataclass
  16. from textwrap import dedent
  17. from typing import Protocol
  18. from materialize import buildkite
  19. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  20. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  21. from materialize.mzcompose.services.clusterd import Clusterd
  22. from materialize.mzcompose.services.kafka import Kafka
  23. from materialize.mzcompose.services.materialized import Materialized
  24. from materialize.mzcompose.services.postgres import Postgres
  25. from materialize.mzcompose.services.redpanda import Redpanda
  26. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  27. from materialize.mzcompose.services.testdrive import Testdrive
  28. from materialize.mzcompose.services.zookeeper import Zookeeper
  29. from materialize.util import selected_by_name
  30. def schema() -> str:
  31. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  32. SERVICES = [
  33. Redpanda(),
  34. Materialized(),
  35. Testdrive(),
  36. Clusterd(),
  37. Postgres(),
  38. Zookeeper(),
  39. Kafka(
  40. name="badkafka",
  41. environment=[
  42. "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
  43. # Setting the following values to 3 to trigger a failure
  44. # sets the transaction.state.log.min.isr config
  45. "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=3",
  46. # sets the transaction.state.log.replication.factor config
  47. "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3",
  48. ],
  49. ),
  50. SchemaRegistry(kafka_servers=[("badkafka", "9092")]),
  51. ]
  52. class Disruption(Protocol):
  53. name: str
  54. def run_test(self, c: Composition) -> None: ...
  55. @dataclass
  56. class KafkaTransactionLogGreaterThan1:
  57. name: str
  58. # override the `run_test`, as we need `Kafka` (not `Redpanda`), and need to change some other things
  59. def run_test(self, c: Composition) -> None:
  60. print(f"+++ Running disruption scenario {self.name}")
  61. seed = random.randint(0, 256**4)
  62. c.up({"name": "testdrive", "persistent": True})
  63. with c.override(
  64. Testdrive(
  65. no_reset=True,
  66. seed=seed,
  67. kafka_url="badkafka",
  68. entrypoint_extra=[
  69. "--initial-backoff=1s",
  70. "--backoff-factor=0",
  71. ],
  72. ),
  73. ):
  74. c.up("zookeeper", "badkafka", "schema-registry", "materialized")
  75. self.populate(c)
  76. self.assert_error(c, "transaction error", "running a single Kafka broker")
  77. c.down(sanity_restart_mz=False)
  78. def populate(self, c: Composition) -> None:
  79. # Create a source and a sink
  80. c.testdrive(
  81. dedent(
  82. """
  83. > CREATE CONNECTION kafka_conn
  84. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  85. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  86. URL '${testdrive.schema-registry-url}'
  87. );
  88. > CREATE TABLE sink_table (f1 INTEGER);
  89. > INSERT INTO sink_table VALUES (1);
  90. > INSERT INTO sink_table VALUES (2);
  91. > CREATE SINK kafka_sink FROM sink_table
  92. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${testdrive.seed}')
  93. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  94. ENVELOPE DEBEZIUM
  95. """
  96. ),
  97. )
  98. def assert_error(self, c: Composition, error: str, hint: str) -> None:
  99. c.testdrive(
  100. dedent(
  101. f"""
  102. $ set-sql-timeout duration=120s
  103. > SELECT bool_or(error ~* '{error}'), bool_or(details::json#>>'{{hints,0}}' ~* '{hint}')
  104. FROM mz_internal.mz_sink_status_history
  105. JOIN mz_sinks ON mz_sinks.id = sink_id
  106. WHERE name = 'kafka_sink' and status = 'stalled'
  107. true true
  108. """
  109. )
  110. )
  111. @dataclass
  112. class KafkaDisruption:
  113. name: str
  114. breakage: Callable
  115. expected_error: str
  116. fixage: Callable | None
  117. def run_test(self, c: Composition) -> None:
  118. print(f"+++ Running disruption scenario {self.name}")
  119. seed = random.randint(0, 256**4)
  120. c.down(destroy_volumes=True, sanity_restart_mz=False)
  121. c.up(
  122. "redpanda",
  123. "materialized",
  124. "clusterd",
  125. {"name": "testdrive", "persistent": True},
  126. )
  127. with c.override(
  128. Testdrive(
  129. no_reset=True,
  130. seed=seed,
  131. entrypoint_extra=["--initial-backoff=1s", "--backoff-factor=0"],
  132. )
  133. ):
  134. self.populate(c)
  135. self.breakage(c, seed)
  136. self.assert_error(c, self.expected_error)
  137. if self.fixage:
  138. self.fixage(c, seed)
  139. self.assert_recovery(c)
  140. def populate(self, c: Composition) -> None:
  141. # Create a source and a sink
  142. c.testdrive(
  143. dedent(
  144. """
  145. # We specify the progress topic explicitly so we can delete it in a test later,
  146. # and confirm that the sink stalls. (Deleting the output topic is not enough if
  147. # we're not actively publishing new messages to the sink.)
  148. > CREATE CONNECTION kafka_conn
  149. TO KAFKA (
  150. BROKER '${testdrive.kafka-addr}',
  151. SECURITY PROTOCOL PLAINTEXT,
  152. PROGRESS TOPIC 'testdrive-progress-topic-${testdrive.seed}'
  153. );
  154. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  155. URL '${testdrive.schema-registry-url}'
  156. );
  157. $ kafka-create-topic topic=source-topic
  158. $ kafka-ingest topic=source-topic format=bytes
  159. ABC
  160. > CREATE SOURCE source1
  161. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source-topic-${testdrive.seed}')
  162. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-source-topic-${testdrive.seed}")
  163. FORMAT BYTES
  164. ENVELOPE NONE
  165. # WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/database-issues/issues/4800
  166. # Ensure the source makes _real_ progress before we disrupt it. This also
  167. # ensures the sink makes progress, which is required to hit certain stalls.
  168. # As of implementing correctness property #2, this is required.
  169. > SELECT count(*) from source1_tbl
  170. 1
  171. > CREATE SINK sink1 FROM source1_tbl
  172. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-topic-${testdrive.seed}')
  173. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  174. ENVELOPE DEBEZIUM
  175. # WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/database-issues/issues/4800
  176. $ kafka-verify-topic sink=materialize.public.sink1
  177. """
  178. )
  179. )
  180. def assert_error(self, c: Composition, error: str) -> None:
  181. c.testdrive(
  182. dedent(
  183. f"""
  184. $ set-sql-timeout duration=60s
  185. > SELECT status, error ~* '{error}'
  186. FROM mz_internal.mz_source_statuses
  187. WHERE name = 'source1'
  188. stalled true
  189. """
  190. )
  191. )
  192. def assert_recovery(self, c: Composition) -> None:
  193. c.testdrive(
  194. dedent(
  195. """
  196. $ kafka-ingest topic=source-topic format=bytes
  197. ABC
  198. > SELECT COUNT(*) FROM source1_tbl;
  199. 2
  200. > SELECT status, error
  201. FROM mz_internal.mz_source_statuses
  202. WHERE name = 'source1'
  203. running <null>
  204. """
  205. )
  206. )
  207. @dataclass
  208. class KafkaSinkDisruption:
  209. name: str
  210. breakage: Callable
  211. expected_error: str
  212. fixage: Callable | None
  213. def run_test(self, c: Composition) -> None:
  214. print(f"+++ Running Kafka sink disruption scenario {self.name}")
  215. seed = random.randint(0, 256**4)
  216. c.down(destroy_volumes=True, sanity_restart_mz=False)
  217. c.up(
  218. "redpanda",
  219. "materialized",
  220. "clusterd",
  221. {"name": "testdrive", "persistent": True},
  222. )
  223. with c.override(
  224. Testdrive(
  225. no_reset=True,
  226. seed=seed,
  227. entrypoint_extra=["--initial-backoff=1s", "--backoff-factor=0"],
  228. )
  229. ):
  230. self.populate(c)
  231. self.breakage(c, seed)
  232. self.assert_error(c, self.expected_error)
  233. if self.fixage:
  234. self.fixage(c, seed)
  235. self.assert_recovery(c)
  236. def populate(self, c: Composition) -> None:
  237. # Create a source and a sink
  238. c.testdrive(
  239. schema()
  240. + dedent(
  241. """
  242. # We specify the progress topic explicitly so we can delete it in a test later,
  243. # and confirm that the sink stalls. (Deleting the output topic is not enough if
  244. # we're not actively publishing new messages to the sink.)
  245. > CREATE CONNECTION kafka_conn
  246. TO KAFKA (
  247. BROKER '${testdrive.kafka-addr}',
  248. SECURITY PROTOCOL PLAINTEXT,
  249. PROGRESS TOPIC 'testdrive-progress-topic-${testdrive.seed}'
  250. );
  251. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  252. URL '${testdrive.schema-registry-url}'
  253. );
  254. $ kafka-create-topic topic=source-topic
  255. $ kafka-ingest topic=source-topic format=avro schema=${schema}
  256. {"f1": "A"}
  257. > CREATE SOURCE source1
  258. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source-topic-${testdrive.seed}')
  259. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-source-topic-${testdrive.seed}")
  260. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  261. ENVELOPE NONE
  262. # WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/database-issues/issues/4800
  263. > CREATE SINK sink1 FROM source1_tbl
  264. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-topic-${testdrive.seed}')
  265. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  266. ENVELOPE DEBEZIUM
  267. # WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/database-issues/issues/4800
  268. $ kafka-verify-data format=avro sink=materialize.public.sink1 sort-messages=true
  269. {"before": null, "after": {"row":{"f1": "A"}}}
  270. """
  271. )
  272. )
  273. def assert_error(self, c: Composition, error: str) -> None:
  274. c.testdrive(
  275. dedent(
  276. f"""
  277. $ set-sql-timeout duration=60s
  278. # Sinks generally halt after receiving an error, which means that they may alternate
  279. # between `stalled` and `starting`. Instead of relying on the current status, we
  280. # check that there is a stalled status with the expected error.
  281. > SELECT bool_or(error ~* '{error}'), bool_or(details->'namespaced'->>'kafka' ~* '{error}')
  282. FROM mz_internal.mz_sink_status_history
  283. JOIN mz_sinks ON mz_sinks.id = sink_id
  284. WHERE name = 'sink1' and status = 'stalled'
  285. true true
  286. """
  287. )
  288. )
  289. def assert_recovery(self, c: Composition) -> None:
  290. c.testdrive(
  291. dedent(
  292. """
  293. > SELECT status, error
  294. FROM mz_internal.mz_sink_statuses
  295. WHERE name = 'sink1'
  296. running <null>
  297. """
  298. )
  299. )
  300. @dataclass
  301. class PgDisruption:
  302. name: str
  303. breakage: Callable
  304. expected_error: str
  305. fixage: Callable | None
  306. def run_test(self, c: Composition) -> None:
  307. print(f"+++ Running disruption scenario {self.name}")
  308. seed = random.randint(0, 256**4)
  309. c.down(destroy_volumes=True, sanity_restart_mz=False)
  310. c.up(
  311. "postgres",
  312. "materialized",
  313. "clusterd",
  314. {"name": "testdrive", "persistent": True},
  315. )
  316. with c.override(
  317. Testdrive(
  318. no_reset=True,
  319. seed=seed,
  320. entrypoint_extra=["--initial-backoff=1s", "--backoff-factor=0"],
  321. )
  322. ):
  323. self.populate(c)
  324. self.breakage(c, seed)
  325. self.assert_error(c, self.expected_error)
  326. if self.fixage:
  327. self.fixage(c, seed)
  328. self.assert_recovery(c)
  329. def populate(self, c: Composition) -> None:
  330. # Create a source and a sink
  331. c.testdrive(
  332. dedent(
  333. """
  334. > CREATE SECRET pgpass AS 'postgres'
  335. > CREATE CONNECTION pg TO POSTGRES (
  336. HOST postgres,
  337. DATABASE postgres,
  338. USER postgres,
  339. PASSWORD SECRET pgpass
  340. )
  341. $ postgres-execute connection=postgres://postgres:postgres@postgres
  342. ALTER USER postgres WITH replication;
  343. DROP SCHEMA IF EXISTS public CASCADE;
  344. CREATE SCHEMA public;
  345. DROP PUBLICATION IF EXISTS mz_source;
  346. CREATE PUBLICATION mz_source FOR ALL TABLES;
  347. CREATE TABLE source1 (f1 INTEGER PRIMARY KEY, f2 integer[]);
  348. INSERT INTO source1 VALUES (1, NULL);
  349. ALTER TABLE source1 REPLICA IDENTITY FULL;
  350. INSERT INTO source1 VALUES (2, NULL);
  351. > CREATE SOURCE "pg_source"
  352. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  353. > CREATE TABLE "source1_tbl" FROM SOURCE "pg_source" (REFERENCE "source1");
  354. """
  355. )
  356. )
  357. def assert_error(self, c: Composition, error: str) -> None:
  358. c.testdrive(
  359. dedent(
  360. f"""
  361. $ set-sql-timeout duration=60s
  362. # Postgres sources may halt after receiving an error, which means that they may alternate
  363. # between `stalled` and `starting`. Instead of relying on the current status, we
  364. # check that the latest stall has the error we expect.
  365. > SELECT error ~* '{error}'
  366. FROM mz_internal.mz_source_status_history
  367. JOIN (
  368. SELECT name, id FROM mz_sources UNION SELECT name, id FROM mz_tables
  369. ) ON id = source_id
  370. WHERE (
  371. name = 'source1_tbl' OR name = 'pg_source'
  372. ) AND (status = 'stalled' OR status = 'ceased')
  373. ORDER BY occurred_at DESC LIMIT 1;
  374. true
  375. """
  376. )
  377. )
  378. def assert_recovery(self, c: Composition) -> None:
  379. c.testdrive(
  380. dedent(
  381. """
  382. $ postgres-execute connection=postgres://postgres:postgres@postgres
  383. INSERT INTO source1 VALUES (3);
  384. > SELECT status, error
  385. FROM mz_internal.mz_source_statuses
  386. WHERE name = 'source1_tbl'
  387. AND type = 'table'
  388. running <null>
  389. > SELECT f1 FROM source1_tbl;
  390. 1
  391. 2
  392. 3
  393. """
  394. )
  395. )
  396. disruptions: list[Disruption] = [
  397. KafkaSinkDisruption(
  398. name="delete-sink-topic-delete-progress-fix",
  399. breakage=lambda c, seed: delete_sink_topic(c, seed),
  400. expected_error="topic testdrive-sink-topic-\\d+ does not exist",
  401. # If we delete the progress topic, we will re-create the sink as if it is new.
  402. fixage=lambda c, seed: c.exec(
  403. "redpanda", "rpk", "topic", "delete", f"testdrive-progress-topic-{seed}"
  404. ),
  405. ),
  406. KafkaSinkDisruption(
  407. name="delete-sink-topic-recreate-topic-fix",
  408. breakage=lambda c, seed: delete_sink_topic(c, seed),
  409. expected_error="topic testdrive-sink-topic-\\d+ does not exist",
  410. # If we recreate the sink topic, the sink will work but will likely be inconsistent.
  411. fixage=lambda c, seed: c.exec(
  412. "redpanda", "rpk", "topic", "create", f"testdrive-sink-topic-{seed}"
  413. ),
  414. ),
  415. KafkaDisruption(
  416. name="delete-source-topic",
  417. breakage=lambda c, seed: c.exec(
  418. "redpanda", "rpk", "topic", "delete", f"testdrive-source-topic-{seed}"
  419. ),
  420. expected_error="UnknownTopicOrPartition|topic",
  421. fixage=None,
  422. # Re-creating the topic does not restart the source
  423. # fixage=lambda c,seed: redpanda_topics(c, "create", seed),
  424. ),
  425. # docker compose pause has become unreliable recently
  426. # KafkaDisruption(
  427. # name="pause-redpanda",
  428. # breakage=lambda c, _: c.pause("redpanda"),
  429. # expected_error="OperationTimedOut|BrokerTransportFailure|transaction",
  430. # fixage=lambda c, _: c.unpause("redpanda"),
  431. # ),
  432. KafkaDisruption(
  433. name="sigstop-redpanda",
  434. breakage=lambda c, _: c.kill("redpanda", signal="SIGSTOP", wait=False),
  435. expected_error="OperationTimedOut|BrokerTransportFailure|transaction",
  436. fixage=lambda c, _: c.kill("redpanda", signal="SIGCONT", wait=False),
  437. ),
  438. KafkaDisruption(
  439. name="kill-redpanda",
  440. breakage=lambda c, _: c.kill("redpanda"),
  441. expected_error="BrokerTransportFailure|Resolve|Broker transport failure|Timed out",
  442. fixage=lambda c, _: c.up("redpanda"),
  443. ),
  444. # https://github.com/MaterializeInc/database-issues/issues/4800
  445. # KafkaDisruption(
  446. # name="kill-redpanda-clusterd",
  447. # breakage=lambda c, _: c.kill("redpanda", "clusterd"),
  448. # expected_error="???",
  449. # fixage=lambda c, _: c.up("redpanda", "clusterd"),
  450. # ),
  451. PgDisruption(
  452. name="kill-postgres",
  453. breakage=lambda c, _: c.kill("postgres"),
  454. expected_error="error connecting to server|connection closed|deadline has elapsed|failed to lookup address information",
  455. fixage=lambda c, _: c.up("postgres"),
  456. ),
  457. PgDisruption(
  458. name="drop-publication-postgres",
  459. breakage=lambda c, _: c.testdrive(
  460. dedent(
  461. """
  462. $ postgres-execute connection=postgres://postgres:postgres@postgres
  463. DROP PUBLICATION mz_source;
  464. INSERT INTO source1 VALUES (3, NULL);
  465. """
  466. )
  467. ),
  468. expected_error="publication .+ does not exist",
  469. # Can't recover when publication state is deleted.
  470. fixage=None,
  471. ),
  472. PgDisruption(
  473. name="alter-postgres",
  474. breakage=lambda c, _: alter_pg_table(c),
  475. expected_error="source table source1 with oid .+ has been altered",
  476. fixage=None,
  477. ),
  478. PgDisruption(
  479. name="unsupported-postgres",
  480. breakage=lambda c, _: unsupported_pg_table(c),
  481. expected_error="invalid input syntax for type array",
  482. fixage=None,
  483. ),
  484. # One-off disruption with a badly configured kafka sink
  485. KafkaTransactionLogGreaterThan1(
  486. name="bad-kafka-sink",
  487. ),
  488. ]
  489. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  490. parser.add_argument("disruptions", nargs="*", default=[d.name for d in disruptions])
  491. args = parser.parse_args()
  492. sharded_disruptions = buildkite.shard_list(
  493. list(selected_by_name(args.disruptions, disruptions)), lambda s: s.name
  494. )
  495. print(
  496. f"Disruptions in shard with index {buildkite.get_parallelism_index()}: {[d.name for d in sharded_disruptions]}"
  497. )
  498. for disruption in sharded_disruptions:
  499. c.override_current_testcase_name(
  500. f"Disruption '{disruption.name}' in workflow_default"
  501. )
  502. disruption.run_test(c)
  503. def delete_sink_topic(c: Composition, seed: int) -> None:
  504. c.exec("redpanda", "rpk", "topic", "delete", f"testdrive-sink-topic-{seed}")
  505. # Write new data to source otherwise nothing will encounter the missing topic
  506. c.testdrive(
  507. schema()
  508. + dedent(
  509. """
  510. $ kafka-ingest topic=source-topic format=avro schema=${schema}
  511. {"f1": "B"}
  512. > SELECT COUNT(*) FROM source1_tbl;
  513. 2
  514. """
  515. )
  516. )
  517. def alter_pg_table(c: Composition) -> None:
  518. c.testdrive(
  519. dedent(
  520. """
  521. $ postgres-execute connection=postgres://postgres:postgres@postgres
  522. ALTER TABLE source1 DROP COLUMN f1;
  523. INSERT INTO source1 VALUES (NULL)
  524. """
  525. )
  526. )
  527. def unsupported_pg_table(c: Composition) -> None:
  528. c.testdrive(
  529. dedent(
  530. """
  531. $ postgres-execute connection=postgres://postgres:postgres@postgres
  532. INSERT INTO source1 VALUES (3, '[2:3]={2,2}')
  533. """
  534. )
  535. )