123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- @externally_idempotent(False)
- class SourceErrors(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- # In order to avoid conflicts, user must be unique
- CREATE USER source_errors_user1 WITH SUPERUSER PASSWORD 'postgres';
- ALTER USER source_errors_user1 WITH replication;
- DROP PUBLICATION IF EXISTS source_errors_publicationA;
- DROP PUBLICATION IF EXISTS source_errors_publicationB;
- DROP TABLE IF EXISTS source_errors_table;
- CREATE TABLE source_errors_table (f1 TEXT);
- ALTER TABLE source_errors_table REPLICA IDENTITY FULL;
- INSERT INTO source_errors_table VALUES (1);
- CREATE PUBLICATION source_errors_publicationA FOR ALL TABLES;
- CREATE PUBLICATION source_errors_publicationB FOR ALL TABLES;
- > CREATE SECRET source_errors_secret AS 'postgres';
- > CREATE CONNECTION source_errors_connection FOR POSTGRES
- HOST 'postgres',
- DATABASE postgres,
- USER source_errors_user1,
- PASSWORD SECRET source_errors_secret
- > CREATE SOURCE source_errors_sourceA
- FROM POSTGRES CONNECTION source_errors_connection
- (PUBLICATION 'source_errors_publicationa') /* all lowercase */
- > CREATE TABLE source_errors_tableA FROM SOURCE source_errors_sourceA (REFERENCE source_errors_table);
- > CREATE SOURCE source_errors_sourceB
- FROM POSTGRES CONNECTION source_errors_connection
- (PUBLICATION 'source_errors_publicationb') /* all lowercase */
- > CREATE TABLE source_errors_tableB FROM SOURCE source_errors_sourceB (REFERENCE source_errors_table);
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO source_errors_table VALUES (2);
- > SELECT COUNT(*) FROM source_errors_tableA;
- 2
- > SELECT COUNT(*) FROM source_errors_tableB;
- 2
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- DROP PUBLICATION IF EXISTS source_errors_publicationA;
- INSERT INTO source_errors_table VALUES (3);
- # We sleep for a bit here to allow status updates to propagate to the storage controller
- # in scenarios where environmentd is killed
- $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s
- """,
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- DROP PUBLICATION IF EXISTS source_errors_publicationB;
- INSERT INTO source_errors_table VALUES (4);
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- # We check two things: a) that the expected (sub)sources are
- # stalled, and b) that they report the expected error. Only
- # checking the error using bool_and wouldn't work because this
- # check ignores NULL values, so would succeed if, say, all
- # sources are in state 'running'.
- #
- # TODO(aljoscha): We recently migrated the status history
- # collection, so all updates are lost when upgrading. This has
- # the consequence that sources report as 'created'. We therefore
- # have to accept 'created' below, but should remove this
- # relaxation once we have enough new releasees that there won't
- # be a migration between tested versions. Plus, because of this
- # we have to wrap the error check in a coalesce: when all the
- # status updates show 'created', we'll have no error and get a
- # NULL result.
- #
- # Additionally, we also have to accept paused, because platform
- # checks might pause replicas, and these paused status updates
- # take precedence over errors. To fix this, we might want to
- # rewrite this test to look at mz_source_status_history
- # instead, which contains the full history.
- """
- > SELECT
- coalesce(bool_and(error ~* 'publication .+ does not exist'), true) as matches,
- bool_and(status IN ('stalled', 'created', 'paused')) as is_stalled
- FROM mz_internal.mz_source_statuses
- WHERE
- name IN ('source_errors_sourcea', 'source_errors_sourceb', 'source_errors_tablea', 'source_errors_tableb');
- true true
- """
- )
- )
|