mzcompose.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Native Postgres source tests, functional.
  11. """
  12. import glob
  13. import time
  14. from textwrap import dedent
  15. import psycopg
  16. from psycopg import Connection
  17. from materialize import MZ_ROOT, buildkite
  18. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  19. from materialize.mzcompose.service import Service, ServiceConfig
  20. from materialize.mzcompose.services.materialized import Materialized
  21. from materialize.mzcompose.services.mz import Mz
  22. from materialize.mzcompose.services.postgres import Postgres
  23. from materialize.mzcompose.services.test_certs import TestCerts
  24. from materialize.mzcompose.services.testdrive import Testdrive
  25. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  26. # Set the max slot WAL keep size to 10MB
  27. DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"]
  28. class PostgresRecvlogical(Service):
  29. """
  30. Command to start a replication.
  31. """
  32. def __init__(self, replication_slot_name: str, publication_name: str) -> None:
  33. command: list[str] = [
  34. "pg_recvlogical",
  35. "--start",
  36. "--slot",
  37. f"{replication_slot_name}",
  38. "--file",
  39. "-",
  40. "--dbname",
  41. "postgres",
  42. "--host",
  43. "postgres",
  44. "--port",
  45. "5432",
  46. "--username",
  47. "postgres",
  48. "--no-password",
  49. "-o",
  50. "proto_version=1",
  51. "-o",
  52. f"publication_names={publication_name}",
  53. ]
  54. config: ServiceConfig = {"mzbuild": "postgres"}
  55. config.update(
  56. {
  57. "command": command,
  58. "allow_host_ports": True,
  59. "ports": ["5432"],
  60. "environment": ["PGPASSWORD=postgres"],
  61. }
  62. )
  63. super().__init__(name="pg_recvlogical", config=config)
  64. def create_postgres(
  65. pg_version: str | None, extra_command: list[str] = DEFAULT_PG_EXTRA_COMMAND
  66. ) -> Postgres:
  67. if pg_version is None:
  68. image = None
  69. else:
  70. image = f"postgres:{pg_version}"
  71. return Postgres(image=image, extra_command=extra_command)
  72. SERVICES = [
  73. Mz(app_password=""),
  74. Materialized(
  75. volumes_extra=["secrets:/share/secrets"],
  76. additional_system_parameter_defaults={
  77. "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error"
  78. },
  79. default_replication_factor=2,
  80. ),
  81. Testdrive(),
  82. TestCerts(),
  83. Toxiproxy(),
  84. create_postgres(pg_version=None),
  85. PostgresRecvlogical(
  86. replication_slot_name="", publication_name=""
  87. ), # Overriden below
  88. ]
  89. def get_targeted_pg_version(parser: WorkflowArgumentParser) -> str | None:
  90. parser.add_argument(
  91. "--pg-version",
  92. type=str,
  93. )
  94. args, _ = parser.parse_known_args()
  95. pg_version = args.pg_version
  96. if pg_version is not None:
  97. print(f"Running with Postgres version {pg_version}")
  98. return pg_version
  99. # TODO: redesign ceased status database-issues#7687
  100. # Test that how subsource statuses work across a variety of scenarios
  101. # def workflow_statuses(c: Composition, parser: WorkflowArgumentParser) -> None:
  102. # c.up("materialized", "postgres", "toxiproxy")
  103. # c.run_testdrive_files("status/01-setup.td")
  104. # with c.override(Testdrive(no_reset=True)):
  105. # # Restart mz
  106. # c.kill("materialized")
  107. # c.up("materialized")
  108. # c.run_testdrive_files(
  109. # "status/02-after-mz-restart.td",
  110. # "status/03-toxiproxy-interrupt.td",
  111. # "status/04-drop-publication.td",
  112. # )
  113. def workflow_replication_slots(c: Composition, parser: WorkflowArgumentParser) -> None:
  114. pg_version = get_targeted_pg_version(parser)
  115. with c.override(
  116. create_postgres(
  117. pg_version=pg_version, extra_command=["-c", "max_replication_slots=3"]
  118. )
  119. ):
  120. c.up("materialized", "postgres")
  121. c.run_testdrive_files("override/replication-slots.td")
  122. def workflow_wal_level(c: Composition, parser: WorkflowArgumentParser) -> None:
  123. pg_version = get_targeted_pg_version(parser)
  124. for wal_level in ["replica", "minimal"]:
  125. with c.override(
  126. create_postgres(
  127. pg_version=pg_version,
  128. extra_command=[
  129. "-c",
  130. "max_wal_senders=0",
  131. "-c",
  132. f"wal_level={wal_level}",
  133. ],
  134. )
  135. ):
  136. c.up("materialized", "postgres")
  137. c.run_testdrive_files("override/insufficient-wal-level.td")
  138. def workflow_replication_disabled(
  139. c: Composition, parser: WorkflowArgumentParser
  140. ) -> None:
  141. pg_version = get_targeted_pg_version(parser)
  142. with c.override(
  143. create_postgres(
  144. pg_version=pg_version, extra_command=["-c", "max_wal_senders=0"]
  145. )
  146. ):
  147. c.up("materialized", "postgres")
  148. c.run_testdrive_files("override/replication-disabled.td")
  149. def workflow_silent_connection_drop(
  150. c: Composition, parser: WorkflowArgumentParser
  151. ) -> None:
  152. """
  153. Test that mz can regain a replication slot that is used by another service.
  154. """
  155. pg_version = get_targeted_pg_version(parser)
  156. with c.override(
  157. create_postgres(
  158. pg_version=pg_version,
  159. extra_command=[
  160. "-c",
  161. "wal_sender_timeout=0",
  162. ],
  163. ),
  164. ):
  165. c.up("postgres")
  166. pg_conn = psycopg.connect(
  167. host="localhost",
  168. user="postgres",
  169. password="postgres",
  170. port=c.default_port("postgres"),
  171. )
  172. _verify_exactly_n_replication_slots_exist(pg_conn, n=0)
  173. c.up("materialized")
  174. c.run_testdrive_files(
  175. "--no-reset",
  176. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  177. "override/silent-connection-drop-part-1.td",
  178. )
  179. _verify_exactly_n_replication_slots_exist(pg_conn, n=1)
  180. _await_postgres_replication_slot_state(
  181. pg_conn,
  182. await_active=False,
  183. error_message="Replication slot is still active",
  184. )
  185. _claim_postgres_replication_slot(c, pg_conn)
  186. _await_postgres_replication_slot_state(
  187. pg_conn,
  188. await_active=True,
  189. error_message="Replication slot has not been claimed",
  190. )
  191. c.run_testdrive_files("--no-reset", "override/silent-connection-drop-part-2.td")
  192. _verify_exactly_n_replication_slots_exist(pg_conn, n=1)
  193. def _await_postgres_replication_slot_state(
  194. pg_conn: Connection, await_active: bool, error_message: str
  195. ) -> None:
  196. for i in range(1, 5):
  197. is_active = _is_postgres_activation_slot_active(pg_conn)
  198. if is_active == await_active:
  199. return
  200. else:
  201. time.sleep(1)
  202. raise RuntimeError(error_message)
  203. def _get_postgres_replication_slot_name(pg_conn: Connection) -> str:
  204. cursor = pg_conn.cursor()
  205. cursor.execute("SELECT slot_name FROM pg_replication_slots;")
  206. return cursor.fetchall()[0][0]
  207. def _claim_postgres_replication_slot(c: Composition, pg_conn: Connection) -> None:
  208. replicator = PostgresRecvlogical(
  209. replication_slot_name=_get_postgres_replication_slot_name(pg_conn),
  210. publication_name="mz_source",
  211. )
  212. with c.override(replicator):
  213. c.up(replicator.name)
  214. def _is_postgres_activation_slot_active(pg_conn: Connection) -> bool:
  215. cursor = pg_conn.cursor()
  216. cursor.execute("SELECT active FROM pg_replication_slots;")
  217. is_active = cursor.fetchall()[0][0]
  218. return is_active
  219. def _verify_exactly_n_replication_slots_exist(pg_conn: Connection, n: int) -> None:
  220. cursor = pg_conn.cursor()
  221. cursor.execute("SELECT count(*) FROM pg_replication_slots;")
  222. count_slots = cursor.fetchall()[0][0]
  223. assert (
  224. count_slots == n
  225. ), f"Expected {n} replication slot(s) but found {count_slots} slot(s)"
  226. def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
  227. pg_version = get_targeted_pg_version(parser)
  228. parser.add_argument(
  229. "filter",
  230. nargs="*",
  231. default=["*.td"],
  232. help="limit to only the files matching filter",
  233. )
  234. args = parser.parse_args()
  235. matching_files = []
  236. for filter in args.filter:
  237. matching_files.extend(glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc"))
  238. sharded_files: list[str] = buildkite.shard_list(
  239. sorted(matching_files), lambda file: file
  240. )
  241. print(f"Files: {sharded_files}")
  242. c.up({"name": "test-certs", "persistent": True})
  243. ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout
  244. ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout
  245. ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout
  246. ssl_wrong_cert = c.run(
  247. "test-certs", "cat", "/secrets/postgres.crt", capture=True
  248. ).stdout
  249. ssl_wrong_key = c.run(
  250. "test-certs", "cat", "/secrets/postgres.key", capture=True
  251. ).stdout
  252. with c.override(create_postgres(pg_version=pg_version)):
  253. c.up("materialized", "test-certs", "postgres")
  254. c.test_parts(
  255. sharded_files,
  256. lambda file: c.run_testdrive_files(
  257. f"--var=ssl-ca={ssl_ca}",
  258. f"--var=ssl-cert={ssl_cert}",
  259. f"--var=ssl-key={ssl_key}",
  260. f"--var=ssl-wrong-cert={ssl_wrong_cert}",
  261. f"--var=ssl-wrong-key={ssl_wrong_key}",
  262. f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
  263. f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
  264. file,
  265. ),
  266. )
  267. def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
  268. """
  269. The goal is to test a large scale Postgres instance and to make sure that we can successfully ingest data from it quickly.
  270. """
  271. pg_version = get_targeted_pg_version(parser)
  272. with c.override(
  273. create_postgres(
  274. pg_version=pg_version, extra_command=["-c", "max_replication_slots=3"]
  275. )
  276. ):
  277. c.up("materialized", "postgres", {"name": "testdrive", "persistent": True})
  278. # Set up the Postgres server with the initial records, set up the connection to
  279. # the Postgres server in Materialize.
  280. c.testdrive(
  281. dedent(
  282. """
  283. $ postgres-execute connection=postgres://postgres:postgres@postgres
  284. ALTER USER postgres WITH replication;
  285. DROP SCHEMA IF EXISTS public CASCADE;
  286. DROP PUBLICATION IF EXISTS mz_source;
  287. CREATE SCHEMA public;
  288. > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
  289. > CREATE CONNECTION IF NOT EXISTS pg TO POSTGRES (HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass)
  290. $ postgres-execute connection=postgres://postgres:postgres@postgres
  291. DROP TABLE IF EXISTS products;
  292. CREATE TABLE products (id int NOT NULL, name varchar(255) DEFAULT NULL, merchant_id int NOT NULL, price int DEFAULT NULL, status int DEFAULT NULL, created_at timestamp NULL, recordSizePayload text, PRIMARY KEY (id));
  293. ALTER TABLE products REPLICA IDENTITY FULL;
  294. CREATE PUBLICATION mz_source FOR ALL TABLES;
  295. > DROP SOURCE IF EXISTS s1 CASCADE;
  296. """
  297. )
  298. )
  299. def make_inserts(c: Composition, start: int, batch_num: int):
  300. c.testdrive(
  301. args=["--no-reset"],
  302. input=dedent(
  303. f"""
  304. $ postgres-execute connection=postgres://postgres:postgres@postgres
  305. INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT {start} + row_number() OVER (), 'name' || ({start} + row_number() OVER ()), ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 10, '2024-12-12'::DATE, repeat('x', 1000000) FROM generate_series(1, {batch_num});
  306. """
  307. ),
  308. )
  309. num_rows = 100_000 # out of memory with 200_000 rows
  310. batch_size = 10_000
  311. for i in range(0, num_rows, batch_size):
  312. batch_num = min(batch_size, num_rows - i)
  313. make_inserts(c, i, batch_num)
  314. c.testdrive(
  315. args=["--no-reset"],
  316. input=dedent(
  317. f"""
  318. > CREATE SOURCE s1
  319. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  320. > CREATE TABLE products FROM SOURCE s1 (REFERENCE products);
  321. > SELECT COUNT(*) FROM products;
  322. {num_rows}
  323. """
  324. ),
  325. )
  326. make_inserts(c, num_rows, 1)
  327. c.testdrive(
  328. args=["--no-reset"],
  329. input=dedent(
  330. f"""
  331. > SELECT COUNT(*) FROM products;
  332. {num_rows + 1}
  333. """
  334. ),
  335. )
  336. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  337. def process(name: str) -> None:
  338. if name in ("default", "large-scale"):
  339. return
  340. # TODO: Flaky, reenable when database-issues#7611 is fixed
  341. if name == "statuses":
  342. return
  343. # TODO: Flaky, reenable when database-issues#8447 is fixed
  344. if name == "silent-connection-drop":
  345. return
  346. c.kill("postgres")
  347. c.rm("postgres")
  348. c.kill("materialized")
  349. c.rm("materialized")
  350. with c.test_case(name):
  351. c.workflow(name, *parser.args)
  352. workflows_with_internal_sharding = ["cdc"]
  353. sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
  354. [
  355. w
  356. for w in c.workflows
  357. if w not in workflows_with_internal_sharding and w != "migration"
  358. ],
  359. lambda w: w,
  360. )
  361. print(
  362. f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
  363. )
  364. c.test_parts(sharded_workflows, process)