123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Test Materialize with the Random Query Generator (grammar-based):
- https://github.com/MaterializeInc/RQG/ Can find query errors and panics, but
- not correctness.
- """
- import argparse
- from dataclasses import dataclass
- from enum import Enum
- from materialize.mzcompose.composition import (
- Composition,
- Service,
- WorkflowArgumentParser,
- )
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.rqg import RQG
- from materialize.version_ancestor_overrides import (
- ANCESTOR_OVERRIDES_FOR_CORRECTNESS_REGRESSIONS,
- )
- from materialize.version_list import resolve_ancestor_image_tag
- SERVICES = [
- RQG(),
- Materialized(name="mz_this", default_replication_factor=2),
- Materialized(name="mz_other", default_replication_factor=2),
- Postgres(),
- ]
- class Dataset(Enum):
- SIMPLE = 1
- DBT3 = 2
- STAR_SCHEMA = 3
- def files(self) -> list[str]:
- match self:
- case Dataset.SIMPLE:
- return ["conf/mz/simple.sql"]
- case Dataset.DBT3:
- # With Postgres, CREATE MATERIALZIED VIEW from dbt3-ddl.sql will produce
- # a view thats is empty unless REFRESH MATERIALIZED VIEW from dbt3-ddl-refresh-mvs.sql
- # is also run after the data has been loaded by dbt3-s0.0001.dump
- return [
- "conf/mz/dbt3-ddl.sql",
- "conf/mz/dbt3-s0.0001.dump",
- "conf/mz/dbt3-ddl-refresh-mvs.sql",
- ]
- case Dataset.STAR_SCHEMA:
- return ["/workdir/datasets/star_schema.sql"]
- case _:
- raise RuntimeError(f"Not handled: {self}")
- class ReferenceImplementation(Enum):
- MATERIALIZE = 1
- POSTGRES = 2
- def dsn(self) -> str:
- match self:
- case ReferenceImplementation.MATERIALIZE:
- return "dbname=materialize;host=mz_other;user=materialize;port=6875"
- case ReferenceImplementation.POSTGRES:
- return "dbname=postgres;host=postgres;user=postgres;password=postgres"
- case _:
- raise RuntimeError("Unsupported case")
- @dataclass
- class Workload:
- name: str
- # All paths are relative to the CWD of the rqg container, which is /RQG and contains
- # a checked-out copy of the MaterializeInc/RQG repository
- # Use /workdir/file-name-goes-here.yy for files located in test/rqg
- grammar: str
- reference_implementation: ReferenceImplementation | None
- dataset: Dataset | None = None
- duration: int = 30 * 60
- queries: int = 100000000
- disabled: bool = False
- threads: int = 4
- validator: str | None = None
- def dataset_files(self) -> list[str]:
- return self.dataset.files() if self.dataset is not None else []
- WORKLOADS = [
- Workload(
- name="simple-aggregates",
- dataset=Dataset.SIMPLE,
- grammar="conf/mz/simple-aggregates.yy",
- reference_implementation=ReferenceImplementation.POSTGRES,
- validator="ResultsetComparatorSimplify",
- ),
- Workload(
- name="lateral-joins",
- dataset=Dataset.SIMPLE,
- grammar="conf/mz/lateral-joins.yy",
- reference_implementation=ReferenceImplementation.POSTGRES,
- validator="ResultsetComparatorSimplify",
- ),
- Workload(
- name="dbt3-joins",
- dataset=Dataset.DBT3,
- grammar="conf/mz/dbt3-joins.yy",
- reference_implementation=ReferenceImplementation.POSTGRES,
- validator="ResultsetComparatorSimplify",
- ),
- Workload(
- name="subqueries",
- dataset=Dataset.SIMPLE,
- grammar="conf/mz/subqueries.yy",
- reference_implementation=ReferenceImplementation.POSTGRES,
- validator="ResultsetComparatorSimplify",
- ),
- Workload(
- name="window-functions",
- dataset=Dataset.SIMPLE,
- grammar="conf/mz/window-functions.yy",
- reference_implementation=ReferenceImplementation.POSTGRES,
- validator="ResultsetComparatorSimplify",
- ),
- Workload(
- name="wmr",
- grammar="conf/mz/with-mutually-recursive.yy",
- # Postgres does not support WMR, so our only hope for a comparison
- # test is to use a previous Mz version via --other-tag=...
- reference_implementation=ReferenceImplementation.MATERIALIZE,
- validator="ResultsetComparatorSimplify",
- # See https://github.com/MaterializeInc/database-issues/issues/9439
- threads=1,
- ),
- Workload(
- # A workload that performs DML that preserve the dataset's invariants
- # and also checks that those invariants are not violated
- name="banking",
- grammar="conf/mz/banking.yy",
- reference_implementation=None,
- validator="QueryProperties,RepeatableRead",
- ),
- # Added as part of MaterializeInc/database-issues#7561.
- Workload(
- name="left-join-stacks",
- dataset=Dataset.STAR_SCHEMA,
- grammar="/workdir/grammars/left_join_stacks.yy",
- reference_implementation=ReferenceImplementation.POSTGRES,
- validator="ResultsetComparatorSimplify",
- queries=1000, # Reduced no. of queries because the grammar is quite focused.
- ),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument(
- "--this-tag",
- help="Run Materialize with this git tag on port 16875",
- )
- parser.add_argument(
- "--other-tag",
- action=StoreOtherTag,
- help="Run Materialize with this git tag on port 26875 (for workloads that compare two MZ instances)",
- )
- parser.add_argument(
- "--grammar",
- type=str,
- help="Override the default grammar of the workload",
- )
- parser.add_argument(
- "--dataset",
- type=str,
- action="append",
- help="Override the dataset files for the workload",
- )
- parser.add_argument(
- "--starting-rule",
- type=str,
- help="Override the default starting-rule for the workload",
- )
- parser.add_argument(
- "--duration",
- type=int,
- help="Run the Workload for the specified time in seconds",
- )
- parser.add_argument(
- "--queries",
- type=int,
- help="Run the Workload for the specified number of queries",
- )
- parser.add_argument(
- "--threads",
- type=int,
- help="Run the Workload with the specified number of concurrent threads",
- )
- parser.add_argument(
- "--sqltrace", action="store_true", help="Print all generated SQL statements"
- )
- parser.add_argument(
- "--skip-recursive-rules",
- action="store_true",
- help="Generate simpler queries by avoiding recursive productions",
- )
- parser.add_argument(
- "--seed",
- metavar="SEED",
- type=str,
- help="Random seed to use.",
- )
- parser.add_argument(
- "workloads", nargs="*", default=None, help="Run specified workloads"
- )
- args = parser.parse_args()
- c.up({"name": "rqg", "persistent": True})
- if args.workloads is not None and len(args.workloads) > 0:
- workloads_to_run = [w for w in WORKLOADS if w.name in args.workloads]
- else:
- workloads_to_run = [w for w in WORKLOADS if not w.disabled]
- assert (
- len(workloads_to_run) > 0
- ), f"No matching workloads found (args was {args.workloads})"
- for workload in workloads_to_run:
- print(f"--- Running workload {workload.name}: {workload} ...")
- run_workload(c, args, workload)
- def run_workload(c: Composition, args: argparse.Namespace, workload: Workload) -> None:
- def materialize_image(tag: str | None) -> str | None:
- return f"materialize/materialized:{tag}" if tag else None
- # A list of psql-compatible services participating in the test
- participants: list[Service] = [
- Materialized(
- name="mz_this",
- ports=["16875:6875", "16876:6876", "16877:6877", "16878:6878"],
- image=materialize_image(args.this_tag),
- use_default_volumes=False,
- default_replication_factor=2,
- ),
- ]
- # A list of psql URLs for dataset initialization
- psql_urls = ["postgresql://materialize@mz_this:6875/materialize"]
- # If we have --other-tag, assume we want to run a comparison test against Materialize
- reference_impl = (
- ReferenceImplementation.MATERIALIZE
- if args.other_tag and workload.reference_implementation is not None
- else workload.reference_implementation
- )
- match reference_impl:
- case ReferenceImplementation.MATERIALIZE:
- participants.append(
- Materialized(
- name="mz_other",
- image=materialize_image(args.other_tag),
- ports=["26875:6875", "26876:6876", "26877:6877", "26878:6878"],
- use_default_volumes=False,
- default_replication_factor=2,
- )
- )
- psql_urls.append("postgresql://materialize@mz_other:6875/materialize")
- case ReferenceImplementation.POSTGRES:
- participants.append(Postgres(ports=["15432:5432"]))
- psql_urls.append("postgresql://postgres:postgres@postgres/postgres")
- case None:
- pass
- case _:
- raise RuntimeError(
- f"Unsupported reference implementation: {reference_impl}"
- )
- dsn1 = "dbi:Pg:dbname=materialize;host=mz_this;user=materialize;port=6875"
- dsn2 = f"dbi:Pg:{reference_impl.dsn()}" if reference_impl else None
- dataset = args.dataset if args.dataset is not None else workload.dataset_files()
- grammar = str(args.grammar) if args.grammar is not None else workload.grammar
- queries = int(args.queries) if args.queries is not None else workload.queries
- threads = int(args.threads) if args.threads is not None else workload.threads
- duration = int(args.duration) if args.duration is not None else workload.duration
- with c.override(*participants):
- try:
- c.up(*[p.name for p in participants])
- for file in dataset:
- for psql_url in psql_urls:
- print(f"--- Populating {psql_url} with {file} ...")
- c.exec("rqg", "bash", "-c", f"psql -f {file} {psql_url}")
- c.exec(
- "rqg",
- "perl",
- "gentest.pl",
- f"--seed={args.seed}",
- f"--dsn1={dsn1}",
- f"--dsn2={dsn2}" if dsn2 else "",
- f"--grammar={grammar}",
- f"--queries={queries}",
- f"--threads={threads}",
- f"--duration={duration}",
- f"--validator={workload.validator}" if workload.validator else "",
- f"--starting-rule={args.starting_rule}" if args.starting_rule else "",
- "--sqltrace" if args.sqltrace else "",
- "--skip-recursive-rules" if args.skip_recursive_rules else "",
- )
- finally:
- c.capture_logs()
- class StoreOtherTag(argparse.Action):
- """Resolve common ancestor during argument parsing"""
- def __call__(self, parser, namespace, values, option_string=None):
- if values == "common-ancestor":
- tag = resolve_ancestor_image_tag(
- ANCESTOR_OVERRIDES_FOR_CORRECTNESS_REGRESSIONS
- )
- print(f"Resolving --other-tag to {tag}")
- else:
- tag = str(values)
- setattr(namespace, self.dest, tag)
|