identifiers.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from random import Random
  10. from textwrap import dedent
  11. from typing import Any
  12. from pg8000.converters import literal # type: ignore
  13. from materialize.checks.actions import Testdrive
  14. from materialize.checks.checks import Check
  15. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  16. from materialize.checks.features import Features
  17. from materialize.mz_version import MzVersion
  18. from materialize.util import naughty_strings
  19. def dq(ident: str) -> str:
  20. ident = ident.replace('"', '""')
  21. return f'"{ident}"'
  22. def dq_print(ident: str) -> str:
  23. ident = ident.replace("\\", "\\\\")
  24. ident = ident.replace('"', '\\"')
  25. return f'"{ident}"'
  26. def sq(ident: str) -> Any:
  27. return literal(ident)
  28. def schemas() -> str:
  29. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  30. def cluster() -> str:
  31. return "> CREATE CLUSTER identifiers SIZE '4'\n"
  32. class Identifiers(Check):
  33. IDENT_KEYS = [
  34. "db",
  35. "schema",
  36. "type",
  37. "table",
  38. "column",
  39. "value1",
  40. "value2",
  41. # "source",
  42. "source_view",
  43. "kafka_conn",
  44. "csr_conn",
  45. "secret",
  46. # "secret_value",
  47. "mv0",
  48. "mv1",
  49. "mv2",
  50. "sink0",
  51. "sink1",
  52. "sink2",
  53. "alias",
  54. "role",
  55. "comment_table",
  56. "comment_column",
  57. ]
  58. def __init__(
  59. self, base_version: MzVersion, rng: Random | None, features: Features | None
  60. ) -> None:
  61. strings = naughty_strings()
  62. values = (rng or Random(0)).sample(strings, len(self.IDENT_KEYS))
  63. self.ident = {
  64. key: value.encode()[:255].decode("utf-8", "ignore")
  65. for key, value in zip(self.IDENT_KEYS, values)
  66. }
  67. # ERROR: invalid input syntax for type bytea: invalid escape sequence
  68. self.ident["secret_value"] = "secret_value"
  69. # https://github.com/MaterializeInc/database-issues/issues/6813
  70. self.ident["source"] = "source"
  71. super().__init__(base_version, rng, features)
  72. def initialize(self) -> Testdrive:
  73. cmds = f"""
  74. > SET cluster=identifiers;
  75. > CREATE ROLE {dq(self.ident["role"])};
  76. > CREATE DATABASE {dq(self.ident["db"])};
  77. > SET DATABASE={dq(self.ident["db"])};
  78. > CREATE SCHEMA {dq(self.ident["schema"])};
  79. > CREATE TYPE {dq(self.ident["type"])} AS LIST (ELEMENT TYPE = text);
  80. > CREATE TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} ({dq(self.ident["column"])} TEXT, c2 {dq(self.ident["type"])});
  81. > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
  82. > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv0"])} IN CLUSTER {self._default_cluster()} AS
  83. SELECT COUNT({dq(self.ident["column"])}) FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
  84. $ kafka-create-topic topic=sink-source-ident
  85. $ kafka-ingest format=avro key-format=avro topic=sink-source-ident key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  86. {{"key1": "U2${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
  87. > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["kafka_conn"])} FOR KAFKA {self._kafka_broker()};
  88. > CREATE CONNECTION IF NOT EXISTS {dq(self.ident["csr_conn"])} FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}';
  89. > CREATE SOURCE {dq(self.ident["source"] + "_src")}
  90. IN CLUSTER identifiers
  91. FROM KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'testdrive-sink-source-ident-${{testdrive.seed}}');
  92. > CREATE TABLE {dq(self.ident["source"])} FROM SOURCE {dq(self.ident["source"] + "_src")} (REFERENCE "testdrive-sink-source-ident-${{testdrive.seed}}")
  93. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
  94. ENVELOPE UPSERT;
  95. > CREATE MATERIALIZED VIEW {dq(self.ident["source_view"])} IN CLUSTER {self._default_cluster()} AS
  96. SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM {dq(self.ident["source"])} GROUP BY LEFT(key1, 2), LEFT(f1, 1);
  97. > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink0"])}
  98. IN CLUSTER identifiers
  99. FROM {dq(self.ident["source_view"])}
  100. INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident0')
  101. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
  102. ENVELOPE DEBEZIUM;
  103. > CREATE SECRET {dq(self.ident["secret"])} as {sq(self.ident["secret_value"])};
  104. > COMMENT ON TABLE {dq(self.ident["schema"])}.{dq(self.ident["table"])} IS {sq(self.ident["comment_table"])};
  105. > COMMENT ON COLUMN {dq(self.ident["schema"])}.{dq(self.ident["table"])}.{dq(self.ident["column"])} IS {sq(self.ident["comment_column"])};
  106. """
  107. return Testdrive(schemas() + cluster() + dedent(cmds))
  108. def manipulate(self) -> list[Testdrive]:
  109. cmds = [
  110. f"""
  111. > SET CLUSTER=identifiers;
  112. > SET DATABASE={dq(self.ident["db"])};
  113. > CREATE MATERIALIZED VIEW {dq(self.ident["schema"])}.{dq(self.ident["mv" + i])} IN CLUSTER {self._default_cluster()} AS
  114. SELECT {dq(self.ident["column"])}, c2 as {dq(self.ident["alias"])} FROM {dq(self.ident["schema"])}.{dq(self.ident["table"])};
  115. > INSERT INTO {dq(self.ident["schema"])}.{dq(self.ident["table"])} VALUES ({sq(self.ident["value1"])}, LIST[{sq(self.ident["value2"])}]::{dq(self.ident["type"])});
  116. > CREATE SINK {dq(self.ident["schema"])}.{dq(self.ident["sink" + i])}
  117. IN CLUSTER identifiers
  118. FROM {dq(self.ident["source_view"])}
  119. INTO KAFKA CONNECTION {dq(self.ident["kafka_conn"])} (TOPIC 'sink-sink-ident')
  120. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION {dq(self.ident["csr_conn"])}
  121. ENVELOPE DEBEZIUM;
  122. """
  123. for i in ["1", "2"]
  124. ]
  125. return [Testdrive(dedent(s)) for s in cmds]
  126. def validate(self) -> Testdrive:
  127. cmds = f"""
  128. > SHOW DATABASES WHERE name NOT LIKE 'to_be_created%' AND name NOT LIKE 'owner_db%' AND name NOT LIKE 'privilege_db%' AND name <> 'defpriv_db';
  129. materialize ""
  130. {dq_print(self.ident["db"])} ""
  131. > SET DATABASE={dq(self.ident["db"])};
  132. > SELECT name FROM mz_roles WHERE name = {sq(self.ident["role"])}
  133. {dq_print(self.ident["role"])}
  134. > SHOW TYPES;
  135. {dq_print(self.ident["type"])} ""
  136. > SHOW SCHEMAS FROM {dq(self.ident["db"])};
  137. public ""
  138. information_schema ""
  139. mz_catalog ""
  140. mz_catalog_unstable ""
  141. mz_unsafe ""
  142. mz_internal ""
  143. mz_introspection ""
  144. pg_catalog ""
  145. {dq_print(self.ident["schema"])} ""
  146. > SHOW SINKS FROM {dq(self.ident["schema"])};
  147. {dq_print(self.ident["sink0"])} kafka identifiers ""
  148. {dq_print(self.ident["sink1"])} kafka identifiers ""
  149. {dq_print(self.ident["sink2"])} kafka identifiers ""
  150. > SELECT * FROM {dq(self.ident["schema"])}.{dq(self.ident["mv0"])};
  151. 3
  152. > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv1"])};
  153. {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
  154. {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
  155. {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
  156. > SELECT {dq(self.ident["column"])}, {dq(self.ident["alias"])}[1] FROM {dq(self.ident["schema"])}.{dq(self.ident["mv2"])};
  157. {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
  158. {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
  159. {dq_print(self.ident["value1"])} {dq_print(self.ident["value2"])}
  160. > SELECT * FROM {dq(self.ident["source_view"])};
  161. U2 A 1000
  162. > SELECT object_sub_id, comment FROM mz_internal.mz_comments JOIN mz_tables ON mz_internal.mz_comments.id = mz_tables.id WHERE name = {sq(self.ident["table"])};
  163. <null> {dq_print(self.ident["comment_table"])}
  164. 1 {dq_print(self.ident["comment_column"])}
  165. > SHOW SECRETS;
  166. {dq_print(self.ident["secret"])} ""
  167. """
  168. return Testdrive(dedent(cmds))