123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import json
- import random
- from typing import Any
- from materialize import buildkite
- from materialize.mzcompose import DEFAULT_MZ_VOLUMES, cluster_replica_size_map
- from materialize.mzcompose.service import (
- Service,
- ServiceConfig,
- )
- from materialize.mzcompose.services.azurite import azure_blob_uri
- from materialize.mzcompose.services.minio import minio_blob_uri
- from materialize.mzcompose.services.postgres import METADATA_STORE
- class Testdrive(Service):
- def __init__(
- self,
- name: str = "testdrive",
- mzbuild: str = "testdrive",
- materialize_url: str = "postgres://materialize@materialized:6875",
- materialize_url_internal: str = "postgres://materialize@materialized:6877",
- materialize_use_https: bool = False,
- materialize_params: dict[str, str] = {},
- kafka_url: str = "kafka:9092",
- kafka_default_partitions: int | None = None,
- kafka_args: str | None = None,
- schema_registry_url: str = "http://schema-registry:8081",
- no_reset: bool = False,
- default_timeout: str | None = None,
- seed: int | None = None,
- consistent_seed: bool = False,
- validate_catalog_store: bool = False,
- entrypoint: list[str] | None = None,
- entrypoint_extra: list[str] = [],
- environment: list[str] | None = None,
- volumes_extra: list[str] = [],
- volume_workdir: str = ".:/workdir",
- propagate_uid_gid: bool = True,
- forward_buildkite_shard: bool = False,
- aws_region: str | None = None,
- aws_endpoint: str | None = "http://minio:9000",
- aws_access_key_id: str | None = "minioadmin",
- aws_secret_access_key: str | None = "minioadmin",
- no_consistency_checks: bool = False,
- external_metadata_store: bool = False,
- external_blob_store: bool = False,
- blob_store_is_azure: bool = False,
- fivetran_destination: bool = False,
- fivetran_destination_url: str = "http://fivetran-destination:6874",
- fivetran_destination_files_path: str = "/share/tmp",
- mz_service: str = "materialized",
- metadata_store: str = METADATA_STORE,
- stop_grace_period: str = "120s",
- cluster_replica_size: dict[str, dict[str, Any]] | None = None,
- network_mode: str | None = None,
- set_persist_urls: bool = True,
- backoff_factor: float = 1.0,
- ) -> None:
- if cluster_replica_size is None:
- cluster_replica_size = cluster_replica_size_map()
- if environment is None:
- environment = [
- "TMPDIR=/share/tmp",
- "MZ_SOFT_ASSERTIONS=1",
- # Please think twice before forwarding additional environment
- # variables from the host, as it's easy to write tests that are
- # then accidentally dependent on the state of the host machine.
- #
- # To pass arguments to a testdrive script, use the `--var` CLI
- # option rather than environment variables.
- "MZ_LOG_FILTER",
- "AWS_ACCESS_KEY_ID",
- "AWS_SECRET_ACCESS_KEY",
- "AWS_SESSION_TOKEN",
- ]
- environment += [
- f"CLUSTER_REPLICA_SIZES={json.dumps(cluster_replica_size)}",
- "MZ_CI_LICENSE_KEY",
- "LD_PRELOAD=libeatmydata.so",
- ]
- volumes = [
- volume_workdir,
- *(v for v in DEFAULT_MZ_VOLUMES if v.startswith("tmp:")),
- ]
- if volumes_extra:
- volumes.extend(volumes_extra)
- if entrypoint is None:
- entrypoint = [
- "testdrive",
- f"--kafka-addr={kafka_url}",
- f"--schema-registry-url={schema_registry_url}",
- f"--materialize-url={materialize_url}",
- f"--materialize-internal-url={materialize_url_internal}",
- *(["--materialize-use-https"] if materialize_use_https else []),
- # Faster retries
- ]
- entrypoint.append(f"--backoff-factor={backoff_factor}")
- if aws_region:
- entrypoint.append(f"--aws-region={aws_region}")
- if aws_endpoint and not aws_region:
- entrypoint.append(f"--aws-endpoint={aws_endpoint}")
- entrypoint.append(f"--var=aws-endpoint={aws_endpoint}")
- if aws_access_key_id:
- entrypoint.append(f"--aws-access-key-id={aws_access_key_id}")
- entrypoint.append(f"--var=aws-access-key-id={aws_access_key_id}")
- if aws_secret_access_key:
- entrypoint.append(f"--aws-secret-access-key={aws_secret_access_key}")
- entrypoint.append(f"--var=aws-secret-access-key={aws_secret_access_key}")
- if validate_catalog_store:
- entrypoint.append("--validate-catalog-store")
- if no_reset:
- entrypoint.append("--no-reset")
- for k, v in materialize_params.items():
- entrypoint.append(f"--materialize-param={k}={v}")
- if default_timeout is None:
- default_timeout = "20s"
- entrypoint.append(f"--default-timeout={default_timeout}")
- if kafka_default_partitions:
- entrypoint.append(f"--kafka-default-partitions={kafka_default_partitions}")
- if forward_buildkite_shard:
- shard = buildkite.get_parallelism_index()
- shard_count = buildkite.get_parallelism_count()
- entrypoint += [f"--shard={shard}", f"--shard-count={shard_count}"]
- if seed is not None and consistent_seed:
- raise RuntimeError("Can't pass `seed` and `consistent_seed` at same time")
- elif consistent_seed:
- entrypoint.append(f"--seed={random.getrandbits(32)}")
- elif seed is not None:
- entrypoint.append(f"--seed={seed}")
- if no_consistency_checks:
- entrypoint.append("--consistency-checks=disable")
- if fivetran_destination:
- entrypoint.append(f"--fivetran-destination-url={fivetran_destination_url}")
- entrypoint.append(
- f"--fivetran-destination-files-path={fivetran_destination_files_path}"
- )
- if set_persist_urls:
- if external_blob_store:
- blob_store = "azurite" if blob_store_is_azure else "minio"
- address = (
- blob_store if external_blob_store == True else external_blob_store
- )
- persist_blob_url = (
- azure_blob_uri(address)
- if blob_store_is_azure
- else minio_blob_uri(address)
- )
- entrypoint.append(f"--persist-blob-url={persist_blob_url}")
- else:
- entrypoint.append("--persist-blob-url=file:///mzdata/persist/blob")
- if external_metadata_store:
- entrypoint.append(
- "--persist-consensus-url=postgres://root@cockroach:26257?options=--search_path=consensus"
- )
- else:
- entrypoint.append(
- f"--persist-consensus-url=postgres://root@{mz_service}:26257?options=--search_path=consensus"
- )
- entrypoint.extend(entrypoint_extra)
- config: ServiceConfig = {
- "mzbuild": mzbuild,
- "entrypoint": entrypoint,
- "environment": environment,
- "volumes": volumes,
- "propagate_uid_gid": propagate_uid_gid,
- "init": True,
- "stop_grace_period": stop_grace_period,
- }
- if network_mode:
- config["network_mode"] = network_mode
- super().__init__(
- name=name,
- config=config,
- )
|