123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Runs the Rust-based unit tests in Debug mode.
- """
- import json
- import multiprocessing
- import os
- import shutil
- import subprocess
- from materialize import MZ_ROOT, buildkite, rustc_flags, spawn, ui
- from materialize.cli.run import SANITIZER_TARGET
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.azurite import Azurite
- from materialize.mzcompose.services.clusterd import Clusterd
- from materialize.mzcompose.services.kafka import Kafka
- from materialize.mzcompose.services.minio import Minio
- from materialize.mzcompose.services.postgres import (
- CockroachOrPostgresMetadata,
- Postgres,
- )
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.zookeeper import Zookeeper
- from materialize.rustc_flags import Sanitizer
- from materialize.util import PropagatingThread
- from materialize.xcompile import Arch, target
- SERVICES = [
- Zookeeper(),
- Kafka(
- # We need a stable port to advertise, so pick one that is unlikely to
- # conflict with a Kafka cluster running on the local machine.
- ports=["30123:30123"],
- allow_host_ports=True,
- environment_extra=[
- "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
- "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
- ],
- ),
- SchemaRegistry(),
- Postgres(image="postgres:14.2"),
- CockroachOrPostgresMetadata(),
- Minio(
- # We need a stable port exposed to the host since we can't pass any arguments
- # to the .pt files used in the tests.
- ports=["40109:9000", "40110:9001"],
- allow_host_ports=True,
- additional_directories=["copytos3"],
- ),
- Azurite(
- ports=["40111:10000"],
- allow_host_ports=True,
- ),
- Clusterd(), # Only to attempt to download the binary
- ]
- def flatten(xss):
- return [x for xs in xss for x in xs]
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- parser.add_argument("--miri-full", action="store_true")
- parser.add_argument("--miri-fast", action="store_true")
- parser.add_argument("args", nargs="*")
- args = parser.parse_args()
- c.up(
- "zookeeper",
- "kafka",
- "schema-registry",
- "postgres",
- c.metadata_store(),
- "minio",
- "azurite",
- )
- # Heads up: this intentionally runs on the host rather than in a Docker
- # image. See database-issues#3739.
- postgres_url = (
- f"postgres://postgres:postgres@localhost:{c.default_port('postgres')}"
- )
- cockroach_url = f"postgres://root@localhost:{c.default_port(c.metadata_store())}"
- env = dict(
- os.environ,
- ZOOKEEPER_ADDR=f"localhost:{c.default_port('zookeeper')}",
- KAFKA_ADDRS="localhost:30123",
- SCHEMA_REGISTRY_URL=f"http://localhost:{c.default_port('schema-registry')}",
- POSTGRES_URL=postgres_url,
- COCKROACH_URL=cockroach_url,
- MZ_SOFT_ASSERTIONS="1",
- MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET="mz-test-persist-1d-lifecycle-delete",
- MZ_S3_UPLOADER_TEST_S3_BUCKET="mz-test-1d-lifecycle-delete",
- MZ_PERSIST_EXTERNAL_STORAGE_TEST_AZURE_CONTAINER="mz-test-azure",
- MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL=cockroach_url,
- )
- coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
- sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
- extra_env = {}
- clusterd_thread: PropagatingThread | None = None
- if coverage:
- # TODO(def-): For coverage inside of clusterd called from unit tests need
- # to set LLVM_PROFILE_FILE in test code invoking clusterd and later
- # aggregate the data.
- (MZ_ROOT / "coverage").mkdir(exist_ok=True)
- env["CARGO_LLVM_COV_SETUP"] = "no"
- # There is no pure build command in cargo-llvm-cov, so run with
- # --version as a workaround.
- spawn.runv(
- [
- "cargo",
- "llvm-cov",
- "run",
- "--bin",
- "clusterd",
- "--release",
- "--no-report",
- "--",
- "--version",
- ],
- env=env,
- )
- cmd = [
- "cargo",
- "llvm-cov",
- "nextest",
- "--release",
- "--no-clean",
- "--workspace",
- "--lcov",
- "--output-path",
- "coverage/cargotest.lcov",
- "--profile=coverage",
- # We still want a coverage report on crash
- "--ignore-run-fail",
- ]
- try:
- spawn.runv(cmd + args.args, env=env)
- finally:
- spawn.runv(["zstd", "coverage/cargotest.lcov"])
- buildkite.upload_artifact("coverage/cargotest.lcov.zst")
- else:
- if args.miri_full:
- spawn.runv(
- [
- "bin/ci-builder",
- "run",
- "nightly",
- "ci/test/cargo-test-miri.sh",
- ],
- env=env,
- )
- elif args.miri_fast:
- spawn.runv(
- [
- "bin/ci-builder",
- "run",
- "nightly",
- "ci/test/cargo-test-miri-fast.sh",
- ],
- env=env,
- )
- else:
- if sanitizer != Sanitizer.none:
- cflags = [
- f"--target={target(Arch.host())}",
- f"--gcc-toolchain=/opt/x-tools/{target(Arch.host())}/",
- f"--sysroot=/opt/x-tools/{target(Arch.host())}/{target(Arch.host())}/sysroot",
- ] + rustc_flags.sanitizer_cflags[sanitizer]
- ldflags = cflags + [
- "-fuse-ld=lld",
- f"-L/opt/x-tools/{target(Arch.host())}/{target(Arch.host())}/lib64",
- ]
- extra_env = {
- "CFLAGS": " ".join(cflags),
- "CXXFLAGS": " ".join(cflags),
- "LDFLAGS": " ".join(ldflags),
- "CXXSTDLIB": "stdc++",
- "CC": "cc",
- "CXX": "c++",
- "CPP": "clang-cpp-18",
- "CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER": "cc",
- "CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER": "cc",
- "PATH": f"/sanshim:/opt/x-tools/{target(Arch.host())}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
- "RUSTFLAGS": (
- env.get("RUSTFLAGS", "")
- + " "
- + " ".join(rustc_flags.sanitizer[sanitizer])
- ),
- "TSAN_OPTIONS": "report_bugs=0", # build-scripts fail
- }
- spawn.runv(
- [
- "bin/ci-builder",
- "run",
- "nightly",
- *flatten(
- [
- ["--env", f"{key}={val}"]
- for key, val in extra_env.items()
- ]
- ),
- "cargo",
- "build",
- "--workspace",
- "--no-default-features",
- "--bin",
- "clusterd",
- "-Zbuild-std",
- "--target",
- SANITIZER_TARGET,
- "--profile=ci",
- ],
- )
- else:
- assert (
- buildkite.get_parallelism_count() <= 2
- ), "Special handling of parallelism, only 1 and 2 supported"
- if (
- buildkite.get_parallelism_count() == 1
- or buildkite.get_parallelism_index() == 0
- ):
- def worker() -> None:
- clusterd = c.compose["services"]["clusterd"]
- try:
- subprocess.run(
- ["docker", "pull", clusterd["image"]],
- check=True,
- capture_output=True,
- )
- container_id = subprocess.check_output(
- ["docker", "create", clusterd["image"]], text=True
- ).strip()
- target_dir = os.getenv("CARGO_TARGET_DIR", "target") + "/ci"
- os.makedirs(target_dir, exist_ok=True)
- subprocess.run(
- [
- "docker",
- "cp",
- f"{container_id}:/usr/local/bin/clusterd",
- target_dir,
- ],
- check=True,
- )
- except subprocess.CalledProcessError as e:
- print(f"Failed to get clusterd image: {e}")
- spawn.runv(
- [
- "cargo",
- "build",
- "--workspace",
- "--bin",
- "clusterd",
- "--profile=ci",
- ],
- env={**env, "CARGO_TARGET_DIR": "target/ci-clusterd"},
- )
- shutil.copy(
- "target/ci-clusterd/ci/clusterd",
- os.getenv("CARGO_TARGET_DIR", "target") + "/ci/",
- )
- clusterd_thread = PropagatingThread(target=worker)
- clusterd_thread.start()
- spawn.runv(
- [
- "cargo",
- "nextest",
- "run",
- "--no-run",
- "--all-features",
- "--cargo-profile=ci",
- "--profile=ci",
- *(
- ["--package=mz-environmentd", "--package=mz-balancerd"]
- if buildkite.get_parallelism_count() == 2
- else ["--workspace"]
- ),
- ],
- env=env,
- )
- clusterd_thread.join()
- metadata = json.loads(
- subprocess.check_output(
- ["cargo", "metadata", "--no-deps", "--format-version=1"]
- )
- )
- if sanitizer != Sanitizer.none:
- # Can't just use --workspace because of https://github.com/rust-lang/cargo/issues/7160
- for pkg in metadata["packages"]:
- try:
- spawn.runv(
- [
- "bin/ci-builder",
- "run",
- "nightly",
- *flatten(
- [
- ["--env", f"{key}={val}"]
- for key, val in extra_env.items()
- ]
- ),
- "cargo",
- "nextest",
- "run",
- "--package",
- pkg["name"],
- "--no-default-features",
- "--profile=sanitizer",
- "--cargo-profile=ci",
- # We want all tests to run
- "--no-fail-fast",
- "-Zbuild-std",
- "--target",
- SANITIZER_TARGET,
- *args.args,
- ],
- env=env,
- )
- except subprocess.CalledProcessError:
- print(f"Test against package {pkg['name']} failed, continuing")
- else:
- pkgs = [
- f"--package={p['name']}"
- for p in metadata["packages"]
- if p["name"] not in ("mz-environmentd", "mz-balancerd")
- ]
- spawn.runv(
- [
- "cargo",
- "nextest",
- "run",
- # We want all tests to run
- "--no-fail-fast",
- "--all-features",
- "--profile=ci",
- "--cargo-profile=ci",
- f"--test-threads={multiprocessing.cpu_count() * 2}",
- *(
- (
- pkgs
- if buildkite.get_parallelism_index() == 1
- else [
- "--package=mz-environmentd",
- "--package=mz-balancerd",
- ]
- )
- if buildkite.get_parallelism_count() == 2
- else ["--workspace"]
- ),
- *args.args,
- ],
- env=env,
- )
|