123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import threading
- import time
- from io import StringIO
- from textwrap import dedent
- import pytest
- from pg8000 import Connection
- from materialize.cloudtest.app.materialize_application import MaterializeApplication
- def query(conn: Connection, sql: str) -> None:
- # Wrap all exceptions so that when the connection is closed from the other
- # thread we don't panic the test.
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql)
- except:
- pass
- def copy(conn: Connection, sql: str) -> None:
- try:
- conn.run(sql, stream=StringIO())
- except:
- pass
- # Returns and consumes notices on conn until one contains `contains`.
- def assert_notice(conn: Connection, contains: bytes) -> None:
- while True:
- try:
- notice = conn.notices.pop()
- if contains in notice[b"M"]:
- return
- except IndexError:
- pass
- time.sleep(0.2)
- # Test that an OOMing cluster replica generates expected entries in
- # `mz_cluster_replica_statuses`
- @pytest.mark.skip(reason="Now fails after a Buildkite upgrade database-issues#6307")
- def test_oom_clusterd(mz: MaterializeApplication) -> None:
- def verify_cluster_oomed() -> None:
- with mz.environmentd.sql_cursor(autocommit=False) as cur:
- cur.execute(
- dedent(
- """
- SET CLUSTER=mz_catalog_server;
- DECLARE c CURSOR FOR SUBSCRIBE TO (
- SELECT status, reason
- FROM mz_internal.mz_cluster_replica_statuses mcrs
- JOIN mz_cluster_replicas mcr ON mcrs.replica_id = mcr.id
- JOIN mz_clusters mc ON mcr.cluster_id = mc.id
- WHERE mc.name = 'oom'
- )
- """
- )
- )
- while True:
- cur.execute("FETCH ALL c")
- for _, diff, status, reason in cur.fetchall():
- if diff < 1:
- continue
- if status == "offline" and reason == "oom-killed":
- return
- # Once we create an index on this view in a cluster limited to 2Gb, it is practically guaranteed to OOM
- mz.environmentd.sql(
- dedent(
- """
- CREATE CLUSTER oom REPLICAS (oom (size 'mem-2'));
- SET cluster=oom;
- CREATE VIEW oom AS
- SELECT repeat('abc' || x || y, 1000000) FROM
- (SELECT * FROM generate_series(1, 1000000)) a(x),
- (SELECT * FROM generate_series(1, 1000000)) b(y);
- CREATE DEFAULT INDEX oom_idx ON oom
- """
- )
- )
- # Wait for the cluster pod to OOM
- verify_cluster_oomed()
- mz.environmentd.sql("DROP CLUSTER oom CASCADE; DROP VIEW oom CASCADE")
- # Test that a crashed (and restarted) cluster replica generates expected notice
- # events.
- def test_crash_clusterd(mz: MaterializeApplication) -> None:
- mz.environmentd.sql("DROP TABLE IF EXISTS t1 CASCADE")
- mz.environmentd.sql("CREATE TABLE t1 (f1 TEXT)")
- # For various query contexts, create a connection, run a query that'll never
- # finish in another thread, and examine its notices from this thread since
- # the queries block forever. The contexts here (SELECT stuck in pending,
- # direct SUBSCRIBE, SUBSCRIBE via COPY) are all separately implemented, so
- # need to be separately tested.
- c_select = mz.environmentd.sql_conn()
- t_select = threading.Thread(
- target=query,
- args=(
- c_select,
- "SELECT * FROM t1 AS OF 18446744073709551615",
- ),
- )
- t_select.start()
- c_subscribe = mz.environmentd.sql_conn()
- t_subscribe = threading.Thread(
- target=query,
- args=(
- c_subscribe,
- "SUBSCRIBE t1",
- ),
- )
- t_subscribe.start()
- c_copy = mz.environmentd.sql_conn()
- t_copy = threading.Thread(
- target=copy,
- args=(
- c_copy,
- "COPY (SUBSCRIBE t1) TO STDOUT",
- ),
- )
- t_copy.start()
- # Wait a teeny bit for the queries to be receiving notices.
- time.sleep(1)
- c_select.notices.clear()
- c_subscribe.notices.clear()
- c_copy.notices.clear()
- # Simulate an unexpected clusterd crash.
- pods = mz.kubectl("get", "pods", "-o", "custom-columns=:metadata.name")
- podcount = 0
- for pod in pods.splitlines():
- if "cluster" in pod:
- try:
- mz.kubectl("delete", "pod", pod)
- podcount += 1
- except:
- # It's OK if the pod delete fails --
- # it probably means we raced with a previous test that
- # dropped resources.
- pass
- assert podcount > 0
- # Wait for expected notices on all connections.
- msg = b'cluster replica quickstart.r1 changed status to "offline"'
- assert_notice(c_select, msg)
- assert_notice(c_subscribe, msg)
- assert_notice(c_copy, msg)
- # Cleanup for other tests.
- mz.environmentd.sql("DROP TABLE t1")
- # We need all the above threads to finish for the test to succeed.Close the
- # connections from this thread because pg8000 doesn't support cancellation
- # and dropping the table in mz doesn't complete the queries either.
- c_select.close()
- c_subscribe.close()
- c_copy.close()
|