kafka_formats.py 10 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. PROTOBUF = dedent(
  13. """
  14. $ file-append path=test.proto
  15. syntax = "proto3";
  16. message Key {
  17. string key1 = 1;
  18. string key2 = 2;
  19. }
  20. message Value {
  21. string value1 = 1;
  22. string value2 = 2;
  23. }
  24. $ protobuf-compile-descriptors inputs=test.proto output=test.proto set-var=test-schema
  25. """
  26. )
  27. @externally_idempotent(False)
  28. class KafkaFormats(Check):
  29. def initialize(self) -> Testdrive:
  30. return Testdrive(
  31. PROTOBUF
  32. + dedent(
  33. """
  34. > CREATE CLUSTER kafka_formats REPLICAS (kafka_formats_r1 (SIZE '4'))
  35. > SET cluster=kafka_formats
  36. $ kafka-create-topic topic=format-bytes
  37. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-bytes
  38. key1A,key1B:value1A,value1B
  39. $ kafka-create-topic topic=format-protobuf partitions=1
  40. $ kafka-ingest topic=format-protobuf
  41. key-format=protobuf key-descriptor-file=test.proto key-message=Key
  42. format=protobuf descriptor-file=test.proto message=Value
  43. {"key1": "key1A", "key2": "key1B"} {"value1": "value1A", "value2": "value1B"}
  44. > CREATE SOURCE format_bytes1_src
  45. IN CLUSTER kafka_formats
  46. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  47. > CREATE TABLE format_bytes1 FROM SOURCE format_bytes1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  48. KEY FORMAT BYTES
  49. VALUE FORMAT BYTES
  50. ENVELOPE UPSERT
  51. > CREATE SOURCE format_text1_src
  52. IN CLUSTER kafka_formats
  53. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  54. > CREATE TABLE format_text1 FROM SOURCE format_text1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  55. KEY FORMAT TEXT
  56. VALUE FORMAT TEXT
  57. ENVELOPE UPSERT
  58. > CREATE SOURCE format_csv1_src
  59. IN CLUSTER kafka_formats
  60. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  61. > CREATE TABLE format_csv1 (key1, key2, value1, value2)
  62. FROM SOURCE format_csv1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  63. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  64. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  65. ENVELOPE UPSERT
  66. > CREATE SOURCE format_regex1_src
  67. IN CLUSTER kafka_formats
  68. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  69. > CREATE TABLE format_regex1 (key1, key2, value1, value2)
  70. FROM SOURCE format_regex1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  71. KEY FORMAT REGEX '(?P<key1>[^,]+),(?P<key2>\\w+)'
  72. VALUE FORMAT REGEX '(?P<value1>[^,]+),(?P<value2>\\w+)'
  73. ENVELOPE UPSERT
  74. > CREATE SOURCE format_protobuf1_src
  75. IN CLUSTER kafka_formats
  76. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-protobuf-${testdrive.seed}')
  77. > CREATE TABLE format_protobuf1 FROM SOURCE format_protobuf1_src (REFERENCE "testdrive-format-protobuf-${testdrive.seed}")
  78. KEY FORMAT PROTOBUF MESSAGE '.Key' USING SCHEMA '${test-schema}'
  79. VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
  80. INCLUDE KEY
  81. ENVELOPE UPSERT
  82. """
  83. )
  84. )
  85. def manipulate(self) -> list[Testdrive]:
  86. return [
  87. Testdrive(PROTOBUF + dedent(s))
  88. for s in [
  89. """
  90. > SET cluster=kafka_formats
  91. > CREATE SOURCE format_bytes2_src
  92. IN CLUSTER kafka_formats
  93. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  94. > CREATE TABLE format_bytes2 FROM SOURCE format_bytes2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  95. KEY FORMAT BYTES
  96. VALUE FORMAT BYTES
  97. ENVELOPE UPSERT
  98. > CREATE SOURCE format_text2_src
  99. IN CLUSTER kafka_formats
  100. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  101. > CREATE TABLE format_text2 FROM SOURCE format_text2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  102. KEY FORMAT TEXT
  103. VALUE FORMAT TEXT
  104. ENVELOPE UPSERT
  105. > CREATE SOURCE format_csv2_src
  106. IN CLUSTER kafka_formats
  107. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  108. > CREATE TABLE format_csv2 (key1, key2, value1, value2)
  109. FROM SOURCE format_csv2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  110. KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  111. VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
  112. ENVELOPE UPSERT
  113. > CREATE SOURCE format_regex2_src
  114. IN CLUSTER kafka_formats
  115. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
  116. > CREATE TABLE format_regex2
  117. FROM SOURCE format_regex2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
  118. KEY FORMAT REGEX '(?P<key1>[^,]+),(?P<key2>\\w+)'
  119. VALUE FORMAT REGEX '(?P<value1>[^,]+),(?P<value2>\\w+)'
  120. ENVELOPE UPSERT
  121. > CREATE SOURCE format_protobuf2_src
  122. IN CLUSTER kafka_formats
  123. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-protobuf-${testdrive.seed}')
  124. > CREATE TABLE format_protobuf2 FROM SOURCE format_protobuf2_src (REFERENCE "testdrive-format-protobuf-${testdrive.seed}")
  125. KEY FORMAT PROTOBUF MESSAGE '.Key' USING SCHEMA '${test-schema}'
  126. VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
  127. INCLUDE KEY
  128. ENVELOPE UPSERT
  129. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-bytes
  130. key2A,key2B:value2A,value2B
  131. $ kafka-ingest topic=format-protobuf
  132. key-format=protobuf key-descriptor-file=test.proto key-message=Key
  133. format=protobuf descriptor-file=test.proto message=Value
  134. {"key1": "key2A", "key2": "key2B"} {"value1": "value2A", "value2": "value2B"}
  135. """,
  136. """
  137. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-bytes
  138. key3A,key3B:value3A,value3B
  139. $ kafka-ingest topic=format-protobuf
  140. key-format=protobuf key-descriptor-file=test.proto key-message=Key
  141. format=protobuf descriptor-file=test.proto message=Value
  142. {"key1": "key3A", "key2": "key3B"} {"value1": "value3A", "value2": "value3B"}
  143. """,
  144. ]
  145. ]
  146. def validate(self) -> Testdrive:
  147. return Testdrive(
  148. dedent(
  149. r"""
  150. > SELECT COUNT(*) FROM format_bytes1
  151. 3
  152. > SELECT * FROM format_text1
  153. key1A,key1B value1A,value1B
  154. key2A,key2B value2A,value2B
  155. key3A,key3B value3A,value3B
  156. > SELECT * FROM format_csv1
  157. key1A key1B value1A value1B
  158. key2A key2B value2A value2B
  159. key3A key3B value3A value3B
  160. > SELECT * FROM format_regex1
  161. key1A key1B value1A value1B
  162. key2A key2B value2A value2B
  163. key3A key3B value3A value3B
  164. > SELECT * FROM format_protobuf1
  165. key1A key1B value1A value1B
  166. key2A key2B value2A value2B
  167. key3A key3B value3A value3B
  168. > SELECT * FROM format_text2
  169. key1A,key1B value1A,value1B
  170. key2A,key2B value2A,value2B
  171. key3A,key3B value3A,value3B
  172. > SELECT * FROM format_csv2
  173. key1A key1B value1A value1B
  174. key2A key2B value2A value2B
  175. key3A key3B value3A value3B
  176. > SELECT * FROM format_regex2
  177. key1A key1B value1A value1B
  178. key2A key2B value2A value2B
  179. key3A key3B value3A value3B
  180. > SELECT * FROM format_protobuf2
  181. key1A key1B value1A value1B
  182. key2A key2B value2A value2B
  183. key3A key3B value3A value3B
  184. $ set-regex match=testdrive-format-bytes-\d+ replacement=<TOPIC>
  185. >[version>=14000] SHOW CREATE SOURCE format_bytes1_src;
  186. materialize.public.format_bytes1_src "CREATE SOURCE materialize.public.format_bytes1_src\nIN CLUSTER kafka_formats\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = '<TOPIC>')\nEXPOSE PROGRESS AS materialize.public.format_bytes1_src_progress;"
  187. >[version<14000] SHOW CREATE SOURCE format_bytes1_src;
  188. materialize.public.format_bytes1_src "CREATE SOURCE \"materialize\".\"public\".\"format_bytes1_src\" IN CLUSTER \"kafka_formats\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = '<TOPIC>') EXPOSE PROGRESS AS \"materialize\".\"public\".\"format_bytes1_src_progress\""
  189. """
  190. )
  191. )