mzcompose.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Runs the Rust-based unit tests in Debug mode.
  11. """
  12. import json
  13. import multiprocessing
  14. import os
  15. import shutil
  16. import subprocess
  17. from materialize import MZ_ROOT, buildkite, rustc_flags, spawn, ui
  18. from materialize.cli.run import SANITIZER_TARGET
  19. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  20. from materialize.mzcompose.services.azurite import Azurite
  21. from materialize.mzcompose.services.clusterd import Clusterd
  22. from materialize.mzcompose.services.kafka import Kafka
  23. from materialize.mzcompose.services.minio import Minio
  24. from materialize.mzcompose.services.postgres import (
  25. CockroachOrPostgresMetadata,
  26. Postgres,
  27. )
  28. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  29. from materialize.mzcompose.services.zookeeper import Zookeeper
  30. from materialize.rustc_flags import Sanitizer
  31. from materialize.util import PropagatingThread
  32. from materialize.xcompile import Arch, target
  33. SERVICES = [
  34. Zookeeper(),
  35. Kafka(
  36. # We need a stable port to advertise, so pick one that is unlikely to
  37. # conflict with a Kafka cluster running on the local machine.
  38. ports=["30123:30123"],
  39. allow_host_ports=True,
  40. environment_extra=[
  41. "KAFKA_ADVERTISED_LISTENERS=HOST://localhost:30123,PLAINTEXT://kafka:9092",
  42. "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=HOST:PLAINTEXT,PLAINTEXT:PLAINTEXT",
  43. ],
  44. ),
  45. SchemaRegistry(),
  46. Postgres(image="postgres:14.2"),
  47. CockroachOrPostgresMetadata(),
  48. Minio(
  49. # We need a stable port exposed to the host since we can't pass any arguments
  50. # to the .pt files used in the tests.
  51. ports=["40109:9000", "40110:9001"],
  52. allow_host_ports=True,
  53. additional_directories=["copytos3"],
  54. ),
  55. Azurite(
  56. ports=["40111:10000"],
  57. allow_host_ports=True,
  58. ),
  59. Clusterd(), # Only to attempt to download the binary
  60. ]
  61. def flatten(xss):
  62. return [x for xs in xss for x in xs]
  63. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  64. parser.add_argument("--miri-full", action="store_true")
  65. parser.add_argument("--miri-fast", action="store_true")
  66. parser.add_argument("args", nargs="*")
  67. args = parser.parse_args()
  68. c.up(
  69. "zookeeper",
  70. "kafka",
  71. "schema-registry",
  72. "postgres",
  73. c.metadata_store(),
  74. "minio",
  75. "azurite",
  76. )
  77. # Heads up: this intentionally runs on the host rather than in a Docker
  78. # image. See database-issues#3739.
  79. postgres_url = (
  80. f"postgres://postgres:postgres@localhost:{c.default_port('postgres')}"
  81. )
  82. cockroach_url = f"postgres://root@localhost:{c.default_port(c.metadata_store())}"
  83. env = dict(
  84. os.environ,
  85. ZOOKEEPER_ADDR=f"localhost:{c.default_port('zookeeper')}",
  86. KAFKA_ADDRS="localhost:30123",
  87. SCHEMA_REGISTRY_URL=f"http://localhost:{c.default_port('schema-registry')}",
  88. POSTGRES_URL=postgres_url,
  89. COCKROACH_URL=cockroach_url,
  90. MZ_SOFT_ASSERTIONS="1",
  91. MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET="mz-test-persist-1d-lifecycle-delete",
  92. MZ_S3_UPLOADER_TEST_S3_BUCKET="mz-test-1d-lifecycle-delete",
  93. MZ_PERSIST_EXTERNAL_STORAGE_TEST_AZURE_CONTAINER="mz-test-azure",
  94. MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL=cockroach_url,
  95. )
  96. coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
  97. sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
  98. extra_env = {}
  99. clusterd_thread: PropagatingThread | None = None
  100. if coverage:
  101. # TODO(def-): For coverage inside of clusterd called from unit tests need
  102. # to set LLVM_PROFILE_FILE in test code invoking clusterd and later
  103. # aggregate the data.
  104. (MZ_ROOT / "coverage").mkdir(exist_ok=True)
  105. env["CARGO_LLVM_COV_SETUP"] = "no"
  106. # There is no pure build command in cargo-llvm-cov, so run with
  107. # --version as a workaround.
  108. spawn.runv(
  109. [
  110. "cargo",
  111. "llvm-cov",
  112. "run",
  113. "--bin",
  114. "clusterd",
  115. "--release",
  116. "--no-report",
  117. "--",
  118. "--version",
  119. ],
  120. env=env,
  121. )
  122. cmd = [
  123. "cargo",
  124. "llvm-cov",
  125. "nextest",
  126. "--release",
  127. "--no-clean",
  128. "--workspace",
  129. "--lcov",
  130. "--output-path",
  131. "coverage/cargotest.lcov",
  132. "--profile=coverage",
  133. # We still want a coverage report on crash
  134. "--ignore-run-fail",
  135. ]
  136. try:
  137. spawn.runv(cmd + args.args, env=env)
  138. finally:
  139. spawn.runv(["zstd", "coverage/cargotest.lcov"])
  140. buildkite.upload_artifact("coverage/cargotest.lcov.zst")
  141. else:
  142. if args.miri_full:
  143. spawn.runv(
  144. [
  145. "bin/ci-builder",
  146. "run",
  147. "nightly",
  148. "ci/test/cargo-test-miri.sh",
  149. ],
  150. env=env,
  151. )
  152. elif args.miri_fast:
  153. spawn.runv(
  154. [
  155. "bin/ci-builder",
  156. "run",
  157. "nightly",
  158. "ci/test/cargo-test-miri-fast.sh",
  159. ],
  160. env=env,
  161. )
  162. else:
  163. if sanitizer != Sanitizer.none:
  164. cflags = [
  165. f"--target={target(Arch.host())}",
  166. f"--gcc-toolchain=/opt/x-tools/{target(Arch.host())}/",
  167. f"--sysroot=/opt/x-tools/{target(Arch.host())}/{target(Arch.host())}/sysroot",
  168. ] + rustc_flags.sanitizer_cflags[sanitizer]
  169. ldflags = cflags + [
  170. "-fuse-ld=lld",
  171. f"-L/opt/x-tools/{target(Arch.host())}/{target(Arch.host())}/lib64",
  172. ]
  173. extra_env = {
  174. "CFLAGS": " ".join(cflags),
  175. "CXXFLAGS": " ".join(cflags),
  176. "LDFLAGS": " ".join(ldflags),
  177. "CXXSTDLIB": "stdc++",
  178. "CC": "cc",
  179. "CXX": "c++",
  180. "CPP": "clang-cpp-18",
  181. "CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER": "cc",
  182. "CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER": "cc",
  183. "PATH": f"/sanshim:/opt/x-tools/{target(Arch.host())}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
  184. "RUSTFLAGS": (
  185. env.get("RUSTFLAGS", "")
  186. + " "
  187. + " ".join(rustc_flags.sanitizer[sanitizer])
  188. ),
  189. "TSAN_OPTIONS": "report_bugs=0", # build-scripts fail
  190. }
  191. spawn.runv(
  192. [
  193. "bin/ci-builder",
  194. "run",
  195. "nightly",
  196. *flatten(
  197. [
  198. ["--env", f"{key}={val}"]
  199. for key, val in extra_env.items()
  200. ]
  201. ),
  202. "cargo",
  203. "build",
  204. "--workspace",
  205. "--no-default-features",
  206. "--bin",
  207. "clusterd",
  208. "-Zbuild-std",
  209. "--target",
  210. SANITIZER_TARGET,
  211. "--profile=ci",
  212. ],
  213. )
  214. else:
  215. assert (
  216. buildkite.get_parallelism_count() <= 2
  217. ), "Special handling of parallelism, only 1 and 2 supported"
  218. if (
  219. buildkite.get_parallelism_count() == 1
  220. or buildkite.get_parallelism_index() == 0
  221. ):
  222. def worker() -> None:
  223. clusterd = c.compose["services"]["clusterd"]
  224. try:
  225. subprocess.run(
  226. ["docker", "pull", clusterd["image"]],
  227. check=True,
  228. capture_output=True,
  229. )
  230. container_id = subprocess.check_output(
  231. ["docker", "create", clusterd["image"]], text=True
  232. ).strip()
  233. target_dir = os.getenv("CARGO_TARGET_DIR", "target") + "/ci"
  234. os.makedirs(target_dir, exist_ok=True)
  235. subprocess.run(
  236. [
  237. "docker",
  238. "cp",
  239. f"{container_id}:/usr/local/bin/clusterd",
  240. target_dir,
  241. ],
  242. check=True,
  243. )
  244. except subprocess.CalledProcessError as e:
  245. print(f"Failed to get clusterd image: {e}")
  246. spawn.runv(
  247. [
  248. "cargo",
  249. "build",
  250. "--workspace",
  251. "--bin",
  252. "clusterd",
  253. "--profile=ci",
  254. ],
  255. env={**env, "CARGO_TARGET_DIR": "target/ci-clusterd"},
  256. )
  257. shutil.copy(
  258. "target/ci-clusterd/ci/clusterd",
  259. os.getenv("CARGO_TARGET_DIR", "target") + "/ci/",
  260. )
  261. clusterd_thread = PropagatingThread(target=worker)
  262. clusterd_thread.start()
  263. spawn.runv(
  264. [
  265. "cargo",
  266. "nextest",
  267. "run",
  268. "--no-run",
  269. "--all-features",
  270. "--cargo-profile=ci",
  271. "--profile=ci",
  272. *(
  273. ["--package=mz-environmentd", "--package=mz-balancerd"]
  274. if buildkite.get_parallelism_count() == 2
  275. else ["--workspace"]
  276. ),
  277. ],
  278. env=env,
  279. )
  280. clusterd_thread.join()
  281. metadata = json.loads(
  282. subprocess.check_output(
  283. ["cargo", "metadata", "--no-deps", "--format-version=1"]
  284. )
  285. )
  286. if sanitizer != Sanitizer.none:
  287. # Can't just use --workspace because of https://github.com/rust-lang/cargo/issues/7160
  288. for pkg in metadata["packages"]:
  289. try:
  290. spawn.runv(
  291. [
  292. "bin/ci-builder",
  293. "run",
  294. "nightly",
  295. *flatten(
  296. [
  297. ["--env", f"{key}={val}"]
  298. for key, val in extra_env.items()
  299. ]
  300. ),
  301. "cargo",
  302. "nextest",
  303. "run",
  304. "--package",
  305. pkg["name"],
  306. "--no-default-features",
  307. "--profile=sanitizer",
  308. "--cargo-profile=ci",
  309. # We want all tests to run
  310. "--no-fail-fast",
  311. "-Zbuild-std",
  312. "--target",
  313. SANITIZER_TARGET,
  314. *args.args,
  315. ],
  316. env=env,
  317. )
  318. except subprocess.CalledProcessError:
  319. print(f"Test against package {pkg['name']} failed, continuing")
  320. else:
  321. pkgs = [
  322. f"--package={p['name']}"
  323. for p in metadata["packages"]
  324. if p["name"] not in ("mz-environmentd", "mz-balancerd")
  325. ]
  326. spawn.runv(
  327. [
  328. "cargo",
  329. "nextest",
  330. "run",
  331. # We want all tests to run
  332. "--no-fail-fast",
  333. "--all-features",
  334. "--profile=ci",
  335. "--cargo-profile=ci",
  336. f"--test-threads={multiprocessing.cpu_count() * 2}",
  337. *(
  338. (
  339. pkgs
  340. if buildkite.get_parallelism_index() == 1
  341. else [
  342. "--package=mz-environmentd",
  343. "--package=mz-balancerd",
  344. ]
  345. )
  346. if buildkite.get_parallelism_count() == 2
  347. else ["--workspace"]
  348. ),
  349. *args.args,
  350. ],
  351. env=env,
  352. )