mkpipeline.py 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Generator for the test CI pipeline.
  10. This script takes pipeline.template.yml as input, possibly trims out jobs
  11. whose inputs have not changed relative to the code on main, and uploads the
  12. resulting pipeline to the Buildkite job that triggers this script.
  13. On main and tags, all jobs are always run.
  14. For details about how steps are trimmed, see the comment at the top of
  15. pipeline.template.yml and the docstring on `trim_tests_pipeline` below.
  16. """
  17. import argparse
  18. import copy
  19. import hashlib
  20. import os
  21. import subprocess
  22. import sys
  23. import threading
  24. import traceback
  25. from collections import OrderedDict
  26. from collections.abc import Iterable, Iterator
  27. from concurrent.futures import ThreadPoolExecutor
  28. from datetime import datetime, timedelta, timezone
  29. from pathlib import Path
  30. from typing import Any
  31. import requests
  32. import yaml
  33. from materialize import mzbuild, spawn, ui
  34. from materialize.buildkite_insights.buildkite_api import generic_api
  35. from materialize.mz_version import MzVersion
  36. from materialize.mzcompose.composition import Composition
  37. from materialize.rustc_flags import Sanitizer
  38. from materialize.ui import UIError
  39. from materialize.version_list import get_previous_published_version
  40. from materialize.xcompile import Arch
  41. from .deploy.deploy_util import rust_version
  42. # These paths contain "CI glue code", i.e., the code that powers CI itself,
  43. # including this very script! All of CI implicitly depends on this code, so
  44. # whenever it changes, we ought not trim any jobs from the pipeline in order to
  45. # exercise as much of the glue code as possible.
  46. #
  47. # It's tough to track this code with any sort of fine-grained granularity, so we
  48. # err on the side of including too much rather than too little. (For example,
  49. # bin/resync-submodules is not presently used by CI, but it's just not worth
  50. # trying to capture that.)
  51. CI_GLUE_GLOBS = ["bin", "ci", "misc/python/materialize/cli/ci_annotate_errors.py"]
  52. DEFAULT_AGENT = "hetzner-aarch64-4cpu-8gb"
  53. def steps(pipeline: Any) -> Iterator[dict[str, Any]]:
  54. for step in pipeline["steps"]:
  55. yield step
  56. if "group" in step:
  57. yield from step.get("steps", [])
  58. def get_imported_files(composition: str) -> list[str]:
  59. return spawn.capture(["bin/ci-python-imports", composition]).splitlines()
  60. def main() -> int:
  61. parser = argparse.ArgumentParser(
  62. prog="mkpipeline",
  63. formatter_class=argparse.RawDescriptionHelpFormatter,
  64. description="""
  65. mkpipeline creates a Buildkite pipeline based on a template file and uploads it
  66. so it is executed.""",
  67. )
  68. parser.add_argument("--coverage", action="store_true")
  69. parser.add_argument(
  70. "--sanitizer",
  71. default=Sanitizer[os.getenv("CI_SANITIZER", "none")],
  72. type=Sanitizer,
  73. choices=Sanitizer,
  74. )
  75. parser.add_argument(
  76. "--priority",
  77. type=int,
  78. default=os.getenv("CI_PRIORITY", 0),
  79. )
  80. parser.add_argument("pipeline", type=str)
  81. parser.add_argument(
  82. "--bazel-remote-cache",
  83. default=os.getenv("CI_BAZEL_REMOTE_CACHE"),
  84. action="store",
  85. )
  86. args = parser.parse_args()
  87. print(f"Pipeline is: {args.pipeline}")
  88. with open(Path(__file__).parent / args.pipeline / "pipeline.template.yml") as f:
  89. raw = f.read()
  90. raw = raw.replace("$RUST_VERSION", rust_version())
  91. # On 'main' or tagged branches, we use a separate remote cache that only CI can write to.
  92. if os.environ["BUILDKITE_BRANCH"] == "main" or os.environ["BUILDKITE_TAG"]:
  93. bazel_remote_cache = "https://bazel-remote-pa.dev.materialize.com"
  94. else:
  95. bazel_remote_cache = "https://bazel-remote.dev.materialize.com"
  96. raw = raw.replace("$BAZEL_REMOTE_CACHE", bazel_remote_cache)
  97. pipeline = yaml.safe_load(raw)
  98. bazel = pipeline.get("env", {}).get("CI_BAZEL_BUILD", 0) == 1
  99. bazel_lto = (
  100. pipeline.get("env", {}).get("CI_BAZEL_LTO", 0) == 1
  101. or bool(os.environ["BUILDKITE_TAG"])
  102. or ui.env_is_truthy("CI_RELEASE_LTO_BUILD")
  103. )
  104. hash_check: dict[Arch, tuple[str, bool]] = {}
  105. def hash(deps: mzbuild.DependencySet) -> str:
  106. h = hashlib.sha1()
  107. for dep in deps:
  108. h.update(dep.spec().encode())
  109. return h.hexdigest()
  110. def get_hashes(arch: Arch) -> tuple[str, bool]:
  111. repo = mzbuild.Repository(
  112. Path("."),
  113. profile=mzbuild.Profile.RELEASE if bazel_lto else mzbuild.Profile.OPTIMIZED,
  114. arch=arch,
  115. coverage=args.coverage,
  116. sanitizer=args.sanitizer,
  117. bazel=bazel,
  118. bazel_remote_cache=bazel_remote_cache,
  119. bazel_lto=bazel_lto,
  120. )
  121. deps = repo.resolve_dependencies(image for image in repo if image.publish)
  122. check = deps.check()
  123. return (hash(deps), check)
  124. def fetch_hashes() -> None:
  125. for arch in [Arch.AARCH64, Arch.X86_64]:
  126. hash_check[arch] = get_hashes(arch)
  127. trim_builds_prep_thread = threading.Thread(target=fetch_hashes)
  128. trim_builds_prep_thread.start()
  129. # This has to run before other cutting steps because it depends on the id numbers
  130. if test_selection := os.getenv("CI_TEST_IDS"):
  131. trim_test_selection_id(pipeline, {int(i) for i in test_selection.split(",")})
  132. elif test_selection := os.getenv("CI_TEST_SELECTION"):
  133. trim_test_selection_name(pipeline, set(test_selection.split(",")))
  134. if args.pipeline == "test" and not os.getenv("CI_TEST_IDS"):
  135. if args.coverage or args.sanitizer != Sanitizer.none:
  136. print("Coverage/Sanitizer build, not trimming pipeline")
  137. elif os.environ["BUILDKITE_BRANCH"] == "main" or os.environ["BUILDKITE_TAG"]:
  138. print("On main branch or tag, so not trimming pipeline")
  139. elif have_paths_changed(CI_GLUE_GLOBS):
  140. # We still execute pipeline trimming on a copy of the pipeline to
  141. # protect against bugs in the pipeline trimming itself.
  142. print("[DRY RUN] Trimming unchanged steps from pipeline")
  143. print(
  144. "Repository glue code has changed, so the trimmed pipeline below does not apply"
  145. )
  146. trim_tests_pipeline(
  147. copy.deepcopy(pipeline),
  148. args.coverage,
  149. args.sanitizer,
  150. bazel,
  151. args.bazel_remote_cache,
  152. bazel_lto,
  153. )
  154. else:
  155. print("Trimming unchanged steps from pipeline")
  156. trim_tests_pipeline(
  157. pipeline,
  158. args.coverage,
  159. args.sanitizer,
  160. bazel,
  161. args.bazel_remote_cache,
  162. bazel_lto,
  163. )
  164. handle_sanitizer_skip(pipeline, args.sanitizer)
  165. increase_agents_timeouts(pipeline, args.sanitizer, args.coverage)
  166. prioritize_pipeline(pipeline, args.priority)
  167. switch_jobs_to_aws(pipeline, args.priority)
  168. permit_rerunning_successful_steps(pipeline)
  169. set_retry_on_agent_lost(pipeline)
  170. set_default_agents_queue(pipeline)
  171. set_parallelism_name(pipeline)
  172. check_depends_on(pipeline, args.pipeline)
  173. add_version_to_preflight_tests(pipeline)
  174. move_build_to_bazel_lto(pipeline, args.pipeline)
  175. trim_builds_prep_thread.join()
  176. trim_builds(pipeline, hash_check)
  177. add_cargo_test_dependency(
  178. pipeline,
  179. args.pipeline,
  180. args.coverage,
  181. args.sanitizer,
  182. args.bazel_remote_cache,
  183. bazel_lto,
  184. )
  185. remove_dependencies_on_prs(pipeline, args.pipeline, hash_check)
  186. remove_mz_specific_keys(pipeline)
  187. print("--- Uploading new pipeline:")
  188. print(yaml.dump(pipeline))
  189. spawn.runv(
  190. ["buildkite-agent", "pipeline", "upload"], stdin=yaml.dump(pipeline).encode()
  191. )
  192. return 0
  193. class PipelineStep:
  194. def __init__(self, id: str):
  195. self.id = id
  196. self.extra_inputs: set[str] = set()
  197. self.image_dependencies: set[mzbuild.ResolvedImage] = set()
  198. self.step_dependencies: set[str] = set()
  199. def inputs(self) -> set[str]:
  200. inputs = set()
  201. inputs.update(self.extra_inputs)
  202. for image in self.image_dependencies:
  203. inputs.update(image.inputs(transitive=True))
  204. return inputs
  205. def prioritize_pipeline(pipeline: Any, priority: int) -> None:
  206. """Prioritize builds against main or release branches"""
  207. tag = os.environ["BUILDKITE_TAG"]
  208. branch = os.getenv("BUILDKITE_BRANCH")
  209. build_author = os.getenv("BUILDKITE_BUILD_AUTHOR")
  210. # use the base priority of the entire pipeline
  211. priority += pipeline.get("priority", 0)
  212. # Release results are time sensitive
  213. if tag.startswith("v"):
  214. priority += 10
  215. # main branch is less time sensitive than results on PRs
  216. if branch == "main":
  217. priority -= 50
  218. # Dependabot is less urgent than manual PRs
  219. if build_author == "Dependabot":
  220. priority -= 40
  221. def visit(config: Any) -> None:
  222. # Increase priority for larger Hetzner-based tests so that they get
  223. # preferential treatment on the agents which also accept smaller jobs.
  224. agent_priority = 0
  225. if "agents" in config:
  226. agent = config["agents"].get("queue", None)
  227. if agent == "hetzner-aarch64-8cpu-16gb":
  228. agent_priority = 1
  229. if agent == "hetzner-aarch64-16cpu-32gb":
  230. agent_priority = 2
  231. config["priority"] = config.get("priority", 0) + priority + agent_priority
  232. for config in pipeline["steps"]:
  233. if "trigger" in config or "wait" in config:
  234. # Trigger and Wait steps do not allow priorities.
  235. continue
  236. if "group" in config:
  237. for inner_config in config.get("steps", []):
  238. visit(inner_config)
  239. continue
  240. visit(config)
  241. def handle_sanitizer_skip(pipeline: Any, sanitizer: Sanitizer) -> None:
  242. if sanitizer != Sanitizer.none:
  243. pipeline.setdefault("env", {})["CI_SANITIZER"] = sanitizer.value
  244. def visit(step: dict[str, Any]) -> None:
  245. if step.get("sanitizer") == "skip":
  246. step["skip"] = True
  247. else:
  248. def visit(step: dict[str, Any]) -> None:
  249. if step.get("sanitizer") == "only":
  250. step["skip"] = True
  251. for step in pipeline["steps"]:
  252. visit(step)
  253. if "group" in step:
  254. for inner_step in step.get("steps", []):
  255. visit(inner_step)
  256. def increase_agents_timeouts(
  257. pipeline: Any, sanitizer: Sanitizer, coverage: bool
  258. ) -> None:
  259. if sanitizer != Sanitizer.none or os.getenv("CI_SYSTEM_PARAMETERS", "") == "random":
  260. def visit(step: dict[str, Any]) -> None:
  261. # Most sanitizer runs, as well as random permutations of system
  262. # parameters, are slower and need more memory. The default system
  263. # parameters in CI are chosen to be efficient for execution, while
  264. # a random permutation might take way longer and use more memory.
  265. if "timeout_in_minutes" in step:
  266. step["timeout_in_minutes"] *= 10
  267. if "agents" in step:
  268. agent = step["agents"].get("queue", None)
  269. if agent == "linux-aarch64-small":
  270. agent = "linux-aarch64"
  271. elif agent == "linux-aarch64":
  272. agent = "linux-aarch64-medium"
  273. elif agent == "linux-aarch64-medium":
  274. agent = "linux-aarch64-large"
  275. elif agent == "linux-aarch64-large":
  276. agent = "builder-linux-aarch64-mem"
  277. elif agent == "linux-x86_64-small":
  278. agent = "linux-x86_64"
  279. elif agent == "linux-x86_64":
  280. agent = "linux-x86_64-medium"
  281. elif agent == "linux-x86_64-medium":
  282. agent = "linux-x86_64-large"
  283. elif agent == "linux-x86_64-large":
  284. agent = "builder-linux-x86_64"
  285. elif agent == "hetzner-aarch64-2cpu-4gb":
  286. agent = "hetzner-aarch64-4cpu-8gb"
  287. elif agent == "hetzner-aarch64-4cpu-8gb":
  288. agent = "hetzner-aarch64-8cpu-16gb"
  289. elif agent == "hetzner-aarch64-8cpu-16gb":
  290. agent = "hetzner-aarch64-16cpu-32gb"
  291. elif agent == "hetzner-x86-64-2cpu-4gb":
  292. agent = "hetzner-x86-64-4cpu-8gb"
  293. elif agent == "hetzner-x86-64-4cpu-8gb":
  294. agent = "hetzner-x86-64-8cpu-16gb"
  295. elif agent == "hetzner-x86-64-8cpu-16gb":
  296. agent = "hetzner-x86-64-16cpu-32gb"
  297. elif agent == "hetzner-x86-64-16cpu-32gb":
  298. agent = "hetzner-x86-64-dedi-16cpu-64gb"
  299. elif agent == "hetzner-x86-64-16cpu-64gb":
  300. agent = "hetzner-x86-64-dedi-32cpu-128gb"
  301. elif agent == "hetzner-x86-64-dedi-32cpu-128gb":
  302. agent = "hetzner-x86-64-dedi-48cpu-192gb"
  303. step["agents"] = {"queue": agent}
  304. for step in pipeline["steps"]:
  305. visit(step)
  306. # Groups can't be nested, so handle them explicitly here instead of recursing
  307. if "group" in step:
  308. for inner_step in step.get("steps", []):
  309. visit(inner_step)
  310. if coverage:
  311. pipeline["env"]["CI_BUILDER_SCCACHE"] = 1
  312. pipeline["env"]["CI_COVERAGE_ENABLED"] = 1
  313. for step in steps(pipeline):
  314. # Coverage runs are slower
  315. if "timeout_in_minutes" in step:
  316. step["timeout_in_minutes"] *= 3
  317. if step.get("coverage") == "skip":
  318. step["skip"] = True
  319. if step.get("id") == "build-x86_64":
  320. step["name"] = "Build x86_64 with coverage"
  321. if step.get("id") == "build-aarch":
  322. step["name"] = "Build aarch64 with coverage"
  323. else:
  324. for step in steps(pipeline):
  325. if step.get("coverage") == "only":
  326. step["skip"] = True
  327. def switch_jobs_to_aws(pipeline: Any, priority: int) -> None:
  328. """Switch jobs to AWS if Hetzner is currently overloaded"""
  329. # If Hetzner is entirely broken, you have to take these actions to switch everything back to AWS:
  330. # - CI_FORCE_SWITCH_TO_AWS env variable to 1
  331. # - Reconfigure the agent from hetzner-aarch64-4cpu-8gb to linux-aarch64-small in https://buildkite.com/materialize/test/settings/steps and other pipelines
  332. # - Reconfigure the agent from hetzner-aarch64-4cpu-8gb to linux-aarch64-small in ci/mkpipeline.sh
  333. stuck: set[str] = set()
  334. # TODO(def-): Remove me when Hetzner fixes its aarch64 availability
  335. stuck.update(
  336. [
  337. "hetzner-aarch64-16cpu-32gb",
  338. "hetzner-aarch64-8cpu-16gb",
  339. "hetzner-aarch64-4cpu-8gb",
  340. "hetzner-aarch64-2cpu-4gb",
  341. ]
  342. )
  343. if ui.env_is_truthy("CI_FORCE_SWITCH_TO_AWS", "0"):
  344. stuck = set(
  345. {
  346. "hetzner-x86-64-16cpu-32gb",
  347. "hetzner-x86-64-8cpu-16gb",
  348. "hetzner-x86-64-4cpu-8gb",
  349. "hetzner-x86-64-2cpu-4gb",
  350. "hetzner-aarch64-16cpu-32gb",
  351. "hetzner-aarch64-8cpu-16gb",
  352. "hetzner-aarch64-4cpu-8gb",
  353. "hetzner-aarch64-2cpu-4gb",
  354. "hetzner-x86-64-dedi-48cpu-192gb",
  355. "hetzner-x86-64-dedi-32cpu-128gb",
  356. "hetzner-x86-64-dedi-16cpu-64gb",
  357. "hetzner-x86-64-dedi-8cpu-32gb",
  358. "hetzner-x86-64-dedi-4cpu-16gb",
  359. "hetzner-x86-64-dedi-2cpu-8gb",
  360. }
  361. )
  362. else:
  363. # TODO(def-): Reenable me when Hetzner fixes its aarch64 availability
  364. # If priority has manually been set to be low, or on main branch, we can
  365. # wait for agents to become available
  366. # if branch == "main" or priority < 0:
  367. # return
  368. # Consider Hetzner to be overloaded/broken when an important job is stuck waiting for an agent for > 20 minutes
  369. try:
  370. builds = generic_api.get_multiple(
  371. "builds",
  372. params={
  373. "state[]": [
  374. "creating",
  375. "scheduled",
  376. "running",
  377. "failing",
  378. "canceling",
  379. ],
  380. },
  381. max_fetches=None,
  382. )
  383. for build in builds:
  384. for job in build["jobs"]:
  385. if "state" not in job:
  386. continue
  387. if "agent_query_rules" not in job:
  388. continue
  389. queue = job["agent_query_rules"][0].removeprefix("queue=")
  390. if not queue.startswith("hetzner-"):
  391. continue
  392. if queue in stuck:
  393. continue
  394. if job.get("state") != "scheduled":
  395. continue
  396. runnable = job.get("runnable_at")
  397. if not runnable or job.get("started_at"):
  398. continue
  399. if datetime.now(timezone.utc) - datetime.fromisoformat(
  400. runnable
  401. ) < timedelta(minutes=20):
  402. continue
  403. print(
  404. f"Job {job.get('id')} ({job.get('web_url')}) with priority {priority} is runnable since {runnable} on {queue}, considering {queue} stuck"
  405. )
  406. stuck.add(queue)
  407. except Exception:
  408. print("switch_jobs_to_aws failed, ignoring:")
  409. traceback.print_exc()
  410. return
  411. if not stuck:
  412. return
  413. print(f"Queues stuck in Hetzner, switching to AWS or another arch: {stuck}")
  414. def visit(config: Any) -> None:
  415. if "agents" not in config:
  416. return
  417. agent = config["agents"].get("queue", None)
  418. if not agent in stuck:
  419. return
  420. if agent == "hetzner-aarch64-2cpu-4gb":
  421. if "hetzner-x86-64-2cpu-4gb" not in stuck:
  422. config["agents"]["queue"] = "hetzner-x86-64-2cpu-4gb"
  423. if config.get("depends_on") == "build-aarch64":
  424. config["depends_on"] = "build-x86_64"
  425. else:
  426. config["agents"]["queue"] = "linux-aarch64"
  427. elif agent == "hetzner-aarch64-4cpu-8gb":
  428. if "hetzner-x86-64-4cpu-8gb" not in stuck:
  429. config["agents"]["queue"] = "hetzner-x86-64-4cpu-8gb"
  430. if config.get("depends_on") == "build-aarch64":
  431. config["depends_on"] = "build-x86_64"
  432. else:
  433. config["agents"]["queue"] = "linux-aarch64"
  434. elif agent == "hetzner-aarch64-8cpu-16gb":
  435. if "hetzner-x86-64-8cpu-16gb" not in stuck:
  436. config["agents"]["queue"] = "hetzner-x86-64-8cpu-16gb"
  437. if config.get("depends_on") == "build-aarch64":
  438. config["depends_on"] = "build-x86_64"
  439. else:
  440. config["agents"]["queue"] = "linux-aarch64-medium"
  441. elif agent == "hetzner-aarch64-16cpu-32gb":
  442. if "hetzner-x86-64-16cpu-32gb" not in stuck:
  443. config["agents"]["queue"] = "hetzner-x86-64-16cpu-32gb"
  444. if config.get("depends_on") == "build-aarch64":
  445. config["depends_on"] = "build-x86_64"
  446. else:
  447. config["agents"]["queue"] = "linux-aarch64-medium"
  448. elif agent in ("hetzner-x86-64-4cpu-8gb", "hetzner-x86-64-2cpu-4gb"):
  449. config["agents"]["queue"] = "linux-x86_64"
  450. elif agent in ("hetzner-x86-64-8cpu-16gb", "hetzner-x86-64-16cpu-32gb"):
  451. config["agents"]["queue"] = "linux-x86_64-medium"
  452. elif agent == "hetzner-x86-64-dedi-2cpu-8gb":
  453. config["agents"]["queue"] = "linux-x86_64"
  454. elif agent == "hetzner-x86-64-dedi-4cpu-16gb":
  455. config["agents"]["queue"] = "linux-x86_64-medium"
  456. elif agent in (
  457. "hetzner-x86-64-dedi-8cpu-32gb",
  458. "hetzner-x86-64-dedi-16cpu-64gb",
  459. ):
  460. config["agents"]["queue"] = "linux-x86_64-large"
  461. elif agent in (
  462. "hetzner-x86-64-dedi-32cpu-128gb",
  463. "hetzner-x86-64-dedi-48cpu-192gb",
  464. ):
  465. config["agents"]["queue"] = "builder-linux-x86_64"
  466. for config in pipeline["steps"]:
  467. if "trigger" in config or "wait" in config:
  468. # Trigger and Wait steps don't have agents
  469. continue
  470. if "group" in config:
  471. for inner_config in config.get("steps", []):
  472. visit(inner_config)
  473. continue
  474. visit(config)
  475. def permit_rerunning_successful_steps(pipeline: Any) -> None:
  476. def visit(step: Any) -> None:
  477. step.setdefault("retry", {}).setdefault("manual", {}).setdefault(
  478. "permit_on_passed", True
  479. )
  480. for config in pipeline["steps"]:
  481. if "trigger" in config or "wait" in config or "block" in config:
  482. continue
  483. if "group" in config:
  484. for inner_config in config.get("steps", []):
  485. visit(inner_config)
  486. continue
  487. visit(config)
  488. def set_retry_on_agent_lost(pipeline: Any) -> None:
  489. def visit(step: Any) -> None:
  490. step.setdefault("retry", {}).setdefault("automatic", []).extend(
  491. [
  492. {
  493. "exit_status": -1, # Connection to agent lost
  494. "signal_reason": "none",
  495. "limit": 2,
  496. },
  497. {
  498. "signal_reason": "agent_stop", # Stopped by OS
  499. "limit": 2,
  500. },
  501. {
  502. "exit_status": 128, # Temporary Github connection issue
  503. "limit": 2,
  504. },
  505. ]
  506. )
  507. for config in pipeline["steps"]:
  508. if "trigger" in config or "wait" in config or "block" in config:
  509. continue
  510. if "group" in config:
  511. for inner_config in config.get("steps", []):
  512. visit(inner_config)
  513. continue
  514. visit(config)
  515. def set_default_agents_queue(pipeline: Any) -> None:
  516. for step in steps(pipeline):
  517. if (
  518. "agents" not in step
  519. and "prompt" not in step
  520. and "wait" not in step
  521. and "group" not in step
  522. and "trigger" not in step
  523. ):
  524. step["agents"] = {"queue": DEFAULT_AGENT}
  525. def set_parallelism_name(pipeline: Any) -> None:
  526. def visit(step: Any) -> None:
  527. if step.get("parallelism", 1) > 1:
  528. step["label"] += " %N"
  529. for config in pipeline["steps"]:
  530. if "trigger" in config or "wait" in config or "block" in config:
  531. continue
  532. if "group" in config:
  533. for inner_config in config.get("steps", []):
  534. visit(inner_config)
  535. continue
  536. visit(config)
  537. def check_depends_on(pipeline: Any, pipeline_name: str) -> None:
  538. if pipeline_name not in ("test", "nightly", "release-qualification"):
  539. return
  540. for step in steps(pipeline):
  541. # From buildkite documentation:
  542. # Note that a step with an explicit dependency specified with the
  543. # depends_on attribute will run immediately after the dependency step
  544. # has completed, without waiting for block or wait steps unless those
  545. # are also explicit dependencies.
  546. if step.get("id") in ("analyze", "deploy", "coverage-pr-analyze"):
  547. return
  548. if (
  549. "depends_on" not in step
  550. and "prompt" not in step
  551. and "wait" not in step
  552. and "group" not in step
  553. ):
  554. raise UIError(
  555. f"Every step should have an explicit depends_on value, missing in: {step}"
  556. )
  557. def add_version_to_preflight_tests(pipeline: Any) -> None:
  558. for step in steps(pipeline):
  559. if step.get("id", "") in (
  560. "test-preflight-check-rollback",
  561. "nightly-preflight-check-rollback",
  562. ):
  563. current_version = MzVersion.parse_cargo()
  564. version = get_previous_published_version(
  565. current_version, previous_minor=True
  566. )
  567. step["build"]["commit"] = str(version)
  568. step["build"]["branch"] = str(version)
  569. def trim_test_selection_id(pipeline: Any, step_ids_to_run: set[int]) -> None:
  570. for i, step in enumerate(steps(pipeline)):
  571. ident = step.get("id") or step.get("command")
  572. if (
  573. (i not in step_ids_to_run or len(step_ids_to_run) == 0)
  574. and "prompt" not in step
  575. and "wait" not in step
  576. and "group" not in step
  577. and ident
  578. not in (
  579. "coverage-pr-analyze",
  580. "analyze",
  581. "build-x86_64",
  582. "build-aarch64",
  583. "build-x86_64-lto",
  584. "build-aarch64-lto",
  585. )
  586. and not step.get("async")
  587. ):
  588. step["skip"] = True
  589. def trim_test_selection_name(pipeline: Any, steps_to_run: set[str]) -> None:
  590. for step in steps(pipeline):
  591. ident = step.get("id") or step.get("command")
  592. if (
  593. ident not in steps_to_run
  594. and "prompt" not in step
  595. and "wait" not in step
  596. and "group" not in step
  597. and ident
  598. not in (
  599. "coverage-pr-analyze",
  600. "analyze",
  601. "build-x86_64",
  602. "build-aarch64",
  603. "build-x86_64-lto",
  604. "build-aarch64-lto",
  605. )
  606. and not step.get("async")
  607. ):
  608. step["skip"] = True
  609. def trim_tests_pipeline(
  610. pipeline: Any,
  611. coverage: bool,
  612. sanitizer: Sanitizer,
  613. bazel: bool,
  614. bazel_remote_cache: str,
  615. bazel_lto: bool,
  616. ) -> None:
  617. """Trim pipeline steps whose inputs have not changed in this branch.
  618. Steps are assigned inputs in two ways:
  619. 1. An explicit glob in the `inputs` key.
  620. 2. An implicit dependency on any number of mzbuild images via the
  621. mzcompose plugin. Any steps which use the mzcompose plugin will
  622. have inputs autodiscovered based on the images used in that
  623. mzcompose configuration.
  624. A step is trimmed if a) none of its inputs have changed, and b) there are
  625. no other untrimmed steps that depend on it.
  626. """
  627. print("--- Resolving dependencies")
  628. repo = mzbuild.Repository(
  629. Path("."),
  630. profile=mzbuild.Profile.RELEASE if bazel_lto else mzbuild.Profile.OPTIMIZED,
  631. coverage=coverage,
  632. sanitizer=sanitizer,
  633. bazel=bazel,
  634. bazel_remote_cache=bazel_remote_cache,
  635. bazel_lto=bazel_lto,
  636. )
  637. deps = repo.resolve_dependencies(image for image in repo)
  638. steps = OrderedDict()
  639. composition_paths: set[str] = set()
  640. for config in pipeline["steps"]:
  641. if "plugins" in config:
  642. for plugin in config["plugins"]:
  643. for plugin_name, plugin_config in plugin.items():
  644. if plugin_name != "./ci/plugins/mzcompose":
  645. continue
  646. name = plugin_config["composition"]
  647. composition_paths.add(str(repo.compositions[name]))
  648. if "group" in config:
  649. for inner_config in config.get("steps", []):
  650. if not "plugins" in inner_config:
  651. continue
  652. for plugin in inner_config["plugins"]:
  653. for plugin_name, plugin_config in plugin.items():
  654. if plugin_name != "./ci/plugins/mzcompose":
  655. continue
  656. name = plugin_config["composition"]
  657. composition_paths.add(str(repo.compositions[name]))
  658. imported_files: dict[str, list[str]] = {}
  659. with ThreadPoolExecutor(max_workers=len(composition_paths)) as executor:
  660. futures = {
  661. executor.submit(get_imported_files, path): path
  662. for path in composition_paths
  663. }
  664. for future in futures:
  665. path = futures[future]
  666. files = future.result()
  667. imported_files[path] = files
  668. compositions: dict[str, Composition] = {}
  669. def to_step(config: dict[str, Any]) -> PipelineStep | None:
  670. if "wait" in config or "group" in config:
  671. return None
  672. step = PipelineStep(config["id"])
  673. if "inputs" in config:
  674. for inp in config["inputs"]:
  675. step.extra_inputs.add(inp)
  676. if "depends_on" in config:
  677. d = config["depends_on"]
  678. if isinstance(d, str):
  679. step.step_dependencies.add(d)
  680. elif isinstance(d, list):
  681. step.step_dependencies.update(d)
  682. else:
  683. raise ValueError(f"unexpected non-str non-list for depends_on: {d}")
  684. if "plugins" in config:
  685. for plugin in config["plugins"]:
  686. for plugin_name, plugin_config in plugin.items():
  687. if plugin_name == "./ci/plugins/mzcompose":
  688. name = plugin_config["composition"]
  689. if name not in compositions:
  690. compositions[name] = Composition(repo, name)
  691. for dep in compositions[name].dependencies:
  692. step.image_dependencies.add(dep)
  693. composition_path = str(repo.compositions[name])
  694. step.extra_inputs.add(composition_path)
  695. # All (transitively) imported python modules are also implicitly dependencies
  696. for file in imported_files[composition_path]:
  697. step.extra_inputs.add(file)
  698. elif plugin_name == "./ci/plugins/cloudtest":
  699. step.image_dependencies.add(deps["environmentd"])
  700. step.image_dependencies.add(deps["clusterd"])
  701. return step
  702. for config in pipeline["steps"]:
  703. if step := to_step(config):
  704. steps[step.id] = step
  705. if "group" in config:
  706. for inner_config in config.get("steps", []):
  707. if inner_step := to_step(inner_config):
  708. steps[inner_step.id] = inner_step
  709. # Find all the steps whose inputs have changed with respect to main.
  710. # We delegate this hard work to Git.
  711. changed = set()
  712. for step in steps.values():
  713. inputs = step.inputs()
  714. if not inputs:
  715. # No inputs means there is no way this step can be considered
  716. # changed, but `git diff` with no pathspecs means "diff everything",
  717. # not "diff nothing", so explicitly skip.
  718. continue
  719. if have_paths_changed(inputs):
  720. changed.add(step.id)
  721. # Then collect all changed steps, and all the steps that those changed steps
  722. # depend on.
  723. needed = set()
  724. def visit(step: PipelineStep) -> None:
  725. if step.id not in needed:
  726. needed.add(step.id)
  727. for d in step.step_dependencies:
  728. visit(steps[d])
  729. for step_id in changed:
  730. visit(steps[step_id])
  731. # Print decisions, for debugging.
  732. for step in steps.values():
  733. print(f'{"✓" if step.id in needed else "✗"} {step.id}')
  734. if step.step_dependencies:
  735. print(" wait:", " ".join(step.step_dependencies))
  736. if step.extra_inputs:
  737. print(" globs:", " ".join(step.extra_inputs))
  738. if step.image_dependencies:
  739. print(
  740. " images:", " ".join(image.name for image in step.image_dependencies)
  741. )
  742. # Restrict the pipeline to the needed steps.
  743. for step in pipeline["steps"]:
  744. if "group" in step:
  745. step["steps"] = [
  746. inner_step
  747. for inner_step in step.get("steps", [])
  748. if inner_step.get("id") in needed
  749. ]
  750. pipeline["steps"] = [
  751. step
  752. for step in pipeline["steps"]
  753. if "wait" in step
  754. or ("group" in step and step["steps"])
  755. or step.get("id") in needed
  756. ]
  757. def add_cargo_test_dependency(
  758. pipeline: Any,
  759. pipeline_name: str,
  760. coverage: bool,
  761. sanitizer: Sanitizer,
  762. bazel_remote_cache: str,
  763. bazel_lto: bool,
  764. ) -> None:
  765. """Cargo Test normally doesn't have to wait for the build to complete, but it requires a few images (ubuntu-base, postgres), which are rarely changed. So only add a dependency when those images are not on Dockerhub yet."""
  766. if pipeline_name not in ("test", "nightly"):
  767. return
  768. if ui.env_is_truthy("BUILDKITE_PULL_REQUEST") and pipeline_name == "test":
  769. for step in steps(pipeline):
  770. if step.get("id") == "cargo-test":
  771. step["depends_on"] = "build-x86_64"
  772. return
  773. repo = mzbuild.Repository(
  774. Path("."),
  775. arch=Arch.X86_64,
  776. profile=mzbuild.Profile.RELEASE if bazel_lto else mzbuild.Profile.OPTIMIZED,
  777. coverage=coverage,
  778. sanitizer=sanitizer,
  779. bazel=True,
  780. bazel_remote_cache=bazel_remote_cache,
  781. bazel_lto=bazel_lto,
  782. )
  783. composition = Composition(repo, name="cargo-test")
  784. deps = composition.dependencies
  785. if deps.check():
  786. # We already have the dependencies available, no need to add a build dependency
  787. return
  788. for step in steps(pipeline):
  789. if step.get("id") == "cargo-test":
  790. step["depends_on"] = "build-x86_64"
  791. if step.get("id") == "miri-test":
  792. step["depends_on"] = "build-aarch64"
  793. def remove_dependencies_on_prs(
  794. pipeline: Any,
  795. pipeline_name: str,
  796. hash_check: dict[Arch, tuple[str, bool]],
  797. ) -> None:
  798. """On PRs in test pipeline remove dependencies on the build, start up tests immediately, they keep retrying for the Docker image"""
  799. if pipeline_name != "test":
  800. return
  801. if (
  802. not ui.env_is_truthy("BUILDKITE_PULL_REQUEST")
  803. or os.environ["BUILDKITE_TAG"]
  804. or ui.env_is_truthy("CI_RELEASE_LTO_BUILD")
  805. ):
  806. return
  807. for step in steps(pipeline):
  808. if step.get("id") in (
  809. "upload-debug-symbols-x86_64",
  810. "upload-debug-symbols-aarch64",
  811. ):
  812. continue
  813. if step.get("depends_on") in ("build-x86_64", "build-aarch64"):
  814. if step["depends_on"] == "build-x86_64" and hash_check[Arch.X86_64][1]:
  815. continue
  816. if step["depends_on"] == "build-aarch64" and hash_check[Arch.AARCH64][1]:
  817. continue
  818. step.setdefault("env", {})["CI_WAITING_FOR_BUILD"] = step["depends_on"]
  819. del step["depends_on"]
  820. def move_build_to_bazel_lto(pipeline: Any, pipeline_name: str) -> None:
  821. if pipeline_name != "test":
  822. return
  823. if not os.environ["BUILDKITE_TAG"] and not ui.env_is_truthy("CI_RELEASE_LTO_BUILD"):
  824. return
  825. pipeline.setdefault("env", {})["CI_BAZEL_BUILD"] = 1
  826. pipeline["env"]["CI_BAZEL_LTO"] = 1
  827. for step in steps(pipeline):
  828. if step.get("id") == "build-x86_64":
  829. step["label"] = ":bazel: Build x86_64"
  830. step["agents"]["queue"] = "builder-linux-x86_64"
  831. elif step.get("id") == "build-aarch64":
  832. step["label"] = ":bazel: Build aarch64"
  833. step["agents"]["queue"] = "builder-linux-aarch64-mem"
  834. def trim_builds(
  835. pipeline: Any,
  836. hash_check: dict[Arch, tuple[str, bool]],
  837. ) -> None:
  838. """Trim unnecessary x86-64/aarch64 builds if all artifacts already exist. Also mark remaining builds with a unique concurrency group for the code state so that the same build doesn't happen multiple times."""
  839. for step in steps(pipeline):
  840. if step.get("id") == "build-x86_64":
  841. if hash_check[Arch.X86_64][1]:
  842. step["skip"] = True
  843. else:
  844. step["concurrency"] = 1
  845. step["concurrency_group"] = f"build-x86_64/{hash_check[Arch.X86_64][0]}"
  846. elif step.get("id") == "upload-debug-symbols-x86_64":
  847. if hash_check[Arch.X86_64][1]:
  848. step["skip"] = True
  849. elif step.get("id") == "build-aarch64":
  850. if hash_check[Arch.AARCH64][1]:
  851. step["skip"] = True
  852. else:
  853. step["concurrency"] = 1
  854. step["concurrency_group"] = (
  855. f"build-aarch64/{hash_check[Arch.AARCH64][0]}"
  856. )
  857. elif step.get("id") == "upload-debug-symbols-aarch64":
  858. if hash_check[Arch.AARCH64][1]:
  859. step["skip"] = True
  860. _github_changed_files: set[str] | None = None
  861. def have_paths_changed(globs: Iterable[str]) -> bool:
  862. """Reports whether the specified globs have diverged from origin/main."""
  863. global _github_changed_files
  864. try:
  865. if not _github_changed_files:
  866. head = spawn.capture(["git", "rev-parse", "HEAD"]).strip()
  867. headers = {"Accept": "application/vnd.github+json"}
  868. if token := os.getenv("GITHUB_TOKEN"):
  869. headers["Authorization"] = f"Bearer {token}"
  870. resp = requests.get(
  871. f"https://api.github.com/repos/materializeinc/materialize/compare/main...{head}",
  872. headers=headers,
  873. )
  874. resp.raise_for_status()
  875. _github_changed_files = {
  876. f["filename"] for f in resp.json().get("files", [])
  877. }
  878. for file in spawn.capture(["git", "ls-files", *globs]).splitlines():
  879. if file in _github_changed_files:
  880. return True
  881. return False
  882. except Exception as e:
  883. # Try locally if Github is down or the change has not been pushed yet when running locally
  884. print(f"Failed to get changed files from Github, running locally: {e}")
  885. # Make sure we have an up to date view of main.
  886. spawn.runv(["git", "fetch", "origin", "main"])
  887. diff = subprocess.run(
  888. ["git", "diff", "--no-patch", "--quiet", "origin/main...", "--", *globs]
  889. )
  890. if diff.returncode == 0:
  891. return False
  892. elif diff.returncode == 1:
  893. return True
  894. else:
  895. diff.check_returncode()
  896. raise RuntimeError("unreachable")
  897. def remove_mz_specific_keys(pipeline: Any) -> None:
  898. """Remove the Materialize-specific keys from the configuration that are only used to inform how to trim the pipeline and for coverage runs."""
  899. for step in steps(pipeline):
  900. if "inputs" in step:
  901. del step["inputs"]
  902. if "coverage" in step:
  903. del step["coverage"]
  904. if "sanitizer" in step:
  905. del step["sanitizer"]
  906. if (
  907. "timeout_in_minutes" not in step
  908. and "prompt" not in step
  909. and "wait" not in step
  910. and "group" not in step
  911. and "trigger" not in step
  912. and not step.get("async")
  913. ):
  914. raise UIError(
  915. f"Every step should have an explicit timeout_in_minutes value, missing in: {step}"
  916. )
  917. if __name__ == "__main__":
  918. sys.exit(main())