test_k8s_node_recovery.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import time
  10. from dataclasses import dataclass
  11. from textwrap import dedent
  12. import pytest
  13. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  14. TD_TIMEOUT_SHORT = 10
  15. TD_TIMEOUT_FULL_RECOVERY = 660
  16. @dataclass
  17. class ReplicaDefinition:
  18. cluster_name: str
  19. index: int
  20. availability_zone: str
  21. def get_name(self) -> str:
  22. return f"{self.cluster_name}_r_{self.index}"
  23. @dataclass
  24. class ClusterDefinition:
  25. name: str
  26. replica_definitions: list[ReplicaDefinition]
  27. def create_replica_definitions_sql(self) -> str:
  28. replica_definitions = []
  29. for replica in self.replica_definitions:
  30. replica_definitions.append(
  31. f"{replica.get_name()} (SIZE = '1', AVAILABILITY ZONE '{replica.availability_zone}')"
  32. )
  33. return ", ".join(replica_definitions)
  34. def default_compute_cluster() -> ClusterDefinition:
  35. """Single cluster in availability zone 1."""
  36. compute_cluster = ClusterDefinition("c_compute", [])
  37. compute_cluster.replica_definitions.append(
  38. ReplicaDefinition(compute_cluster.name, index=1, availability_zone="1")
  39. )
  40. return compute_cluster
  41. def default_storage_cluster() -> ClusterDefinition:
  42. """Single cluster in availability zone 2."""
  43. storage_cluster = ClusterDefinition("c_storage", [])
  44. storage_cluster.replica_definitions.append(
  45. ReplicaDefinition(storage_cluster.name, index=1, availability_zone="2")
  46. )
  47. return storage_cluster
  48. REQUIRED_PROGRESS = 5
  49. def populate(
  50. mz: MaterializeApplication,
  51. compute_cluster: ClusterDefinition,
  52. storage_cluster: ClusterDefinition,
  53. ) -> None:
  54. # Make sure the `quickstart` cluster replica gets is scheduled on its own
  55. # node, so queries still work when a node running a compute/storage replica
  56. # is suspended.
  57. mz.environmentd.sql(
  58. "ALTER CLUSTER quickstart SET (AVAILABILITY ZONES ('quickstart'))",
  59. port="internal",
  60. user="mz_system",
  61. )
  62. all_clusters = [compute_cluster, storage_cluster]
  63. drop_cluster_statements = [
  64. f"> DROP CLUSTER IF EXISTS {cluster.name};" for cluster in all_clusters
  65. ]
  66. # string needs the same indentation as testdrive script below
  67. drop_cluster_statement_sql = "\n".join(drop_cluster_statements)
  68. create_cluster_statements = [
  69. f"> CREATE CLUSTER {cluster.name} REPLICAS ({cluster.create_replica_definitions_sql()});"
  70. for cluster in all_clusters
  71. ]
  72. # string needs the same indentation as testdrive script below
  73. create_cluster_statement_sql = "\n".join(create_cluster_statements)
  74. mz.testdrive.run(
  75. input=dedent(
  76. """
  77. > DROP MATERIALIZED VIEW IF EXISTS mv;
  78. > DROP SOURCE IF EXISTS source CASCADE;
  79. """
  80. )
  81. + dedent(drop_cluster_statement_sql)
  82. + "\n"
  83. + dedent(create_cluster_statement_sql)
  84. + "\n"
  85. + dedent(
  86. f"""
  87. > SELECT name FROM mz_clusters WHERE name IN ('{compute_cluster.name}', '{storage_cluster.name}');
  88. {storage_cluster.name}
  89. {compute_cluster.name}
  90. > CREATE SOURCE source IN CLUSTER {storage_cluster.name}
  91. FROM LOAD GENERATOR COUNTER
  92. (TICK INTERVAL '500ms');
  93. > SELECT COUNT(*) FROM (SHOW SOURCES) WHERE name = 'source';
  94. 1
  95. > CREATE MATERIALIZED VIEW mv (f1) IN CLUSTER {compute_cluster.name} AS SELECT counter + 1 FROM source;
  96. > CREATE DEFAULT INDEX IN CLUSTER {compute_cluster.name} ON mv;
  97. > SELECT COUNT(*) > 0 from mv;
  98. true
  99. """
  100. ),
  101. no_reset=True,
  102. )
  103. def validate_state(
  104. mz: MaterializeApplication,
  105. reached_index: int,
  106. must_exceed_reached_index: bool,
  107. timeout_in_sec: int,
  108. expected_state: str,
  109. isolation_level: str = "STRICT SERIALIZABLE",
  110. ) -> None:
  111. comparison_operator = ">" if must_exceed_reached_index else ">="
  112. print(f"Expect '{expected_state}' within timeout of {timeout_in_sec}s")
  113. testdrive_run_timeout_in_sec = 10
  114. validation_succeeded = False
  115. last_error_message = None
  116. start_time = time.time()
  117. # re-run testdrive to make sure it connects to the most recent envd
  118. max_run_count = int(timeout_in_sec / testdrive_run_timeout_in_sec)
  119. max_run_count = 1 if max_run_count < 1 else max_run_count
  120. for run in range(0, max_run_count):
  121. is_last_run = run + 1 == max_run_count
  122. try:
  123. mz.testdrive.run(
  124. input=dedent(
  125. f"""
  126. > SET TRANSACTION_ISOLATION TO '{isolation_level}';
  127. > SELECT COUNT(*) {comparison_operator} {reached_index} FROM source; -- validate source with isolation {isolation_level}
  128. true
  129. > SELECT COUNT(*) {comparison_operator} {reached_index} FROM mv; -- validate mv with isolation {isolation_level}
  130. true
  131. """
  132. ),
  133. default_timeout=f"{testdrive_run_timeout_in_sec}s",
  134. no_reset=True,
  135. suppress_command_error_output=not is_last_run,
  136. )
  137. validation_succeeded = True
  138. break
  139. except Exception as e:
  140. try_info = f"{run + 1}/{max_run_count} with isolation {isolation_level}"
  141. # arbitrary error can occur if envd is not yet ready after restart
  142. if is_last_run:
  143. print(f"Validation failed in try {try_info}, aborting!")
  144. if last_error_message is not None:
  145. print(f"Last error message was: {last_error_message}")
  146. else:
  147. print(f"Validation failed in try {try_info}, retrying.")
  148. last_error_message = str(e)
  149. end_time = time.time()
  150. if not validation_succeeded:
  151. # do not raise an FailedTestExecutionError because we are not in mzcompose
  152. # do not use fail because it comes with a verbose stacktrace
  153. assert (
  154. False
  155. ), f"Failed to achieve '{expected_state}' using '{isolation_level}' within {timeout_in_sec}s!"
  156. duration = round(end_time - start_time, 1)
  157. print(
  158. f"Succeeded to achieve '{expected_state}' within {duration} seconds (limit: {timeout_in_sec}s)"
  159. )
  160. def get_current_counter_index(mz: MaterializeApplication) -> int:
  161. """
  162. This query has no timeout. Only use it if is expected to deliver.
  163. """
  164. reached_value: int = mz.environmentd.sql_query("SELECT COUNT(*) FROM source")[0][0]
  165. return reached_value
  166. def suspend_node_of_replica(
  167. mz: MaterializeApplication, cluster: ClusterDefinition
  168. ) -> str:
  169. node_names = mz.get_cluster_node_names(cluster.name)
  170. assert len(node_names) > 0
  171. print(f"Cluster {cluster.name} uses nodes {node_names}")
  172. suspended_node_name = node_names[0]
  173. mz.suspend_k8s_node(suspended_node_name)
  174. return suspended_node_name
  175. @pytest.mark.node_recovery
  176. def test_unreplicated_storage_cluster_on_failing_node(
  177. mz: MaterializeApplication,
  178. ) -> None:
  179. """
  180. An unreplicated storage cluster is on the failed node. Queries of a downstream index in serializable mode should
  181. continue to work but return stale data. Staleness should resolve within a minute or two.
  182. """
  183. compute_cluster = default_compute_cluster()
  184. storage_cluster = default_storage_cluster()
  185. populate(mz, compute_cluster, storage_cluster)
  186. reached_index = get_current_counter_index(mz)
  187. suspended_node_name = suspend_node_of_replica(mz, storage_cluster)
  188. # with SERIALIZABLE
  189. validate_state(
  190. mz,
  191. reached_index,
  192. must_exceed_reached_index=False,
  193. timeout_in_sec=TD_TIMEOUT_SHORT,
  194. expected_state="stale data being delivered timely",
  195. isolation_level="SERIALIZABLE",
  196. )
  197. # with STRICT SERIALIZABLE
  198. validate_state(
  199. mz,
  200. reached_index,
  201. must_exceed_reached_index=False,
  202. timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
  203. expected_state="data being delivered",
  204. isolation_level="STRICT SERIALIZABLE",
  205. )
  206. # only request this index because the previous validation succeeded / did not block
  207. stalled_index = get_current_counter_index(mz)
  208. # expect live data to be delivered at most after two minutes in production (or longer in k8s)
  209. validate_state(
  210. mz,
  211. stalled_index,
  212. must_exceed_reached_index=True,
  213. timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
  214. expected_state="live data after to node recovery",
  215. )
  216. recovered_index = get_current_counter_index(mz)
  217. mz.revive_suspended_k8s_node(suspended_node_name)
  218. validate_state(
  219. mz,
  220. recovered_index + REQUIRED_PROGRESS,
  221. must_exceed_reached_index=True,
  222. timeout_in_sec=TD_TIMEOUT_SHORT,
  223. expected_state="no issues after node recovery",
  224. )
  225. @pytest.mark.node_recovery
  226. def test_unreplicated_compute_cluster_on_failing_node(
  227. mz: MaterializeApplication,
  228. ) -> None:
  229. """
  230. An unreplicated compute cluster is on the failed node. Queries of indexes on the compute cluster should fail, but
  231. resolve within a minute or two.
  232. """
  233. compute_cluster = default_compute_cluster()
  234. storage_cluster = default_storage_cluster()
  235. populate(mz, compute_cluster, storage_cluster)
  236. reached_index = get_current_counter_index(mz)
  237. suspended_node_name = suspend_node_of_replica(
  238. mz,
  239. compute_cluster,
  240. )
  241. # expect (live) data to be delivered after at most after two minutes in production (or longer in k8s)
  242. validate_state(
  243. mz,
  244. reached_index + REQUIRED_PROGRESS,
  245. must_exceed_reached_index=True,
  246. timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
  247. expected_state="node recovery and live data",
  248. )
  249. recovered_index = get_current_counter_index(mz)
  250. mz.revive_suspended_k8s_node(suspended_node_name)
  251. validate_state(
  252. mz,
  253. recovered_index + REQUIRED_PROGRESS,
  254. must_exceed_reached_index=True,
  255. timeout_in_sec=TD_TIMEOUT_SHORT,
  256. expected_state="no issues after node recovery",
  257. )
  258. @pytest.mark.node_recovery
  259. def test_replicated_compute_cluster_on_failing_node(mz: MaterializeApplication) -> None:
  260. """
  261. A replicated compute cluster is on the failed node. Queries of indexes on the compute cluster should experience no
  262. disruption in latency, thanks to the second replica.
  263. """
  264. compute_cluster = default_compute_cluster()
  265. compute_cluster.replica_definitions.append(
  266. ReplicaDefinition(compute_cluster.name, index=2, availability_zone="3")
  267. )
  268. assert (
  269. compute_cluster.replica_definitions[0].availability_zone
  270. != compute_cluster.replica_definitions[1].availability_zone
  271. ), "Test configuration error"
  272. storage_cluster = default_storage_cluster()
  273. populate(mz, compute_cluster, storage_cluster)
  274. reached_index = get_current_counter_index(mz)
  275. nodes_with_compute_clusters = set(mz.get_cluster_node_names(compute_cluster.name))
  276. nodes_with_storage_clusters = set(mz.get_cluster_node_names(storage_cluster.name))
  277. nodes_with_only_compute_clusters = (
  278. nodes_with_compute_clusters - nodes_with_storage_clusters
  279. )
  280. assert (
  281. len(nodes_with_only_compute_clusters) > 0
  282. ), "No nodes that do not contain both compute and storage clusters"
  283. suspended_node_name = next(iter(nodes_with_only_compute_clusters))
  284. print(
  285. f"Compute clusters on nodes {nodes_with_compute_clusters}, storage clusters on nodes {nodes_with_storage_clusters}"
  286. )
  287. print(f"Suspending {suspended_node_name}")
  288. mz.suspend_k8s_node(suspended_node_name)
  289. validate_state(
  290. mz,
  291. reached_index + REQUIRED_PROGRESS,
  292. must_exceed_reached_index=True,
  293. timeout_in_sec=TD_TIMEOUT_SHORT,
  294. expected_state="live data without disruption in latency",
  295. isolation_level="SERIALIZABLE",
  296. )
  297. reached_index = get_current_counter_index(mz)
  298. mz.revive_suspended_k8s_node(suspended_node_name)
  299. validate_state(
  300. mz,
  301. reached_index + REQUIRED_PROGRESS,
  302. must_exceed_reached_index=True,
  303. timeout_in_sec=TD_TIMEOUT_SHORT,
  304. expected_state="no issues after node recovery",
  305. )
  306. @pytest.mark.node_recovery
  307. def test_envd_on_failing_node(mz: MaterializeApplication) -> None:
  308. """
  309. environmentd is on the failed node. All connections should fail, but resolve within a minute or two.
  310. """
  311. compute_cluster = default_compute_cluster()
  312. storage_cluster = default_storage_cluster()
  313. populate(mz, compute_cluster, storage_cluster)
  314. reached_index = get_current_counter_index(mz)
  315. envd_node_name = mz.get_k8s_value(
  316. "app=environmentd", "{.items[*].spec.nodeName}", remove_quotes=True
  317. )
  318. mz.suspend_k8s_node(envd_node_name)
  319. print("Expecting connection timeout...")
  320. # all connections / queries should fail initially
  321. try:
  322. mz.testdrive.run(
  323. input=dedent(
  324. """
  325. > SELECT COUNT(*) > 0 FROM mz_tables;
  326. true
  327. """
  328. ),
  329. default_timeout=f"{TD_TIMEOUT_SHORT}s",
  330. no_reset=True,
  331. suppress_command_error_output=True,
  332. )
  333. raise RuntimeError("Expected timeout")
  334. except Exception:
  335. # OK
  336. print("Timeout is expected")
  337. print("Survived connection timeout.")
  338. # expect (live) data to be delivered after at most after two minutes in production (or longer in k8s)
  339. validate_state(
  340. mz,
  341. reached_index + REQUIRED_PROGRESS,
  342. must_exceed_reached_index=True,
  343. timeout_in_sec=TD_TIMEOUT_FULL_RECOVERY,
  344. expected_state="node recovery and live data",
  345. )
  346. recovered_index = get_current_counter_index(mz)
  347. mz.revive_suspended_k8s_node(envd_node_name)
  348. validate_state(
  349. mz,
  350. recovered_index + REQUIRED_PROGRESS,
  351. must_exceed_reached_index=True,
  352. timeout_in_sec=TD_TIMEOUT_SHORT,
  353. expected_state="no issues after node recovery",
  354. )