123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check, externally_idempotent
- @externally_idempotent(False)
- class JsonSource(Check):
- """Test CREATE SOURCE ... FORMAT JSON"""
- def initialize(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- $ kafka-create-topic topic=format-json partitions=1
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
- "object":{"a":"b","c":"d"}
- > CREATE CLUSTER single_replica_cluster SIZE '1';
- > CREATE SOURCE format_jsonA_src
- IN CLUSTER single_replica_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}')
- > CREATE TABLE format_jsonA FROM SOURCE format_jsonA_src (REFERENCE "testdrive-format-json-${testdrive.seed}")
- KEY FORMAT JSON
- VALUE FORMAT JSON
- ENVELOPE UPSERT
- > CREATE SOURCE format_jsonB_src
- IN CLUSTER single_replica_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}')
- > CREATE TABLE format_jsonB FROM SOURCE format_jsonB_src (REFERENCE "testdrive-format-json-${testdrive.seed}")
- KEY FORMAT JSON
- VALUE FORMAT JSON
- ENVELOPE UPSERT
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- return [
- Testdrive(dedent(s))
- for s in [
- """
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
- "float":1.23
- "str":"hello"
- """,
- """
- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
- "array":[1,2,3]
- "int":1
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- return Testdrive(
- dedent(
- """
- > SELECT * FROM format_jsonA ORDER BY key
- "\\"array\\"" [1,2,3]
- "\\"float\\"" 1.23
- "\\"int\\"" 1
- "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}"
- "\\"str\\"" "\\"hello\\""
- > SELECT * FROM format_jsonB ORDER BY key
- "\\"array\\"" [1,2,3]
- "\\"float\\"" 1.23
- "\\"int\\"" 1
- "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}"
- "\\"str\\"" "\\"hello\\""
- """
- + r"""
- >[version>=14000] SHOW CREATE SOURCE format_jsonb_src;
- materialize.public.format_jsonb_src "CREATE SOURCE materialize.public.format_jsonb_src\nIN CLUSTER single_replica_cluster\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = 'testdrive-format-json-${testdrive.seed}')\nEXPOSE PROGRESS AS materialize.public.format_jsonb_src_progress;"
- >[version<14000] SHOW CREATE SOURCE format_jsonb_src;
- materialize.public.format_jsonb_src "CREATE SOURCE \"materialize\".\"public\".\"format_jsonb_src\" IN CLUSTER \"single_replica_cluster\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'testdrive-format-json-1') EXPOSE PROGRESS AS \"materialize\".\"public\".\"format_jsonb_src_progress\""
- """
- )
- )
|