123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Verify how much storage is being used, based on mz_storage_usage and
- mz_recent_storage_usage.
- """
- import time
- from dataclasses import dataclass
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.redpanda import Redpanda
- from materialize.mzcompose.services.testdrive import Testdrive
- COLLECTION_INTERVAL_SECS = 5
- PG_CDC_SETUP = dedent(
- """
- > CREATE SECRET pgpass AS 'postgres'
- > CREATE CONNECTION pg TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass
- )
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- DROP SCHEMA IF EXISTS public CASCADE;
- CREATE SCHEMA public;
- DROP PUBLICATION IF EXISTS mz_source;
- CREATE PUBLICATION mz_source FOR ALL TABLES;
- """
- )
- KAFKA_SETUP = dedent(
- """
- > CREATE CONNECTION IF NOT EXISTS kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- );
- $ set key-schema={"type": "string"}
- $ set value-schema={"type": "record", "name": "r", "fields": [{"name": "a", "type": "string"}]}
- """
- )
- SERVICES = [
- Redpanda(),
- Postgres(),
- Materialized(
- environment_extra=[
- f"MZ_STORAGE_USAGE_COLLECTION_INTERVAL={COLLECTION_INTERVAL_SECS}s"
- ],
- additional_system_parameter_defaults={"persist_rollup_threshold": "20"},
- ),
- Testdrive(default_timeout="120s", no_reset=True),
- ]
- @dataclass
- class DatabaseObject:
- name: str
- testdrive: str
- expected_size: int
- database_objects = [
- DatabaseObject(
- name="table_insert_unique_rows",
- testdrive=dedent(
- """
- > CREATE TABLE obj (f1 TEXT)
- > INSERT INTO obj SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
- """
- ),
- expected_size=1024 * 1024,
- ),
- # Identical rows should cause a diff > 1 and not be stored individually
- DatabaseObject(
- name="table_insert_identical_rows",
- testdrive=dedent(
- """
- > CREATE TABLE obj (f1 TEXT)
- > INSERT INTO obj SELECT REPEAT('x', 1024 * 1024) FROM generate_series(1, 1024)
- """
- ),
- expected_size=1024 * 1024,
- ),
- # Deleted/updated rows should be garbage-collected
- # https://github.com/MaterializeInc/database-issues/issues/4313
- # DatabaseObject(
- # name="table_delete",
- # testdrive=dedent(
- # f"""
- # > CREATE TABLE obj (f1 TEXT)
- # > INSERT INTO obj SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
- # > SELECT mz_unsafe.mz_sleep({COLLECTION_INTERVAL_SECS} + 1)
- # <null>
- # > DELETE FROM obj;
- # """
- # ),
- # expected_size=???,
- # ),
- # DatabaseObject(
- # name="upsert_update",
- # testdrive=KAFKA_SETUP+ dedent(
- # f"""
- # $ kafka-create-topic topic=upsert-update
- #
- # $ kafka-ingest format=avro topic=upsert-update key-format=avro key-schema=${{key-schema}} schema=${{value-schema}}
- # "${{kafka-ingest.iteration}}" {{"a": "0"}}
- #
- # > CREATE SOURCE obj
- # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-update-${{testdrive.seed}}')
- #
- # > CREATE TABLE obj_tbl
- # FROM SOURCE obj (REFERENCE "testdrive-upsert-update-${{testdrive.seed}}")
- # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- # ENVELOPE UPSERT
- # """) + "\n".join([dedent(
- # f"""
- # $ kafka-ingest format=avro topic=upsert-update key-format=avro key-schema=${{key-schema}} schema=${{value-schema}} repeat=5000000
- # "${{kafka-ingest.iteration}}" {{"a": "{i}"}}
- # """
- # ) for i in range(1,11)]) + dedent(
- # """
- # > SELECT COUNT(*) FROM obj_tbl WHERE a::integer = 10;
- # 5000000
- # """
- # ),
- # expected_size=???,
- # ),
- DatabaseObject(
- name="materialized_view_constant",
- testdrive=dedent(
- """
- > CREATE MATERIALIZED VIEW obj AS SELECT generate_series::text , REPEAT('x', 1024) FROM generate_series(1, 1024)
- """
- ),
- # Dictionary encoding in Persist greatly reduces the size of repeated characters.
- expected_size=1024 * 10,
- ),
- # If a materialized view returns a small number of rows,
- # it should not require storage proportional to its input
- DatabaseObject(
- name="materialized_view_small_output",
- testdrive=dedent(
- """
- > CREATE TABLE t1 (f1 TEXT)
- > INSERT INTO t1 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
- > CREATE MATERIALIZED VIEW obj AS SELECT COUNT(*) FROM t1;
- """
- ),
- expected_size=4 * 1024,
- ),
- # The pg-cdc source is expected to be empty. The data is in the sub-source
- DatabaseObject(
- name="pg_cdc_source",
- testdrive=PG_CDC_SETUP
- + dedent(
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- CREATE TABLE pg_table (f1 TEXT);
- INSERT INTO pg_table SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024);
- ALTER TABLE pg_table REPLICA IDENTITY FULL;
- > CREATE SOURCE obj
- FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
- > CREATE TABLE pg_table FROM SOURCE obj (REFERENCE pg_table);
- """
- ),
- expected_size=4 * 1024,
- ),
- # The pg-cdc data is expected to be in the sub-source,
- # unaffected by the presence of other tables
- DatabaseObject(
- name="pg_cdc_subsource",
- testdrive=PG_CDC_SETUP
- + dedent(
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- CREATE TABLE pg_table1 (f1 TEXT);
- INSERT INTO pg_table1 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024);
- ALTER TABLE pg_table1 REPLICA IDENTITY FULL;
- CREATE TABLE pg_table2 (f1 TEXT);
- INSERT INTO pg_table2 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
- ALTER TABLE pg_table2 REPLICA IDENTITY FULL;
- CREATE TABLE pg_table3 (f1 TEXT);
- INSERT INTO pg_table3 SELECT generate_series::text || REPEAT('x', 1024) FROM generate_series(1, 1024)
- ALTER TABLE pg_table3 REPLICA IDENTITY FULL;
- > CREATE SOURCE pg_source
- FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
- > CREATE TABLE obj FROM SOURCE pg_source (REFERENCE pg_table1);
- > SELECT COUNT(*) FROM obj;
- 1024
- """
- ),
- expected_size=1024 * 1024,
- ),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """Create various database objects and confirm that their storage
- as reported in the mz_storage_usage table are as expected.
- """
- parser.add_argument("tests", nargs="*", default=None, help="run specified tests")
- args = parser.parse_args()
- c.up(
- "redpanda",
- "postgres",
- "materialized",
- {"name": "testdrive", "persistent": True},
- )
- for database_object in database_objects:
- if (
- args.tests is not None
- and len(args.tests) > 0
- and database_object.name not in args.tests
- ):
- continue
- print(f"Running scenario {database_object.name} ...")
- c.testdrive(
- dedent(
- """
- $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
- DROP SCHEMA IF EXISTS public CASCADE;
- CREATE SCHEMA public;
- GRANT ALL PRIVILEGES ON SCHEMA public TO materialize;
- """
- )
- )
- c.testdrive(database_object.testdrive)
- # Make sure the storage is fully accounted for
- print(
- f"Sleeping for {COLLECTION_INTERVAL_SECS + 1} seconds so that collection kicks in ..."
- )
- time.sleep(COLLECTION_INTERVAL_SECS + 1)
- c.testdrive(
- dedent(
- f"""
- $ set-regex match=\\d+ replacement=<SIZE>
- # Select the raw size as well, so if this errors in testdrive, its easier to debug.
- > SELECT size_bytes, size_bytes BETWEEN {database_object.expected_size//3} AND {database_object.expected_size*3}
- FROM mz_storage_usage
- WHERE collection_timestamp = ( SELECT MAX(collection_timestamp) FROM mz_storage_usage )
- AND object_id = ( SELECT id FROM mz_objects WHERE name = 'obj' );
- <SIZE> true
- > SELECT size_bytes, size_bytes BETWEEN {database_object.expected_size//3} AND {database_object.expected_size*3}
- FROM mz_recent_storage_usage
- WHERE object_id = ( SELECT id FROM mz_objects WHERE name = 'obj' );
- <SIZE> true
- """
- )
- )
|