continual_task.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. from materialize.checks.executors import Executor
  14. from materialize.mz_version import MzVersion
  15. def schemas() -> str:
  16. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  17. class AuditLogCT(Check):
  18. """Continual Task for audit logging"""
  19. def _can_run(self, e: Executor) -> bool:
  20. return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
  21. def initialize(self) -> Testdrive:
  22. return Testdrive(
  23. schemas()
  24. + dedent(
  25. """
  26. > CREATE TABLE t_input (key INT);
  27. > INSERT INTO t_input VALUES (1);
  28. > CREATE MATERIALIZED VIEW anomalies AS SELECT sum(key)::INT FROM t_input;
  29. > CREATE CONTINUAL TASK audit_log (count INT) ON INPUT anomalies AS (
  30. INSERT INTO audit_log SELECT * FROM anomalies WHERE sum IS NOT NULL;
  31. )
  32. """
  33. )
  34. )
  35. def manipulate(self) -> list[Testdrive]:
  36. return [
  37. Testdrive(schemas() + dedent(s))
  38. for s in [
  39. """
  40. > INSERT INTO t_input VALUES (2), (3);
  41. """,
  42. """
  43. > INSERT INTO t_input VALUES (4), (5), (6);
  44. """,
  45. ]
  46. ]
  47. def validate(self) -> Testdrive:
  48. return Testdrive(
  49. dedent(
  50. """
  51. > SELECT * FROM audit_log
  52. 1
  53. 6
  54. 21
  55. """
  56. )
  57. )
  58. class StreamTableJoinCT(Check):
  59. """Continual Task for stream table join"""
  60. def _can_run(self, e: Executor) -> bool:
  61. return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
  62. def initialize(self) -> Testdrive:
  63. return Testdrive(
  64. schemas()
  65. + dedent(
  66. """
  67. > CREATE TABLE big (key INT);
  68. > CREATE TABLE small (key INT, val STRING);
  69. > INSERT INTO small VALUES (1, 'v1');
  70. > INSERT INTO small VALUES (2, 'v2');
  71. > INSERT INTO small VALUES (3, 'v3');
  72. > INSERT INTO small VALUES (4, 'v4');
  73. > INSERT INTO small VALUES (5, 'v5');
  74. > CREATE CONTINUAL TASK stj (key INT, val STRING) ON INPUT big AS (
  75. INSERT INTO stj SELECT b.key, s.val FROM big b JOIN small s ON b.key = s.key;
  76. )
  77. > INSERT INTO big VALUES (1), (2), (3), (4), (5)
  78. """
  79. )
  80. )
  81. def manipulate(self) -> list[Testdrive]:
  82. return [
  83. Testdrive(schemas() + dedent(s))
  84. for s in [
  85. """
  86. > UPDATE small SET val = 'v' || val;
  87. > INSERT INTO big VALUES (1), (2), (3), (4), (5)
  88. """,
  89. """
  90. > UPDATE small SET val = 'v' || val;
  91. > INSERT INTO big VALUES (1), (2), (3), (4), (5)
  92. """,
  93. ]
  94. ]
  95. def validate(self) -> Testdrive:
  96. return Testdrive(
  97. dedent(
  98. """
  99. > SELECT * FROM stj
  100. 1 v1
  101. 2 v2
  102. 3 v3
  103. 4 v4
  104. 5 v5
  105. 1 vv1
  106. 2 vv2
  107. 3 vv3
  108. 4 vv4
  109. 5 vv5
  110. 1 vvv1
  111. 2 vvv2
  112. 3 vvv3
  113. 4 vvv4
  114. 5 vvv5
  115. """
  116. )
  117. )
  118. class UpsertCT(Check):
  119. """Continual Task for upserts"""
  120. def _can_run(self, e: Executor) -> bool:
  121. return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
  122. def initialize(self) -> Testdrive:
  123. return Testdrive(
  124. schemas()
  125. + dedent(
  126. """
  127. > CREATE TABLE append_only (key INT, val INT);
  128. > CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
  129. DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
  130. INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
  131. )
  132. > INSERT INTO append_only VALUES (1, 2), (1, 1)
  133. """
  134. )
  135. )
  136. def manipulate(self) -> list[Testdrive]:
  137. return [
  138. Testdrive(schemas() + dedent(s))
  139. for s in [
  140. """
  141. > INSERT INTO append_only VALUES (1, 3), (2, 4)
  142. """,
  143. """
  144. > INSERT INTO append_only VALUES (1, 5), (2, 6), (3, 7);
  145. """,
  146. ]
  147. ]
  148. def validate(self) -> Testdrive:
  149. return Testdrive(
  150. dedent(
  151. """
  152. > INSERT INTO append_only VALUES (3, 8);
  153. > SELECT * FROM upsert
  154. 1 5
  155. 2 6
  156. 3 8
  157. """
  158. )
  159. )