123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- PROTOBUF = dedent(
- """
- $ file-append path=test.proto
- syntax = "proto3";
- message Key {
- string key1 = 1;
- string key2 = 2;
- }
- message Value {
- string value1 = 1;
- string value2 = 2;
- }
- $ protobuf-compile-descriptors inputs=test.proto output=test.proto set-var=test-schema
- """
- )
- @externally_idempotent(False)
- class KafkaFormats(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- PROTOBUF
- + dedent(
- """
- > CREATE CLUSTER kafka_formats REPLICAS (kafka_formats_r1 (SIZE '4'))
- > SET cluster=kafka_formats
- $ kafka-create-topic topic=format-bytes
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-bytes
- key1A,key1B:value1A,value1B
- $ kafka-create-topic topic=format-protobuf partitions=1
- $ kafka-ingest topic=format-protobuf
- key-format=protobuf key-descriptor-file=test.proto key-message=Key
- format=protobuf descriptor-file=test.proto message=Value
- {"key1": "key1A", "key2": "key1B"} {"value1": "value1A", "value2": "value1B"}
- > CREATE SOURCE format_bytes1_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_bytes1 FROM SOURCE format_bytes1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT BYTES
- VALUE FORMAT BYTES
- ENVELOPE UPSERT
- > CREATE SOURCE format_text1_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_text1 FROM SOURCE format_text1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT TEXT
- VALUE FORMAT TEXT
- ENVELOPE UPSERT
- > CREATE SOURCE format_csv1_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_csv1 (key1, key2, value1, value2)
- FROM SOURCE format_csv1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
- VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
- ENVELOPE UPSERT
- > CREATE SOURCE format_regex1_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_regex1 (key1, key2, value1, value2)
- FROM SOURCE format_regex1_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT REGEX '(?P<key1>[^,]+),(?P<key2>\\w+)'
- VALUE FORMAT REGEX '(?P<value1>[^,]+),(?P<value2>\\w+)'
- ENVELOPE UPSERT
- > CREATE SOURCE format_protobuf1_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-protobuf-${testdrive.seed}')
- > CREATE TABLE format_protobuf1 FROM SOURCE format_protobuf1_src (REFERENCE "testdrive-format-protobuf-${testdrive.seed}")
- KEY FORMAT PROTOBUF MESSAGE '.Key' USING SCHEMA '${test-schema}'
- VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
- INCLUDE KEY
- ENVELOPE UPSERT
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(PROTOBUF + dedent(s))
- for s in [
- """
- > SET cluster=kafka_formats
- > CREATE SOURCE format_bytes2_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_bytes2 FROM SOURCE format_bytes2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT BYTES
- VALUE FORMAT BYTES
- ENVELOPE UPSERT
- > CREATE SOURCE format_text2_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_text2 FROM SOURCE format_text2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT TEXT
- VALUE FORMAT TEXT
- ENVELOPE UPSERT
- > CREATE SOURCE format_csv2_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_csv2 (key1, key2, value1, value2)
- FROM SOURCE format_csv2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
- VALUE FORMAT CSV WITH 2 COLUMNS DELIMITED BY ','
- ENVELOPE UPSERT
- > CREATE SOURCE format_regex2_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-bytes-${testdrive.seed}')
- > CREATE TABLE format_regex2
- FROM SOURCE format_regex2_src (REFERENCE "testdrive-format-bytes-${testdrive.seed}")
- KEY FORMAT REGEX '(?P<key1>[^,]+),(?P<key2>\\w+)'
- VALUE FORMAT REGEX '(?P<value1>[^,]+),(?P<value2>\\w+)'
- ENVELOPE UPSERT
- > CREATE SOURCE format_protobuf2_src
- IN CLUSTER kafka_formats
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-protobuf-${testdrive.seed}')
- > CREATE TABLE format_protobuf2 FROM SOURCE format_protobuf2_src (REFERENCE "testdrive-format-protobuf-${testdrive.seed}")
- KEY FORMAT PROTOBUF MESSAGE '.Key' USING SCHEMA '${test-schema}'
- VALUE FORMAT PROTOBUF MESSAGE '.Value' USING SCHEMA '${test-schema}'
- INCLUDE KEY
- ENVELOPE UPSERT
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-bytes
- key2A,key2B:value2A,value2B
- $ kafka-ingest topic=format-protobuf
- key-format=protobuf key-descriptor-file=test.proto key-message=Key
- format=protobuf descriptor-file=test.proto message=Value
- {"key1": "key2A", "key2": "key2B"} {"value1": "value2A", "value2": "value2B"}
- """,
- """
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-bytes
- key3A,key3B:value3A,value3B
- $ kafka-ingest topic=format-protobuf
- key-format=protobuf key-descriptor-file=test.proto key-message=Key
- format=protobuf descriptor-file=test.proto message=Value
- {"key1": "key3A", "key2": "key3B"} {"value1": "value3A", "value2": "value3B"}
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- r"""
- > SELECT COUNT(*) FROM format_bytes1
- 3
- > SELECT * FROM format_text1
- key1A,key1B value1A,value1B
- key2A,key2B value2A,value2B
- key3A,key3B value3A,value3B
- > SELECT * FROM format_csv1
- key1A key1B value1A value1B
- key2A key2B value2A value2B
- key3A key3B value3A value3B
- > SELECT * FROM format_regex1
- key1A key1B value1A value1B
- key2A key2B value2A value2B
- key3A key3B value3A value3B
- > SELECT * FROM format_protobuf1
- key1A key1B value1A value1B
- key2A key2B value2A value2B
- key3A key3B value3A value3B
- > SELECT * FROM format_text2
- key1A,key1B value1A,value1B
- key2A,key2B value2A,value2B
- key3A,key3B value3A,value3B
- > SELECT * FROM format_csv2
- key1A key1B value1A value1B
- key2A key2B value2A value2B
- key3A key3B value3A value3B
- > SELECT * FROM format_regex2
- key1A key1B value1A value1B
- key2A key2B value2A value2B
- key3A key3B value3A value3B
- > SELECT * FROM format_protobuf2
- key1A key1B value1A value1B
- key2A key2B value2A value2B
- key3A key3B value3A value3B
- $ set-regex match=testdrive-format-bytes-\d+ replacement=<TOPIC>
- >[version>=14000] SHOW CREATE SOURCE format_bytes1_src;
- materialize.public.format_bytes1_src "CREATE SOURCE materialize.public.format_bytes1_src\nIN CLUSTER kafka_formats\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = '<TOPIC>')\nEXPOSE PROGRESS AS materialize.public.format_bytes1_src_progress;"
- >[version<14000] SHOW CREATE SOURCE format_bytes1_src;
- materialize.public.format_bytes1_src "CREATE SOURCE \"materialize\".\"public\".\"format_bytes1_src\" IN CLUSTER \"kafka_formats\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = '<TOPIC>') EXPOSE PROGRESS AS \"materialize\".\"public\".\"format_bytes1_src_progress\""
- """
- )
- )
|