ssh.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. def schemas() -> str:
  14. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  15. @externally_idempotent(False)
  16. class SshPg(Check):
  17. """
  18. Testing Postgres CDC source with SSH tunnel
  19. """
  20. def initialize(self) -> Testdrive:
  21. return Testdrive(
  22. schemas()
  23. + dedent(
  24. """
  25. > CREATE SECRET pgpass AS 'postgres'
  26. > CREATE CONNECTION pg_ssh1 TO POSTGRES (
  27. HOST postgres,
  28. DATABASE postgres,
  29. USER postgres,
  30. PASSWORD SECRET pgpass,
  31. SSL MODE require,
  32. SSH TUNNEL ssh_tunnel_0);
  33. $ postgres-execute connection=postgres://postgres:postgres@postgres
  34. ALTER USER postgres WITH replication;
  35. CREATE TABLE t_ssh1 (f1 INTEGER);
  36. ALTER TABLE t_ssh1 REPLICA IDENTITY FULL;
  37. CREATE TABLE t_ssh2 (f1 INTEGER);
  38. ALTER TABLE t_ssh2 REPLICA IDENTITY FULL;
  39. CREATE TABLE t_ssh3 (f1 INTEGER);
  40. ALTER TABLE t_ssh3 REPLICA IDENTITY FULL;
  41. CREATE PUBLICATION mz_source_ssh FOR ALL TABLES;
  42. INSERT INTO t_ssh1 VALUES (1), (2), (3), (4), (5);
  43. > CREATE SOURCE mz_source_ssh1
  44. FROM POSTGRES CONNECTION pg_ssh1
  45. (PUBLICATION 'mz_source_ssh')
  46. > CREATE TABLE t_ssh1 FROM SOURCE mz_source_ssh1 (REFERENCE t_ssh1);
  47. """
  48. )
  49. )
  50. def manipulate(self) -> list[Testdrive]:
  51. return [
  52. Testdrive(schemas() + dedent(s))
  53. for s in [
  54. """
  55. > CREATE CONNECTION pg_ssh2 TO POSTGRES (
  56. HOST postgres,
  57. DATABASE postgres,
  58. USER postgres,
  59. PASSWORD SECRET pgpass,
  60. SSL MODE require,
  61. SSH TUNNEL ssh_tunnel_0);
  62. > CREATE SOURCE mz_source_ssh2
  63. FROM POSTGRES CONNECTION pg_ssh2
  64. (PUBLICATION 'mz_source_ssh');
  65. > CREATE TABLE t_ssh2 FROM SOURCE mz_source_ssh2 (REFERENCE t_ssh2);
  66. $ postgres-execute connection=postgres://postgres:postgres@postgres
  67. INSERT INTO t_ssh1 VALUES (6), (7), (8), (9), (10);
  68. $ postgres-execute connection=postgres://postgres:postgres@postgres
  69. INSERT INTO t_ssh2 VALUES (6), (7), (8), (9), (10);
  70. """,
  71. """
  72. > CREATE CONNECTION pg_ssh3 TO POSTGRES (
  73. HOST postgres,
  74. DATABASE postgres,
  75. USER postgres,
  76. PASSWORD SECRET pgpass,
  77. SSL MODE require,
  78. SSH TUNNEL ssh_tunnel_0);
  79. > CREATE SOURCE mz_source_ssh3
  80. FROM POSTGRES CONNECTION pg_ssh3
  81. (PUBLICATION 'mz_source_ssh');
  82. > CREATE TABLE t_ssh3 FROM SOURCE mz_source_ssh3 (REFERENCE t_ssh3);
  83. $ postgres-execute connection=postgres://postgres:postgres@postgres
  84. INSERT INTO t_ssh1 VALUES (11), (12), (13), (14), (15);
  85. $ postgres-execute connection=postgres://postgres:postgres@postgres
  86. INSERT INTO t_ssh2 VALUES (11), (12), (13), (14), (15);
  87. $ postgres-execute connection=postgres://postgres:postgres@postgres
  88. INSERT INTO t_ssh3 VALUES (11), (12), (13), (14), (15);
  89. """,
  90. ]
  91. ]
  92. def validate(self) -> Testdrive:
  93. return Testdrive(
  94. dedent(
  95. """
  96. > SELECT COUNT(*) FROM t_ssh1;
  97. 15
  98. > SELECT COUNT(*) FROM t_ssh2;
  99. 10
  100. > SELECT COUNT(*) FROM t_ssh3;
  101. 5
  102. """
  103. )
  104. )
  105. @externally_idempotent(False)
  106. class SshKafka(Check):
  107. """
  108. Testing Kafka source with SSH tunnel
  109. """
  110. def initialize(self) -> Testdrive:
  111. return Testdrive(
  112. schemas()
  113. + dedent(
  114. """
  115. $ kafka-create-topic topic=ssh1
  116. $ kafka-create-topic topic=ssh2
  117. $ kafka-create-topic topic=ssh3
  118. $ kafka-ingest topic=ssh1 format=bytes
  119. one
  120. > CREATE CONNECTION kafka_conn_ssh1
  121. TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);
  122. > CREATE SOURCE ssh1_src
  123. FROM KAFKA CONNECTION kafka_conn_ssh1 (TOPIC 'testdrive-ssh1-${testdrive.seed}');
  124. > CREATE TABLE ssh1 FROM SOURCE ssh1_src (REFERENCE "testdrive-ssh1-${testdrive.seed}")
  125. FORMAT TEXT
  126. ENVELOPE NONE;
  127. """
  128. )
  129. )
  130. def manipulate(self) -> list[Testdrive]:
  131. return [
  132. Testdrive(schemas() + dedent(s))
  133. for s in [
  134. """
  135. > CREATE CONNECTION kafka_conn_ssh2
  136. TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);
  137. > CREATE SOURCE ssh2_src
  138. FROM KAFKA CONNECTION kafka_conn_ssh2 (TOPIC 'testdrive-ssh2-${testdrive.seed}');
  139. > CREATE TABLE ssh2 FROM SOURCE ssh2_src (REFERENCE "testdrive-ssh2-${testdrive.seed}")
  140. FORMAT TEXT
  141. ENVELOPE NONE;
  142. $ kafka-ingest topic=ssh1 format=bytes
  143. two
  144. $ kafka-ingest topic=ssh2 format=bytes
  145. two
  146. """,
  147. """
  148. > CREATE CONNECTION kafka_conn_ssh3
  149. TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL ssh_tunnel_0, SECURITY PROTOCOL PLAINTEXT);
  150. > CREATE SOURCE ssh3_src
  151. FROM KAFKA CONNECTION kafka_conn_ssh3 (TOPIC 'testdrive-ssh3-${testdrive.seed}');
  152. > CREATE TABLE ssh3 FROM SOURCE ssh3_src (REFERENCE "testdrive-ssh3-${testdrive.seed}")
  153. FORMAT TEXT
  154. ENVELOPE NONE;
  155. $ kafka-ingest topic=ssh1 format=bytes
  156. three
  157. $ kafka-ingest topic=ssh2 format=bytes
  158. three
  159. $ kafka-ingest topic=ssh3 format=bytes
  160. three
  161. """,
  162. ]
  163. ]
  164. def validate(self) -> Testdrive:
  165. return Testdrive(
  166. dedent(
  167. """
  168. > SELECT * FROM ssh1;
  169. one
  170. two
  171. three
  172. > SELECT * FROM ssh2;
  173. two
  174. three
  175. > SELECT * FROM ssh3;
  176. three
  177. """
  178. )
  179. )