test_storage_shared_fate.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import logging
  10. import subprocess
  11. import time
  12. from textwrap import dedent
  13. import pytest
  14. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  15. from materialize.cloudtest.util.cluster import cluster_pod_name
  16. LOGGER = logging.getLogger(__name__)
  17. CLUSTER_SIZE = 8
  18. NUM_SOURCES = 4
  19. def populate(mz: MaterializeApplication, seed: int) -> None:
  20. create_sources_sinks = "\n".join(
  21. f"""
  22. > CREATE SOURCE source{i}
  23. IN CLUSTER storage_shared_fate
  24. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-storage-shared-fate-${{testdrive.seed}}');
  25. > CREATE TABLE source{i}_tbl FROM SOURCE source{i} (REFERENCE "testdrive-storage-shared-fate-${{testdrive.seed}}")
  26. FORMAT BYTES
  27. ENVELOPE NONE;
  28. > CREATE MATERIALIZED VIEW v{i} AS SELECT COUNT(*) FROM source{i}_tbl;
  29. > CREATE TABLE t{i} (f1 INTEGER);
  30. > INSERT INTO t{i} SELECT 123000 + generate_series FROM generate_series(1, 1000);
  31. $ kafka-create-topic topic=storage-shared-fate-sink{i} partitions={CLUSTER_SIZE*4}
  32. > CREATE SINK sink{i}
  33. IN CLUSTER storage_shared_fate FROM t{i}
  34. INTO KAFKA CONNECTION kafka (TOPIC 'testdrive-storage-shared-fate-sink{i}-${{testdrive.seed}}')
  35. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  36. ENVELOPE DEBEZIUM;
  37. $ kafka-verify-topic sink=materialize.public.sink{i} await-value-schema=true
  38. > CREATE SOURCE sink{i}_check
  39. IN CLUSTER storage_shared_fate
  40. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-storage-shared-fate-sink{i}-${{testdrive.seed}}');
  41. > CREATE TABLE sink{i}_check_tbl FROM SOURCE sink{i}_check (REFERENCE "testdrive-storage-shared-fate-sink{i}-${{testdrive.seed}}")
  42. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  43. ENVELOPE NONE;
  44. """
  45. for i in range(NUM_SOURCES)
  46. )
  47. check_counts = "\n".join(
  48. f"""
  49. > SELECT COUNT(*) FROM source{i}_tbl;
  50. 2000
  51. > SELECT COUNT(*) FROM sink{i}_check_tbl;
  52. 1000
  53. """
  54. for i in range(NUM_SOURCES)
  55. )
  56. mz.testdrive.run(
  57. input=dedent(
  58. f"""
  59. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  60. URL '${{testdrive.schema-registry-url}}'
  61. );
  62. > CREATE CLUSTER storage_shared_fate REPLICAS (storage_shared_fate_replica (SIZE '{CLUSTER_SIZE}-1'));
  63. > CREATE CONNECTION kafka TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT)
  64. $ kafka-create-topic topic=storage-shared-fate partitions={CLUSTER_SIZE*4}
  65. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=storage-shared-fate repeat=1000
  66. CDE${{kafka-ingest.iteration}}:CDE${{kafka-ingest.iteration}}
  67. {create_sources_sinks}
  68. $ kafka-ingest key-format=bytes format=bytes key-terminator=: topic=storage-shared-fate repeat=1000
  69. DEF${{kafka-ingest.iteration}}:DEF${{kafka-ingest.iteration}}
  70. {check_counts}
  71. """
  72. ),
  73. seed=seed,
  74. )
  75. def validate(mz: MaterializeApplication, seed: int) -> None:
  76. validations = "\n".join(
  77. f"""
  78. > INSERT INTO t{i} SELECT 234000 + generate_series FROM generate_series(1, 1000);
  79. > SELECT COUNT(*) FROM source{i}_tbl;
  80. 3000
  81. > SELECT * FROM v{i};
  82. 3000
  83. > SELECT COUNT(*) FROM sink{i}_check_tbl;
  84. 2000
  85. """
  86. for i in range(NUM_SOURCES)
  87. )
  88. mz.testdrive.run(
  89. input=dedent(
  90. f"""
  91. $ kafka-ingest key-format=bytes format=bytes key-terminator=: topic=storage-shared-fate repeat=1000
  92. EFG${{kafka-ingest.iteration}}:EFG${{kafka-ingest.iteration}}
  93. {validations}
  94. > DROP CLUSTER storage_shared_fate CASCADE;
  95. """
  96. ),
  97. no_reset=True,
  98. seed=seed,
  99. )
  100. def kill_clusterd(
  101. mz: MaterializeApplication, compute_id: int, signal: str = "SIGKILL"
  102. ) -> None:
  103. cluster_id, replica_id = mz.environmentd.sql_query(
  104. "SELECT cluster_id, id FROM mz_cluster_replicas WHERE name = 'storage_shared_fate_replica'"
  105. )[0]
  106. pod_name = cluster_pod_name(cluster_id, replica_id, compute_id)
  107. LOGGER.info(f"sending signal {signal} to pod {pod_name}...")
  108. try:
  109. mz.kubectl(
  110. "exec", pod_name, "--", "bash", "-c", f"kill -{signal} `pidof clusterd`"
  111. )
  112. except subprocess.CalledProcessError:
  113. # The clusterd process or container most likely has stopped already or is on its way
  114. pass
  115. @pytest.mark.long
  116. def test_kill_all_storage_clusterds(mz: MaterializeApplication) -> None:
  117. """Kill all clusterds"""
  118. populate(mz, 1)
  119. for compute_id in range(0, CLUSTER_SIZE):
  120. kill_clusterd(mz, compute_id)
  121. validate(mz, 1)
  122. @pytest.mark.long
  123. def test_kill_one_storage_clusterd(mz: MaterializeApplication) -> None:
  124. """Kill one clusterd out of $CLUSTER_SIZE"""
  125. populate(mz, 2)
  126. kill_clusterd(mz, round(CLUSTER_SIZE / 2))
  127. validate(mz, 2)
  128. @pytest.mark.long
  129. def test_kill_first_storage_clusterd(mz: MaterializeApplication) -> None:
  130. """Kill the first clusterd out of $CLUSTER_SIZE"""
  131. populate(mz, 3)
  132. kill_clusterd(mz, 0)
  133. validate(mz, 3)
  134. @pytest.mark.long
  135. def test_kill_all_but_one_storage_clusterd(mz: MaterializeApplication) -> None:
  136. """Kill all clusterds except one"""
  137. populate(mz, 4)
  138. for compute_id in list(range(0, 2)) + list(range(3, CLUSTER_SIZE)):
  139. kill_clusterd(mz, compute_id)
  140. validate(mz, 4)
  141. @pytest.mark.long
  142. def test_kill_storage_while_suspended(mz: MaterializeApplication) -> None:
  143. """Suspend a clusterd and resume it after the rest of the cluster went down."""
  144. populate(mz, 5)
  145. kill_clusterd(mz, 2, signal="SIGSTOP")
  146. time.sleep(1)
  147. kill_clusterd(mz, 4)
  148. time.sleep(10)
  149. kill_clusterd(mz, 2, signal="SIGCONT")
  150. validate(mz, 5)
  151. @pytest.mark.long
  152. def test_suspend_while_killing_storage(mz: MaterializeApplication) -> None:
  153. """Suspend a clusterd while the cluster is going down and resume it after."""
  154. populate(mz, 6)
  155. kill_clusterd(mz, 4)
  156. kill_clusterd(mz, 2, signal="SIGSTOP")
  157. time.sleep(10)
  158. kill_clusterd(mz, 2, signal="SIGCONT")
  159. validate(mz, 6)
  160. @pytest.mark.long
  161. def test_suspend_all_but_one_storage(mz: MaterializeApplication) -> None:
  162. """Suspend all clusterds while killing one."""
  163. populate(mz, 7)
  164. for compute_id in range(0, CLUSTER_SIZE):
  165. if compute_id != 4:
  166. kill_clusterd(mz, compute_id, signal="SIGSTOP")
  167. kill_clusterd(mz, 4)
  168. time.sleep(10)
  169. for compute_id in range(0, CLUSTER_SIZE):
  170. if compute_id != 4:
  171. kill_clusterd(mz, compute_id, signal="SIGCONT")
  172. validate(mz, 7)