test_replica_restart.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import threading
  10. import time
  11. from io import StringIO
  12. from textwrap import dedent
  13. import pytest
  14. from pg8000 import Connection
  15. from materialize.cloudtest.app.materialize_application import MaterializeApplication
  16. def query(conn: Connection, sql: str) -> None:
  17. # Wrap all exceptions so that when the connection is closed from the other
  18. # thread we don't panic the test.
  19. try:
  20. with conn.cursor() as cursor:
  21. cursor.execute(sql)
  22. except:
  23. pass
  24. def copy(conn: Connection, sql: str) -> None:
  25. try:
  26. conn.run(sql, stream=StringIO())
  27. except:
  28. pass
  29. # Returns and consumes notices on conn until one contains `contains`.
  30. def assert_notice(conn: Connection, contains: bytes) -> None:
  31. while True:
  32. try:
  33. notice = conn.notices.pop()
  34. if contains in notice[b"M"]:
  35. return
  36. except IndexError:
  37. pass
  38. time.sleep(0.2)
  39. # Test that an OOMing cluster replica generates expected entries in
  40. # `mz_cluster_replica_statuses`
  41. @pytest.mark.skip(reason="Now fails after a Buildkite upgrade database-issues#6307")
  42. def test_oom_clusterd(mz: MaterializeApplication) -> None:
  43. def verify_cluster_oomed() -> None:
  44. with mz.environmentd.sql_cursor(autocommit=False) as cur:
  45. cur.execute(
  46. dedent(
  47. """
  48. SET CLUSTER=mz_catalog_server;
  49. DECLARE c CURSOR FOR SUBSCRIBE TO (
  50. SELECT status, reason
  51. FROM mz_internal.mz_cluster_replica_statuses mcrs
  52. JOIN mz_cluster_replicas mcr ON mcrs.replica_id = mcr.id
  53. JOIN mz_clusters mc ON mcr.cluster_id = mc.id
  54. WHERE mc.name = 'oom'
  55. )
  56. """
  57. )
  58. )
  59. while True:
  60. cur.execute("FETCH ALL c")
  61. for _, diff, status, reason in cur.fetchall():
  62. if diff < 1:
  63. continue
  64. if status == "offline" and reason == "oom-killed":
  65. return
  66. # Once we create an index on this view in a cluster limited to 2Gb, it is practically guaranteed to OOM
  67. mz.environmentd.sql(
  68. dedent(
  69. """
  70. CREATE CLUSTER oom REPLICAS (oom (size 'mem-2'));
  71. SET cluster=oom;
  72. CREATE VIEW oom AS
  73. SELECT repeat('abc' || x || y, 1000000) FROM
  74. (SELECT * FROM generate_series(1, 1000000)) a(x),
  75. (SELECT * FROM generate_series(1, 1000000)) b(y);
  76. CREATE DEFAULT INDEX oom_idx ON oom
  77. """
  78. )
  79. )
  80. # Wait for the cluster pod to OOM
  81. verify_cluster_oomed()
  82. mz.environmentd.sql("DROP CLUSTER oom CASCADE; DROP VIEW oom CASCADE")
  83. # Test that a crashed (and restarted) cluster replica generates expected notice
  84. # events.
  85. def test_crash_clusterd(mz: MaterializeApplication) -> None:
  86. mz.environmentd.sql("DROP TABLE IF EXISTS t1 CASCADE")
  87. mz.environmentd.sql("CREATE TABLE t1 (f1 TEXT)")
  88. # For various query contexts, create a connection, run a query that'll never
  89. # finish in another thread, and examine its notices from this thread since
  90. # the queries block forever. The contexts here (SELECT stuck in pending,
  91. # direct SUBSCRIBE, SUBSCRIBE via COPY) are all separately implemented, so
  92. # need to be separately tested.
  93. c_select = mz.environmentd.sql_conn()
  94. t_select = threading.Thread(
  95. target=query,
  96. args=(
  97. c_select,
  98. "SELECT * FROM t1 AS OF 18446744073709551615",
  99. ),
  100. )
  101. t_select.start()
  102. c_subscribe = mz.environmentd.sql_conn()
  103. t_subscribe = threading.Thread(
  104. target=query,
  105. args=(
  106. c_subscribe,
  107. "SUBSCRIBE t1",
  108. ),
  109. )
  110. t_subscribe.start()
  111. c_copy = mz.environmentd.sql_conn()
  112. t_copy = threading.Thread(
  113. target=copy,
  114. args=(
  115. c_copy,
  116. "COPY (SUBSCRIBE t1) TO STDOUT",
  117. ),
  118. )
  119. t_copy.start()
  120. # Wait a teeny bit for the queries to be receiving notices.
  121. time.sleep(1)
  122. c_select.notices.clear()
  123. c_subscribe.notices.clear()
  124. c_copy.notices.clear()
  125. # Simulate an unexpected clusterd crash.
  126. pods = mz.kubectl("get", "pods", "-o", "custom-columns=:metadata.name")
  127. podcount = 0
  128. for pod in pods.splitlines():
  129. if "cluster" in pod:
  130. try:
  131. mz.kubectl("delete", "pod", pod)
  132. podcount += 1
  133. except:
  134. # It's OK if the pod delete fails --
  135. # it probably means we raced with a previous test that
  136. # dropped resources.
  137. pass
  138. assert podcount > 0
  139. # Wait for expected notices on all connections.
  140. msg = b'cluster replica quickstart.r1 changed status to "offline"'
  141. assert_notice(c_select, msg)
  142. assert_notice(c_subscribe, msg)
  143. assert_notice(c_copy, msg)
  144. # Cleanup for other tests.
  145. mz.environmentd.sql("DROP TABLE t1")
  146. # We need all the above threads to finish for the test to succeed.Close the
  147. # connections from this thread because pg8000 doesn't support cancellation
  148. # and dropping the table in mz doesn't complete the queries either.
  149. c_select.close()
  150. c_subscribe.close()
  151. c_copy.close()