source_errors.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. @externally_idempotent(False)
  13. class SourceErrors(Check):
  14. def initialize(self) -> Testdrive:
  15. return Testdrive(
  16. dedent(
  17. """
  18. $ postgres-execute connection=postgres://postgres:postgres@postgres
  19. # In order to avoid conflicts, user must be unique
  20. CREATE USER source_errors_user1 WITH SUPERUSER PASSWORD 'postgres';
  21. ALTER USER source_errors_user1 WITH replication;
  22. DROP PUBLICATION IF EXISTS source_errors_publicationA;
  23. DROP PUBLICATION IF EXISTS source_errors_publicationB;
  24. DROP TABLE IF EXISTS source_errors_table;
  25. CREATE TABLE source_errors_table (f1 TEXT);
  26. ALTER TABLE source_errors_table REPLICA IDENTITY FULL;
  27. INSERT INTO source_errors_table VALUES (1);
  28. CREATE PUBLICATION source_errors_publicationA FOR ALL TABLES;
  29. CREATE PUBLICATION source_errors_publicationB FOR ALL TABLES;
  30. > CREATE SECRET source_errors_secret AS 'postgres';
  31. > CREATE CONNECTION source_errors_connection FOR POSTGRES
  32. HOST 'postgres',
  33. DATABASE postgres,
  34. USER source_errors_user1,
  35. PASSWORD SECRET source_errors_secret
  36. > CREATE SOURCE source_errors_sourceA
  37. FROM POSTGRES CONNECTION source_errors_connection
  38. (PUBLICATION 'source_errors_publicationa') /* all lowercase */
  39. > CREATE TABLE source_errors_tableA FROM SOURCE source_errors_sourceA (REFERENCE source_errors_table);
  40. > CREATE SOURCE source_errors_sourceB
  41. FROM POSTGRES CONNECTION source_errors_connection
  42. (PUBLICATION 'source_errors_publicationb') /* all lowercase */
  43. > CREATE TABLE source_errors_tableB FROM SOURCE source_errors_sourceB (REFERENCE source_errors_table);
  44. $ postgres-execute connection=postgres://postgres:postgres@postgres
  45. INSERT INTO source_errors_table VALUES (2);
  46. > SELECT COUNT(*) FROM source_errors_tableA;
  47. 2
  48. > SELECT COUNT(*) FROM source_errors_tableB;
  49. 2
  50. """
  51. )
  52. )
  53. def manipulate(self) -> list[Testdrive]:
  54. return [
  55. Testdrive(dedent(s))
  56. for s in [
  57. """
  58. $ postgres-execute connection=postgres://postgres:postgres@postgres
  59. DROP PUBLICATION IF EXISTS source_errors_publicationA;
  60. INSERT INTO source_errors_table VALUES (3);
  61. # We sleep for a bit here to allow status updates to propagate to the storage controller
  62. # in scenarios where environmentd is killed
  63. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s
  64. """,
  65. """
  66. $ postgres-execute connection=postgres://postgres:postgres@postgres
  67. DROP PUBLICATION IF EXISTS source_errors_publicationB;
  68. INSERT INTO source_errors_table VALUES (4);
  69. """,
  70. ]
  71. ]
  72. def validate(self) -> Testdrive:
  73. return Testdrive(
  74. dedent(
  75. # We check two things: a) that the expected (sub)sources are
  76. # stalled, and b) that they report the expected error. Only
  77. # checking the error using bool_and wouldn't work because this
  78. # check ignores NULL values, so would succeed if, say, all
  79. # sources are in state 'running'.
  80. #
  81. # TODO(aljoscha): We recently migrated the status history
  82. # collection, so all updates are lost when upgrading. This has
  83. # the consequence that sources report as 'created'. We therefore
  84. # have to accept 'created' below, but should remove this
  85. # relaxation once we have enough new releasees that there won't
  86. # be a migration between tested versions. Plus, because of this
  87. # we have to wrap the error check in a coalesce: when all the
  88. # status updates show 'created', we'll have no error and get a
  89. # NULL result.
  90. #
  91. # Additionally, we also have to accept paused, because platform
  92. # checks might pause replicas, and these paused status updates
  93. # take precedence over errors. To fix this, we might want to
  94. # rewrite this test to look at mz_source_status_history
  95. # instead, which contains the full history.
  96. """
  97. > SELECT
  98. coalesce(bool_and(error ~* 'publication .+ does not exist'), true) as matches,
  99. bool_and(status IN ('stalled', 'created', 'paused')) as is_stalled
  100. FROM mz_internal.mz_source_statuses
  101. WHERE
  102. name IN ('source_errors_sourcea', 'source_errors_sourceb', 'source_errors_tablea', 'source_errors_tableb');
  103. true true
  104. """
  105. )
  106. )