123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Testdrive is the basic framework and language for defining product tests under
- the expected-result/actual-result (aka golden testing) paradigm. A query is
- retried until it produces the desired result.
- """
- import glob
- import os
- from materialize import MZ_ROOT, buildkite, ci_util
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.azurite import Azurite
- from materialize.mzcompose.services.fivetran_destination import FivetranDestination
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.minio import Minio
- from materialize.mzcompose.services.mysql import MySql
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.redpanda import Redpanda
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.zookeeper import Zookeeper
- SERVICES = [
- Zookeeper(),
- Kafka(),
- SchemaRegistry(),
- Redpanda(),
- Postgres(),
- MySql(),
- Azurite(),
- Mz(app_password=""),
- Minio(setup_materialize=True, additional_directories=["copytos3"]),
- Materialized(external_blob_store=True),
- FivetranDestination(volumes_extra=["tmp:/share/tmp"]),
- Testdrive(external_blob_store=True),
- ]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """Run testdrive."""
- parser.add_argument(
- "--slow",
- action="store_true",
- help="include slow tests (usually only in Nightly)",
- )
- parser.add_argument(
- "--redpanda",
- action="store_true",
- help="run against Redpanda instead of the Confluent Platform",
- )
- parser.add_argument(
- "--aws-region",
- help="run against the specified AWS region instead of localstack",
- )
- parser.add_argument(
- "--kafka-default-partitions",
- type=int,
- metavar="N",
- help="set the default number of kafka partitions per topic",
- )
- parser.add_argument(
- "--default-size",
- type=int,
- default=Materialized.Size.DEFAULT_SIZE,
- help="Use SIZE 'N-N' for replicas and SIZE 'N' for sources",
- )
- parser.add_argument(
- "--system-param",
- type=str,
- action="append",
- nargs="*",
- help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
- )
- parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas")
- parser.add_argument(
- "--default-timeout",
- type=str,
- help="set the default timeout for Testdrive",
- )
- parser.add_argument(
- "--rewrite-results",
- action="store_true",
- help="Rewrite results, disables junit reports",
- )
- parser.add_argument(
- "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
- )
- parser.add_argument(
- "files",
- nargs="*",
- default=["*.td"],
- help="run against the specified files",
- )
- (args, passthrough_args) = parser.parse_known_args()
- dependencies = [
- "fivetran-destination",
- "materialized",
- "postgres",
- "mysql",
- "minio",
- ]
- if args.redpanda:
- dependencies += ["redpanda"]
- else:
- dependencies += ["zookeeper", "kafka", "schema-registry"]
- additional_system_parameter_defaults = {"default_cluster_replication_factor": "1"}
- for val in args.system_param or []:
- x = val[0].split("=", maxsplit=1)
- assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
- additional_system_parameter_defaults[x[0]] = x[1]
- materialized = Materialized(
- default_size=args.default_size,
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- additional_system_parameter_defaults=additional_system_parameter_defaults,
- default_replication_factor=1,
- )
- testdrive = Testdrive(
- kafka_default_partitions=args.kafka_default_partitions,
- aws_region=args.aws_region,
- validate_catalog_store=True,
- default_timeout=args.default_timeout,
- volumes_extra=["mzdata:/mzdata"],
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- fivetran_destination=True,
- fivetran_destination_files_path="/share/tmp",
- entrypoint_extra=[
- f"--var=uses-redpanda={args.redpanda}",
- ],
- )
- with c.override(testdrive, materialized):
- c.up(*dependencies, {"name": "testdrive", "persistent": True})
- c.sql(
- "ALTER SYSTEM SET max_clusters = 50;",
- port=6877,
- user="mz_system",
- )
- non_default_testdrive_vars = []
- if args.replicas > 1:
- c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
- # Make sure a replica named 'r1' always exists
- replica_names = [
- "r1" if replica_id == 0 else f"replica{replica_id}"
- for replica_id in range(0, args.replicas)
- ]
- replica_string = ",".join(
- f"{replica_name} (SIZE '{materialized.default_replica_size}')"
- for replica_name in replica_names
- )
- c.sql(
- f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
- user="mz_system",
- port=6877,
- )
- # Note that any command that outputs SHOW CLUSTERS will have output
- # that depends on the number of replicas testdrive has. This means
- # it might be easier to skip certain tests if the number of replicas
- # is > 1.
- c.sql(
- f"""
- CREATE CLUSTER testdrive_single_replica_cluster SIZE = '{materialized.default_replica_size}';
- GRANT ALL PRIVILEGES ON CLUSTER testdrive_single_replica_cluster TO materialize;
- """,
- user="mz_system",
- port=6877,
- )
- non_default_testdrive_vars.append(f"--var=replicas={args.replicas}")
- non_default_testdrive_vars.append(
- "--var=single-replica-cluster=testdrive_single_replica_cluster"
- )
- if args.default_size != 1:
- non_default_testdrive_vars.append(
- f"--var=default-replica-size={materialized.default_replica_size}"
- )
- non_default_testdrive_vars.append(
- f"--var=default-storage-size={materialized.default_storage_size}"
- )
- print(f"Passing through arguments to testdrive {passthrough_args}\n")
- # do not set default args, they should be set in the td file using set-arg-default to easen the execution
- # without mzcompose
- def process(file: str) -> None:
- if not args.slow and file in (
- "fivetran-destination.td",
- "materialized-view-refresh-options.td",
- "upsert-source-race.td",
- ):
- return
- junit_report = ci_util.junit_report_filename(f"{c.name}_{file}")
- c.run_testdrive_files(
- (
- "--rewrite-results"
- if args.rewrite_results
- else f"--junit-report={junit_report}"
- ),
- *non_default_testdrive_vars,
- *passthrough_args,
- file,
- )
- # Uploading successful junit files wastes time and contains no useful information
- os.remove(f"test/testdrive/{junit_report}")
- files = buildkite.shard_list(
- sorted(
- [
- file
- for pattern in args.files
- for file in glob.glob(
- pattern, root_dir=MZ_ROOT / "test" / "testdrive"
- )
- ]
- ),
- lambda file: file,
- )
- c.test_parts(files, process)
- c.sanity_restart_mz()
|