test_compute.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import copy
  10. import json
  11. import logging
  12. import time
  13. import pytest
  14. from pg8000.exceptions import InterfaceError
  15. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  16. from materialize.cloudtest.k8s.environmentd import EnvironmentdStatefulSet
  17. from materialize.cloudtest.util.cluster import cluster_pod_name, cluster_service_name
  18. from materialize.cloudtest.util.exists import exists, not_exists
  19. from materialize.cloudtest.util.wait import wait
  20. LOGGER = logging.getLogger(__name__)
  21. def test_cluster_sizing(mz: MaterializeApplication) -> None:
  22. """Test that a SIZE N cluster indeed creates N clusterd instances."""
  23. SIZE = 2
  24. mz.environmentd.sql(
  25. f"CREATE CLUSTER sized1 REPLICAS (sized_replica1 (SIZE '{SIZE}-1'))"
  26. )
  27. cluster_id = mz.environmentd.sql_query(
  28. "SELECT id FROM mz_clusters WHERE name = 'sized1'"
  29. )[0][0]
  30. assert cluster_id is not None
  31. replica_id = mz.environmentd.sql_query(
  32. "SELECT id FROM mz_cluster_replicas WHERE name = 'sized_replica1'"
  33. )[0][0]
  34. assert replica_id is not None
  35. for compute_id in range(0, SIZE):
  36. compute_pod = cluster_pod_name(cluster_id, replica_id, compute_id)
  37. wait(condition="condition=Ready", resource=compute_pod)
  38. mz.environmentd.sql("DROP CLUSTER sized1 CASCADE")
  39. @pytest.mark.parametrize(
  40. "failpoint",
  41. ["", "after_catalog_drop_replica=panic", "after_sequencer_drop_replica=panic"],
  42. )
  43. @pytest.mark.skip(reason="Failpoints mess up the Mz instance database-issues#5263")
  44. def test_cluster_shutdown(mz: MaterializeApplication, failpoint: str) -> None:
  45. """Test that dropping a cluster or replica causes the associated clusterds to shut down."""
  46. LOGGER.info(f"Testing cluster shutdown with failpoint={failpoint}")
  47. mz.set_environmentd_failpoints(failpoint)
  48. def sql_expect_crash(sql: str) -> None:
  49. # We expect executing `sql` will crash environmentd. To ensure it is actually `sql`
  50. # wait until the SQL interface is available.
  51. mz.wait_for_sql()
  52. try:
  53. mz.environmentd.sql(sql)
  54. except InterfaceError as e:
  55. LOGGER.error(f"Expected SQL error: {e}")
  56. mz.environmentd.sql(
  57. "CREATE CLUSTER shutdown1 REPLICAS (shutdown_replica1 (SIZE '1'), shutdown_replica2 (SIZE '1'))"
  58. )
  59. cluster_id = mz.environmentd.sql_query(
  60. "SELECT id FROM mz_clusters WHERE name = 'shutdown1'"
  61. )[0][0]
  62. assert cluster_id is not None
  63. compute_pods = {}
  64. compute_svcs = {}
  65. for replica_name in ["shutdown_replica1", "shutdown_replica2"]:
  66. replica_id = mz.environmentd.sql_query(
  67. f"SELECT id FROM mz_cluster_replicas WHERE name = '{replica_name}'"
  68. )[0][0]
  69. assert replica_id is not None
  70. compute_pod = cluster_pod_name(cluster_id, replica_id)
  71. compute_pods[replica_name] = compute_pod
  72. wait(condition="condition=Ready", resource=compute_pod)
  73. compute_svc = cluster_service_name(cluster_id, replica_id)
  74. compute_svcs[replica_name] = compute_svc
  75. exists(resource=compute_svc)
  76. sql_expect_crash("DROP CLUSTER REPLICA shutdown1.shutdown_replica1")
  77. wait(condition="delete", resource=compute_pods["shutdown_replica1"])
  78. not_exists(resource=compute_svcs["shutdown_replica1"])
  79. sql_expect_crash("DROP CLUSTER shutdown1 CASCADE")
  80. wait(condition="delete", resource=compute_pods["shutdown_replica2"])
  81. not_exists(resource=compute_svcs["shutdown_replica2"])
  82. mz.set_environmentd_failpoints("")
  83. def get_value_from_label(
  84. mz: MaterializeApplication, cluster_id: str, replica_id: str, jsonpath: str
  85. ) -> str:
  86. return mz.kubectl(
  87. "get",
  88. "pods",
  89. f"--selector=cluster.environmentd.materialize.cloud/cluster-id={cluster_id},cluster.environmentd.materialize.cloud/replica-id={replica_id}",
  90. "-o",
  91. "jsonpath='{.items[*]." + jsonpath + "}'",
  92. )
  93. def get_node_selector(
  94. mz: MaterializeApplication, cluster_id: str, replica_id: str
  95. ) -> str:
  96. return get_value_from_label(mz, cluster_id, replica_id, "spec.nodeSelector")
  97. def test_disk_label(mz: MaterializeApplication) -> None:
  98. """Test that cluster replicas have the correct materialize.cloud/disk labels"""
  99. # If cluster_always_use_disk is set to true, it will take precedence over the DISK keyword in CREATE CLUSTER.
  100. mz.environmentd.sql(
  101. "ALTER SYSTEM SET cluster_always_use_disk = false;",
  102. port="internal",
  103. user="mz_system",
  104. )
  105. for value in ("true", "false"):
  106. mz.environmentd.sql(
  107. f"CREATE CLUSTER disk_{value} MANAGED, SIZE = '2-no-disk', DISK = {value}"
  108. )
  109. (cluster_id, replica_id) = mz.environmentd.sql_query(
  110. f"SELECT mz_clusters.id, mz_cluster_replicas.id FROM mz_cluster_replicas JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id WHERE mz_clusters.name = 'disk_{value}'"
  111. )[0]
  112. assert cluster_id is not None
  113. assert replica_id is not None
  114. node_selectors = get_node_selector(mz, cluster_id, replica_id)
  115. if value == "true":
  116. assert (
  117. node_selectors == '\'{"materialize.cloud/disk":"true"}\''
  118. ), node_selectors
  119. else:
  120. assert node_selectors == "''"
  121. mz.environmentd.sql(f"DROP CLUSTER disk_{value} CASCADE")
  122. # Reset
  123. mz.environmentd.sql(
  124. "ALTER SYSTEM SET cluster_always_use_disk = true;",
  125. port="internal",
  126. user="mz_system",
  127. )
  128. @pytest.mark.skip(reason="Keeps flaking, see database-issues#8299")
  129. def test_cluster_replica_sizes(mz: MaterializeApplication) -> None:
  130. """Test that --cluster-replica-sizes mapping is respected"""
  131. # Some time for existing cluster drops to complete so we don't try to spin them up again
  132. time.sleep(5)
  133. cluster_replica_size_map = {
  134. "small": {
  135. "scale": 1,
  136. "workers": 1,
  137. "cpu_limit": None,
  138. "memory_limit": None,
  139. "disk_limit": None,
  140. "credits_per_hour": "1",
  141. "disabled": False,
  142. "selectors": {"key1": "value1"},
  143. },
  144. "medium": {
  145. "scale": 1,
  146. "workers": 1,
  147. "cpu_limit": None,
  148. "memory_limit": None,
  149. "disk_limit": None,
  150. "credits_per_hour": "1",
  151. "disabled": False,
  152. "selectors": {"key2": "value2", "key3": "value3"},
  153. },
  154. # for existing clusters
  155. "1": {
  156. "scale": 1,
  157. "workers": 1,
  158. "cpu_limit": None,
  159. "memory_limit": None,
  160. "disk_limit": None,
  161. "disabled": False,
  162. "credits_per_hour": "1",
  163. },
  164. "2-1": {
  165. "scale": 2,
  166. "workers": 2,
  167. "cpu_limit": None,
  168. "memory_limit": None,
  169. "disk_limit": None,
  170. "disabled": False,
  171. "credits_per_hour": "2",
  172. },
  173. }
  174. stateful_set = [
  175. resource
  176. for resource in mz.resources
  177. if type(resource) == EnvironmentdStatefulSet
  178. ]
  179. assert len(stateful_set) == 1
  180. stateful_set = copy.deepcopy(stateful_set[0])
  181. stateful_set.env["MZ_CLUSTER_REPLICA_SIZES"] = json.dumps(cluster_replica_size_map)
  182. stateful_set.extra_args.append("--bootstrap-default-cluster-replica-size=1")
  183. stateful_set.replace()
  184. mz.wait_for_sql()
  185. for key, value in {
  186. "small": cluster_replica_size_map["small"],
  187. "medium": cluster_replica_size_map["medium"],
  188. }.items():
  189. mz.environmentd.sql(f"CREATE CLUSTER scale_{key} MANAGED, SIZE = '{key}'")
  190. (cluster_id, replica_id) = mz.environmentd.sql_query(
  191. f"SELECT mz_clusters.id, mz_cluster_replicas.id FROM mz_cluster_replicas JOIN mz_clusters ON mz_cluster_replicas.cluster_id = mz_clusters.id WHERE mz_clusters.name = 'scale_{key}'"
  192. )[0]
  193. assert cluster_id is not None
  194. assert replica_id is not None
  195. expected = value.get("selectors", {}) | {"materialize.cloud/disk": "true"}
  196. node_selectors_raw = ""
  197. for i in range(1, 10):
  198. node_selectors_raw = get_node_selector(mz, cluster_id, replica_id)
  199. if node_selectors_raw:
  200. break
  201. print("No node selectors available yet, sleeping")
  202. time.sleep(5)
  203. node_selectors = json.loads(node_selectors_raw[1:-1])
  204. assert (
  205. node_selectors == expected
  206. ), f"actual: {node_selectors}, but expected {expected}"
  207. mz.environmentd.sql(f"DROP CLUSTER scale_{key} CASCADE")
  208. # Cleanup
  209. stateful_set = [
  210. resource
  211. for resource in mz.resources
  212. if type(resource) == EnvironmentdStatefulSet
  213. ]
  214. assert len(stateful_set) == 1
  215. stateful_set = stateful_set[0]
  216. stateful_set.replace()
  217. mz.wait_for_sql()