123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from textwrap import dedent
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check
- from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
- def schemas() -> str:
- return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
- class ShrinkGrow:
- def initialize(self) -> Testdrive:
- name = self.name()
- pads = self.pads()
- return Testdrive(
- schemas()
- + dedent(
- f"""
- $ kafka-create-topic topic=upsert-update-{name}
- $ kafka-ingest format=avro key-format=avro topic=upsert-update-{name} key-schema=${{keyschema}} schema=${{schema}} repeat=10000
- {{"key1": "${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}{pads[0]}A"}}
- > CREATE SOURCE upsert_update_{name}_src
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-update-{name}-${{testdrive.seed}}')
- > CREATE TABLE upsert_update_{name} FROM SOURCE upsert_update_{name}_src (REFERENCE "testdrive-upsert-update-{name}-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- > CREATE MATERIALIZED VIEW upsert_update_{name}_view AS
- SELECT LEFT(f1, 1), RIGHT(f1, 1),
- COUNT(*) AS c1, COUNT(DISTINCT key1) AS c2, COUNT(DISTINCT f1) AS c3,
- MIN(LENGTH(f1)) AS l1, MAX(LENGTH(f1)) AS l2
- FROM upsert_update_{name}
- GROUP BY LEFT(f1, 1), RIGHT(f1, 1);
- """
- )
- )
- def manipulate(self) -> list[Testdrive]:
- name = self.name()
- pads = self.pads()
- return [
- Testdrive(schemas() + dedent(s))
- for s in [
- f"""
- $ kafka-ingest format=avro key-format=avro topic=upsert-update-{name} key-schema=${{keyschema}} schema=${{schema}} repeat=10000
- {{"key1": "${{kafka-ingest.iteration}}"}} {{"f1": "B${{kafka-ingest.iteration}}{pads[1]}B"}}
- """,
- f"""
- $ kafka-ingest format=avro key-format=avro topic=upsert-update-{name} key-schema=${{keyschema}} schema=${{schema}} repeat=10000
- {{"key1": "${{kafka-ingest.iteration}}"}} {{"f1": "C${{kafka-ingest.iteration}}{pads[2]}C"}}
- """,
- ]
- ]
- def validate(self) -> Testdrive:
- name = self.name()
- last_pad_length = len(self.pads()[-1])
- return Testdrive(
- dedent(
- f"""
- > SELECT * FROM upsert_update_{name}_view;
- C C 10000 10000 10000 {last_pad_length+3} {last_pad_length+6}
- """
- )
- )
- def name(self) -> str:
- raise NotImplementedError
- def pads(self) -> list[str]:
- raise NotImplementedError
- class UpsertUpdateShrink(ShrinkGrow, Check):
- """Upserts where the data length shrinks"""
- def name(self) -> str:
- return "shrink"
- def pads(self) -> list[str]:
- return ["x" * 1024, "x" * 512, "x" * 256]
- class UpsertUpdateGrow(ShrinkGrow, Check):
- """Upserts where the data lenth grows"""
- def name(self) -> str:
- return "grow"
- def pads(self) -> list[str]:
- return ["x" * 256, "x" * 512, "x" * 1024]
|