mzcompose.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Postgres source tests with interruptions, test that Materialize can recover.
  11. """
  12. import time
  13. from materialize import buildkite
  14. from materialize.mzcompose.composition import Composition
  15. from materialize.mzcompose.services.alpine import Alpine
  16. from materialize.mzcompose.services.materialized import Materialized
  17. from materialize.mzcompose.services.mz import Mz
  18. from materialize.mzcompose.services.postgres import Postgres
  19. from materialize.mzcompose.services.testdrive import Testdrive
  20. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  21. SERVICES = [
  22. Alpine(),
  23. Mz(app_password=""),
  24. Materialized(default_replication_factor=2),
  25. Postgres(),
  26. Toxiproxy(),
  27. Testdrive(no_reset=True, default_timeout="300s"),
  28. ]
  29. def workflow_default(c: Composition) -> None:
  30. def process(name: str) -> None:
  31. if name == "default":
  32. return
  33. # clear to avoid issues
  34. c.kill("postgres")
  35. c.rm("postgres")
  36. with c.test_case(name):
  37. c.workflow(name)
  38. c.test_parts(list(c.workflows.keys()), process)
  39. def workflow_disruptions(c: Composition) -> None:
  40. """Test Postgres direct replication's failure handling by
  41. disrupting replication at various stages using Toxiproxy or service restarts
  42. """
  43. # TODO: most of these should likely be converted to cluster tests
  44. scenarios = [
  45. pg_out_of_disk_space,
  46. disconnect_pg_during_snapshot,
  47. disconnect_pg_during_replication,
  48. restart_pg_during_snapshot,
  49. restart_mz_during_snapshot,
  50. restart_pg_during_replication,
  51. restart_mz_during_replication,
  52. fix_pg_schema_while_mz_restarts,
  53. verify_no_snapshot_reingestion,
  54. ]
  55. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  56. print(
  57. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  58. )
  59. for scenario in scenarios:
  60. overrides = (
  61. [Postgres(volumes=["sourcedata_512Mb:/var/lib/postgresql/data"])]
  62. if scenario == pg_out_of_disk_space
  63. else []
  64. )
  65. with c.override(*overrides):
  66. print(
  67. f"--- Running scenario {scenario.__name__} with overrides: {overrides}"
  68. )
  69. c.override_current_testcase_name(
  70. f"Scenario '{scenario.__name__}' of workflow_disruptions"
  71. )
  72. initialize(c)
  73. scenario(c)
  74. end(c)
  75. def workflow_backup_restore(c: Composition) -> None:
  76. scenarios = [
  77. backup_restore_pg,
  78. ]
  79. scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__)
  80. print(
  81. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {[s.__name__ for s in scenarios]}"
  82. )
  83. with c.override(
  84. Materialized(sanity_restart=False, default_replication_factor=2),
  85. Alpine(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]),
  86. Postgres(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]),
  87. ):
  88. for scenario in scenarios:
  89. print(f"--- Running scenario {scenario.__name__}")
  90. initialize(c)
  91. scenario(c)
  92. # No end confirmation here, since we expect the source to be in a bad state
  93. def initialize(c: Composition) -> None:
  94. c.down(destroy_volumes=True)
  95. c.up("materialized", "postgres", "toxiproxy")
  96. c.run_testdrive_files(
  97. "configure-toxiproxy.td",
  98. "populate-tables.td",
  99. "configure-postgres.td",
  100. "configure-materialize.td",
  101. )
  102. def restart_pg(c: Composition) -> None:
  103. c.kill("postgres")
  104. c.up("postgres")
  105. def restart_mz(c: Composition) -> None:
  106. c.kill("materialized")
  107. c.up("materialized")
  108. def end(c: Composition) -> None:
  109. """Validate the data at the end."""
  110. c.run_testdrive_files("verify-data.td", "cleanup.td")
  111. def disconnect_pg_during_snapshot(c: Composition) -> None:
  112. c.run_testdrive_files(
  113. "toxiproxy-close-connection.td",
  114. "toxiproxy-restore-connection.td",
  115. "delete-rows-t1.td",
  116. "delete-rows-t2.td",
  117. "alter-table.td",
  118. "alter-mz.td",
  119. )
  120. def restart_pg_during_snapshot(c: Composition) -> None:
  121. restart_pg(c)
  122. c.run_testdrive_files(
  123. "delete-rows-t1.td",
  124. "delete-rows-t2.td",
  125. "alter-table.td",
  126. "alter-mz.td",
  127. )
  128. def restart_mz_during_snapshot(c: Composition) -> None:
  129. c.run_testdrive_files("alter-mz.td")
  130. restart_mz(c)
  131. c.run_testdrive_files("delete-rows-t1.td", "delete-rows-t2.td", "alter-table.td")
  132. def disconnect_pg_during_replication(c: Composition) -> None:
  133. c.run_testdrive_files(
  134. "wait-for-snapshot.td",
  135. "delete-rows-t1.td",
  136. "delete-rows-t2.td",
  137. "alter-table.td",
  138. "alter-mz.td",
  139. "toxiproxy-close-connection.td",
  140. "toxiproxy-restore-connection.td",
  141. )
  142. def restart_pg_during_replication(c: Composition) -> None:
  143. c.run_testdrive_files(
  144. "wait-for-snapshot.td",
  145. "delete-rows-t1.td",
  146. "alter-table.td",
  147. "alter-mz.td",
  148. )
  149. restart_pg(c)
  150. c.run_testdrive_files("delete-rows-t2.td")
  151. def restart_mz_during_replication(c: Composition) -> None:
  152. c.run_testdrive_files(
  153. "wait-for-snapshot.td",
  154. "delete-rows-t1.td",
  155. "alter-table.td",
  156. "alter-mz.td",
  157. )
  158. restart_mz(c)
  159. c.run_testdrive_files("delete-rows-t2.td")
  160. def fix_pg_schema_while_mz_restarts(c: Composition) -> None:
  161. c.run_testdrive_files(
  162. "delete-rows-t1.td",
  163. "delete-rows-t2.td",
  164. "alter-table.td",
  165. "alter-mz.td",
  166. "verify-data.td",
  167. "alter-table-fix.td",
  168. )
  169. restart_mz(c)
  170. def verify_no_snapshot_reingestion(c: Composition) -> None:
  171. """Confirm that Mz does not reingest the entire snapshot on restart by
  172. revoking its SELECT privileges
  173. """
  174. c.run_testdrive_files(
  175. "wait-for-snapshot.td", "postgres-disable-select-permission.td"
  176. )
  177. restart_mz(c)
  178. c.run_testdrive_files(
  179. "delete-rows-t1.td",
  180. "delete-rows-t2.td",
  181. "alter-table.td",
  182. "alter-mz.td",
  183. )
  184. def pg_out_of_disk_space(c: Composition) -> None:
  185. c.run_testdrive_files(
  186. "wait-for-snapshot.td",
  187. "delete-rows-t1.td",
  188. )
  189. fill_file = "/var/lib/postgresql/data/fill_file"
  190. c.exec(
  191. "postgres",
  192. "bash",
  193. "-c",
  194. f"dd if=/dev/zero of={fill_file} bs=1024 count=$[1024*512] || true",
  195. )
  196. print("Sleeping for 30 seconds ...")
  197. time.sleep(30)
  198. c.exec("postgres", "bash", "-c", f"rm {fill_file}")
  199. c.run_testdrive_files("delete-rows-t2.td", "alter-table.td", "alter-mz.td")
  200. def backup_restore_pg(c: Composition) -> None:
  201. c.exec(
  202. "postgres",
  203. "bash",
  204. "-c",
  205. "echo 'local replication all trust\n' >> /share/conf/pg_hba.conf",
  206. )
  207. # Tell postgres to reload config
  208. c.kill("postgres", signal="HUP", wait=False)
  209. # Backup postgres, wait for completion
  210. backup_dir = "/scratch/backup"
  211. c.exec(
  212. "postgres",
  213. "bash",
  214. "-c",
  215. f"mkdir {backup_dir} && chown -R postgres:postgres {backup_dir}",
  216. )
  217. c.exec(
  218. "postgres",
  219. "pg_basebackup",
  220. "-U",
  221. "postgres",
  222. "-X",
  223. "stream",
  224. "-c",
  225. "fast",
  226. "-D",
  227. backup_dir,
  228. )
  229. c.run_testdrive_files("delete-rows-t1.td")
  230. # Stop postgres service
  231. c.stop("postgres")
  232. # Perform a point-in-time recovery from the backup to postgres
  233. c.run("alpine", "/bin/sh", "-c", "rm -rf /var/lib/postgresql/data/*")
  234. c.run("alpine", "/bin/sh", "-c", f"cp -r {backup_dir}/* /var/lib/postgresql/data/")
  235. c.run("alpine", "/bin/sh", "-c", "touch /var/lib/postgresql/data/recovery.signal")
  236. c.run(
  237. "alpine",
  238. "/bin/sh",
  239. "-c",
  240. "echo \"restore_command 'cp /var/lib/postgresql/data/pg_wal/%f %p'\n\" >> /var/lib/postgresql/data/postgresql.conf",
  241. )
  242. # Wait for postgres to become usable again
  243. c.up("postgres")
  244. c.run_testdrive_files("verify-postgres-select.td")
  245. # Check state of the postgres source
  246. c.run_testdrive_files("verify-source-failed.td")