123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from random import Random
- from textwrap import dedent
- from typing import Any
- from pg8000.converters import literal # type: ignore
- from materialize.checks.actions import Testdrive
- from materialize.checks.checks import Check
- from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
- from materialize.checks.features import Features
- from materialize.mz_version import MzVersion
- from materialize.util import naughty_strings
- def dq(ident: str) -> str:
- ident = ident.replace('"', '""')
- return f'"{ident}"'
- def dq_print(ident: str) -> str:
- ident = ident.replace("\\", "\\\\")
- ident = ident.replace('"', '\\"')
- return f'"{ident}"'
- def sq(ident: str) -> Any:
- return literal(ident)
- def schemas() -> str:
- return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
- def cluster() -> str:
- return "> CREATE CLUSTER identifiers SIZE '4'\n"
- class Identifiers(Check):
- IDENT_KEYS = [
- "db",
- "schema",
- "type",
- "table",
- "column",
- "value1",
- "value2",
- # "source",
- "source_view",
- "kafka_conn",
- "csr_conn",
- "secret",
- # "secret_value",
- "mv0",
- "mv1",
- "mv2",
- "sink0",
- "sink1",
- "sink2",
- "alias",
- "role",
- "comment_table",
- "comment_column",
- ]
- def __init__(
- self, base_version: MzVersion, rng: Random | None, features: Features | None
- ) -> None:
- strings = naughty_strings()
- values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS))
- self.ident = {
- key: value.encode()[:255].decode("utf-8", "ignore")
- for key, value in zip(self.IDENT_KEYS, values)
- }
- # ERROR: invalid input syntax for type bytea: invalid escape sequence
- self.ident["secret_value"] = "secret_value"
- # https://github.com/MaterializeInc/database-issues/issues/6813
- self.ident["source"] = "source"
- super().__init__(base_version, rng, features)
- def initialize(self) -> Testdrive:
- cmds = f"""
- > SET cluster=identifiers;
- > CREATE ROLE {dq(self.ident["role"])};
- > CREATE DATABASE {dq(self.ident["db"])};
- > SET DATABASE={dq(self.ident["db"])};
- > CREATE SCHEMA {dq(self.ident["schema"])};
- > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
- > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
- > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
- > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
- SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
- $ kafka-create-topic topic=sink-source-ident
- $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
- {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
- > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
- > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
- > CREATE SOURCE {dq(self.ident["source"] + "_src")}
- IN CLUSTER identifiers
- FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}');
- > CREATE TABLE {dq(self.ident["source"])} FROM SOURCE {dq(self.ident["source"] + "_src")} (REFERENCE "testdrive-sink-source-ident-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
- ENVELOPE UPSERT;
- > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
- SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
- > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
- IN CLUSTER identifiers
- FROM {dq(self.ident["source_view"])}
- INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
- ENVELOPE DEBEZIUM;
- > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
- > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};
- > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
- """
- return Testdrive(schemas() + cluster() + dedent(cmds))
- def manipulate(self) -> list[Testdrive]:
- cmds = [
- f"""
- > SET CLUSTER=identifiers;
- > SET DATABASE={dq(self.ident["db"])};
- > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
- SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
- > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
- > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
- IN CLUSTER identifiers
- FROM {dq(self.ident["source_view"])}
- INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
- ENVELOPE DEBEZIUM;
- """
- for i in ["1", "2"]
- ]
- return [Testdrive(dedent(s)) for s in cmds]
- def validate(self) -> Testdrive:
- cmds = f"""
- > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
- materialize ""
- {dq_print(self.ident["db"])} ""
- > SET DATABASE={dq(self.ident["db"])};
- > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
- {dq_print(self.ident["role"])}
- > SHOW TYPES;
- {dq_print(self.ident["type"])} ""
- > SHOW SCHEMAS FROM {dq(self.ident["db"])};
- public ""
- information_schema ""
- mz_catalog ""
- mz_catalog_unstable ""
- mz_unsafe ""
- mz_internal ""
- mz_introspection ""
- pg_catalog ""
- {dq_print(self.ident["schema"])} ""
- > SHOW SINKS FROM {dq(self.ident["schema"])};
- {dq_print(self.ident["sink0"])} kafka identifiers ""
- {dq_print(self.ident["sink1"])} kafka identifiers ""
- {dq_print(self.ident["sink2"])} kafka identifiers ""
- > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
- 3
- > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
- {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
- {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
- {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
- > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
- {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
- {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
- {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
- > SELECT * FROM {dq(self.ident["source_view"])};
- U2 A 1000
- > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
- <null> {dq_print(self.ident["comment_table"])}
- 1 {dq_print(self.ident["comment_column"])}
- > SHOW SECRETS;
- {dq_print(self.ident["secret"])} ""
- """
- return Testdrive(dedent(cmds))
|