top_k.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. def schema() -> str:
  13. return dedent(
  14. """
  15. $ set schema={
  16. "type" : "record",
  17. "name" : "test",
  18. "fields" : [
  19. {"name":"f1", "type":"string"}
  20. ]
  21. }
  22. """
  23. )
  24. @externally_idempotent(False)
  25. class BasicTopK(Check):
  26. def initialize(self) -> Testdrive:
  27. return Testdrive(
  28. dedent(
  29. """
  30. > CREATE TABLE basic_topk_table (f1 INTEGER);
  31. > INSERT INTO basic_topk_table VALUES (1), (2), (2), (3), (3), (3), (NULL), (NULL), (NULL), (NULL);
  32. """
  33. )
  34. )
  35. def manipulate(self) -> list[Testdrive]:
  36. return [
  37. Testdrive(dedent(s))
  38. for s in [
  39. """
  40. > INSERT INTO basic_topk_table SELECT * FROM basic_topk_table
  41. > CREATE MATERIALIZED VIEW basic_topk_view1 AS SELECT f1, COUNT(f1) FROM basic_topk_table GROUP BY f1 ORDER BY f1 DESC NULLS LAST LIMIT 2;
  42. > INSERT INTO basic_topk_table SELECT * FROM basic_topk_table;
  43. > CREATE VIEW view_with_limit_offset_1a AS SELECT DISTINCT f1 FROM basic_topk_table ORDER BY f1 DESC NULLS LAST LIMIT 2 OFFSET 1;
  44. # offset and limit reordered
  45. > CREATE VIEW view_with_limit_offset_1b AS SELECT DISTINCT f1 FROM basic_topk_table ORDER BY f1 DESC NULLS LAST OFFSET 1 LIMIT 2;
  46. """,
  47. """
  48. > INSERT INTO basic_topk_table SELECT * FROM basic_topk_table;
  49. > CREATE MATERIALIZED VIEW basic_topk_view2 AS SELECT f1, COUNT(f1) FROM basic_topk_table GROUP BY f1 ORDER BY f1 ASC NULLS FIRST LIMIT 2;
  50. > INSERT INTO basic_topk_table SELECT * FROM basic_topk_table;
  51. > CREATE VIEW view_with_limit_offset_2a AS SELECT DISTINCT f1 FROM basic_topk_table ORDER BY f1 DESC NULLS LAST LIMIT 2 OFFSET 1;
  52. # offset and limit reordered
  53. > CREATE VIEW view_with_limit_offset_2b AS SELECT DISTINCT f1 FROM basic_topk_table ORDER BY f1 DESC NULLS LAST OFFSET 1 LIMIT 2;
  54. """,
  55. ]
  56. ]
  57. def validate(self) -> Testdrive:
  58. return Testdrive(
  59. dedent(
  60. """
  61. > SELECT * FROM basic_topk_view1;
  62. 2 32
  63. 3 48
  64. > SELECT * FROM basic_topk_view2;
  65. 1 16
  66. <null> 0
  67. > SELECT * FROM view_with_limit_offset_1a;
  68. 2
  69. 1
  70. > SELECT * FROM view_with_limit_offset_2a;
  71. 2
  72. 1
  73. > SELECT * FROM view_with_limit_offset_1b;
  74. 2
  75. 1
  76. > SELECT * FROM view_with_limit_offset_2b;
  77. 2
  78. 1
  79. """
  80. )
  81. )
  82. @externally_idempotent(False)
  83. class MonotonicTopK(Check):
  84. def initialize(self) -> Testdrive:
  85. return Testdrive(
  86. schema()
  87. + dedent(
  88. """
  89. $ kafka-create-topic topic=monotonic-topk
  90. $ kafka-ingest format=avro topic=monotonic-topk schema=${schema} repeat=1
  91. {"f1": "A"}
  92. > CREATE SOURCE monotonic_topk_source_src
  93. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-monotonic-topk-${testdrive.seed}')
  94. > CREATE TABLE monotonic_topk_source FROM SOURCE monotonic_topk_source_src (REFERENCE "testdrive-monotonic-topk-${testdrive.seed}")
  95. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  96. ENVELOPE NONE
  97. """
  98. )
  99. )
  100. def manipulate(self) -> list[Testdrive]:
  101. return [
  102. Testdrive(schema() + dedent(s))
  103. for s in [
  104. """
  105. $ kafka-ingest format=avro topic=monotonic-topk schema=${schema} repeat=2
  106. {"f1": "B"}
  107. > CREATE MATERIALIZED VIEW monotonic_topk_view1 AS SELECT f1, COUNT(f1) FROM monotonic_topk_source GROUP BY f1 ORDER BY f1 DESC NULLS LAST LIMIT 2;
  108. $ kafka-ingest format=avro topic=monotonic-topk schema=${schema} repeat=3
  109. {"f1": "C"}
  110. """,
  111. """
  112. $ kafka-ingest format=avro topic=monotonic-topk schema=${schema} repeat=4
  113. {"f1": "D"}
  114. > CREATE MATERIALIZED VIEW monotonic_topk_view2 AS SELECT f1, COUNT(f1) FROM monotonic_topk_source GROUP BY f1 ORDER BY f1 ASC NULLS FIRST LIMIT 2;
  115. $ kafka-ingest format=avro topic=monotonic-topk schema=${schema} repeat=5
  116. {"f1": "E"}
  117. """,
  118. ]
  119. ]
  120. def validate(self) -> Testdrive:
  121. return Testdrive(
  122. dedent(
  123. """
  124. > SELECT * FROM monotonic_topk_view1;
  125. E 5
  126. D 4
  127. > SELECT * FROM monotonic_topk_view2;
  128. A 1
  129. B 2
  130. """
  131. )
  132. )
  133. @externally_idempotent(False)
  134. class MonotonicTop1(Check):
  135. def initialize(self) -> Testdrive:
  136. return Testdrive(
  137. schema()
  138. + dedent(
  139. """
  140. $ kafka-create-topic topic=monotonic-top1
  141. $ kafka-ingest format=avro topic=monotonic-top1 schema=${schema} repeat=1
  142. {"f1": "A"}
  143. > CREATE SOURCE monotonic_top1_source_src
  144. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-monotonic-top1-${testdrive.seed}')
  145. > CREATE TABLE monotonic_top1_source FROM SOURCE monotonic_top1_source_src (REFERENCE "testdrive-monotonic-top1-${testdrive.seed}")
  146. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  147. ENVELOPE NONE
  148. """
  149. )
  150. )
  151. def manipulate(self) -> list[Testdrive]:
  152. return [
  153. Testdrive(schema() + dedent(s))
  154. for s in [
  155. """
  156. $ kafka-ingest format=avro topic=monotonic-top1 schema=${schema} repeat=2
  157. {"f1": "B"}
  158. > CREATE MATERIALIZED VIEW monotonic_top1_view1 AS SELECT f1, COUNT(f1) FROM monotonic_top1_source GROUP BY f1 ORDER BY f1 DESC NULLS LAST LIMIT 1;
  159. $ kafka-ingest format=avro topic=monotonic-top1 schema=${schema} repeat=3
  160. {"f1": "C"}
  161. """,
  162. """
  163. $ kafka-ingest format=avro topic=monotonic-top1 schema=${schema} repeat=4
  164. {"f1": "C"}
  165. > CREATE MATERIALIZED VIEW monotonic_top1_view2 AS SELECT f1, COUNT(f1) FROM monotonic_top1_source GROUP BY f1 ORDER BY f1 ASC NULLS FIRST LIMIT 1;
  166. $ kafka-ingest format=avro topic=monotonic-top1 schema=${schema} repeat=5
  167. {"f1": "D"}
  168. """,
  169. ]
  170. ]
  171. def validate(self) -> Testdrive:
  172. return Testdrive(
  173. dedent(
  174. """
  175. > SELECT * FROM monotonic_top1_view1;
  176. D 5
  177. > SELECT * FROM monotonic_top1_view2;
  178. A 1
  179. """
  180. )
  181. )