123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- class ParseError(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > CREATE TABLE parse_error_table (f1 STRING);
- > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
- > INSERT INTO parse_error_table VALUES ('123');
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(s)
- for s in [
- "> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
- "> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- ! SELECT * FROM parse_error_view;
- contains: invalid input syntax for type integer
- """
- )
- )
- class ParseHexError(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > CREATE TABLE parse_hex_error_table (f1 STRING);
- > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table;
- > INSERT INTO parse_hex_error_table VALUES ('aa');
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(s)
- for s in [
- "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');",
- "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');",
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- ! SELECT * FROM parse_hex_error_view;
- contains: invalid hexadecimal digit
- """
- )
- )
- class DataflowErrorRetraction(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > CREATE TABLE dataflow_error_retraction_table (f1 STRING);
- > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table;
- > INSERT INTO dataflow_error_retraction_table VALUES ('123');
- > INSERT INTO dataflow_error_retraction_table VALUES ('abc');
- > INSERT INTO dataflow_error_retraction_table VALUES ('klm');
- > INSERT INTO dataflow_error_retraction_table VALUES ('234');
- ! SELECT * FROM dataflow_error_retraction_view;
- contains: invalid input syntax for type integer
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(s)
- for s in [
- dedent(
- """
- > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc'
- """
- ),
- dedent(
- """
- > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm'
- """
- ),
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT * FROM dataflow_error_retraction_view;
- 123
- 234
- """
- )
- )
- def schemas() -> str:
- return dedent(
- """
- $ set schema-f1={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"string"}
- ]
- }
- $ set schema-f2={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f2", "type":"int"}
- ]
- }
- """
- )
- @externally_idempotent(False)
- class DecodeError(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- schemas()
- + dedent(
- """
- $ kafka-create-topic topic=decode-error
- $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1
- {"f1": "A"}
- > CREATE SOURCE decode_error_src
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}')
- > CREATE TABLE decode_error FROM SOURCE decode_error_src (REFERENCE "testdrive-decode-error-${testdrive.seed}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE NONE
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(schemas() + dedent(s))
- for s in [
- """
- # {"f2": 123456789}, no publish
- $ kafka-ingest format=bytes topic=decode-error repeat=1
- \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75
- """,
- """
- $ kafka-ingest format=bytes topic=decode-error repeat=1
- ABCD
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- ! SELECT * FROM decode_error
- contains: Decode error
- """
- )
- )
- class DecodeErrorUpsertValue(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- $ kafka-create-topic topic=decode-error-upsert-value
- $ set schema={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"int"}
- ]
- }
- $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema}
- key0: {"f1": 1}
- key1: {"f1": 2}
- key2: {"f1": 3}
- > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1';
- > CREATE SOURCE decode_error_upsert_value_src
- IN CLUSTER decode_error_upsert_value_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}')
- > CREATE TABLE decode_error_upsert_value FROM SOURCE decode_error_upsert_value_src (REFERENCE "testdrive-decode-error-upsert-value-${testdrive.seed}")
- KEY FORMAT TEXT
- VALUE FORMAT AVRO USING SCHEMA '${schema}'
- ENVELOPE UPSERT
- $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
- key1: garbage
- ! SELECT * FROM decode_error_upsert_value
- contains: avro deserialization error
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(
- dedent(
- """
- $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
- key2: garbage2
- ! SELECT * FROM decode_error_upsert_value
- contains: avro deserialization error
- """
- )
- ),
- Testdrive(
- dedent(
- """
- # Ingest valid avro, but with an incompatible schema
- $ set schema-string={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"string"}
- ]
- }
- $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false
- key3: {"f1": "garbage3"}
- ! SELECT * FROM decode_error_upsert_value
- contains: avro deserialization error
- """,
- )
- ),
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- # Retract all the garbage and confirm the source is now operational
- $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
- key1:
- key2:
- key3:
- > SELECT f1 FROM decode_error_upsert_value
- 1
- """
- )
- )
- class DecodeErrorUpsertKey(Check):
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- $ kafka-create-topic topic=decode-error-upsert-key
- $ set key-schema={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"int"}
- ]
- }
- $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema}
- {"f1": 1} value1
- {"f1": 2} value2
- > CREATE SOURCE decode_error_upsert_key_src
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}')
- > CREATE TABLE decode_error_upsert_key FROM SOURCE decode_error_upsert_key_src (REFERENCE "testdrive-decode-error-upsert-key-${testdrive.seed}")
- KEY FORMAT AVRO USING SCHEMA '${key-schema}'
- VALUE FORMAT BYTES
- ENVELOPE UPSERT
- $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes
- garbage1: value3
- ! SELECT * FROM decode_error_upsert_key
- contains: avro deserialization error
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(
- dedent(
- """
- # Retract existing garbage
- $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
- garbage1:
- # And introduce a new one -- valid avro, but with an incompatible schema
- $ set key-schema-string={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"string"}
- ]
- }
- $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
- {"f1": "garbage2"} value4
- ! SELECT * FROM decode_error_upsert_key
- contains: avro deserialization error
- """
- )
- ),
- Testdrive(
- dedent(
- """
- # Retract existing garbage and introduce a new one
- $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
- garbage3: value4
- $ set key-schema-string={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"string"}
- ]
- }
- $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
- {"f1": "garbage2"}
- ! SELECT * FROM decode_error_upsert_key
- contains: avro deserialization error
- """,
- )
- ),
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- # Retract any remaining garbage
- $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
- garbage3:
- # Source should return to operational status
- > SELECT f1 FROM decode_error_upsert_key
- 1
- 2
- """
- )
- )
|