alter_connection.py 11 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from __future__ import annotations
  10. from enum import Enum
  11. from random import Random
  12. from textwrap import dedent
  13. from materialize.checks.actions import Testdrive
  14. from materialize.checks.checks import Check, externally_idempotent
  15. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  16. from materialize.checks.executors import Executor
  17. from materialize.checks.features import Features
  18. from materialize.mz_version import MzVersion
  19. def schema() -> str:
  20. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  21. class SshChange(Enum):
  22. ADD_SSH = 1
  23. DROP_SSH = 2
  24. CHANGE_SSH_HOST = 3
  25. # {i} will be replaced later
  26. SHOW_CONNECTION_SSH_TUNNEL = """materialize.public.ssh_tunnel_{i} "CREATE CONNECTION \\"materialize\\".\\"public\\".\\"ssh_tunnel_{i}\\" TO SSH TUNNEL (HOST = 'ssh-bastion-host', PORT = 22, USER = 'mz')" """
  27. WITH_SSH_SUFFIX = "USING SSH TUNNEL ssh_tunnel_{i}"
  28. class AlterConnectionSshChangeBase(Check):
  29. def __init__(
  30. self,
  31. ssh_change: SshChange,
  32. index: int,
  33. base_version: MzVersion,
  34. rng: Random | None,
  35. features: Features | None,
  36. ):
  37. super().__init__(base_version, rng, features)
  38. self.ssh_change = ssh_change
  39. self.index = index
  40. def initialize(self) -> Testdrive:
  41. i = self.index
  42. return Testdrive(
  43. schema()
  44. + dedent(
  45. f"""
  46. $ postgres-execute connection=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  47. ALTER SYSTEM SET enable_connection_validation_syntax = true
  48. $ kafka-create-topic topic=alter-connection-{i}a
  49. $ kafka-ingest topic=alter-connection-{i}a format=bytes
  50. one
  51. > CREATE CONNECTION kafka_conn_alter_connection_{i}a
  52. TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});
  53. > CREATE SOURCE alter_connection_source_{i}a_src
  54. FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-{i}a-${{testdrive.seed}}');
  55. > CREATE TABLE alter_connection_source_{i}a FROM SOURCE alter_connection_source_{i}a_src (REFERENCE "testdrive-alter-connection-{i}a-${{testdrive.seed}}")
  56. FORMAT TEXT
  57. ENVELOPE NONE;
  58. > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
  59. {'false' if self.ssh_change == SshChange.ADD_SSH else 'true'}
  60. > SELECT count(regexp_match(create_sql, 'ssh-bastion-host')) > 0 FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
  61. true
  62. > CREATE TABLE alter_connection_table_{i} (f1 INTEGER, PRIMARY KEY (f1));
  63. > INSERT INTO alter_connection_table_{i} VALUES (1);
  64. > CREATE MATERIALIZED VIEW mv_alter_connection_{i} AS SELECT f1 FROM alter_connection_table_{i};
  65. > CREATE SINK alter_connection_sink_{i} FROM mv_alter_connection_{i}
  66. INTO KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}')
  67. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  68. ENVELOPE DEBEZIUM;
  69. $ kafka-verify-topic sink=materialize.public.alter_connection_sink_{i} await-value-schema=true
  70. """
  71. )
  72. )
  73. def manipulate(self) -> list[Testdrive]:
  74. i = self.index
  75. return [
  76. Testdrive(schema() + dedent(s))
  77. for s in [
  78. f"""
  79. $ kafka-ingest topic=alter-connection-{i}a format=bytes
  80. two
  81. > ALTER CONNECTION kafka_conn_alter_connection_{i}a SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});
  82. $ kafka-ingest topic=alter-connection-{i}a format=bytes
  83. three
  84. $ kafka-create-topic topic=alter-connection-{i}b
  85. $ kafka-ingest topic=alter-connection-{i}b format=bytes
  86. ten
  87. > CREATE CONNECTION kafka_conn_alter_connection_{i}b
  88. TO KAFKA (SECURITY PROTOCOL = "plaintext", BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.DROP_SSH, SshChange.CHANGE_SSH_HOST} else ''});
  89. > CREATE SOURCE alter_connection_source_{i}b_src
  90. FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}b (TOPIC 'testdrive-alter-connection-{i}b-${{testdrive.seed}}');
  91. > CREATE TABLE alter_connection_source_{i}b FROM SOURCE alter_connection_source_{i}b_src (REFERENCE "testdrive-alter-connection-{i}b-${{testdrive.seed}}")
  92. FORMAT TEXT
  93. ENVELOPE NONE;
  94. $ kafka-ingest topic=alter-connection-{i}b format=bytes
  95. twenty
  96. {f"> ALTER CONNECTION ssh_tunnel_{i} SET (HOST = 'other_ssh_bastion') WITH (VALIDATE = true);" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "$ nop"}
  97. > INSERT INTO alter_connection_table_{i} VALUES (2);
  98. """,
  99. f"""
  100. $ kafka-ingest topic=alter-connection-{i}a format=bytes
  101. four
  102. $ kafka-ingest topic=alter-connection-{i}b format=bytes
  103. thirty
  104. > ALTER CONNECTION kafka_conn_alter_connection_{i}b SET (BROKER '${{testdrive.kafka-addr}}' {WITH_SSH_SUFFIX.replace('{i}', str(i)) if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else ''});
  105. $ kafka-ingest topic=alter-connection-{i}b format=bytes
  106. fourty
  107. > INSERT INTO alter_connection_table_{i} VALUES (3);
  108. """,
  109. ]
  110. ]
  111. def validate(self) -> Testdrive:
  112. i = self.index
  113. return Testdrive(
  114. dedent(
  115. f"""
  116. > SELECT regexp_match(create_sql, '(ssh-bastion-host|other_ssh_bastion)') FROM (SHOW CREATE CONNECTION ssh_tunnel_{i});
  117. {"{other_ssh_bastion}" if self.ssh_change == SshChange.CHANGE_SSH_HOST else "{ssh-bastion-host}"}
  118. > SELECT count(regexp_match(create_sql, 'USING SSH TUNNEL')) > 0 FROM (SHOW CREATE CONNECTION kafka_conn_alter_connection_{i}a);
  119. {'true' if self.ssh_change in {SshChange.ADD_SSH, SshChange.CHANGE_SSH_HOST} else 'false'}
  120. > SELECT * FROM alter_connection_source_{i}a;
  121. one
  122. two
  123. three
  124. four
  125. > SELECT * FROM alter_connection_source_{i}b;
  126. ten
  127. twenty
  128. thirty
  129. fourty
  130. > CREATE SOURCE alter_connection_sink_source_{i}_src
  131. FROM KAFKA CONNECTION kafka_conn_alter_connection_{i}a (TOPIC 'testdrive-alter-connection-sink-{i}-${{testdrive.seed}}');
  132. > CREATE TABLE alter_connection_sink_source_{i} FROM SOURCE alter_connection_sink_source_{i}_src (REFERENCE "testdrive-alter-connection-sink-{i}-${{testdrive.seed}}")
  133. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  134. ENVELOPE NONE;
  135. # Table has expected data
  136. > SELECT * FROM alter_connection_table_{i};
  137. 1
  138. 2
  139. 3
  140. # TODO: Reenable this check when kafka-verify-data can deal with validate being run twice
  141. # Sink Kafka topic has expected data
  142. # $ kafka-verify-data format=avro sink=materialize.public.alter_connection_sink_{i} sort-messages=true
  143. # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
  144. # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
  145. # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}
  146. # Source based on sink topic has ingested data; data must be text-formatted because records are not
  147. # supported in testdrive
  148. > SELECT before::text, after::text FROM alter_connection_sink_source_{i};
  149. <null> (1)
  150. <null> (2)
  151. <null> (3)
  152. > DROP SOURCE IF EXISTS alter_connection_sink_source_{i}_src CASCADE
  153. """
  154. )
  155. )
  156. @externally_idempotent(False)
  157. class AlterConnectionToSsh(AlterConnectionSshChangeBase):
  158. def __init__(
  159. self, base_version: MzVersion, rng: Random | None, features: Features | None
  160. ):
  161. super().__init__(SshChange.ADD_SSH, 1, base_version, rng, features)
  162. @externally_idempotent(False)
  163. class AlterConnectionToNonSsh(AlterConnectionSshChangeBase):
  164. def __init__(
  165. self, base_version: MzVersion, rng: Random | None, features: Features | None
  166. ):
  167. super().__init__(SshChange.DROP_SSH, 2, base_version, rng, features)
  168. @externally_idempotent(False)
  169. class AlterConnectionHost(AlterConnectionSshChangeBase):
  170. def __init__(
  171. self, base_version: MzVersion, rng: Random | None, features: Features | None
  172. ):
  173. super().__init__(SshChange.CHANGE_SSH_HOST, 3, base_version, rng, features)
  174. class AlterConnectionDependencyOrder(Check):
  175. """
  176. Ensure that ALTER-ing a CONNECTION to reference one with a greater ID, does not panic.
  177. """
  178. def __init__(
  179. self,
  180. base_version: MzVersion,
  181. rng: Random | None,
  182. features: Features | None,
  183. ):
  184. super().__init__(base_version, rng, features)
  185. def _can_run(self, e: Executor) -> bool:
  186. return self.base_version >= MzVersion.parse_mz("v0.144.0-dev")
  187. def initialize(self) -> Testdrive:
  188. return Testdrive(
  189. dedent(
  190. """
  191. > CREATE CONNECTION my_kafka_alter_conn TO KAFKA (BROKER 'localhost:32816') WITH (VALIDATE = false);
  192. > CREATE CONNECTION other_ssh TO SSH TUNNEL (host 'foo', user 'bar', port 42) WITH (VALIDATE = false);
  193. """
  194. )
  195. )
  196. def manipulate(self) -> list[Testdrive]:
  197. return [
  198. Testdrive(dedent(s))
  199. for s in [
  200. """
  201. > ALTER CONNECTION my_kafka_alter_conn SET (BROKER 'localhost:32816' USING SSH TUNNEL other_ssh) WITH (VALIDATE = false);
  202. """,
  203. """
  204. > CREATE CONNECTION another_kafka_conn TO KAFKA (BROKER 'localhost:32816') WITH (VALIDATE = false);
  205. """,
  206. ]
  207. ]
  208. def validate(self) -> Testdrive:
  209. return Testdrive(
  210. dedent(
  211. """
  212. $ set-from-sql var=other_ssh_id
  213. SELECT id FROM mz_connections WHERE name = 'other_ssh';
  214. > SELECT name FROM mz_connections WHERE create_sql LIKE '%[${other_ssh_id} AS %';
  215. my_kafka_alter_conn
  216. """
  217. )
  218. )