upsert.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. def schemas() -> str:
  14. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  15. class UpsertInsert(Check):
  16. """Test that repeated inserts of the same record are properly handled"""
  17. def initialize(self) -> Testdrive:
  18. return Testdrive(
  19. schemas()
  20. + dedent(
  21. """
  22. $ kafka-create-topic topic=upsert-insert
  23. $ kafka-ingest format=avro key-format=avro topic=upsert-insert key-schema=${keyschema} schema=${schema} repeat=10000
  24. {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  25. > CREATE SOURCE upsert_insert_src
  26. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-insert-${testdrive.seed}')
  27. > CREATE TABLE upsert_insert FROM SOURCE upsert_insert_src (REFERENCE "testdrive-upsert-insert-${testdrive.seed}")
  28. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  29. ENVELOPE UPSERT
  30. > CREATE MATERIALIZED VIEW upsert_insert_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert;
  31. """
  32. )
  33. )
  34. def manipulate(self) -> list[Testdrive]:
  35. return [
  36. Testdrive(schemas() + dedent(s))
  37. for s in [
  38. """
  39. $ kafka-ingest format=avro key-format=avro topic=upsert-insert key-schema=${keyschema} schema=${schema} repeat=10000
  40. {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  41. """,
  42. """
  43. $ kafka-ingest format=avro key-format=avro topic=upsert-insert key-schema=${keyschema} schema=${schema} repeat=10000
  44. {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  45. """,
  46. ]
  47. ]
  48. def validate(self) -> Testdrive:
  49. return Testdrive(
  50. dedent(
  51. """
  52. > SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert
  53. 10000 10000 10000
  54. > SELECT * FROM upsert_insert_view;
  55. 10000
  56. """
  57. )
  58. )
  59. class UpsertUpdate(Check):
  60. def initialize(self) -> Testdrive:
  61. return Testdrive(
  62. schemas()
  63. + dedent(
  64. """
  65. $ kafka-create-topic topic=upsert-update
  66. $ kafka-ingest format=avro key-format=avro topic=upsert-update key-schema=${keyschema} schema=${schema} repeat=10000
  67. {"key1": "${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  68. > CREATE SOURCE upsert_update_src
  69. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-update-${testdrive.seed}')
  70. > CREATE TABLE upsert_update FROM SOURCE upsert_update_src (REFERENCE "testdrive-upsert-update-${testdrive.seed}")
  71. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  72. ENVELOPE UPSERT
  73. > CREATE MATERIALIZED VIEW upsert_update_view AS SELECT LEFT(f1, 1), COUNT(*) AS c1, COUNT(DISTINCT key1) AS c2, COUNT(DISTINCT f1) AS c3 FROM upsert_update GROUP BY LEFT(f1, 1);
  74. """
  75. )
  76. )
  77. def manipulate(self) -> list[Testdrive]:
  78. return [
  79. Testdrive(schemas() + dedent(s))
  80. for s in [
  81. """
  82. $ kafka-ingest format=avro key-format=avro topic=upsert-update key-schema=${keyschema} schema=${schema} repeat=10000
  83. {"key1": "${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
  84. """,
  85. """
  86. $ kafka-ingest format=avro key-format=avro topic=upsert-update key-schema=${keyschema} schema=${schema} repeat=10000
  87. {"key1": "${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
  88. """,
  89. ]
  90. ]
  91. def validate(self) -> Testdrive:
  92. return Testdrive(
  93. dedent(
  94. """
  95. > SELECT * FROM upsert_update_view;
  96. C 10000 10000 10000
  97. """
  98. )
  99. )
  100. class UpsertDelete(Check):
  101. def initialize(self) -> Testdrive:
  102. return Testdrive(
  103. schemas()
  104. + dedent(
  105. """
  106. $ kafka-create-topic topic=upsert-delete
  107. $ kafka-ingest format=avro key-format=avro topic=upsert-delete key-schema=${keyschema} schema=${schema} repeat=30000
  108. {"key1": "${kafka-ingest.iteration}"} {"f1": "${kafka-ingest.iteration}"}
  109. > CREATE SOURCE upsert_delete_src
  110. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-delete-${testdrive.seed}')
  111. > CREATE TABLE upsert_delete FROM SOURCE upsert_delete_src (REFERENCE "testdrive-upsert-delete-${testdrive.seed}")
  112. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  113. ENVELOPE UPSERT
  114. > CREATE MATERIALIZED VIEW upsert_delete_view AS SELECT COUNT(*), MIN(key1), MAX(key1) FROM upsert_delete;
  115. """
  116. )
  117. )
  118. def manipulate(self) -> list[Testdrive]:
  119. return [
  120. Testdrive(schemas() + dedent(s))
  121. for s in [
  122. """
  123. $ kafka-ingest format=avro key-format=avro topic=upsert-delete key-schema=${keyschema} schema=${schema} repeat=10000
  124. {"key1": "${kafka-ingest.iteration}"}
  125. """,
  126. """
  127. $ kafka-ingest format=avro key-format=avro topic=upsert-delete key-schema=${keyschema} schema=${schema} start-iteration=20000 repeat=10000
  128. {"key1": "${kafka-ingest.iteration}"}
  129. """,
  130. ]
  131. ]
  132. def validate(self) -> Testdrive:
  133. return Testdrive(
  134. dedent(
  135. """
  136. > SELECT * FROM upsert_delete_view;
  137. 10000 10000 19999
  138. """
  139. )
  140. )
  141. class UpsertLegacy(Check):
  142. """
  143. An upsert source test that uses the legacy syntax to create the source
  144. on all versions to ensure the source is properly migrated with the
  145. ActivateSourceVersioningMigration scenario
  146. """
  147. def initialize(self) -> Testdrive:
  148. return Testdrive(
  149. schemas()
  150. + dedent(
  151. """
  152. $ kafka-create-topic topic=upsert-legacy-syntax
  153. $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
  154. {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  155. > CREATE SOURCE upsert_insert_legacy
  156. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
  157. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  158. ENVELOPE UPSERT
  159. > CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
  160. """
  161. )
  162. )
  163. def manipulate(self) -> list[Testdrive]:
  164. return [
  165. Testdrive(schemas() + dedent(s))
  166. for s in [
  167. """
  168. $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
  169. {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  170. """,
  171. """
  172. $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
  173. {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  174. """,
  175. ]
  176. ]
  177. def validate(self) -> Testdrive:
  178. return Testdrive(
  179. dedent(
  180. """
  181. > SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
  182. 10000 10000 10000
  183. > SELECT * FROM upsert_insert_legacy_view;
  184. 10000
  185. """
  186. )
  187. )