mzcompose.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test sources with SSH connections using an SSH bastion host.
  11. """
  12. from prettytable import PrettyTable
  13. from materialize import buildkite
  14. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  15. from materialize.mzcompose.services.kafka import Kafka
  16. from materialize.mzcompose.services.materialized import Materialized
  17. from materialize.mzcompose.services.mysql import MySql
  18. from materialize.mzcompose.services.mz import Mz
  19. from materialize.mzcompose.services.postgres import Postgres
  20. from materialize.mzcompose.services.redpanda import Redpanda
  21. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  22. from materialize.mzcompose.services.ssh_bastion_host import SshBastionHost
  23. from materialize.mzcompose.services.test_certs import TestCerts
  24. from materialize.mzcompose.services.testdrive import Testdrive
  25. from materialize.mzcompose.services.zookeeper import Zookeeper
  26. SERVICES = [
  27. Zookeeper(),
  28. Kafka(),
  29. SchemaRegistry(),
  30. Materialized(),
  31. Testdrive(consistent_seed=True),
  32. SshBastionHost(),
  33. Postgres(),
  34. TestCerts(),
  35. Redpanda(),
  36. MySql(),
  37. Mz(app_password=""),
  38. ]
  39. def restart_mz(c: Composition) -> None:
  40. c.kill("materialized")
  41. c.up("materialized")
  42. # restart the bastion, wiping its keys in the process
  43. def restart_bastion(c: Composition) -> None:
  44. c.kill("ssh-bastion-host")
  45. c.rm("ssh-bastion-host")
  46. c.up("ssh-bastion-host")
  47. def workflow_basic_ssh_features(c: Composition, redpanda: bool = False) -> None:
  48. c.down()
  49. dependencies = ["materialized", "ssh-bastion-host"]
  50. if redpanda:
  51. dependencies += ["redpanda"]
  52. else:
  53. dependencies += ["zookeeper", "kafka", "schema-registry"]
  54. c.up(*dependencies)
  55. c.run_testdrive_files("ssh-connections.td")
  56. # Check that objects can be restored correctly
  57. restart_mz(c)
  58. def workflow_validate_connection(c: Composition) -> None:
  59. c.up("materialized", "ssh-bastion-host", "postgres")
  60. c.run_testdrive_files("setup.td")
  61. public_key = c.sql_query(
  62. """
  63. select public_key_1 from mz_ssh_tunnel_connections ssh \
  64. join mz_connections c on c.id = ssh.id
  65. where c.name = 'thancred';
  66. """
  67. )[0][0]
  68. c.run_testdrive_files("--no-reset", "validate-failures.td")
  69. c.exec(
  70. "ssh-bastion-host",
  71. "bash",
  72. "-c",
  73. f"echo '{public_key}' > /etc/authorized_keys/mz",
  74. )
  75. c.run_testdrive_files("--no-reset", "validate-success.td")
  76. def workflow_pg(c: Composition) -> None:
  77. c.up("materialized", "ssh-bastion-host", "postgres")
  78. c.run_testdrive_files("setup.td")
  79. public_key = c.sql_query(
  80. """
  81. select public_key_1 from mz_ssh_tunnel_connections ssh \
  82. join mz_connections c on c.id = ssh.id
  83. where c.name = 'thancred';
  84. """
  85. )[0][0]
  86. c.exec(
  87. "ssh-bastion-host",
  88. "bash",
  89. "-c",
  90. f"echo '{public_key}' > /etc/authorized_keys/mz",
  91. )
  92. c.run_testdrive_files("--no-reset", "pg-source.td")
  93. c.kill("ssh-bastion-host")
  94. c.run_testdrive_files("--no-reset", "pg-source-after-ssh-failure.td")
  95. c.up("ssh-bastion-host")
  96. c.run_testdrive_files("--no-reset", "pg-source-after-ssh-restart.td")
  97. def workflow_mysql(c: Composition) -> None:
  98. c.up("materialized", "ssh-bastion-host", "mysql")
  99. c.run_testdrive_files("setup.td")
  100. public_key = c.sql_query(
  101. """
  102. select public_key_1 from mz_ssh_tunnel_connections ssh \
  103. join mz_connections c on c.id = ssh.id
  104. where c.name = 'thancred';
  105. """
  106. )[0][0]
  107. c.exec(
  108. "ssh-bastion-host",
  109. "bash",
  110. "-c",
  111. f"echo '{public_key}' > /etc/authorized_keys/mz",
  112. )
  113. # Basic validation
  114. c.run_testdrive_files(
  115. "--no-reset",
  116. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  117. "mysql-source.td",
  118. )
  119. # Validate SSH bastion host failure & recovery scenario
  120. c.kill("ssh-bastion-host")
  121. c.run_testdrive_files("--no-reset", "mysql-source-after-ssh-failure.td")
  122. c.up("ssh-bastion-host")
  123. c.run_testdrive_files(
  124. "--no-reset",
  125. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  126. "mysql-source-after-ssh-restart.td",
  127. )
  128. # MySQL generates self-signed certificates for SSL connections on startup,
  129. # for both the server and client:
  130. # https://dev.mysql.com/doc/refman/8.3/en/creating-ssl-rsa-files-using-mysql.html
  131. # Grab the correct Server CA and Client Key and Cert from the MySQL container
  132. # (and strip the trailing null byte):
  133. ssl_ca = c.exec("mysql", "cat", "/var/lib/mysql/ca.pem", capture=True).stdout.split(
  134. "\x00", 1
  135. )[0]
  136. ssl_client_cert = c.exec(
  137. "mysql", "cat", "/var/lib/mysql/client-cert.pem", capture=True
  138. ).stdout.split("\x00", 1)[0]
  139. ssl_client_key = c.exec(
  140. "mysql", "cat", "/var/lib/mysql/client-key.pem", capture=True
  141. ).stdout.split("\x00", 1)[0]
  142. # Validate SSL/TLS connections over SSH tunnel
  143. c.run_testdrive_files(
  144. "--no-reset",
  145. f"--var=ssl-ca={ssl_ca}",
  146. f"--var=ssl-client-cert={ssl_client_cert}",
  147. f"--var=ssl-client-key={ssl_client_key}",
  148. f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
  149. "mysql-source-ssl.td",
  150. )
  151. def workflow_kafka(c: Composition, redpanda: bool = False) -> None:
  152. c.down()
  153. # Configure the SSH bastion host to allow only two connections to be
  154. # initiated simultaneously. This is enough to establish *one* Kafka SSH
  155. # tunnel and *one* Confluent Schema Registry tunnel simultaneously.
  156. # Combined with using a large cluster in kafka-source.td, this ensures that
  157. # we only create one SSH tunnel per Kafka broker, rather than one SSH tunnel
  158. # per Kafka broker per worker.
  159. with c.override(SshBastionHost(max_startups="2")):
  160. dependencies = ["materialized", "ssh-bastion-host"]
  161. if redpanda:
  162. dependencies += ["redpanda"]
  163. else:
  164. dependencies += ["zookeeper", "kafka", "schema-registry"]
  165. c.up(*dependencies)
  166. c.run_testdrive_files("setup.td")
  167. public_key = c.sql_query(
  168. """
  169. select public_key_1 from mz_ssh_tunnel_connections ssh \
  170. join mz_connections c on c.id = ssh.id
  171. where c.name = 'thancred';
  172. """
  173. )[0][0]
  174. c.exec(
  175. "ssh-bastion-host",
  176. "bash",
  177. "-c",
  178. f"echo '{public_key}' > /etc/authorized_keys/mz",
  179. )
  180. c.run_testdrive_files("--no-reset", "kafka-source.td")
  181. c.kill("ssh-bastion-host")
  182. c.run_testdrive_files("--no-reset", "kafka-source-after-ssh-failure.td")
  183. c.up("ssh-bastion-host")
  184. c.run_testdrive_files("--no-reset", "kafka-source-after-ssh-restart.td")
  185. def workflow_kafka_restart_replica(c: Composition, redpanda: bool = False) -> None:
  186. c.down()
  187. # Configure the SSH bastion host to allow only two connections to be
  188. # initiated simultaneously. This is enough to establish *one* Kafka SSH
  189. # tunnel and *one* Confluent Schema Registry tunnel simultaneously.
  190. # Combined with using a large cluster in kafka-source.td, this ensures that
  191. # we only create one SSH tunnel per Kafka broker, rather than one SSH tunnel
  192. # per Kafka broker per worker.
  193. with c.override(SshBastionHost(max_startups="2")):
  194. dependencies = ["materialized", "ssh-bastion-host"]
  195. if redpanda:
  196. dependencies += ["redpanda"]
  197. else:
  198. dependencies += ["zookeeper", "kafka", "schema-registry"]
  199. c.up(*dependencies)
  200. c.run_testdrive_files("setup.td")
  201. public_key = c.sql_query(
  202. """
  203. select public_key_1 from mz_ssh_tunnel_connections ssh \
  204. join mz_connections c on c.id = ssh.id
  205. where c.name = 'thancred';
  206. """
  207. )[0][0]
  208. c.exec(
  209. "ssh-bastion-host",
  210. "bash",
  211. "-c",
  212. f"echo '{public_key}' > /etc/authorized_keys/mz",
  213. )
  214. c.run_testdrive_files("--no-reset", "kafka-source.td")
  215. c.kill("ssh-bastion-host")
  216. c.run_testdrive_files(
  217. "--no-reset",
  218. "kafka-source-after-ssh-failure-restart-replica.td",
  219. )
  220. c.up("ssh-bastion-host")
  221. c.run_testdrive_files("--no-reset", "kafka-source-after-ssh-restart.td")
  222. def workflow_kafka_sink(c: Composition, redpanda: bool = False) -> None:
  223. c.down()
  224. # Configure the SSH bastion host to allow only two connections to be
  225. # initiated simultaneously. This is enough to establish *one* Kafka SSH
  226. # tunnel and *one* Confluent Schema Registry tunnel simultaneously.
  227. # Combined with using a large cluster in kafka-source.td, this ensures that
  228. # we only create one SSH tunnel per Kafka broker, rather than one SSH tunnel
  229. # per Kafka broker per worker.
  230. with c.override(SshBastionHost(max_startups="2")):
  231. dependencies = ["materialized", "ssh-bastion-host"]
  232. if redpanda:
  233. dependencies += ["redpanda"]
  234. else:
  235. dependencies += ["zookeeper", "kafka", "schema-registry"]
  236. c.up(*dependencies)
  237. c.run_testdrive_files("setup.td")
  238. public_key = c.sql_query(
  239. """
  240. select public_key_1 from mz_ssh_tunnel_connections ssh \
  241. join mz_connections c on c.id = ssh.id
  242. where c.name = 'thancred';
  243. """
  244. )[0][0]
  245. c.exec(
  246. "ssh-bastion-host",
  247. "bash",
  248. "-c",
  249. f"echo '{public_key}' > /etc/authorized_keys/mz",
  250. )
  251. c.run_testdrive_files("--no-reset", "kafka-sink.td")
  252. c.kill("ssh-bastion-host")
  253. c.run_testdrive_files("--no-reset", "kafka-sink-after-ssh-failure.td")
  254. c.up("ssh-bastion-host")
  255. c.run_testdrive_files("--no-reset", "kafka-sink-after-ssh-restart.td")
  256. def workflow_hidden_hosts(c: Composition, redpanda: bool = False) -> None:
  257. c.down()
  258. dependencies = ["materialized", "ssh-bastion-host"]
  259. if redpanda:
  260. dependencies += ["redpanda"]
  261. else:
  262. dependencies += ["zookeeper", "kafka", "schema-registry"]
  263. c.up(*dependencies)
  264. c.run_testdrive_files("setup.td")
  265. public_key = c.sql_query(
  266. """
  267. select public_key_1 from mz_ssh_tunnel_connections ssh \
  268. join mz_connections c on c.id = ssh.id
  269. where c.name = 'thancred';
  270. """
  271. )[0][0]
  272. c.exec(
  273. "ssh-bastion-host",
  274. "bash",
  275. "-c",
  276. f"echo '{public_key}' > /etc/authorized_keys/mz",
  277. )
  278. def add_hidden_host(container: str) -> None:
  279. ip = c.exec(
  280. "ssh-bastion-host", "getent", "hosts", container, capture=True
  281. ).stdout.split(" ")[0]
  282. c.exec(
  283. "ssh-bastion-host",
  284. "bash",
  285. "-c",
  286. f"echo '{ip} hidden-{container}' >> /etc/hosts",
  287. )
  288. add_hidden_host("kafka")
  289. add_hidden_host("schema-registry")
  290. c.run_testdrive_files("--no-reset", "hidden-hosts.td")
  291. # Test that if we restart the bastion AND change its server keys(s), we can
  292. # still reconnect in the replication stream.
  293. def workflow_pg_restart_bastion(c: Composition) -> None:
  294. c.up("materialized", "ssh-bastion-host", "postgres")
  295. c.run_testdrive_files("setup.td")
  296. public_key = c.sql_query(
  297. """
  298. select public_key_1 from mz_ssh_tunnel_connections ssh \
  299. join mz_connections c on c.id = ssh.id
  300. where c.name = 'thancred';
  301. """
  302. )[0][0]
  303. c.exec(
  304. "ssh-bastion-host",
  305. "bash",
  306. "-c",
  307. f"echo '{public_key}' > /etc/authorized_keys/mz",
  308. )
  309. first_fingerprint = c.exec(
  310. "ssh-bastion-host",
  311. "bash",
  312. "-c",
  313. "cat /etc/ssh/keys/ssh_host_ed25519_key.pub",
  314. capture=True,
  315. ).stdout.strip()
  316. c.run_testdrive_files("--no-reset", "pg-source.td")
  317. restart_bastion(c)
  318. c.exec(
  319. "ssh-bastion-host",
  320. "bash",
  321. "-c",
  322. f"echo '{public_key}' > /etc/authorized_keys/mz",
  323. )
  324. c.run_testdrive_files("--no-reset", "pg-source-ingest-more.td")
  325. # we do this after we assert that we re-connnected
  326. # with the passing td file, to ensure that the
  327. # docker image was setup before we actually start reading
  328. # stuff from it
  329. second_fingerprint = c.exec(
  330. "ssh-bastion-host",
  331. "bash",
  332. "-c",
  333. "cat /etc/ssh/keys/ssh_host_ed25519_key.pub",
  334. capture=True,
  335. ).stdout.strip()
  336. assert (
  337. first_fingerprint != second_fingerprint
  338. ), "this test requires that the ssh server fingerprint changes"
  339. def workflow_pg_restart_postgres(c: Composition) -> None:
  340. c.up("materialized", "ssh-bastion-host", "postgres")
  341. c.run_testdrive_files("setup.td")
  342. public_key = c.sql_query(
  343. """
  344. select public_key_1 from mz_ssh_tunnel_connections ssh \
  345. join mz_connections c on c.id = ssh.id
  346. where c.name = 'thancred';
  347. """
  348. )[0][0]
  349. c.exec(
  350. "ssh-bastion-host",
  351. "bash",
  352. "-c",
  353. f"echo '{public_key}' > /etc/authorized_keys/mz",
  354. )
  355. # debugging output for https://github.com/MaterializeInc/database-issues/issues/8905
  356. # in some cases, materialize never sees the data ingested by pg-source-ingest-more
  357. # this captures the replication stream, which will be printed out for debugging purposes
  358. c.sql(
  359. service="postgres",
  360. user="postgres",
  361. password="postgres",
  362. database="postgres",
  363. sql="select pg_create_logical_replication_slot('spy', 'test_decoding', false, true);",
  364. print_statement=False,
  365. )
  366. c.run_testdrive_files("--no-reset", "pg-source.td")
  367. c.kill("postgres")
  368. c.up("postgres")
  369. c.run_testdrive_files("--no-reset", "pg-source-ingest-more.td")
  370. c.sql(
  371. service="postgres",
  372. user="postgres",
  373. password="postgres",
  374. database="postgres",
  375. sql="""
  376. create function lsn_to_numeric(pg_lsn) returns numeric
  377. as $$
  378. select ('0x' || split_part($1::text, '/', 1))::numeric * '0x10000000'::numeric + ( '0x' || split_part($1::text, '/', 2))::numeric;
  379. $$ language sql immutable strict;
  380. """,
  381. print_statement=False,
  382. )
  383. rows = c.sql_query(
  384. service="postgres",
  385. user="postgres",
  386. password="postgres",
  387. database="postgres",
  388. # converting pgLsn to u64 for easier comparison to mz_source_postgres_lsn
  389. sql="""
  390. select lsn_to_numeric(lsn::pg_lsn) as lsn, xid, data
  391. from pg_logical_slot_get_changes('spy', null, null);
  392. """,
  393. )
  394. c.sql(
  395. service="postgres",
  396. user="postgres",
  397. password="postgres",
  398. database="postgres",
  399. sql="select pg_drop_replication_slot('spy');",
  400. print_statement=False,
  401. )
  402. table = PrettyTable()
  403. table.field_names = ["lsn", "xid", "data"]
  404. table.add_rows(rows)
  405. print(f"== logical replication stream ==\n{table}")
  406. rows = c.sql_query(
  407. service="postgres",
  408. user="postgres",
  409. password="postgres",
  410. database="postgres",
  411. sql="""
  412. select
  413. lsn_to_numeric(pg_current_wal_lsn()) as pg_current_wal_lsn, slot_name,
  414. active, active_pid, lsn_to_numeric(confirmed_flush_lsn) as confirmed_flush_lsn,
  415. inactive_since
  416. from pg_replication_slots;
  417. """,
  418. )
  419. table = PrettyTable()
  420. table.field_names = [
  421. "pg_current_wal_lsn",
  422. "slot_name",
  423. "active",
  424. "active_pid",
  425. "confirmed_flush_lsn",
  426. "inactive_since",
  427. ]
  428. table.add_rows(rows)
  429. print(f"== pg_replication_slots ==\n{table}")
  430. rows = c.sql_query(
  431. user="mz_system",
  432. port=6877,
  433. sql="""
  434. select coalesce(mzs.name, mzt.name) as name, read_frontier, write_frontier
  435. from mz_internal.mz_frontiers mzf
  436. left join mz_sources mzs on (mzf.object_id = mzs.id)
  437. left join mz_tables mzt on (mzf.object_id = mzt.id)
  438. where object_id ~ 'u.*';
  439. """,
  440. )
  441. table = PrettyTable()
  442. table.field_names = ["name", "read_frontier", "write_frontier"]
  443. table.add_rows(rows)
  444. print(f"== mz_frontiers ==\n{table}")
  445. rows = c.sql_query(
  446. user="mz_system",
  447. port=6877,
  448. sql="""
  449. select id, name, last_status_change_at, status, error, details
  450. from mz_internal.mz_source_statuses;
  451. """,
  452. )
  453. table = PrettyTable()
  454. table.field_names = [
  455. "id",
  456. "name",
  457. "last_status_change_at",
  458. "status",
  459. "error",
  460. "details",
  461. ]
  462. table.add_rows(rows)
  463. print(f"== mz_source_statuses ==\n{table}")
  464. # table name "mz_source_progress" is dependent on setup.td
  465. mz_source_progress_lsn = c.sql_query("select lsn from mz_source_progress;")[0][0]
  466. print(f"== mz_source_progress_lsn ==\n{mz_source_progress_lsn}")
  467. def workflow_pg_via_ssh_tunnel_with_ssl(c: Composition) -> None:
  468. c.up("materialized", "ssh-bastion-host", "postgres")
  469. c.run_testdrive_files("setup.td")
  470. public_key = c.sql_query(
  471. """
  472. select public_key_1 from mz_ssh_tunnel_connections ssh \
  473. join mz_connections c on c.id = ssh.id
  474. where c.name = 'thancred';
  475. """
  476. )[0][0]
  477. c.exec(
  478. "ssh-bastion-host",
  479. "bash",
  480. "-c",
  481. f"echo '{public_key}' > /etc/authorized_keys/mz",
  482. )
  483. c.run_testdrive_files("--no-reset", "pg-source-ssl.td")
  484. def workflow_ssh_key_after_restart(c: Composition) -> None:
  485. c.up("materialized")
  486. c.run_testdrive_files("setup.td")
  487. (primary, secondary) = c.sql_query(
  488. "SELECT public_key_1, public_key_2 FROM mz_ssh_tunnel_connections;"
  489. )[0]
  490. restart_mz(c)
  491. (restart_primary, restart_secondary) = c.sql_query(
  492. "SELECT public_key_1, public_key_2 FROM mz_ssh_tunnel_connections;"
  493. )[0]
  494. if (primary, secondary) != (restart_primary, restart_secondary):
  495. print("initial public keys: ", (primary, secondary))
  496. print("public keys after restart:", (restart_primary, restart_secondary))
  497. raise Exception("public key not equal after restart")
  498. c.sql("DROP CONNECTION thancred;")
  499. num_connections = c.sql_query("SELECT count(*) FROM mz_ssh_tunnel_connections;")[0][
  500. 0
  501. ]
  502. if num_connections != 1:
  503. connections = c.sql_query("SELECT * FROM mz_ssh_tunnel_connections;")
  504. print("Found connections in mz_ssh_tunnel_connections: ", connections)
  505. raise Exception(
  506. "ssh tunnel connection not properly removed from mz_ssh_tunnel_connections"
  507. )
  508. def workflow_rotated_ssh_key_after_restart(c: Composition) -> None:
  509. c.up("materialized")
  510. c.run_testdrive_files("setup.td")
  511. secondary_public_key = c.sql_query(
  512. """
  513. select public_key_2 from mz_ssh_tunnel_connections ssh \
  514. join mz_connections c on c.id = ssh.id
  515. where c.name = 'thancred';
  516. """
  517. )[0][0]
  518. c.sql("ALTER CONNECTION thancred ROTATE KEYS;")
  519. restart_mz(c)
  520. primary_public_key_after_restart = c.sql_query(
  521. """
  522. select public_key_1 from mz_ssh_tunnel_connections ssh \
  523. join mz_connections c on c.id = ssh.id
  524. where c.name = 'thancred';
  525. """
  526. )[0][0]
  527. if secondary_public_key != primary_public_key_after_restart:
  528. print("initial secondary key:", secondary_public_key)
  529. print(
  530. "primary public key after rotation + restart:",
  531. primary_public_key_after_restart,
  532. )
  533. raise Exception("public keys don't match")
  534. c.sql("DROP CONNECTION thancred;")
  535. num_connections = c.sql_query("SELECT count(*) FROM mz_ssh_tunnel_connections;")[0][
  536. 0
  537. ]
  538. if num_connections != 1:
  539. connections = c.sql_query("SELECT * FROM mz_ssh_tunnel_connections;")
  540. print("Found connections in mz_ssh_tunnel_connections: ", connections)
  541. raise Exception(
  542. "ssh tunnel connection not properly removed from mz_ssh_tunnel_connections after key rotation"
  543. )
  544. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  545. parser.add_argument(
  546. "--extended",
  547. action="store_true",
  548. help="run additional tests",
  549. )
  550. args = parser.parse_args()
  551. # Test against both standard schema registry
  552. # and kafka implementations, if --extended is passed
  553. workflows = [
  554. # These tests core functionality related to kafka with ssh and error reporting.
  555. (workflow_kafka, (False,), True),
  556. (workflow_hidden_hosts, (False,), True),
  557. # These tests core functionality related to pg with ssh and error reporting.
  558. (workflow_basic_ssh_features, (), False),
  559. (workflow_pg, (), True),
  560. (workflow_kafka_restart_replica, (), True),
  561. (workflow_kafka_sink, (), True),
  562. ]
  563. if args.extended:
  564. workflows.extend(
  565. [
  566. (workflow_kafka, (True,), True),
  567. (workflow_hidden_hosts, (True,), True),
  568. # Various special cases related to ssh
  569. (workflow_ssh_key_after_restart, (), False),
  570. (workflow_rotated_ssh_key_after_restart, (), False),
  571. (workflow_validate_connection, (), True),
  572. (workflow_pg_via_ssh_tunnel_with_ssl, (), True),
  573. (workflow_pg_restart_bastion, (), True),
  574. (workflow_pg_restart_postgres, (), True),
  575. ]
  576. )
  577. def process(p) -> None:
  578. workflow, args, validate_success = p
  579. workflow(c, *args)
  580. c.sanity_restart_mz()
  581. if validate_success:
  582. c.run_testdrive_files("--no-reset", "validate-success.td")
  583. sharded_workflows = buildkite.shard_list(workflows, lambda w: w[0].__name__)
  584. c.test_parts(sharded_workflows, process)