test_crash.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import subprocess
  10. from textwrap import dedent
  11. from kubernetes.client import V1Pod, V1StatefulSet
  12. from pg8000.exceptions import InterfaceError
  13. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  14. from materialize.cloudtest.util.cluster import cluster_pod_name
  15. from materialize.cloudtest.util.wait import wait
  16. def populate(mz: MaterializeApplication, seed: int) -> None:
  17. mz.testdrive.run(
  18. input=dedent(
  19. """
  20. > CREATE TABLE t1 (f1 INTEGER);
  21. > INSERT INTO t1 VALUES (123);
  22. > CREATE DEFAULT INDEX ON t1;
  23. > INSERT INTO t1 VALUES (234);
  24. > CREATE CONNECTION kafka TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  25. $ kafka-create-topic topic=crash
  26. > CREATE SOURCE s1
  27. FROM KAFKA CONNECTION kafka
  28. (TOPIC 'testdrive-crash-${testdrive.seed}');
  29. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-crash-${testdrive.seed}")
  30. FORMAT BYTES
  31. ENVELOPE NONE;
  32. $ kafka-ingest format=bytes topic=crash
  33. CDE
  34. > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) FROM t1 UNION ALL SELECT COUNT(*) FROM s1_tbl;
  35. $ kafka-ingest format=bytes topic=crash
  36. DEF
  37. > CREATE DEFAULT INDEX ON v1;
  38. > SELECT COUNT(*) > 0 FROM s1_tbl;
  39. true
  40. """
  41. ),
  42. seed=seed,
  43. )
  44. def validate(mz: MaterializeApplication, seed: int) -> None:
  45. mz.testdrive.run(
  46. input=dedent(
  47. """
  48. > INSERT INTO t1 VALUES (345);
  49. $ kafka-ingest format=bytes topic=crash
  50. EFG
  51. > SELECT COUNT(*) FROM t1;
  52. 3
  53. > SELECT COUNT(*) FROM s1_tbl;
  54. 3
  55. > SELECT * FROM v1;
  56. 3
  57. 3
  58. """
  59. ),
  60. no_reset=True,
  61. seed=seed,
  62. )
  63. def test_crash_storage(mz: MaterializeApplication) -> None:
  64. populate(mz, 1)
  65. [cluster_id, replica_id] = mz.environmentd.sql_query(
  66. "SELECT s.cluster_id, r.id FROM mz_sources s JOIN mz_cluster_replicas r ON r.cluster_id = s.cluster_id WHERE s.name = 's1'"
  67. )[0]
  68. pod_name = cluster_pod_name(cluster_id, replica_id)
  69. wait(condition="jsonpath={.status.phase}=Running", resource=pod_name)
  70. try:
  71. mz.kubectl("exec", pod_name, "--", "bash", "-c", "kill -9 `pidof clusterd`")
  72. except subprocess.CalledProcessError as e:
  73. # Killing the entrypoint via kubectl may result in kubectl exiting with code 137
  74. assert e.returncode == 137
  75. wait(condition="jsonpath={.status.phase}=Running", resource=pod_name)
  76. validate(mz, 1)
  77. def test_crash_environmentd(mz: MaterializeApplication) -> None:
  78. def restarts(p: V1Pod) -> int:
  79. assert p.status is not None
  80. assert p.status.container_statuses is not None
  81. return p.status.container_statuses[0].restart_count
  82. def get_replica() -> tuple[V1Pod, V1StatefulSet]:
  83. """Find the stateful set for the replica of the default cluster"""
  84. compute_pod_name = "cluster-u1-replica-u1-gen-0-0"
  85. ss_name = "cluster-u1-replica-u1-gen-0"
  86. compute_pod = mz.environmentd.api().read_namespaced_pod(
  87. compute_pod_name, mz.environmentd.namespace()
  88. )
  89. for ss in (
  90. mz.environmentd.apps_api().list_stateful_set_for_all_namespaces().items
  91. ):
  92. assert ss.metadata is not None
  93. if ss.metadata.name == ss_name:
  94. return (compute_pod, ss)
  95. raise RuntimeError(f"No data found for {ss_name}")
  96. populate(mz, 2)
  97. before = get_replica()
  98. try:
  99. mz.environmentd.sql("SELECT mz_unsafe.mz_panic('forced panic')")
  100. except InterfaceError:
  101. pass
  102. validate(mz, 2)
  103. after = get_replica()
  104. # A environmentd crash must not restart other nodes
  105. assert restarts(before[0]) == restarts(after[0])
  106. def test_crash_clusterd(mz: MaterializeApplication) -> None:
  107. populate(mz, 3)
  108. mz.testdrive.run(
  109. input=dedent(
  110. """
  111. $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  112. ALTER SYSTEM SET unsafe_enable_unstable_dependencies = true;
  113. """
  114. ),
  115. no_reset=True,
  116. )
  117. mz.environmentd.sql("CREATE TABLE crash_table (f1 TEXT)")
  118. mz.environmentd.sql(
  119. "CREATE MATERIALIZED VIEW crash_view AS SELECT mz_unsafe.mz_panic(f1) FROM crash_table"
  120. )
  121. mz.environmentd.sql("INSERT INTO crash_table VALUES ('forced panic')")
  122. mz.testdrive.run(
  123. input=dedent(
  124. """
  125. > DROP MATERIALIZED VIEW crash_view
  126. """
  127. ),
  128. no_reset=True,
  129. seed=3,
  130. )
  131. validate(mz, 3)