mzcompose.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Postgres source tests with interruptions, test that Materialize can recover.
  11. """
  12. import re
  13. import time
  14. import pg8000
  15. from pg8000 import Connection
  16. from pg8000.dbapi import ProgrammingError
  17. from materialize import buildkite
  18. from materialize.mzcompose.composition import Composition
  19. from materialize.mzcompose.services.alpine import Alpine
  20. from materialize.mzcompose.services.materialized import Materialized
  21. from materialize.mzcompose.services.mz import Mz
  22. from materialize.mzcompose.services.postgres import Postgres
  23. from materialize.mzcompose.services.testdrive import Testdrive
  24. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  25. SERVICES = [
  26. Alpine(),
  27. Mz(app_password=""),
  28. Materialized(default_replication_factor=2),
  29. Postgres(),
  30. Toxiproxy(),
  31. Testdrive(no_reset=True, default_timeout="300s"),
  32. ]
  33. def workflow_default(c: Composition) -> None:
  34. def process(name: str) -> None:
  35. if name == "default":
  36. return
  37. # clear to avoid issues
  38. c.kill("postgres")
  39. c.rm("postgres")
  40. with c.test_case(name):
  41. c.workflow(name)
  42. c.test_parts(list(c.workflows.keys()), process)
  43. def workflow_disruptions(c: Composition) -> None:
  44. """Test Postgres direct replication's failure handling by
  45. disrupting replication at various stages using Toxiproxy or service restarts
  46. """
  47. # TODO: most of these should likely be converted to cluster tests
  48. scenarios = [
  49. pg_out_of_disk_space,
  50. disconnect_pg_during_snapshot,
  51. disconnect_pg_during_replication,
  52. restart_pg_during_snapshot,
  53. restart_mz_during_snapshot,
  54. restart_pg_during_replication,
  55. restart_mz_during_replication,
  56. fix_pg_schema_while_mz_restarts,
  57. verify_no_snapshot_reingestion,
  58. restart_mz_after_initial_snapshot,
  59. restart_mz_while_cdc_changes,
  60. drop_replication_slot_when_mz_is_on,
  61. ]
  62. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  63. print(
  64. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  65. )
  66. for scenario in scenarios:
  67. overrides = (
  68. [Postgres(volumes=["sourcedata_512Mb:/var/lib/postgresql/data"])]
  69. if scenario == pg_out_of_disk_space
  70. else []
  71. )
  72. with c.override(*overrides):
  73. print(
  74. f"--- Running scenario {scenario.__name__} with overrides: {overrides}"
  75. )
  76. c.override_current_testcase_name(
  77. f"Scenario '{scenario.__name__}' of workflow_disruptions"
  78. )
  79. initialize(c)
  80. scenario(c)
  81. end(c)
  82. def workflow_backup_restore(c: Composition) -> None:
  83. scenarios = [
  84. backup_restore_pg,
  85. ]
  86. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  87. print(
  88. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  89. )
  90. with c.override(
  91. Materialized(sanity_restart=False, default_replication_factor=2),
  92. Alpine(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]),
  93. Postgres(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]),
  94. ):
  95. for scenario in scenarios:
  96. print(f"--- Running scenario {scenario.__name__}")
  97. initialize(c)
  98. scenario(c)
  99. # No end confirmation here, since we expect the source to be in a bad state
  100. def initialize(c: Composition) -> None:
  101. c.down(destroy_volumes=True)
  102. c.up("materialized", "postgres", "toxiproxy")
  103. c.run_testdrive_files(
  104. "configure-toxiproxy.td",
  105. "populate-tables.td",
  106. "configure-postgres.td",
  107. "configure-materialize.td",
  108. )
  109. def restart_pg(c: Composition) -> None:
  110. c.kill("postgres")
  111. c.up("postgres")
  112. def restart_mz(c: Composition) -> None:
  113. c.kill("materialized")
  114. c.up("materialized")
  115. def end(c: Composition) -> None:
  116. """Validate the data at the end."""
  117. c.run_testdrive_files("verify-data.td", "cleanup.td")
  118. def disconnect_pg_during_snapshot(c: Composition) -> None:
  119. c.run_testdrive_files(
  120. "toxiproxy-close-connection.td",
  121. "toxiproxy-restore-connection.td",
  122. "delete-rows-t1.td",
  123. "delete-rows-t2.td",
  124. "alter-table.td",
  125. "alter-mz.td",
  126. )
  127. def restart_pg_during_snapshot(c: Composition) -> None:
  128. restart_pg(c)
  129. c.run_testdrive_files(
  130. "delete-rows-t1.td",
  131. "delete-rows-t2.td",
  132. "alter-table.td",
  133. "alter-mz.td",
  134. )
  135. def restart_mz_during_snapshot(c: Composition) -> None:
  136. c.run_testdrive_files("alter-mz.td")
  137. restart_mz(c)
  138. c.run_testdrive_files("delete-rows-t1.td", "delete-rows-t2.td", "alter-table.td")
  139. def restart_mz_after_initial_snapshot(c: Composition) -> None:
  140. c.run_testdrive_files(
  141. "wait-for-snapshot.td",
  142. "delete-rows-t1.td",
  143. )
  144. restart_mz(c)
  145. c.run_testdrive_files(
  146. "delete-rows-t2.td",
  147. "alter-table.td",
  148. "alter-mz.td",
  149. "verify-data.td",
  150. "alter-table-fix.td",
  151. )
  152. def restart_mz_while_cdc_changes(c: Composition) -> None:
  153. c.run_testdrive_files(
  154. "wait-for-snapshot.td",
  155. "delete-rows-t1.td",
  156. )
  157. c.kill("materialized")
  158. # run delete-rows-t2.td in pg
  159. pg_conn = _create_pg_connection(c)
  160. cursor = pg_conn.cursor()
  161. cursor.execute("DELETE FROM t2 WHERE f1 % 2 = 1;")
  162. c.up("materialized")
  163. c.run_testdrive_files(
  164. "alter-table.td",
  165. "alter-mz.td",
  166. "verify-data.td",
  167. "alter-table-fix.td",
  168. )
  169. def disconnect_pg_during_replication(c: Composition) -> None:
  170. c.run_testdrive_files(
  171. "wait-for-snapshot.td",
  172. "delete-rows-t1.td",
  173. "delete-rows-t2.td",
  174. "alter-table.td",
  175. "alter-mz.td",
  176. "toxiproxy-close-connection.td",
  177. "toxiproxy-restore-connection.td",
  178. )
  179. def restart_pg_during_replication(c: Composition) -> None:
  180. c.run_testdrive_files(
  181. "wait-for-snapshot.td",
  182. "delete-rows-t1.td",
  183. "alter-table.td",
  184. "alter-mz.td",
  185. )
  186. restart_pg(c)
  187. c.run_testdrive_files("delete-rows-t2.td")
  188. def restart_mz_during_replication(c: Composition) -> None:
  189. c.run_testdrive_files(
  190. "wait-for-snapshot.td",
  191. "delete-rows-t1.td",
  192. "alter-table.td",
  193. "alter-mz.td",
  194. )
  195. restart_mz(c)
  196. c.run_testdrive_files("delete-rows-t2.td")
  197. def fix_pg_schema_while_mz_restarts(c: Composition) -> None:
  198. c.run_testdrive_files(
  199. "delete-rows-t1.td",
  200. "delete-rows-t2.td",
  201. "alter-table.td",
  202. "alter-mz.td",
  203. "verify-data.td",
  204. "alter-table-fix.td",
  205. )
  206. restart_mz(c)
  207. def verify_no_snapshot_reingestion(c: Composition) -> None:
  208. """Confirm that Mz does not reingest the entire snapshot on restart by
  209. revoking its SELECT privileges
  210. """
  211. c.run_testdrive_files(
  212. "wait-for-snapshot.td", "postgres-disable-select-permission.td"
  213. )
  214. restart_mz(c)
  215. c.run_testdrive_files(
  216. "delete-rows-t1.td",
  217. "delete-rows-t2.td",
  218. "alter-table.td",
  219. "alter-mz.td",
  220. )
  221. def pg_out_of_disk_space(c: Composition) -> None:
  222. c.run_testdrive_files(
  223. "wait-for-snapshot.td",
  224. "delete-rows-t1.td",
  225. )
  226. fill_file = "/var/lib/postgresql/data/fill_file"
  227. c.exec(
  228. "postgres",
  229. "bash",
  230. "-c",
  231. f"dd if=/dev/zero of={fill_file} bs=1024 count=$[1024*512] || true",
  232. )
  233. print("Sleeping for 30 seconds ...")
  234. time.sleep(30)
  235. c.exec("postgres", "bash", "-c", f"rm {fill_file}")
  236. c.run_testdrive_files("delete-rows-t2.td", "alter-table.td", "alter-mz.td")
  237. def drop_replication_slot_when_mz_is_on(c: Composition) -> None:
  238. c.run_testdrive_files(
  239. "wait-for-snapshot.td",
  240. "delete-rows-t1.td",
  241. )
  242. pg_conn = _create_pg_connection(c)
  243. slot_names = _get_all_pg_replication_slots(pg_conn)
  244. try:
  245. _drop_pg_replication_slots(pg_conn, slot_names)
  246. assert False, "active replication slot is not expected to allow drop action"
  247. except ProgrammingError as e:
  248. assert re.search(
  249. 'replication slot "materialize_[a-f0-9]+" is active', (str(e))
  250. ), f"Got: {str(e)}"
  251. c.run_testdrive_files(
  252. "delete-rows-t2.td",
  253. "alter-table.td",
  254. "alter-mz.td",
  255. )
  256. def _create_pg_connection(c: Composition) -> Connection:
  257. connection = pg8000.connect(
  258. host="localhost",
  259. user="postgres",
  260. password="postgres",
  261. port=c.default_port("postgres"),
  262. )
  263. connection.autocommit = True
  264. return connection
  265. def _get_all_pg_replication_slots(pg_conn: Connection) -> list[str]:
  266. cursor = pg_conn.cursor()
  267. cursor.execute("SELECT slot_name FROM pg_replication_slots;")
  268. slot_names = []
  269. for row in cursor.fetchall():
  270. slot_names.append(row[0])
  271. return slot_names
  272. def _drop_pg_replication_slots(pg_conn: Connection, slot_names: list[str]) -> None:
  273. cursor = pg_conn.cursor()
  274. for slot_name in slot_names:
  275. print(f"Dropping replication slot {slot_name}")
  276. cursor.execute(f"SELECT pg_drop_replication_slot('{slot_name}');")
  277. def backup_restore_pg(c: Composition) -> None:
  278. c.exec(
  279. "postgres",
  280. "bash",
  281. "-c",
  282. "echo 'local replication all trust\n' >> /share/conf/pg_hba.conf",
  283. )
  284. # Tell postgres to reload config
  285. c.kill("postgres", signal="HUP", wait=False)
  286. # Backup postgres, wait for completion
  287. backup_dir = "/scratch/backup"
  288. c.exec(
  289. "postgres",
  290. "bash",
  291. "-c",
  292. f"mkdir {backup_dir} && chown -R postgres:postgres {backup_dir}",
  293. )
  294. c.exec(
  295. "postgres",
  296. "pg_basebackup",
  297. "-U",
  298. "postgres",
  299. "-X",
  300. "stream",
  301. "-c",
  302. "fast",
  303. "-D",
  304. backup_dir,
  305. )
  306. c.run_testdrive_files("delete-rows-t1.td")
  307. # Stop postgres service
  308. c.stop("postgres")
  309. # Perform a point-in-time recovery from the backup to postgres
  310. c.run("alpine", "/bin/sh", "-c", "rm -rf /var/lib/postgresql/data/*")
  311. c.run("alpine", "/bin/sh", "-c", f"cp -r {backup_dir}/* /var/lib/postgresql/data/")
  312. c.run("alpine", "/bin/sh", "-c", "touch /var/lib/postgresql/data/recovery.signal")
  313. c.run(
  314. "alpine",
  315. "/bin/sh",
  316. "-c",
  317. "echo \"restore_command 'cp /var/lib/postgresql/data/pg_wal/%f %p'\n\" >> /var/lib/postgresql/data/postgresql.conf",
  318. )
  319. # Wait for postgres to become usable again
  320. c.up("postgres")
  321. c.run_testdrive_files("verify-postgres-select.td")
  322. # Check state of the postgres source
  323. c.run_testdrive_files("verify-source-failed.td")