build_step_utils.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from datetime import datetime
  10. from typing import Any
  11. from materialize import buildkite
  12. from materialize.buildkite_insights.buildkite_api.buildkite_constants import (
  13. BUILDKITE_COMPLETED_BUILD_STEP_STATES,
  14. )
  15. from materialize.buildkite_insights.data.build_step import (
  16. BuildJobOutcome,
  17. BuildStepMatcher,
  18. BuildStepOutcome,
  19. )
  20. def extract_build_step_outcomes(
  21. builds_data: list[Any],
  22. selected_build_steps: list[BuildStepMatcher],
  23. build_step_states: list[str],
  24. ) -> list[BuildStepOutcome]:
  25. result = []
  26. for build in builds_data:
  27. step_infos = _extract_build_step_data_from_build(
  28. build, selected_build_steps, build_step_states
  29. )
  30. result.extend(step_infos)
  31. return result
  32. def _extract_build_step_data_from_build(
  33. build_data: Any,
  34. selected_build_steps: list[BuildStepMatcher],
  35. build_step_states: list[str],
  36. ) -> list[BuildStepOutcome]:
  37. collected_steps = []
  38. for job in build_data["jobs"]:
  39. if not job.get("step_key"):
  40. continue
  41. if not _shall_include_build_step(job, selected_build_steps):
  42. continue
  43. build_job_state = job["state"]
  44. if len(build_step_states) > 0 and build_job_state not in build_step_states:
  45. continue
  46. id = build_data["id"]
  47. build_number = build_data["number"]
  48. commit_hash = build_data["commit"]
  49. created_at = datetime.fromisoformat(job["created_at"])
  50. build_step_key = job["step_key"]
  51. parallel_job_index = job.get("parallel_group_index")
  52. if job.get("started_at") and job.get("finished_at"):
  53. started_at = datetime.fromisoformat(job["started_at"])
  54. finished_at = datetime.fromisoformat(job["finished_at"])
  55. duration_in_min = (finished_at - started_at).total_seconds() / 60
  56. else:
  57. duration_in_min = None
  58. job_passed = build_job_state == "passed"
  59. job_completed = build_job_state in BUILDKITE_COMPLETED_BUILD_STEP_STATES
  60. exit_status = job.get("exit_status")
  61. retry_count = job.get("retries_count") or 0
  62. assert (
  63. not job_passed or duration_in_min is not None
  64. ), "Duration must be available for passed step"
  65. step_data = BuildStepOutcome(
  66. id=id,
  67. step_key=build_step_key,
  68. parallel_job_index=parallel_job_index,
  69. build_number=build_number,
  70. commit_hash=commit_hash,
  71. created_at=created_at,
  72. duration_in_min=duration_in_min,
  73. passed=job_passed,
  74. completed=job_completed,
  75. exit_status=exit_status,
  76. retry_count=retry_count,
  77. web_url_to_job=buildkite.get_job_url_from_build_url(
  78. build_data["web_url"], job["id"]
  79. ),
  80. )
  81. if retry_count == 0:
  82. collected_steps.append(step_data)
  83. else:
  84. # latest retry before other retries and original execution
  85. insertion_index = find_index_of_first_step_instance(
  86. collected_steps, build_number, build_step_key, parallel_job_index
  87. )
  88. collected_steps.insert(insertion_index, step_data)
  89. return collected_steps
  90. def find_index_of_first_step_instance(
  91. steps: list[BuildStepOutcome],
  92. build_number: int,
  93. build_step_key: str,
  94. parallel_job_index: int | None,
  95. ) -> int:
  96. index = len(steps)
  97. while index > 0:
  98. prev_index = index - 1
  99. prev_step = steps[prev_index]
  100. if (
  101. prev_step.build_number == build_number
  102. and prev_step.step_key == build_step_key
  103. and prev_step.parallel_job_index == parallel_job_index
  104. ):
  105. index = prev_index
  106. else:
  107. break
  108. return index
  109. def _shall_include_build_step(
  110. job: Any, selected_build_steps: list[BuildStepMatcher]
  111. ) -> bool:
  112. if len(selected_build_steps) == 0:
  113. return True
  114. job_step_key = job["step_key"]
  115. job_parallel_index = job.get("parallel_group_index")
  116. for build_step_matcher in selected_build_steps:
  117. if build_step_matcher.matches(job_step_key, job_parallel_index):
  118. return True
  119. return False
  120. def step_outcomes_to_job_outcomes(
  121. step_infos: list[BuildStepOutcome],
  122. ) -> list[BuildJobOutcome]:
  123. """
  124. This merges sharded executions of the same build and step.
  125. This may still produce multiple entries per step key in case of retries.
  126. """
  127. outcomes_by_build_and_step_key_and_retry: dict[str, list[BuildStepOutcome]] = dict()
  128. for step_info in step_infos:
  129. group_key = (
  130. f"{step_info.build_number}.{step_info.step_key}.{step_info.retry_count}"
  131. )
  132. outcomes_to_merge = (
  133. outcomes_by_build_and_step_key_and_retry.get(group_key) or []
  134. )
  135. outcomes_to_merge.append(step_info)
  136. outcomes_by_build_and_step_key_and_retry[group_key] = outcomes_to_merge
  137. result = []
  138. for _, outcomes_of_same_step in outcomes_by_build_and_step_key_and_retry.items():
  139. result.append(_step_outcomes_to_job_outcome(outcomes_of_same_step))
  140. return result
  141. def _step_outcomes_to_job_outcome(
  142. outcomes_of_same_step: list[BuildStepOutcome],
  143. ) -> BuildJobOutcome:
  144. any_execution = outcomes_of_same_step[0]
  145. for outcome in outcomes_of_same_step:
  146. assert outcome.build_number == any_execution.build_number
  147. assert outcome.step_key == any_execution.step_key
  148. ids = [s.id for s in outcomes_of_same_step]
  149. min_created_at = min([s.created_at for s in outcomes_of_same_step])
  150. durations = [
  151. s.duration_in_min
  152. for s in outcomes_of_same_step
  153. if s.duration_in_min is not None
  154. ]
  155. sum_duration_in_min = sum(durations) if len(durations) > 0 else None
  156. all_passed = len([1 for s in outcomes_of_same_step if not s.passed]) == 0
  157. all_completed = len([1 for s in outcomes_of_same_step if not s.completed]) == 0
  158. max_retry_count = any_execution.retry_count
  159. count_shards = len(outcomes_of_same_step)
  160. web_url_without_job_id = any_execution.web_url_to_build()
  161. return BuildJobOutcome(
  162. ids=ids,
  163. step_key=any_execution.step_key,
  164. build_number=any_execution.build_number,
  165. commit_hash=any_execution.commit_hash,
  166. created_at=min_created_at,
  167. duration_in_min=sum_duration_in_min,
  168. passed=all_passed,
  169. completed=all_completed,
  170. retry_count=max_retry_count,
  171. web_url_to_build=web_url_without_job_id,
  172. count_items=count_shards,
  173. )
  174. def extract_build_step_names_by_job_id(
  175. build_data: Any,
  176. ) -> dict[str, str]:
  177. return _extract_build_step_infos_by_job_id(build_data, "name")
  178. def extract_build_steps_by_job_id(
  179. build_data: Any,
  180. ) -> dict[str, str]:
  181. return _extract_build_step_infos_by_job_id(build_data, "step_key")
  182. def _extract_build_step_infos_by_job_id(
  183. build_data: Any, field_name: str
  184. ) -> dict[str, str]:
  185. build_job_info_by_job_id: dict[str, str] = dict()
  186. for job in build_data["jobs"]:
  187. build_job_id = job["id"]
  188. build_job_info = job.get(field_name, None)
  189. if build_job_info is not None:
  190. build_job_info_by_job_id[build_job_id] = build_job_info
  191. return build_job_info_by_job_id