mzcompose.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Native Postgres source tests, functional.
  11. """
  12. import glob
  13. import time
  14. import pg8000
  15. from pg8000 import Connection
  16. from materialize import MZ_ROOT, buildkite
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.service import Service, ServiceConfig
  19. from materialize.mzcompose.services.materialized import Materialized
  20. from materialize.mzcompose.services.minio import Minio
  21. from materialize.mzcompose.services.mz import Mz
  22. from materialize.mzcompose.services.postgres import (
  23. METADATA_STORE,
  24. CockroachOrPostgresMetadata,
  25. Postgres,
  26. )
  27. from materialize.mzcompose.services.test_certs import TestCerts
  28. from materialize.mzcompose.services.testdrive import Testdrive
  29. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  30. from materialize.source_table_migration import (
  31. get_new_image_for_source_table_migration_test,
  32. get_old_image_for_source_table_migration_test,
  33. verify_sources_after_source_table_migration,
  34. )
  35. # Set the max slot WAL keep size to 10MB
  36. DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"]
  37. class PostgresRecvlogical(Service):
  38. """
  39. Command to start a replication.
  40. """
  41. def __init__(self, replication_slot_name: str, publication_name: str) -> None:
  42. command: list[str] = [
  43. "pg_recvlogical",
  44. "--start",
  45. "--slot",
  46. f"{replication_slot_name}",
  47. "--file",
  48. "-",
  49. "--dbname",
  50. "postgres",
  51. "--host",
  52. "postgres",
  53. "--port",
  54. "5432",
  55. "--username",
  56. "postgres",
  57. "--no-password",
  58. "-o",
  59. "proto_version=1",
  60. "-o",
  61. f"publication_names={publication_name}",
  62. ]
  63. config: ServiceConfig = {"mzbuild": "postgres"}
  64. config.update(
  65. {
  66. "command": command,
  67. "allow_host_ports": True,
  68. "ports": ["5432"],
  69. "environment": ["PGPASSWORD=postgres"],
  70. }
  71. )
  72. super().__init__(name="pg_recvlogical", config=config)
  73. def create_postgres(
  74. pg_version: str | None, extra_command: list[str] = DEFAULT_PG_EXTRA_COMMAND
  75. ) -> Postgres:
  76. if pg_version is None:
  77. image = None
  78. else:
  79. image = f"postgres:{pg_version}"
  80. return Postgres(image=image, extra_command=extra_command)
  81. SERVICES = [
  82. Mz(app_password=""),
  83. Materialized(
  84. volumes_extra=["secrets:/share/secrets"],
  85. additional_system_parameter_defaults={
  86. "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error"
  87. },
  88. external_blob_store=True,
  89. default_replication_factor=2,
  90. ),
  91. Testdrive(),
  92. CockroachOrPostgresMetadata(),
  93. Minio(setup_materialize=True),
  94. TestCerts(),
  95. Toxiproxy(),
  96. create_postgres(pg_version=None),
  97. PostgresRecvlogical(
  98. replication_slot_name="", publication_name=""
  99. ), # Overriden below
  100. ]
  101. def get_targeted_pg_version(parser: WorkflowArgumentParser) -> str | None:
  102. parser.add_argument(
  103. "--pg-version",
  104. type=str,
  105. )
  106. args, _ = parser.parse_known_args()
  107. pg_version = args.pg_version
  108. if pg_version is not None:
  109. print(f"Running with Postgres version {pg_version}")
  110. return pg_version
  111. # TODO: redesign ceased status database-issues#7687
  112. # Test that how subsource statuses work across a variety of scenarios
  113. # def workflow_statuses(c: Composition, parser: WorkflowArgumentParser) -> None:
  114. # c.up("materialized", "postgres", "toxiproxy")
  115. # c.run_testdrive_files("status/01-setup.td")
  116. # with c.override(Testdrive(no_reset=True)):
  117. # # Restart mz
  118. # c.kill("materialized")
  119. # c.up("materialized")
  120. # c.run_testdrive_files(
  121. # "status/02-after-mz-restart.td",
  122. # "status/03-toxiproxy-interrupt.td",
  123. # "status/04-drop-publication.td",
  124. # )
  125. def workflow_replication_slots(c: Composition, parser: WorkflowArgumentParser) -> None:
  126. pg_version = get_targeted_pg_version(parser)
  127. with c.override(
  128. create_postgres(
  129. pg_version=pg_version, extra_command=["-c", "max_replication_slots=3"]
  130. )
  131. ):
  132. c.up("materialized", "postgres")
  133. c.run_testdrive_files("override/replication-slots.td")
  134. def workflow_wal_level(c: Composition, parser: WorkflowArgumentParser) -> None:
  135. pg_version = get_targeted_pg_version(parser)
  136. for wal_level in ["replica", "minimal"]:
  137. with c.override(
  138. create_postgres(
  139. pg_version=pg_version,
  140. extra_command=[
  141. "-c",
  142. "max_wal_senders=0",
  143. "-c",
  144. f"wal_level={wal_level}",
  145. ],
  146. )
  147. ):
  148. c.up("materialized", "postgres")
  149. c.run_testdrive_files("override/insufficient-wal-level.td")
  150. def workflow_replication_disabled(
  151. c: Composition, parser: WorkflowArgumentParser
  152. ) -> None:
  153. pg_version = get_targeted_pg_version(parser)
  154. with c.override(
  155. create_postgres(
  156. pg_version=pg_version, extra_command=["-c", "max_wal_senders=0"]
  157. )
  158. ):
  159. c.up("materialized", "postgres")
  160. c.run_testdrive_files("override/replication-disabled.td")
  161. def workflow_silent_connection_drop(
  162. c: Composition, parser: WorkflowArgumentParser
  163. ) -> None:
  164. """
  165. Test that mz can regain a replication slot that is used by another service.
  166. """
  167. pg_version = get_targeted_pg_version(parser)
  168. with c.override(
  169. create_postgres(
  170. pg_version=pg_version,
  171. extra_command=[
  172. "-c",
  173. "wal_sender_timeout=0",
  174. ],
  175. ),
  176. ):
  177. c.up("postgres")
  178. pg_conn = pg8000.connect(
  179. host="localhost",
  180. user="postgres",
  181. password="postgres",
  182. port=c.default_port("postgres"),
  183. )
  184. _verify_exactly_n_replication_slots_exist(pg_conn, n=0)
  185. c.up("materialized")
  186. c.run_testdrive_files(
  187. "--no-reset",
  188. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  189. "override/silent-connection-drop-part-1.td",
  190. )
  191. _verify_exactly_n_replication_slots_exist(pg_conn, n=1)
  192. _await_postgres_replication_slot_state(
  193. pg_conn,
  194. await_active=False,
  195. error_message="Replication slot is still active",
  196. )
  197. _claim_postgres_replication_slot(c, pg_conn)
  198. _await_postgres_replication_slot_state(
  199. pg_conn,
  200. await_active=True,
  201. error_message="Replication slot has not been claimed",
  202. )
  203. c.run_testdrive_files("--no-reset", "override/silent-connection-drop-part-2.td")
  204. _verify_exactly_n_replication_slots_exist(pg_conn, n=1)
  205. def _await_postgres_replication_slot_state(
  206. pg_conn: Connection, await_active: bool, error_message: str
  207. ) -> None:
  208. for i in range(1, 5):
  209. is_active = _is_postgres_activation_slot_active(pg_conn)
  210. if is_active == await_active:
  211. return
  212. else:
  213. time.sleep(1)
  214. raise RuntimeError(error_message)
  215. def _get_postgres_replication_slot_name(pg_conn: Connection) -> str:
  216. cursor = pg_conn.cursor()
  217. cursor.execute("SELECT slot_name FROM pg_replication_slots;")
  218. return cursor.fetchall()[0][0]
  219. def _claim_postgres_replication_slot(c: Composition, pg_conn: Connection) -> None:
  220. replicator = PostgresRecvlogical(
  221. replication_slot_name=_get_postgres_replication_slot_name(pg_conn),
  222. publication_name="mz_source",
  223. )
  224. with c.override(replicator):
  225. c.up(replicator.name)
  226. def _is_postgres_activation_slot_active(pg_conn: Connection) -> bool:
  227. cursor = pg_conn.cursor()
  228. cursor.execute("SELECT active FROM pg_replication_slots;")
  229. is_active = cursor.fetchall()[0][0]
  230. return is_active
  231. def _verify_exactly_n_replication_slots_exist(pg_conn: Connection, n: int) -> None:
  232. cursor = pg_conn.cursor()
  233. cursor.execute("SELECT count(*) FROM pg_replication_slots;")
  234. count_slots = cursor.fetchall()[0][0]
  235. assert (
  236. count_slots == n
  237. ), f"Expected {n} replication slot(s) but found {count_slots} slot(s)"
  238. def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
  239. pg_version = get_targeted_pg_version(parser)
  240. parser.add_argument(
  241. "filter",
  242. nargs="*",
  243. default=["*.td"],
  244. help="limit to only the files matching filter",
  245. )
  246. args = parser.parse_args()
  247. matching_files = []
  248. for filter in args.filter:
  249. matching_files.extend(
  250. glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax")
  251. )
  252. sharded_files: list[str] = buildkite.shard_list(
  253. sorted(matching_files), lambda file: file
  254. )
  255. print(f"Files: {sharded_files}")
  256. c.up({"name": "test-certs", "persistent": True})
  257. ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout
  258. ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout
  259. ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout
  260. ssl_wrong_cert = c.run(
  261. "test-certs", "cat", "/secrets/postgres.crt", capture=True
  262. ).stdout
  263. ssl_wrong_key = c.run(
  264. "test-certs", "cat", "/secrets/postgres.key", capture=True
  265. ).stdout
  266. with c.override(create_postgres(pg_version=pg_version)):
  267. c.up("materialized", "test-certs", "postgres")
  268. c.test_parts(
  269. sharded_files,
  270. lambda file: c.run_testdrive_files(
  271. f"--var=ssl-ca={ssl_ca}",
  272. f"--var=ssl-cert={ssl_cert}",
  273. f"--var=ssl-key={ssl_key}",
  274. f"--var=ssl-wrong-cert={ssl_wrong_cert}",
  275. f"--var=ssl-wrong-key={ssl_wrong_key}",
  276. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  277. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  278. file,
  279. ),
  280. )
  281. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  282. def process(name: str) -> None:
  283. if name in ("default", "migration"):
  284. return
  285. # TODO: Flaky, reenable when database-issues#7611 is fixed
  286. if name == "statuses":
  287. return
  288. # TODO: Flaky, reenable when database-issues#8447 is fixed
  289. if name == "silent-connection-drop":
  290. return
  291. c.kill("postgres")
  292. c.rm("postgres")
  293. c.kill("materialized")
  294. c.rm("materialized")
  295. with c.test_case(name):
  296. c.workflow(name, *parser.args)
  297. workflows_with_internal_sharding = ["cdc"]
  298. sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
  299. [
  300. w
  301. for w in c.workflows
  302. if w not in workflows_with_internal_sharding and w != "migration"
  303. ],
  304. lambda w: w,
  305. )
  306. print(
  307. f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
  308. )
  309. c.test_parts(sharded_workflows, process)
  310. def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None:
  311. parser.add_argument(
  312. "filter",
  313. nargs="*",
  314. default=["*.td"],
  315. help="limit to only the files matching filter",
  316. )
  317. args = parser.parse_args()
  318. matching_files = []
  319. for filter in args.filter:
  320. matching_files.extend(
  321. glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc-old-syntax")
  322. )
  323. sharded_files: list[str] = buildkite.shard_list(
  324. sorted(matching_files), lambda file: file
  325. )
  326. print(f"Files: {sharded_files}")
  327. c.up({"name": "test-certs", "persistent": True})
  328. ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout
  329. ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout
  330. ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout
  331. ssl_wrong_cert = c.run(
  332. "test-certs", "cat", "/secrets/postgres.crt", capture=True
  333. ).stdout
  334. ssl_wrong_key = c.run(
  335. "test-certs", "cat", "/secrets/postgres.key", capture=True
  336. ).stdout
  337. pg_version = get_targeted_pg_version(parser)
  338. for file in sharded_files:
  339. mz_old = Materialized(
  340. name="materialized",
  341. image=get_old_image_for_source_table_migration_test(),
  342. volumes_extra=["secrets:/share/secrets"],
  343. external_metadata_store=True,
  344. external_blob_store=True,
  345. additional_system_parameter_defaults={
  346. "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error"
  347. },
  348. default_replication_factor=2,
  349. )
  350. mz_new = Materialized(
  351. name="materialized",
  352. image=get_new_image_for_source_table_migration_test(),
  353. volumes_extra=["secrets:/share/secrets"],
  354. external_metadata_store=True,
  355. external_blob_store=True,
  356. additional_system_parameter_defaults={
  357. "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error",
  358. "force_source_table_syntax": "true",
  359. },
  360. default_replication_factor=2,
  361. )
  362. with c.override(mz_old, create_postgres(pg_version=pg_version)):
  363. c.up("materialized", "test-certs", "postgres")
  364. print(f"Running {file} with mz_old")
  365. c.run_testdrive_files(
  366. f"--var=ssl-ca={ssl_ca}",
  367. f"--var=ssl-cert={ssl_cert}",
  368. f"--var=ssl-key={ssl_key}",
  369. f"--var=ssl-wrong-cert={ssl_wrong_cert}",
  370. f"--var=ssl-wrong-key={ssl_wrong_key}",
  371. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  372. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  373. "--no-reset",
  374. file,
  375. )
  376. c.kill("materialized", wait=True)
  377. with c.override(mz_new):
  378. c.up("materialized")
  379. print("Running mz_new")
  380. verify_sources_after_source_table_migration(c, file)
  381. c.kill("materialized", wait=True)
  382. c.kill("postgres", wait=True)
  383. c.kill(METADATA_STORE, wait=True)
  384. c.rm("materialized")
  385. c.rm(METADATA_STORE)
  386. c.rm("postgres")
  387. c.rm_volumes("mzdata")