123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Native Postgres source tests, functional.
- """
- import glob
- import time
- from textwrap import dedent
- import psycopg
- from psycopg import Connection
- from materialize import MZ_ROOT, buildkite
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.service import Service, ServiceConfig
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.test_certs import TestCerts
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.toxiproxy import Toxiproxy
- # Set the max slot WAL keep size to 10MB
- DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"]
- class PostgresRecvlogical(Service):
- """
- Command to start a replication.
- """
- def __init__(self, replication_slot_name: str, publication_name: str) -> None:
- command: list[str] = [
- "pg_recvlogical",
- "--start",
- "--slot",
- f"{replication_slot_name}",
- "--file",
- "-",
- "--dbname",
- "postgres",
- "--host",
- "postgres",
- "--port",
- "5432",
- "--username",
- "postgres",
- "--no-password",
- "-o",
- "proto_version=1",
- "-o",
- f"publication_names={publication_name}",
- ]
- config: ServiceConfig = {"mzbuild": "postgres"}
- config.update(
- {
- "command": command,
- "allow_host_ports": True,
- "ports": ["5432"],
- "environment": ["PGPASSWORD=postgres"],
- }
- )
- super().__init__(name="pg_recvlogical", config=config)
- def create_postgres(
- pg_version: str | None, extra_command: list[str] = DEFAULT_PG_EXTRA_COMMAND
- ) -> Postgres:
- if pg_version is None:
- image = None
- else:
- image = f"postgres:{pg_version}"
- return Postgres(image=image, extra_command=extra_command)
- SERVICES = [
- Mz(app_password=""),
- Materialized(
- volumes_extra=["secrets:/share/secrets"],
- additional_system_parameter_defaults={
- "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error"
- },
- default_replication_factor=2,
- ),
- Testdrive(),
- TestCerts(),
- Toxiproxy(),
- create_postgres(pg_version=None),
- PostgresRecvlogical(
- replication_slot_name="", publication_name=""
- ), # Overriden below
- ]
- def get_targeted_pg_version(parser: WorkflowArgumentParser) -> str | None:
- parser.add_argument(
- "--pg-version",
- type=str,
- )
- args, _ = parser.parse_known_args()
- pg_version = args.pg_version
- if pg_version is not None:
- print(f"Running with Postgres version {pg_version}")
- return pg_version
- # TODO: redesign ceased status database-issues#7687
- # Test that how subsource statuses work across a variety of scenarios
- # def workflow_statuses(c: Composition, parser: WorkflowArgumentParser) -> None:
- # c.up("materialized", "postgres", "toxiproxy")
- # c.run_testdrive_files("status/01-setup.td")
- # with c.override(Testdrive(no_reset=True)):
- # # Restart mz
- # c.kill("materialized")
- # c.up("materialized")
- # c.run_testdrive_files(
- # "status/02-after-mz-restart.td",
- # "status/03-toxiproxy-interrupt.td",
- # "status/04-drop-publication.td",
- # )
- def workflow_replication_slots(c: Composition, parser: WorkflowArgumentParser) -> None:
- pg_version = get_targeted_pg_version(parser)
- with c.override(
- create_postgres(
- pg_version=pg_version, extra_command=["-c", "max_replication_slots=3"]
- )
- ):
- c.up("materialized", "postgres")
- c.run_testdrive_files("override/replication-slots.td")
- def workflow_wal_level(c: Composition, parser: WorkflowArgumentParser) -> None:
- pg_version = get_targeted_pg_version(parser)
- for wal_level in ["replica", "minimal"]:
- with c.override(
- create_postgres(
- pg_version=pg_version,
- extra_command=[
- "-c",
- "max_wal_senders=0",
- "-c",
- f"wal_level={wal_level}",
- ],
- )
- ):
- c.up("materialized", "postgres")
- c.run_testdrive_files("override/insufficient-wal-level.td")
- def workflow_replication_disabled(
- c: Composition, parser: WorkflowArgumentParser
- ) -> None:
- pg_version = get_targeted_pg_version(parser)
- with c.override(
- create_postgres(
- pg_version=pg_version, extra_command=["-c", "max_wal_senders=0"]
- )
- ):
- c.up("materialized", "postgres")
- c.run_testdrive_files("override/replication-disabled.td")
- def workflow_silent_connection_drop(
- c: Composition, parser: WorkflowArgumentParser
- ) -> None:
- """
- Test that mz can regain a replication slot that is used by another service.
- """
- pg_version = get_targeted_pg_version(parser)
- with c.override(
- create_postgres(
- pg_version=pg_version,
- extra_command=[
- "-c",
- "wal_sender_timeout=0",
- ],
- ),
- ):
- c.up("postgres")
- pg_conn = psycopg.connect(
- host="localhost",
- user="postgres",
- password="postgres",
- port=c.default_port("postgres"),
- )
- _verify_exactly_n_replication_slots_exist(pg_conn, n=0)
- c.up("materialized")
- c.run_testdrive_files(
- "--no-reset",
- f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
- "override/silent-connection-drop-part-1.td",
- )
- _verify_exactly_n_replication_slots_exist(pg_conn, n=1)
- _await_postgres_replication_slot_state(
- pg_conn,
- await_active=False,
- error_message="Replication slot is still active",
- )
- _claim_postgres_replication_slot(c, pg_conn)
- _await_postgres_replication_slot_state(
- pg_conn,
- await_active=True,
- error_message="Replication slot has not been claimed",
- )
- c.run_testdrive_files("--no-reset", "override/silent-connection-drop-part-2.td")
- _verify_exactly_n_replication_slots_exist(pg_conn, n=1)
- def _await_postgres_replication_slot_state(
- pg_conn: Connection, await_active: bool, error_message: str
- ) -> None:
- for i in range(1, 5):
- is_active = _is_postgres_activation_slot_active(pg_conn)
- if is_active == await_active:
- return
- else:
- time.sleep(1)
- raise RuntimeError(error_message)
- def _get_postgres_replication_slot_name(pg_conn: Connection) -> str:
- cursor = pg_conn.cursor()
- cursor.execute("SELECT slot_name FROM pg_replication_slots;")
- return cursor.fetchall()[0][0]
- def _claim_postgres_replication_slot(c: Composition, pg_conn: Connection) -> None:
- replicator = PostgresRecvlogical(
- replication_slot_name=_get_postgres_replication_slot_name(pg_conn),
- publication_name="mz_source",
- )
- with c.override(replicator):
- c.up(replicator.name)
- def _is_postgres_activation_slot_active(pg_conn: Connection) -> bool:
- cursor = pg_conn.cursor()
- cursor.execute("SELECT active FROM pg_replication_slots;")
- is_active = cursor.fetchall()[0][0]
- return is_active
- def _verify_exactly_n_replication_slots_exist(pg_conn: Connection, n: int) -> None:
- cursor = pg_conn.cursor()
- cursor.execute("SELECT count(*) FROM pg_replication_slots;")
- count_slots = cursor.fetchall()[0][0]
- assert (
- count_slots == n
- ), f"Expected {n} replication slot(s) but found {count_slots} slot(s)"
- def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
- pg_version = get_targeted_pg_version(parser)
- parser.add_argument(
- "filter",
- nargs="*",
- default=["*.td"],
- help="limit to only the files matching filter",
- )
- args = parser.parse_args()
- matching_files = []
- for filter in args.filter:
- matching_files.extend(glob.glob(filter, root_dir=MZ_ROOT / "test" / "pg-cdc"))
- sharded_files: list[str] = buildkite.shard_list(
- sorted(matching_files), lambda file: file
- )
- print(f"Files: {sharded_files}")
- c.up({"name": "test-certs", "persistent": True})
- ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout
- ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout
- ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout
- ssl_wrong_cert = c.run(
- "test-certs", "cat", "/secrets/postgres.crt", capture=True
- ).stdout
- ssl_wrong_key = c.run(
- "test-certs", "cat", "/secrets/postgres.key", capture=True
- ).stdout
- with c.override(create_postgres(pg_version=pg_version)):
- c.up("materialized", "test-certs", "postgres")
- c.test_parts(
- sharded_files,
- lambda file: c.run_testdrive_files(
- f"--var=ssl-ca={ssl_ca}",
- f"--var=ssl-cert={ssl_cert}",
- f"--var=ssl-key={ssl_key}",
- f"--var=ssl-wrong-cert={ssl_wrong_cert}",
- f"--var=ssl-wrong-key={ssl_wrong_key}",
- f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
- f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
- file,
- ),
- )
- def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
- """
- The goal is to test a large scale Postgres instance and to make sure that we can successfully ingest data from it quickly.
- """
- pg_version = get_targeted_pg_version(parser)
- with c.override(
- create_postgres(
- pg_version=pg_version, extra_command=["-c", "max_replication_slots=3"]
- )
- ):
- c.up("materialized", "postgres", {"name": "testdrive", "persistent": True})
- # Set up the Postgres server with the initial records, set up the connection to
- # the Postgres server in Materialize.
- c.testdrive(
- dedent(
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- DROP SCHEMA IF EXISTS public CASCADE;
- DROP PUBLICATION IF EXISTS mz_source;
- CREATE SCHEMA public;
- > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
- > CREATE CONNECTION IF NOT EXISTS pg TO POSTGRES (HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass)
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- DROP TABLE IF EXISTS products;
- CREATE TABLE products (id int NOT NULL, name varchar(255) DEFAULT NULL, merchant_id int NOT NULL, price int DEFAULT NULL, status int DEFAULT NULL, created_at timestamp NULL, recordSizePayload text, PRIMARY KEY (id));
- ALTER TABLE products REPLICA IDENTITY FULL;
- CREATE PUBLICATION mz_source FOR ALL TABLES;
- > DROP SOURCE IF EXISTS s1 CASCADE;
- """
- )
- )
- def make_inserts(c: Composition, start: int, batch_num: int):
- c.testdrive(
- args=["--no-reset"],
- input=dedent(
- f"""
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT {start} + row_number() OVER (), 'name' || ({start} + row_number() OVER ()), ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 10, '2024-12-12'::DATE, repeat('x', 1000000) FROM generate_series(1, {batch_num});
- """
- ),
- )
- num_rows = 100_000 # out of memory with 200_000 rows
- batch_size = 10_000
- for i in range(0, num_rows, batch_size):
- batch_num = min(batch_size, num_rows - i)
- make_inserts(c, i, batch_num)
- c.testdrive(
- args=["--no-reset"],
- input=dedent(
- f"""
- > CREATE SOURCE s1
- FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
- > CREATE TABLE products FROM SOURCE s1 (REFERENCE products);
- > SELECT COUNT(*) FROM products;
- {num_rows}
- """
- ),
- )
- make_inserts(c, num_rows, 1)
- c.testdrive(
- args=["--no-reset"],
- input=dedent(
- f"""
- > SELECT COUNT(*) FROM products;
- {num_rows + 1}
- """
- ),
- )
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- def process(name: str) -> None:
- if name in ("default", "large-scale"):
- return
- # TODO: Flaky, reenable when database-issues#7611 is fixed
- if name == "statuses":
- return
- # TODO: Flaky, reenable when database-issues#8447 is fixed
- if name == "silent-connection-drop":
- return
- c.kill("postgres")
- c.rm("postgres")
- c.kill("materialized")
- c.rm("materialized")
- with c.test_case(name):
- c.workflow(name, *parser.args)
- workflows_with_internal_sharding = ["cdc"]
- sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
- [
- w
- for w in c.workflows
- if w not in workflows_with_internal_sharding and w != "migration"
- ],
- lambda w: w,
- )
- print(
- f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
- )
- c.test_parts(sharded_workflows, process)
|