test_compute_shared_fate.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import logging
  10. import subprocess
  11. import time
  12. from textwrap import dedent
  13. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  14. from materialize.cloudtest.util.cluster import cluster_pod_name
  15. LOGGER = logging.getLogger(__name__)
  16. CLUSTER_SIZE = 8
  17. def populate(mz: MaterializeApplication, seed: int) -> None:
  18. mz.testdrive.run(
  19. input=dedent(
  20. f"""
  21. > CREATE CLUSTER shared_fate REPLICAS (shared_fate_replica (SIZE '{CLUSTER_SIZE}-1'));
  22. > SET cluster = shared_fate;
  23. > CREATE TABLE t1 (f1 INTEGER);
  24. > INSERT INTO t1 SELECT 123000 + generate_series FROM generate_series(1, 1000);
  25. > CREATE DEFAULT INDEX ON t1;
  26. > INSERT INTO t1 SELECT 234000 + generate_series FROM generate_series(1, 1000);
  27. > CREATE CONNECTION kafka TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT)
  28. $ kafka-create-topic topic=shared-fate partitions={CLUSTER_SIZE}
  29. > CREATE SOURCE s1
  30. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-shared-fate-${{testdrive.seed}}');
  31. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-shared-fate-${{testdrive.seed}}")
  32. FORMAT BYTES
  33. ENVELOPE NONE;
  34. $ kafka-ingest format=bytes topic=shared-fate repeat=1000
  35. CDE${{kafka-ingest.iteration}}
  36. > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) FROM t1 UNION ALL SELECT COUNT(*) FROM s1_tbl;
  37. $ kafka-ingest format=bytes topic=shared-fate repeat=1000
  38. DEF${{kafka-ingest.iteration}}
  39. > CREATE DEFAULT INDEX ON v1;
  40. > SELECT COUNT(*) > 0 FROM s1_tbl;
  41. true
  42. """
  43. ),
  44. seed=seed,
  45. )
  46. def validate(mz: MaterializeApplication, seed: int) -> None:
  47. mz.testdrive.run(
  48. input=dedent(
  49. """
  50. > SET cluster = shared_fate;
  51. > INSERT INTO t1 SELECT 345000 + generate_series FROM generate_series(1, 1000);
  52. $ kafka-ingest format=bytes topic=shared-fate repeat=1000
  53. EFG${kafka-ingest.iteration}
  54. > SELECT COUNT(*) FROM t1;
  55. 3000
  56. > SELECT COUNT(*) FROM s1_tbl;
  57. 3000
  58. > SELECT * FROM v1;
  59. 3000
  60. 3000
  61. > DROP CLUSTER shared_fate CASCADE;
  62. """
  63. ),
  64. no_reset=True,
  65. seed=seed,
  66. )
  67. def kill_clusterd(
  68. mz: MaterializeApplication, compute_id: int, signal: str = "SIGKILL"
  69. ) -> None:
  70. cluster_id, replica_id = mz.environmentd.sql_query(
  71. "SELECT cluster_id, id FROM mz_cluster_replicas WHERE name = 'shared_fate_replica'"
  72. )[0]
  73. pod_name = cluster_pod_name(cluster_id, replica_id, compute_id)
  74. LOGGER.info(f"sending signal {signal} to pod {pod_name}...")
  75. try:
  76. mz.kubectl(
  77. "exec", pod_name, "--", "bash", "-c", f"kill -{signal} `pidof clusterd`"
  78. )
  79. except subprocess.CalledProcessError:
  80. # The clusterd process or container most likely has stopped already or is on its way
  81. pass
  82. def test_kill_all_clusterds(mz: MaterializeApplication) -> None:
  83. """Kill all clusterds"""
  84. populate(mz, 1)
  85. for compute_id in range(0, CLUSTER_SIZE):
  86. kill_clusterd(mz, compute_id)
  87. validate(mz, 1)
  88. def test_kill_one_clusterd(mz: MaterializeApplication) -> None:
  89. """Kill one clusterd out of $CLUSTER_SIZE"""
  90. populate(mz, 2)
  91. kill_clusterd(mz, round(CLUSTER_SIZE / 2))
  92. validate(mz, 2)
  93. def test_kill_first_clusterd(mz: MaterializeApplication) -> None:
  94. """Kill the first clusterd out of $CLUSTER_SIZE"""
  95. populate(mz, 3)
  96. kill_clusterd(mz, 0)
  97. validate(mz, 3)
  98. def test_kill_all_but_one_clusterd(mz: MaterializeApplication) -> None:
  99. """Kill all clusterds except one"""
  100. populate(mz, 4)
  101. for compute_id in list(range(0, 2)) + list(range(3, CLUSTER_SIZE)):
  102. kill_clusterd(mz, compute_id)
  103. validate(mz, 4)
  104. def test_kill_while_suspended(mz: MaterializeApplication) -> None:
  105. """Suspend a clusterd and resume it after the rest of the cluster went down."""
  106. populate(mz, 5)
  107. kill_clusterd(mz, 2, signal="SIGSTOP")
  108. time.sleep(1)
  109. kill_clusterd(mz, 4)
  110. time.sleep(10)
  111. kill_clusterd(mz, 2, signal="SIGCONT")
  112. validate(mz, 5)
  113. def test_suspend_while_killing(mz: MaterializeApplication) -> None:
  114. """Suspend a clusterd while the cluster is going down and resume it after."""
  115. populate(mz, 6)
  116. kill_clusterd(mz, 4)
  117. kill_clusterd(mz, 2, signal="SIGSTOP")
  118. time.sleep(10)
  119. kill_clusterd(mz, 2, signal="SIGCONT")
  120. validate(mz, 6)
  121. def test_suspend_all_but_one(mz: MaterializeApplication) -> None:
  122. """Suspend all clusterds while killing one."""
  123. populate(mz, 7)
  124. for compute_id in range(0, CLUSTER_SIZE):
  125. if compute_id != 4:
  126. kill_clusterd(mz, compute_id, signal="SIGSTOP")
  127. kill_clusterd(mz, 4)
  128. time.sleep(10)
  129. for compute_id in range(0, CLUSTER_SIZE):
  130. if compute_id != 4:
  131. kill_clusterd(mz, compute_id, signal="SIGCONT")
  132. validate(mz, 7)