mzcompose.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test cluster isolation by introducing faults of various kinds in cluster1 and
  11. then making sure that cluster2 continues to operate properly
  12. """
  13. from collections.abc import Callable
  14. from dataclasses import dataclass
  15. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  16. from materialize.mzcompose.services.clusterd import Clusterd
  17. from materialize.mzcompose.services.kafka import Kafka
  18. from materialize.mzcompose.services.materialized import Materialized
  19. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  20. from materialize.mzcompose.services.testdrive import Testdrive
  21. from materialize.mzcompose.services.zookeeper import Zookeeper
  22. from materialize.ui import UIError
  23. from materialize.util import selected_by_name
  24. SERVICES = [
  25. Zookeeper(),
  26. Kafka(),
  27. SchemaRegistry(),
  28. # We use mz_panic() in some test scenarios, so environmentd must stay up.
  29. Materialized(
  30. propagate_crashes=False,
  31. additional_system_parameter_defaults={
  32. "unsafe_enable_unsafe_functions": "true",
  33. "unsafe_enable_unstable_dependencies": "true",
  34. },
  35. ),
  36. Clusterd(name="clusterd_1_1"),
  37. Clusterd(name="clusterd_1_2"),
  38. Clusterd(name="clusterd_2_1"),
  39. Clusterd(name="clusterd_2_2"),
  40. Testdrive(),
  41. ]
  42. @dataclass
  43. class Disruption:
  44. name: str
  45. disruption: Callable
  46. disruptions = [
  47. Disruption(
  48. name="pause-one-cluster",
  49. disruption=lambda c: c.pause("clusterd_1_1"),
  50. ),
  51. Disruption(
  52. name="kill-all-clusters",
  53. disruption=lambda c: c.kill("clusterd_1_1", "clusterd_1_2"),
  54. ),
  55. Disruption(
  56. name="pause-in-materialized-view",
  57. disruption=lambda c: c.testdrive(
  58. """
  59. > SET cluster=cluster1
  60. > CREATE TABLE sleep_table (sleep INTEGER);
  61. > CREATE MATERIALIZED VIEW sleep_view AS SELECT mz_unsafe.mz_sleep(sleep) FROM sleep_table;
  62. > INSERT INTO sleep_table SELECT 1200 FROM generate_series(1,32)
  63. """,
  64. ),
  65. ),
  66. Disruption(
  67. name="drop-cluster",
  68. disruption=lambda c: c.testdrive(
  69. """
  70. > DROP CLUSTER cluster1 CASCADE
  71. """,
  72. ),
  73. ),
  74. Disruption(
  75. name="panic-in-insert-select",
  76. disruption=lambda c: c.testdrive(
  77. """
  78. > SET cluster=cluster1
  79. > SET statement_timeout='1s'
  80. > CREATE TABLE panic_table (f1 TEXT);
  81. > INSERT INTO panic_table VALUES ('forced panic');
  82. ! INSERT INTO panic_table SELECT mz_unsafe.mz_panic(f1) FROM panic_table;
  83. contains: statement timeout
  84. """,
  85. ),
  86. ),
  87. ]
  88. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  89. parser.add_argument("disruptions", nargs="*", default=[d.name for d in disruptions])
  90. args = parser.parse_args()
  91. c.up("zookeeper", "kafka", "schema-registry")
  92. for id, disruption in enumerate(selected_by_name(args.disruptions, disruptions)):
  93. run_test(c, disruption, id)
  94. def populate(c: Composition) -> None:
  95. # Create some database objects
  96. c.testdrive(
  97. """
  98. > SET cluster=cluster1
  99. > DROP TABLE IF EXISTS t1 CASCADE;
  100. > CREATE TABLE t1 (f1 TEXT);
  101. > INSERT INTO t1 VALUES (1), (2);
  102. > CREATE VIEW v1 AS SELECT COUNT(*) AS c1 FROM t1;
  103. > CREATE DEFAULT INDEX i1 IN CLUSTER cluster2 ON v1;
  104. """,
  105. )
  106. def validate(c: Composition) -> None:
  107. # Validate that cluster2 continues to operate
  108. c.testdrive(
  109. """
  110. # Dataflows
  111. $ set-regex match=\\d{13} replacement=<TIMESTAMP>
  112. > SET cluster=cluster2
  113. > SELECT * FROM v1;
  114. 2
  115. # Tables
  116. > INSERT INTO t1 VALUES (3);
  117. > SELECT * FROM t1;
  118. 1
  119. 2
  120. 3
  121. # Introspection tables
  122. > SELECT name FROM (SHOW CLUSTERS LIKE 'cluster2')
  123. cluster2
  124. > SELECT name FROM mz_tables WHERE name = 't1';
  125. t1
  126. # DDL statements
  127. > CREATE MATERIALIZED VIEW v2 AS SELECT COUNT(*) AS c1 FROM t1;
  128. > SELECT * FROM v2;
  129. 3
  130. > CREATE MATERIALIZED VIEW v1mat AS SELECT * FROM v1;
  131. > CREATE INDEX i2 IN CLUSTER cluster2 ON t1 (f1);
  132. > SELECT f1 FROM t1;
  133. 1
  134. 2
  135. 3
  136. # Tables
  137. > CREATE TABLE t2 (f1 INTEGER);
  138. > INSERT INTO t2 VALUES (1);
  139. > INSERT INTO t2 SELECT * FROM t2;
  140. > SELECT * FROM t2;
  141. 1
  142. 1
  143. # Sources
  144. # Explicitly set a progress topic to ensure multiple runs (which the
  145. # mzcompose.py driver does) do not clash.
  146. # TODO: remove this once we add some form of nonce to the default progress
  147. # topic name.
  148. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (
  149. BROKER '${testdrive.kafka-addr}',
  150. PROGRESS TOPIC 'testdrive-progress-${testdrive.seed}',
  151. SECURITY PROTOCOL PLAINTEXT
  152. );
  153. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  154. URL '${testdrive.schema-registry-url}'
  155. );
  156. $ kafka-create-topic topic=source1 partitions=1
  157. $ kafka-ingest format=bytes topic=source1
  158. A
  159. > CREATE SOURCE source1
  160. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source1-${testdrive.seed}')
  161. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-source1-${testdrive.seed}")
  162. FORMAT BYTES
  163. > SELECT * FROM source1_tbl
  164. A
  165. # TODO: This should be made reliable without sleeping, database-issues#7611
  166. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s
  167. # Sinks
  168. > CREATE SINK sink1 FROM v1mat
  169. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink1')
  170. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  171. ENVELOPE DEBEZIUM
  172. $ kafka-verify-data format=avro sink=materialize.public.sink1 sort-messages=true
  173. {"before": null, "after": {"row":{"c1": 3}}}
  174. """,
  175. )
  176. def run_test(c: Composition, disruption: Disruption, id: int) -> None:
  177. print(f"+++ Running disruption scenario {disruption.name}")
  178. with c.override(
  179. Testdrive(
  180. no_reset=True,
  181. materialize_params={"cluster": "cluster2"},
  182. seed=id,
  183. ),
  184. Clusterd(
  185. name="clusterd_1_1",
  186. process_names=["clusterd_1_1", "clusterd_1_2"],
  187. ),
  188. Clusterd(
  189. name="clusterd_1_2",
  190. process_names=["clusterd_1_1", "clusterd_1_2"],
  191. ),
  192. Clusterd(
  193. name="clusterd_2_1",
  194. process_names=["clusterd_2_1", "clusterd_2_2"],
  195. ),
  196. Clusterd(
  197. name="clusterd_2_2",
  198. process_names=["clusterd_2_1", "clusterd_2_2"],
  199. ),
  200. ):
  201. c.up(
  202. "materialized",
  203. "clusterd_1_1",
  204. "clusterd_1_2",
  205. "clusterd_2_1",
  206. "clusterd_2_2",
  207. {"name": "testdrive", "persistent": True},
  208. )
  209. c.sql(
  210. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  211. port=6877,
  212. user="mz_system",
  213. )
  214. c.sql(
  215. """
  216. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  217. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  218. STORAGECTL ADDRESSES ['clusterd_1_1:2100', 'clusterd_1_2:2100'],
  219. STORAGE ADDRESSES ['clusterd_1_1:2103', 'clusterd_1_2:2103'],
  220. COMPUTECTL ADDRESSES ['clusterd_1_1:2101', 'clusterd_1_2:2101'],
  221. COMPUTE ADDRESSES ['clusterd_1_1:2102', 'clusterd_1_2:2102']
  222. ));
  223. """
  224. )
  225. c.sql(
  226. """
  227. DROP CLUSTER IF EXISTS cluster2 CASCADE;
  228. CREATE CLUSTER cluster2 REPLICAS (replica1 (
  229. STORAGECTL ADDRESSES ['clusterd_2_1:2100', 'clusterd_2_2:2100'],
  230. STORAGE ADDRESSES ['clusterd_2_1:2103', 'clusterd_2_2:2103'],
  231. COMPUTECTL ADDRESSES ['clusterd_2_1:2101', 'clusterd_2_2:2101'],
  232. COMPUTE ADDRESSES ['clusterd_2_1:2102', 'clusterd_2_2:2102']
  233. ));
  234. """
  235. )
  236. populate(c)
  237. # Disrupt cluster1 by some means
  238. disruption.disruption(c)
  239. validate(c)
  240. cleanup_list = [
  241. "materialized",
  242. "testdrive",
  243. "clusterd_1_1",
  244. "clusterd_1_2",
  245. "clusterd_2_1",
  246. "clusterd_2_2",
  247. ]
  248. try:
  249. c.kill(*cleanup_list)
  250. except UIError as e:
  251. print(e)
  252. # Killing multiple clusterds from the same cluster may fail
  253. # as the second clusterd may have already exited after the first
  254. # one was killed, due to the 'shared-fate' mechanism.
  255. pass
  256. c.rm(*cleanup_list, destroy_volumes=True)
  257. c.rm_volumes("mzdata")