mzcompose.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Disrupt Cockroach and verify that Materialize recovers from it.
  11. """
  12. from collections.abc import Callable
  13. from dataclasses import dataclass
  14. from textwrap import dedent
  15. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  16. from materialize.mzcompose.service import ServiceHealthcheck
  17. from materialize.mzcompose.services.cockroach import Cockroach
  18. from materialize.mzcompose.services.materialized import Materialized
  19. from materialize.mzcompose.services.testdrive import Testdrive
  20. from materialize.ui import UIError
  21. from materialize.util import selected_by_name
  22. CRDB_NODE_COUNT = 4
  23. TESTDRIVE_TIMEOUT = (
  24. "80s" # We expect any CRDB disruption to not disrupt Mz for more than this timeout
  25. )
  26. COCKROACH_HEALTHCHECK_DISABLED = ServiceHealthcheck(
  27. test="/bin/true",
  28. interval="1s",
  29. start_period="30s",
  30. )
  31. INIT_SCRIPT = dedent(
  32. """
  33. # This source will persist throughout the CRDB rolling restart
  34. > DROP CLUSTER IF EXISTS s_old_cluster CASCADE;
  35. > CREATE CLUSTER s_old_cluster SIZE = '4-4';
  36. > CREATE SOURCE s_old IN CLUSTER s_old_cluster FROM LOAD GENERATOR COUNTER (TICK INTERVAL '0.1s');
  37. > SELECT COUNT(*) > 1 FROM s_old;
  38. true
  39. # This source is recreated periodically
  40. > DROP CLUSTER IF EXISTS s_new_cluster CASCADE;
  41. > CREATE CLUSTER s_new_cluster SIZE ='4-4';
  42. > CREATE SOURCE s_new IN CLUSTER s_new_cluster FROM LOAD GENERATOR COUNTER (TICK INTERVAL '0.1s');
  43. > SELECT COUNT(*) > 1 FROM s_new;
  44. true
  45. """
  46. )
  47. VALIDATE_SCRIPT = dedent(
  48. """
  49. > SELECT COUNT(*) > 1 FROM s_old;
  50. true
  51. # This source is recreated periodically
  52. > DROP SOURCE s_new CASCADE;
  53. > CREATE SOURCE s_new IN CLUSTER s_new_cluster FROM LOAD GENERATOR COUNTER (TICK INTERVAL '0.1s');
  54. > SELECT COUNT(*) > 1 FROM s_new;
  55. true
  56. """
  57. )
  58. ALL_COCKROACH_NODES = ",".join(
  59. [f"cockroach{id}:26257" for id in range(CRDB_NODE_COUNT)]
  60. )
  61. SERVICES = [
  62. Testdrive(default_timeout=TESTDRIVE_TIMEOUT, no_reset=True),
  63. Materialized(
  64. depends_on=[f"cockroach{id}" for id in range(CRDB_NODE_COUNT)],
  65. options=[
  66. "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus",
  67. "--timestamp-oracle-url=postgres://root@cockroach:26257?options=--search_path=tsoracle",
  68. ],
  69. ),
  70. *[
  71. Cockroach(
  72. setup_materialize=True,
  73. name=f"cockroach{id}",
  74. command=[
  75. "start",
  76. "--insecure",
  77. f"--store=cockroach{id}",
  78. "--listen-addr=0.0.0.0:26257",
  79. f"--advertise-addr=cockroach{id}:26257",
  80. "--http-addr=0.0.0.0:8080",
  81. f"--join={ALL_COCKROACH_NODES}",
  82. ],
  83. healthcheck=COCKROACH_HEALTHCHECK_DISABLED,
  84. )
  85. for id in range(CRDB_NODE_COUNT)
  86. ],
  87. ]
  88. @dataclass
  89. class CrdbDisruption:
  90. name: str
  91. disruption: Callable
  92. DISRUPTIONS = [
  93. # Unfortunately this disruption is too aggressive and causes CRDB to enter in a state
  94. # where it is no longer able to service queries, with either no error or errors about
  95. # 'lost quorum' or 'encountered poisoned latch'
  96. #
  97. # Most likely the test kills and restarts the nodes too fast for CRDB to handle, even though
  98. # the nodes are taken out in succession one by one and never in parallel.
  99. #
  100. # CrdbDisruption(
  101. # name="sigkill",
  102. # disruption=lambda c, id: c.kill(f"cockroach{id}"),
  103. # ),
  104. CrdbDisruption(
  105. name="sigterm",
  106. disruption=lambda c, id: c.kill(f"cockroach{id}", signal="SIGTERM"),
  107. ),
  108. CrdbDisruption(
  109. name="drain",
  110. disruption=lambda c, id: c.exec(
  111. # Execute the 'drain' command on a different node from the one that we are draining
  112. #
  113. # Draining may sometimes time out, but we continue with the restart in case this happens,
  114. # as a real life CRDB upgrade procedure will most likely also ignore such a timeout.
  115. f"cockroach{(id % 2) + 1}",
  116. "cockroach",
  117. "node",
  118. "drain",
  119. str(id + 1),
  120. "--insecure",
  121. check=False,
  122. ),
  123. ),
  124. ]
  125. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  126. """Perform rolling restarts on a CRDB cluster with CRDB_NODE_COUNT nodes and
  127. confirm that Mz does not hang for longer than the expected."""
  128. parser.add_argument("disruptions", nargs="*", default=[d.name for d in DISRUPTIONS])
  129. args = parser.parse_args()
  130. for d in selected_by_name(args.disruptions, DISRUPTIONS):
  131. run_disruption(c, d)
  132. def run_disruption(c: Composition, d: CrdbDisruption) -> None:
  133. print(f"--- Running Disruption {d.name} ...")
  134. c.down(destroy_volumes=True, sanity_restart_mz=False)
  135. c.up(*[f"cockroach{id}" for id in range(CRDB_NODE_COUNT)])
  136. c.exec("cockroach0", "cockroach", "init", "--insecure", "--host=localhost:26257")
  137. for query in [
  138. "SET CLUSTER SETTING sql.stats.forecasts.enabled = false",
  139. "CREATE SCHEMA IF NOT EXISTS consensus",
  140. "CREATE SCHEMA IF NOT EXISTS storage",
  141. "CREATE SCHEMA IF NOT EXISTS adapter",
  142. "CREATE SCHEMA IF NOT EXISTS tsoracle",
  143. ]:
  144. c.exec("cockroach0", "cockroach", "sql", "--insecure", "-e", query)
  145. c.up("materialized", {"name": "testdrive", "persistent": True})
  146. # We expect the testdrive fragment to complete within Testdrive's default_timeout
  147. # This will indicate that Mz has not hung for a prolonged period of time
  148. # as a result of the disruption we just introduced
  149. c.testdrive(input=INIT_SCRIPT)
  150. # Messing with cockroach node #0 borks the cluster permanently, so we start from node #1
  151. for id in range(1, CRDB_NODE_COUNT):
  152. d.disruption(c, id)
  153. # Restart the node we just disrupted so that we can safely disrupt another node
  154. try:
  155. # Node may have died already, so we eat any docker-compose exceptions
  156. c.kill(f"cockroach{id}")
  157. except UIError:
  158. pass
  159. c.up(f"cockroach{id}")
  160. # Confirm things continue to work after CRDB is back to full complement
  161. c.testdrive(input=VALIDATE_SCRIPT)