test_antiaffinity.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import pytest
  10. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  11. from materialize.cloudtest.util.cluster import cluster_pod_name
  12. from materialize.cloudtest.util.wait import wait
  13. def zones_used(
  14. mz: MaterializeApplication,
  15. replica_names: list[str] | None = None,
  16. cluster_name: str = "antiaffinity_cluster1",
  17. ) -> int:
  18. if replica_names is None:
  19. replica_names = [
  20. "antiaffinity_replica1",
  21. "antiaffinity_replica2",
  22. "antiaffinity_replica3",
  23. ]
  24. nodes = {}
  25. for replica_name in replica_names:
  26. cluster_id = mz.environmentd.sql_query(
  27. f"SELECT id FROM mz_clusters WHERE name = '{cluster_name}'"
  28. )[0][0]
  29. assert cluster_id is not None
  30. replica_id = mz.environmentd.sql_query(
  31. "SELECT id FROM mz_cluster_replicas "
  32. f"WHERE cluster_id = '{cluster_id}' AND name = '{replica_name}'"
  33. )[0][0]
  34. assert replica_id is not None
  35. cluster_pod = cluster_pod_name(cluster_id, replica_id)
  36. wait(condition="condition=Ready", resource=cluster_pod)
  37. compute_pod = mz.environmentd.api().read_namespaced_pod(
  38. cluster_pod.removeprefix("pod/"),
  39. mz.environmentd.namespace(),
  40. )
  41. spec = compute_pod.spec
  42. assert spec is not None
  43. assert spec.node_name is not None
  44. node = mz.environmentd.api().read_node(spec.node_name)
  45. assert node is not None
  46. assert node.metadata is not None
  47. assert isinstance(node.metadata.labels, dict)
  48. zone = node.metadata.labels["materialize.cloud/availability-zone"]
  49. nodes[zone] = 1
  50. return len(nodes.keys())
  51. def test_create_cluster_antiaffinity(mz: MaterializeApplication) -> None:
  52. """Test that multiple replicas as defined in CREATE CLUSTER are placed in different availability zones."""
  53. mz.environmentd.sql(
  54. """
  55. CREATE CLUSTER antiaffinity_cluster1 REPLICAS (
  56. antiaffinity_replica1 (SIZE '1'),
  57. antiaffinity_replica2 (SIZE '1'),
  58. antiaffinity_replica3 (SIZE '1')
  59. )"""
  60. )
  61. assert zones_used(mz) == 3
  62. mz.environmentd.sql("DROP CLUSTER antiaffinity_cluster1 CASCADE")
  63. def test_create_cluster_replica_antiaffinity(mz: MaterializeApplication) -> None:
  64. """Test that multiple replicas as created with CREATE CLUSTER REPLICA are placed in different availability zones."""
  65. mz.environmentd.sql(
  66. """
  67. CREATE CLUSTER antiaffinity_cluster1 REPLICAS ();
  68. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica1 SIZE '1';
  69. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica2 SIZE '1';
  70. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica3 SIZE '1';
  71. """
  72. )
  73. assert zones_used(mz) == 3
  74. mz.environmentd.sql("DROP CLUSTER antiaffinity_cluster1 CASCADE")
  75. def test_create_cluster_replica_zone_specified(mz: MaterializeApplication) -> None:
  76. """Test that the AVAILABILITY ZONE argument to CREATE CLUSTER REPLICA is observed."""
  77. mz.environmentd.sql(
  78. """
  79. CREATE CLUSTER antiaffinity_cluster1 REPLICAS ();
  80. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica1 SIZE '1' , AVAILABILITY ZONE '3';
  81. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica2 SIZE '1' , AVAILABILITY ZONE '3';
  82. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica3 SIZE '1' , AVAILABILITY ZONE '3';
  83. """
  84. )
  85. assert zones_used(mz) == 1
  86. mz.environmentd.sql("DROP CLUSTER antiaffinity_cluster1 CASCADE")
  87. def test_create_cluster_replica_zone_mixed(mz: MaterializeApplication) -> None:
  88. """Test that the AVAILABILITY ZONE argument to CREATE CLUSTER REPLICA is observed."""
  89. mz.environmentd.sql(
  90. """
  91. CREATE CLUSTER antiaffinity_cluster1 REPLICAS ();
  92. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica1 SIZE '1' , AVAILABILITY ZONE '3';
  93. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica2 SIZE '1' , AVAILABILITY ZONE '3';
  94. CREATE CLUSTER REPLICA antiaffinity_cluster1.antiaffinity_replica3 SIZE '1';
  95. """
  96. )
  97. assert zones_used(mz) == 2
  98. mz.environmentd.sql("DROP CLUSTER antiaffinity_cluster1 CASCADE")
  99. def test_managed_set_azs(mz: MaterializeApplication) -> None:
  100. """Test that the AVAILABILITY ZONE argument to CREATE CLUSTER REPLICA is observed."""
  101. mz.environmentd.sql(
  102. "ALTER SYSTEM SET enable_managed_cluster_availability_zones = true",
  103. port="internal",
  104. user="mz_system",
  105. )
  106. mz.environmentd.sql(
  107. """
  108. CREATE CLUSTER antiaffinity_cluster1 SIZE '1', REPLICATION FACTOR 3, AVAILABILITY ZONES ('1', '3')
  109. """
  110. )
  111. assert (
  112. zones_used(
  113. mz,
  114. replica_names=[
  115. "r1",
  116. "r2",
  117. "r3",
  118. ],
  119. )
  120. == 2
  121. )
  122. mz.environmentd.sql("DROP CLUSTER antiaffinity_cluster1 CASCADE")
  123. @pytest.mark.skip(reason="Not currently guaranteed by implementation.")
  124. def test_create_clusters_antiaffinity(mz: MaterializeApplication) -> None:
  125. """Test that multiple independent clusters are spread out to different availability zones."""
  126. mz.environmentd.sql(
  127. """
  128. CREATE CLUSTER antiaffinity_cluster1 REPLICAS (
  129. antiaffinity_replica1 (SIZE '1')
  130. );
  131. CREATE CLUSTER antiaffinity_cluster2 REPLICAS (
  132. antiaffinity_replica2 (SIZE '1')
  133. );
  134. CREATE CLUSTER antiaffinity_cluster3 REPLICAS (
  135. antiaffinity_replica3 (SIZE '1')
  136. );
  137. """
  138. )
  139. assert zones_used(mz) == 3
  140. mz.environmentd.sql(
  141. """
  142. DROP CLUSTER antiaffinity_cluster1 CASCADE;
  143. DROP CLUSTER antiaffinity_cluster2 CASCADE;
  144. DROP CLUSTER antiaffinity_cluster3 CASCADE;
  145. """
  146. )