run.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # run.py — build and run a core service or test.
  11. import argparse
  12. import atexit
  13. import getpass
  14. import json
  15. import os
  16. import pathlib
  17. import shlex
  18. import shutil
  19. import signal
  20. import subprocess
  21. import sys
  22. import tempfile
  23. import time
  24. import uuid
  25. from datetime import datetime, timedelta
  26. from urllib.parse import parse_qsl, urlparse
  27. import pg8000.exceptions
  28. import pg8000.native
  29. import psutil
  30. from materialize import MZ_ROOT, rustc_flags, spawn, ui
  31. from materialize.bazel import remote_cache_arg
  32. from materialize.build_config import BuildConfig
  33. from materialize.mzcompose import (
  34. bootstrap_cluster_replica_size,
  35. cluster_replica_size_map,
  36. get_default_system_parameters,
  37. )
  38. from materialize.ui import UIError
  39. from materialize.xcompile import Arch
  40. KNOWN_PROGRAMS = ["environmentd", "sqllogictest"]
  41. REQUIRED_SERVICES = ["clusterd"]
  42. SANITIZER_TARGET = (
  43. f"{Arch.host()}-unknown-linux-gnu"
  44. if sys.platform.startswith("linux")
  45. else f"{Arch.host()}-apple-darwin"
  46. )
  47. DEFAULT_POSTGRES = "postgres://root@localhost:26257/materialize"
  48. MZDATA = MZ_ROOT / "mzdata"
  49. DEFAULT_BLOB = f"file://{MZDATA}/persist/blob"
  50. # sets entitlements on the built binary, e.g. environmentd, so you can inspect it with Instruments
  51. MACOS_ENTITLEMENTS_DATA = """
  52. <?xml version="1.0" encoding="UTF-8"?>
  53. <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
  54. <plist version="1.0">
  55. <dict>
  56. <key>com.apple.security.get-task-allow</key>
  57. <true/>
  58. </dict>
  59. </plist>
  60. """
  61. def main() -> int:
  62. parser = argparse.ArgumentParser(
  63. prog="run",
  64. description="""Build and run a core service or test.
  65. Wraps `cargo run` and `cargo test` with Materialize-specific logic.""",
  66. )
  67. parser.add_argument(
  68. "program",
  69. help="the name of the program to run",
  70. choices=[*KNOWN_PROGRAMS, "test"],
  71. )
  72. parser.add_argument(
  73. "args",
  74. help="Arguments to pass to the program",
  75. nargs="*",
  76. )
  77. parser.add_argument(
  78. "--reset",
  79. help="Delete data from prior runs of the program",
  80. action="store_true",
  81. )
  82. parser.add_argument(
  83. "--postgres",
  84. help="Postgres/CockroachDB connection string",
  85. default=os.getenv("MZDEV_POSTGRES", DEFAULT_POSTGRES),
  86. )
  87. parser.add_argument(
  88. "--blob",
  89. help="Blob storage connection string",
  90. default=os.getenv("MZDEV_BLOB", DEFAULT_BLOB),
  91. )
  92. parser.add_argument(
  93. "--release",
  94. help="Build artifacts in release mode, with optimizations",
  95. action="store_true",
  96. )
  97. parser.add_argument(
  98. "--optimized",
  99. help="Build artifacts in optimized mode, with optimizations (but no LTO and debug symbols)",
  100. action="store_true",
  101. )
  102. parser.add_argument(
  103. "--timings",
  104. help="Output timing information",
  105. action="store_true",
  106. )
  107. parser.add_argument(
  108. "--features",
  109. help="Comma separated list of features to activate",
  110. )
  111. parser.add_argument(
  112. "--no-default-features",
  113. help="Do not activate the `default` feature",
  114. action="store_true",
  115. )
  116. parser.add_argument(
  117. "-p",
  118. "--package",
  119. help="Package to run tests for",
  120. action="append",
  121. default=[],
  122. )
  123. parser.add_argument(
  124. "--test",
  125. help="Test only the specified test target",
  126. action="append",
  127. default=[],
  128. )
  129. parser.add_argument(
  130. "--tokio-console",
  131. help="Activate the Tokio console",
  132. action="store_true",
  133. )
  134. parser.add_argument(
  135. "--build-only",
  136. help="Only build, don't run",
  137. action="store_true",
  138. )
  139. parser.add_argument(
  140. "--enable-mac-codesigning",
  141. help="Enables the limited codesigning we do on macOS to support Instruments",
  142. action="store_true",
  143. )
  144. parser.add_argument(
  145. "--coverage",
  146. help="Build with coverage",
  147. default=False,
  148. action="store_true",
  149. )
  150. parser.add_argument(
  151. "--sanitizer",
  152. help="Build with sanitizer",
  153. type=str,
  154. default="none",
  155. )
  156. parser.add_argument(
  157. "--wrapper",
  158. help="Wrapper command for the program",
  159. )
  160. parser.add_argument(
  161. "--monitoring",
  162. help="Automatically send monitoring data.",
  163. default=False,
  164. action="store_true",
  165. )
  166. parser.add_argument(
  167. "--listeners-config-path",
  168. help="Path to json file with environmentd listeners configuration.",
  169. default=f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth.json",
  170. )
  171. parser.add_argument(
  172. "--bazel",
  173. help="Build with Bazel (not supported by all options)",
  174. default=False,
  175. action="store_true",
  176. )
  177. args = parser.parse_intermixed_args()
  178. # Handle `+toolchain` like rustup.
  179. args.channel = None
  180. if len(args.args) > 0 and args.args[0].startswith("+"):
  181. args.channel = args.args[0]
  182. del args.args[0]
  183. env = dict(os.environ)
  184. if args.program in KNOWN_PROGRAMS:
  185. if args.bazel:
  186. build_func = _bazel_build
  187. else:
  188. build_func = _cargo_build
  189. (build_retcode, built_programs) = build_func(
  190. args, extra_programs=[args.program]
  191. )
  192. if args.build_only:
  193. return build_retcode
  194. if args.enable_mac_codesigning:
  195. for program in built_programs:
  196. if sys.platform == "darwin":
  197. _macos_codesign(program)
  198. if sys.platform != "darwin":
  199. print("Ignoring --enable-mac-codesigning since we're not on macOS")
  200. else:
  201. print("Disabled macOS Codesigning")
  202. if args.wrapper:
  203. command = shlex.split(args.wrapper)
  204. else:
  205. command = []
  206. mzbuild = MZ_ROOT / "mzbuild"
  207. mzbuild.mkdir(exist_ok=True)
  208. # Move all built programs into the same directory to make it easier for
  209. # downstream consumers.
  210. binaries_dir = mzbuild / "bin"
  211. binaries_dir.mkdir(exist_ok=True)
  212. binaries = []
  213. for program in built_programs:
  214. src_path = pathlib.Path(program)
  215. dst_path = binaries_dir / src_path.name
  216. if os.path.lexists(dst_path) and os.path.islink(dst_path):
  217. os.unlink(dst_path)
  218. os.symlink(src_path, dst_path)
  219. binaries.append(str(dst_path))
  220. # HACK(parkmycar): The last program is the one requested by the user.
  221. command.append(binaries[-1])
  222. if args.tokio_console:
  223. command += ["--tokio-console-listen-addr=127.0.0.1:6669"]
  224. if args.program == "environmentd":
  225. _handle_lingering_services(kill=args.reset)
  226. scratch = MZ_ROOT / "scratch"
  227. urlparse(args.postgres).path.removeprefix("/")
  228. dbconn = _connect_sql(args.postgres)
  229. for schema in ["consensus", "tsoracle", "storage"]:
  230. if args.reset:
  231. _run_sql(dbconn, f"DROP SCHEMA IF EXISTS {schema} CASCADE")
  232. _run_sql(dbconn, f"CREATE SCHEMA IF NOT EXISTS {schema}")
  233. # Keep this after clearing out Postgres. Otherwise there is a race
  234. # where a ctrl-c could leave persist with references in Postgres to
  235. # files that have been deleted. There's no race if we reset in the
  236. # opposite order.
  237. if args.reset:
  238. # Remove everything in the `mzdata`` directory *except* for
  239. # the `prometheus` directory and all contents of `tempo`.
  240. paths = list(MZDATA.glob("prometheus/*"))
  241. paths.extend(
  242. p
  243. for p in MZDATA.glob("*")
  244. if p.name != "prometheus" and p.name != "tempo"
  245. )
  246. paths.extend(p for p in scratch.glob("*"))
  247. for path in paths:
  248. print(f"Removing {path}...")
  249. if path.is_dir():
  250. shutil.rmtree(path, ignore_errors=True)
  251. else:
  252. path.unlink()
  253. MZDATA.mkdir(exist_ok=True)
  254. scratch.mkdir(exist_ok=True)
  255. environment_file = MZDATA / "environment-id"
  256. try:
  257. environment_id = environment_file.read_text().rstrip()
  258. except FileNotFoundError:
  259. environment_id = f"local-az1-{uuid.uuid4()}-0"
  260. environment_file.write_text(environment_id)
  261. print(f"persist-blob-url: {args.blob}")
  262. print(f"listeners config path: {args.listeners_config_path}")
  263. command += [
  264. f"--listeners-config-path={args.listeners_config_path}",
  265. "--orchestrator=process",
  266. f"--orchestrator-process-secrets-directory={MZDATA}/secrets",
  267. "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0",
  268. f"--orchestrator-process-prometheus-service-discovery-directory={MZDATA}/prometheus",
  269. f"--orchestrator-process-scratch-directory={scratch}",
  270. "--secrets-controller=local-file",
  271. f"--persist-consensus-url={args.postgres}?options=--search_path=consensus",
  272. f"--persist-blob-url={args.blob}",
  273. f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle",
  274. f"--environment-id={environment_id}",
  275. "--bootstrap-role=materialize",
  276. f"--cluster-replica-sizes={json.dumps(cluster_replica_size_map())}",
  277. f"--bootstrap-default-cluster-replica-size={bootstrap_cluster_replica_size()}",
  278. *args.args,
  279. ]
  280. if args.monitoring:
  281. command += ["--opentelemetry-endpoint=http://localhost:4317"]
  282. elif args.program == "sqllogictest":
  283. # sqllogictest creates the scratch directory in a tmpfs mount, which doesn't work well with lgalloc
  284. # https://github.com/MaterializeInc/database-issues/issues/8989
  285. formatted_params = [
  286. f"{key}={value}"
  287. for key, value in get_default_system_parameters().items()
  288. ] + ["enable_lgalloc=false"]
  289. system_parameter_default = ";".join(formatted_params)
  290. # Connect to the database to ensure it exists.
  291. _connect_sql(args.postgres)
  292. command += [
  293. f"--postgres-url={args.postgres}",
  294. f"--system-parameter-default={system_parameter_default}",
  295. *args.args,
  296. ]
  297. elif args.program == "test":
  298. if args.bazel:
  299. raise UIError("testing with Bazel is not yet supported")
  300. (build_retcode, _) = _cargo_build(args)
  301. if args.build_only:
  302. return build_retcode
  303. command = _cargo_command(args, "nextest")
  304. try:
  305. subprocess.check_output(
  306. command + ["--version"], env=env, stderr=subprocess.PIPE
  307. )
  308. except subprocess.CalledProcessError:
  309. raise UIError("cargo nextest not found, run `cargo install cargo-nextest`")
  310. command += ["run"]
  311. for package in args.package:
  312. command += ["--package", package]
  313. for test in args.test:
  314. command += ["--test", test]
  315. command += args.args
  316. env["COCKROACH_URL"] = args.postgres
  317. # some tests run into stack overflows
  318. env["RUST_MIN_STACK"] = "4194304"
  319. dbconn = _connect_sql(args.postgres)
  320. else:
  321. raise UIError(f"unknown program {args.program}")
  322. # We fork off a process that moves to its own process group and then runs
  323. # `command`. This parent process continues running until `command` exits,
  324. # and then kills `command`'s process group. This avoids leaking "grandchild"
  325. # processes--e.g., if we spawn an `environmentd` process that in turn spawns
  326. # `clusterd` processes, and then `environmentd` panics, we want to clean up
  327. # those `clusterd` processes before we exit.
  328. #
  329. # This isn't foolproof. If this script itself crashes, that can leak
  330. # processes. The subprocess can also intentionally daemonize (i.e., move to
  331. # another process group) to evade our detection. But this catches the vast
  332. # majority of cases and is simple to reason about.
  333. child_pid = os.fork()
  334. assert child_pid >= 0
  335. if child_pid > 0:
  336. # This is the parent process, responsible for cleaning up after the
  337. # child.
  338. # First, arrange to terminate all processes in the child's process group
  339. # when we exit.
  340. def _kill_childpg():
  341. try:
  342. os.killpg(child_pid, signal.SIGTERM)
  343. except ProcessLookupError:
  344. pass
  345. atexit.register(_kill_childpg)
  346. # Wait for the child to exit then propagate its exit status.
  347. _, ws = os.waitpid(child_pid, 0)
  348. exit(os.waitstatus_to_exitcode(ws))
  349. # This is the child. First, move to a dedicated process group.
  350. os.setpgid(child_pid, child_pid)
  351. # Then, spawn the desired command.
  352. print(f"$ {' '.join(command)}")
  353. if args.program == "environmentd":
  354. # Automatically restart `environmentd` after it halts, but not more than
  355. # once every 5s to prevent hot loops. This simulates what happens when
  356. # running in Kubernetes, which restarts failed `environmentd` process
  357. # automatically. (We don't restart after a panic, since panics are
  358. # generally unexpected and we don't want to inadvertently hide them
  359. # during local development.)
  360. while True:
  361. last_start_time = datetime.now()
  362. proc = subprocess.run(command, env=env)
  363. if proc.returncode == 166:
  364. wait = max(
  365. timedelta(seconds=5) - (datetime.now() - last_start_time),
  366. timedelta(seconds=0),
  367. )
  368. print(f"environmentd halted; will restart in {wait.total_seconds()}s")
  369. time.sleep(wait.total_seconds())
  370. else:
  371. break
  372. else:
  373. proc = subprocess.run(command, env=env)
  374. exit(proc.returncode)
  375. def _bazel_build(
  376. args: argparse.Namespace, extra_programs: list[str] = []
  377. ) -> tuple[int, list[str]]:
  378. config = BuildConfig.read()
  379. command = _bazel_command(args, config, ["build"])
  380. programs = [*REQUIRED_SERVICES, *extra_programs]
  381. targets = [_bazel_target(program) for program in programs]
  382. command += targets
  383. completed_proc = spawn.runv(command)
  384. artifacts = [str(_bazel_artifact_path(args, t)) for t in targets]
  385. return (completed_proc.returncode, artifacts)
  386. def _bazel_target(program: str) -> str:
  387. if program == "environmentd":
  388. return "//src/environmentd:environmentd"
  389. elif program == "clusterd":
  390. return "//src/clusterd:clusterd"
  391. elif program == "sqllogictest":
  392. return "//src/sqllogictest:sqllogictest"
  393. else:
  394. raise UIError(f"unknown program {program}")
  395. def _bazel_command(
  396. args: argparse.Namespace, config: BuildConfig | None, subcommands: list[str]
  397. ) -> list[str]:
  398. command = ["bazel"] + subcommands
  399. sanitizer = rustc_flags.Sanitizer[args.sanitizer]
  400. if args.release:
  401. command += ["--config=optimized"]
  402. if sanitizer != rustc_flags.Sanitizer.none:
  403. command += sanitizer.bazel_flags()
  404. if config:
  405. command += remote_cache_arg(config)
  406. return command
  407. def _bazel_artifact_path(args: argparse.Namespace, program: str) -> pathlib.Path:
  408. cmd = _bazel_command(args, None, ["cquery", "--output=files"]) + [program]
  409. raw_path = subprocess.check_output(cmd, text=True)
  410. relative_path = pathlib.Path(raw_path.strip())
  411. return MZ_ROOT / relative_path
  412. def _cargo_build(
  413. args: argparse.Namespace, extra_programs: list[str] = []
  414. ) -> tuple[int, list[str]]:
  415. env = dict(os.environ)
  416. command = _cargo_command(args, "build")
  417. features = []
  418. if args.coverage:
  419. env["RUSTFLAGS"] = (
  420. env.get("RUSTFLAGS", "") + " " + " ".join(rustc_flags.coverage)
  421. )
  422. if args.sanitizer != "none":
  423. env["RUSTFLAGS"] = (
  424. env.get("RUSTFLAGS", "")
  425. + " "
  426. + " ".join(rustc_flags.sanitizer[args.sanitizer])
  427. )
  428. env["CFLAGS"] = (
  429. env.get("CFLAGS", "")
  430. + " "
  431. + " ".join(rustc_flags.sanitizer_cflags[args.sanitizer])
  432. )
  433. env["CXXFLAGS"] = (
  434. env.get("CXXFLAGS", "")
  435. + " "
  436. + " ".join(rustc_flags.sanitizer_cflags[args.sanitizer])
  437. )
  438. env["LDFLAGS"] = (
  439. env.get("LDFLAGS", "")
  440. + " "
  441. + " ".join(rustc_flags.sanitizer_cflags[args.sanitizer])
  442. )
  443. if args.features:
  444. features.extend(args.features.split(","))
  445. if features:
  446. command += [f"--features={','.join(features)}"]
  447. programs = [*REQUIRED_SERVICES, *extra_programs]
  448. for program in programs:
  449. command += ["--bin", program]
  450. completed_proc = spawn.runv(
  451. command,
  452. env=env,
  453. cwd=pathlib.Path(
  454. os.path.abspath(os.path.join(os.path.realpath(sys.argv[0]), "../.."))
  455. ),
  456. )
  457. artifacts = [str(_cargo_artifact_path(args, program)) for program in programs]
  458. return (completed_proc.returncode, artifacts)
  459. def _cargo_command(args: argparse.Namespace, subcommand: str) -> list[str]:
  460. command = ["cargo"]
  461. if args.channel:
  462. command += [args.channel]
  463. command += [subcommand]
  464. if args.release:
  465. command += ["--release"]
  466. if args.optimized:
  467. command += ["--profile", "optimized"]
  468. if args.timings:
  469. command += ["--timings"]
  470. if args.no_default_features:
  471. command += ["--no-default-features"]
  472. if args.sanitizer != "none":
  473. command += ["-Zbuild-std", "--target", SANITIZER_TARGET]
  474. return command
  475. def _cargo_artifact_path(args: argparse.Namespace, program: str) -> pathlib.Path:
  476. dir_name = "release" if args.release else "optimized" if args.optimized else "debug"
  477. if args.sanitizer != "none":
  478. artifact_path = MZ_ROOT / "target" / SANITIZER_TARGET / dir_name
  479. else:
  480. artifact_path = MZ_ROOT / "target" / dir_name
  481. return artifact_path / program
  482. def _macos_codesign(path: str) -> None:
  483. env = dict(os.environ)
  484. command = ["codesign"]
  485. command.extend(["-s", "-", "-f", "--entitlements"])
  486. # write our entitlements file to a temp path
  487. temp = tempfile.NamedTemporaryFile()
  488. temp.write(bytes(MACOS_ENTITLEMENTS_DATA, "utf-8"))
  489. temp.flush()
  490. command.append(temp.name)
  491. command.append(path)
  492. spawn.runv(command, env=env)
  493. def _connect_sql(urlstr: str) -> pg8000.native.Connection:
  494. hint = """Have you correctly configured CockroachDB or PostgreSQL?
  495. For CockroachDB:
  496. Follow the instructions in doc/developer/guide.md#CockroachDB
  497. For PostgreSQL:
  498. 1. Install PostgreSQL
  499. 2. Create a database: `createdb materialize`
  500. 3. Set the MZDEV_POSTGRES environment variable accordingly: `export MZDEV_POSTGRES=postgres://localhost/materialize`"""
  501. url = urlparse(urlstr)
  502. database = url.path.removeprefix("/")
  503. try:
  504. dbconn = pg8000.native.Connection(
  505. host=url.hostname or "localhost",
  506. port=url.port or 5432,
  507. user=url.username or getpass.getuser(),
  508. password=url.password,
  509. database=database,
  510. startup_params={key: value for key, value in parse_qsl(url.query)},
  511. )
  512. except pg8000.exceptions.DatabaseError as e:
  513. raise UIError(
  514. f"unable to connect to metadata database: {e.args[0]['M']}",
  515. hint=hint,
  516. )
  517. except pg8000.exceptions.InterfaceError as e:
  518. raise UIError(
  519. f"unable to connect to metadata database: {e}",
  520. hint=hint,
  521. )
  522. # For CockroachDB, after connecting, we can ensure the database exists. For
  523. # PostgreSQL, the database must exist for us to connect to it at all--we
  524. # declare it to be the user's problem to create this database.
  525. if "crdb_version" in dbconn.parameter_statuses:
  526. if not database:
  527. raise UIError(
  528. f"database name is missing in the postgres URL: {urlstr}",
  529. hint="When connecting to CockroachDB, the database name is required.",
  530. )
  531. _run_sql(dbconn, f"CREATE DATABASE IF NOT EXISTS {database}")
  532. return dbconn
  533. def _run_sql(conn: pg8000.native.Connection, sql: str) -> None:
  534. print(f"> {sql}")
  535. conn.run(sql)
  536. def _handle_lingering_services(kill: bool = False) -> None:
  537. uid = os.getuid()
  538. for proc in psutil.process_iter():
  539. try:
  540. if proc.name() in REQUIRED_SERVICES:
  541. if proc.uids().real != uid:
  542. print(
  543. f"Ignoring {proc.name()} process with different UID (PID {proc.pid}, likely running in Docker)"
  544. )
  545. elif kill:
  546. print(f"Killing orphaned {proc.name()} process (PID {proc.pid})")
  547. proc.kill()
  548. else:
  549. ui.warn(
  550. f"Existing {proc.name()} process (PID {proc.pid}) will be reused"
  551. )
  552. except psutil.NoSuchProcess:
  553. continue
  554. if __name__ == "__main__":
  555. with ui.error_handler("run"):
  556. main()