mzcompose.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test replica isolation by introducing faults of various kinds in replica1 and
  11. then making sure that the cluster continues to operate properly
  12. """
  13. import time
  14. from collections.abc import Callable
  15. from dataclasses import dataclass
  16. from textwrap import dedent
  17. from typing import Any
  18. from psycopg import Cursor
  19. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  20. from materialize.mzcompose.services.clusterd import Clusterd
  21. from materialize.mzcompose.services.kafka import Kafka
  22. from materialize.mzcompose.services.localstack import Localstack
  23. from materialize.mzcompose.services.materialized import Materialized
  24. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  25. from materialize.mzcompose.services.testdrive import Testdrive
  26. from materialize.mzcompose.services.zookeeper import Zookeeper
  27. from materialize.util import selected_by_name
  28. SERVICES = [
  29. Zookeeper(),
  30. Kafka(),
  31. SchemaRegistry(),
  32. Localstack(),
  33. Materialized(
  34. additional_system_parameter_defaults={
  35. "log_filter": "mz_cluster::client=debug,info",
  36. },
  37. ),
  38. Clusterd(name="clusterd_1_1"),
  39. Clusterd(name="clusterd_1_2"),
  40. Clusterd(name="clusterd_2_1"),
  41. Clusterd(name="clusterd_2_2"),
  42. Testdrive(),
  43. ]
  44. class AllowCompactionCheck:
  45. # replica: a string describing the SQL accessible name of the replica. Example: "cluster1.replica1"
  46. # host: docker container name from which to check the log. Example: "clusterd_1_1"
  47. def __init__(self, replica: str, host: str):
  48. assert "." in replica
  49. self.replica = replica
  50. self.host = host
  51. self.ids: list[str] | None = None
  52. self.missing_ids: list[str] = []
  53. self.satisfied = False
  54. def find_ids(self, c: Composition) -> None:
  55. raise NotImplementedError
  56. def print_error(self) -> None:
  57. raise NotImplementedError
  58. def check_log(self, c: Composition) -> None:
  59. self.find_ids(c)
  60. assert self.ids is not None
  61. log: str = c.invoke("logs", self.host, capture=True).stdout
  62. self.missing_ids = []
  63. self.satisfied = all([self._log_contains_id(log, x) for x in self.ids])
  64. def replica_id(self, c: Composition) -> str:
  65. cursor = c.sql_cursor()
  66. (cluster, replica) = self.replica.split(".")
  67. cursor.execute(
  68. f"""
  69. SELECT mz_cluster_replicas.id FROM mz_clusters, mz_cluster_replicas
  70. WHERE cluster_id = mz_clusters.id AND mz_clusters.name = '{cluster}'
  71. AND mz_cluster_replicas.name = '{replica}'""".encode(),
  72. )
  73. return str(get_single_value_from_cursor(cursor))
  74. def cluster_id(self, c: Composition) -> str:
  75. cursor = c.sql_cursor()
  76. cluster = self.replica.split(".")[0]
  77. cursor.execute(
  78. f"SELECT id FROM mz_clusters WHERE mz_clusters.name = '{cluster}'".encode(),
  79. )
  80. return str(get_single_value_from_cursor(cursor))
  81. def _log_contains_id(self, log: str, the_id: str) -> bool:
  82. for line in [
  83. x for x in log.splitlines() if "ClusterClient send=AllowCompaction" in x
  84. ]:
  85. if the_id in line:
  86. return True
  87. self.missing_ids.append(the_id)
  88. return False
  89. @staticmethod
  90. def _format_id(iid: str) -> str:
  91. if iid.startswith("si"):
  92. return "IntrospectionSourceIndex(" + iid[2:] + ")"
  93. elif iid.startswith("s"):
  94. return "System(" + iid[1:] + ")"
  95. elif iid.startswith("u"):
  96. return "User(" + iid[1:] + ")"
  97. raise RuntimeError(f"Unexpected iid: {iid}")
  98. @staticmethod
  99. def all_checks(replica: str, host: str) -> list["AllowCompactionCheck"]:
  100. return [
  101. MaterializedView(replica, host),
  102. ArrangedIntro(replica, host),
  103. ArrangedIndex(replica, host),
  104. ]
  105. class MaterializedView(AllowCompactionCheck):
  106. """
  107. Checks that clusterd receives AllowCompaction commands for materialized views.
  108. For materialized views we hold back compaction until slow replicas have caught
  109. up. Hence we dont expect these messages if there is another failing replica in
  110. the cluster.
  111. """
  112. def find_ids(self, c: Composition) -> None:
  113. cursor = c.sql_cursor()
  114. cursor.execute(
  115. """
  116. SELECT id,shard_id from mz_internal.mz_storage_shards, mz_catalog.mz_materialized_views
  117. WHERE object_id = id AND name = 'v3';
  118. """
  119. )
  120. self.ids = [self._format_id(get_single_value_from_cursor(cursor))]
  121. def print_error(self) -> None:
  122. print(
  123. f"!! AllowCompaction not found for materialized view with ids {self.missing_ids}"
  124. )
  125. class ArrangedIntro(AllowCompactionCheck):
  126. """
  127. Checks that clusterd receives AllowCompaction commands for introspection sources.
  128. This is purely per replica property. Other failing replicas in the same cluster should
  129. not influence the result of this test.
  130. """
  131. def find_ids(self, c: Composition) -> None:
  132. cluster_id = self.cluster_id(c)
  133. cursor = c.sql_cursor()
  134. cursor.execute(
  135. f"""
  136. SELECT idx.id from mz_catalog.mz_sources AS src, mz_catalog.mz_indexes AS idx
  137. WHERE src.id = idx.on_id AND idx.cluster_id = '{cluster_id}'""".encode()
  138. )
  139. self.ids = [self._format_id(x[0]) for x in cursor.fetchall()]
  140. def print_error(self) -> None:
  141. print(
  142. f"!! AllowCompaction not found for introspection with ids {self.missing_ids}"
  143. )
  144. class ArrangedIndex(AllowCompactionCheck):
  145. """
  146. Checks that the arrangement of an index receive AllowCompaction.
  147. For arrangements, we hold back compaction until all replicas have caught up. Thus, a failing
  148. replica will not guarantee these messages anymore.
  149. """
  150. def find_ids(self, c: Composition) -> None:
  151. cursor = c.sql_cursor()
  152. cursor.execute(
  153. """
  154. SELECT idx.id FROM mz_catalog.mz_views AS views, mz_catalog.mz_indexes AS idx
  155. WHERE views.name = 'ct1' AND views.id = idx.on_id
  156. """
  157. )
  158. self.ids = [self._format_id(x[0]) for x in cursor.fetchall()]
  159. def print_error(self) -> None:
  160. print(
  161. f"!! AllowCompaction not found for index arrangement with ids {self.missing_ids}"
  162. )
  163. def populate(c: Composition) -> None:
  164. # Create some database objects
  165. c.testdrive(
  166. dedent(
  167. """
  168. > CREATE TABLE t1 (f1 INTEGER);
  169. > INSERT INTO t1 SELECT * FROM generate_series(1, 10);
  170. > CREATE VIEW ct1 AS SELECT COUNT(*) AS c1 FROM t1;
  171. > CREATE DEFAULT INDEX ON ct1;
  172. > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c1 FROM t1;
  173. > CREATE TABLE ten (f1 INTEGER);
  174. > INSERT INTO ten SELECT * FROM generate_series(1, 10);
  175. > CREATE MATERIALIZED VIEW expensive AS SELECT (a1.f1 * 1) +
  176. (a2.f1 * 10) +
  177. (a3.f1 * 100) +
  178. (a4.f1 * 1000) +
  179. (a5.f1 * 10000) +
  180. (a6.f1 * 100000) +
  181. (a7.f1 * 1000000)
  182. FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4, ten AS a5, ten AS a6, ten AS a7;
  183. $ kafka-create-topic topic=source1
  184. $ kafka-ingest format=bytes topic=source1 repeat=1000000
  185. A${kafka-ingest.iteration}
  186. > CREATE CLUSTER c SIZE '1';
  187. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  188. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  189. > CREATE SOURCE source1
  190. IN CLUSTER c
  191. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source1-${testdrive.seed}')
  192. FORMAT BYTES
  193. > CREATE MATERIALIZED VIEW v2 AS SELECT COUNT(*) FROM source1
  194. """
  195. ),
  196. )
  197. def restart_replica(c: Composition) -> None:
  198. c.kill("clusterd_1_1", "clusterd_1_2")
  199. c.up("clusterd_1_1", "clusterd_1_2")
  200. def restart_environmentd(c: Composition) -> None:
  201. c.kill("materialized")
  202. c.up("materialized")
  203. def drop_create_replica(c: Composition) -> None:
  204. c.sql(
  205. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  206. port=6877,
  207. user="mz_system",
  208. )
  209. c.testdrive(
  210. dedent(
  211. """
  212. > DROP CLUSTER REPLICA cluster1.replica1
  213. > CREATE CLUSTER REPLICA cluster1.replica3
  214. STORAGECTL ADDRESSES ['clusterd_1_1:2100', 'clusterd_1_2:2100'],
  215. STORAGE ADDRESSES ['clusterd_1_1:2103', 'clusterd_1_2:2103'],
  216. COMPUTECTL ADDRESSES ['clusterd_1_1:2101', 'clusterd_1_2:2101'],
  217. COMPUTE ADDRESSES ['clusterd_1_1:2102', 'clusterd_1_2:2102']
  218. """
  219. )
  220. )
  221. def create_invalid_replica(c: Composition) -> None:
  222. c.sql(
  223. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  224. port=6877,
  225. user="mz_system",
  226. )
  227. c.testdrive(
  228. dedent(
  229. """
  230. > CREATE CLUSTER REPLICA cluster1.replica3
  231. STORAGECTL ADDRESSES ['no_such_host:2100'],
  232. STORAGE ADDRESSES ['no_such_host:2103'],
  233. COMPUTECTL ADDRESSES ['no_such_host:2101'],
  234. COMPUTE ADDRESSES ['no_such_host:2102']
  235. """
  236. )
  237. )
  238. def validate(c: Composition) -> None:
  239. # Validate that the cluster continues to operate
  240. c.testdrive(
  241. dedent(
  242. """
  243. # Dataflows
  244. > SELECT * FROM ct1;
  245. 10
  246. > SELECT * FROM v1;
  247. 10
  248. # Existing sources
  249. $ kafka-ingest format=bytes topic=source1 repeat=1000000
  250. B${kafka-ingest.iteration}
  251. > SELECT * FROM v2;
  252. 2000000
  253. # Existing tables
  254. > INSERT INTO t1 VALUES (20);
  255. > SELECT * FROM ct1;
  256. 11
  257. > SELECT * FROM v1;
  258. 11
  259. # New materialized views
  260. > CREATE MATERIALIZED VIEW v3 AS SELECT COUNT(*) AS c1 FROM t1;
  261. > SELECT * FROM v3;
  262. 11
  263. # New tables
  264. > CREATE TABLE t2 (f1 INTEGER);
  265. > INSERT INTO t2 SELECT * FROM t1;
  266. > SELECT COUNT(*) FROM t2;
  267. 11
  268. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  269. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  270. # New sources
  271. > CREATE SOURCE source2
  272. IN CLUSTER c
  273. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source1-${testdrive.seed}')
  274. FORMAT BYTES
  275. > SELECT COUNT(*) FROM source2
  276. 2000000
  277. """
  278. ),
  279. )
  280. def validate_introspection_compaction(
  281. c: Composition, checks: list[AllowCompactionCheck]
  282. ) -> None:
  283. # Validate that the AllowCompaction commands arrive at the corresponding replicas.
  284. # Allow up to 10 seconds for the compaction the command to appear
  285. start = time.time()
  286. while time.time() < start + 5:
  287. for check in checks:
  288. check.check_log(c)
  289. if all([check.satisfied for check in checks]):
  290. return
  291. for check in checks:
  292. if not check.satisfied:
  293. check.print_error()
  294. assert all([check.satisfied for check in checks])
  295. @dataclass
  296. class Disruption:
  297. name: str
  298. disruption: Callable
  299. compaction_checks: list[AllowCompactionCheck]
  300. disruptions = [
  301. Disruption(
  302. name="none",
  303. disruption=lambda c: None,
  304. compaction_checks=AllowCompactionCheck.all_checks(
  305. "cluster1.replica1", "clusterd_1_1"
  306. )
  307. + AllowCompactionCheck.all_checks("cluster1.replica2", "clusterd_2_1"),
  308. ),
  309. Disruption(
  310. name="drop-create-replica",
  311. disruption=lambda c: drop_create_replica(c),
  312. compaction_checks=[
  313. ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
  314. ],
  315. ),
  316. Disruption(
  317. name="create-invalid-replica",
  318. disruption=lambda c: create_invalid_replica(c),
  319. compaction_checks=[
  320. ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
  321. ],
  322. ),
  323. Disruption(
  324. name="restart-replica",
  325. disruption=lambda c: restart_replica(c),
  326. compaction_checks=AllowCompactionCheck.all_checks(
  327. "cluster1.replica1", "clusterd_1_1"
  328. )
  329. + AllowCompactionCheck.all_checks("cluster1.replica2", "clusterd_2_1"),
  330. ),
  331. Disruption(
  332. name="pause-one-clusterd",
  333. disruption=lambda c: c.pause("clusterd_1_1"),
  334. compaction_checks=[
  335. ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
  336. ],
  337. ),
  338. Disruption(
  339. name="kill-replica",
  340. disruption=lambda c: c.kill("clusterd_1_1", "clusterd_1_2"),
  341. compaction_checks=[
  342. ArrangedIntro("cluster1.replica2", "clusterd_2_1"),
  343. ],
  344. ),
  345. Disruption(
  346. name="drop-replica",
  347. disruption=lambda c: c.testdrive("> DROP CLUSTER REPLICA cluster1.replica1"),
  348. compaction_checks=AllowCompactionCheck.all_checks(
  349. "cluster1.replica2", "clusterd_2_1"
  350. ),
  351. ),
  352. Disruption(
  353. name="restart-environmentd",
  354. disruption=restart_environmentd,
  355. compaction_checks=AllowCompactionCheck.all_checks(
  356. "cluster1.replica1", "clusterd_1_1"
  357. )
  358. + AllowCompactionCheck.all_checks("cluster1.replica2", "clusterd_2_1"),
  359. ),
  360. ]
  361. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  362. parser.add_argument("disruptions", nargs="*", default=[d.name for d in disruptions])
  363. args = parser.parse_args()
  364. c.up("zookeeper", "kafka", "schema-registry", "localstack")
  365. for id, disruption in enumerate(selected_by_name(args.disruptions, disruptions)):
  366. run_test(c, disruption, id)
  367. def run_test(c: Composition, disruption: Disruption, id: int) -> None:
  368. # Cleanup here instead of at the end of the test to make sure we keep state
  369. # and logs in case something goes wrong
  370. cleanup_list = [
  371. "materialized",
  372. "testdrive",
  373. "clusterd_1_1",
  374. "clusterd_1_2",
  375. "clusterd_2_1",
  376. "clusterd_2_2",
  377. ]
  378. c.kill(*cleanup_list)
  379. c.rm(*cleanup_list, destroy_volumes=True)
  380. c.rm_volumes("mzdata")
  381. print(f"+++ Running disruption scenario {disruption.name}")
  382. with c.override(
  383. Testdrive(
  384. no_reset=True,
  385. materialize_params={"cluster": "cluster1"},
  386. seed=id,
  387. default_timeout="300s",
  388. ),
  389. Clusterd(
  390. name="clusterd_1_1",
  391. process_names=["clusterd_1_1", "clusterd_1_2"],
  392. ),
  393. Clusterd(
  394. name="clusterd_1_2",
  395. process_names=["clusterd_1_1", "clusterd_1_2"],
  396. ),
  397. Clusterd(
  398. name="clusterd_2_1",
  399. process_names=["clusterd_2_1", "clusterd_2_2"],
  400. ),
  401. Clusterd(
  402. name="clusterd_2_2",
  403. process_names=["clusterd_2_1", "clusterd_2_2"],
  404. ),
  405. ):
  406. c.up(
  407. "materialized",
  408. "clusterd_1_1",
  409. "clusterd_1_2",
  410. "clusterd_2_1",
  411. "clusterd_2_2",
  412. {"name": "testdrive", "persistent": True},
  413. )
  414. c.sql(
  415. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  416. port=6877,
  417. user="mz_system",
  418. )
  419. if any(
  420. isinstance(check, ArrangedIntro) for check in disruption.compaction_checks
  421. ):
  422. # Disable introspection subscribes because they break the
  423. # `ArrangedIntro` check by disabling compaction of logging indexes
  424. # on all replicas if one of the replicas is failing. That's because
  425. # of a defect of replica-targeted subscribes: They get installed on
  426. # all replicas but only the targeted replica can drive the write
  427. # frontier forward. If the targeted replica is crashing, the write
  428. # frontier cannot advance and thus the read frontier cannot either.
  429. #
  430. # TODO(database-issues#8091): Fix this by installing targeted subscribes only on the
  431. # targeted replica.
  432. c.sql(
  433. "ALTER SYSTEM SET enable_introspection_subscribes = false;",
  434. port=6877,
  435. user="mz_system",
  436. )
  437. c.sql(
  438. """
  439. CREATE CLUSTER cluster1 REPLICAS (
  440. replica1 (
  441. STORAGECTL ADDRESSES ['clusterd_1_1:2100', 'clusterd_1_2:2100'],
  442. STORAGE ADDRESSES ['clusterd_1_1:2103', 'clusterd_1_2:2103'],
  443. COMPUTECTL ADDRESSES ['clusterd_1_1:2101', 'clusterd_1_2:2101'],
  444. COMPUTE ADDRESSES ['clusterd_1_1:2102', 'clusterd_1_2:2102']
  445. ),
  446. replica2 (
  447. STORAGECTL ADDRESSES ['clusterd_2_1:2100', 'clusterd_2_2:2100'],
  448. STORAGE ADDRESSES ['clusterd_2_1:2103', 'clusterd_2_2:2103'],
  449. COMPUTECTL ADDRESSES ['clusterd_2_1:2101', 'clusterd_2_2:2101'],
  450. COMPUTE ADDRESSES ['clusterd_2_1:2102', 'clusterd_2_2:2102']
  451. )
  452. )
  453. """
  454. )
  455. populate(c)
  456. # Disrupt replica1 by some means
  457. disruption.disruption(c)
  458. validate(c)
  459. validate_introspection_compaction(c, disruption.compaction_checks)
  460. def get_single_value_from_cursor(cursor: Cursor) -> Any:
  461. result = cursor.fetchone()
  462. assert result is not None
  463. return result[0]