123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from datetime import datetime
- from typing import Any
- from materialize import buildkite
- from materialize.buildkite_insights.buildkite_api.buildkite_constants import (
- BUILDKITE_COMPLETED_BUILD_STEP_STATES,
- )
- from materialize.buildkite_insights.data.build_step import (
- BuildJobOutcome,
- BuildStepMatcher,
- BuildStepOutcome,
- )
- def extract_build_step_outcomes(
- builds_data: list[Any],
- selected_build_steps: list[BuildStepMatcher],
- build_step_states: list[str],
- ) -> list[BuildStepOutcome]:
- result = []
- for build in builds_data:
- step_infos = _extract_build_step_data_from_build(
- build, selected_build_steps, build_step_states
- )
- result.extend(step_infos)
- return result
- def _extract_build_step_data_from_build(
- build_data: Any,
- selected_build_steps: list[BuildStepMatcher],
- build_step_states: list[str],
- ) -> list[BuildStepOutcome]:
- collected_steps = []
- for job in build_data["jobs"]:
- if not job.get("step_key"):
- continue
- if not _shall_include_build_step(job, selected_build_steps):
- continue
- build_job_state = job["state"]
- if len(build_step_states) > 0 and build_job_state not in build_step_states:
- continue
- id = build_data["id"]
- build_number = build_data["number"]
- commit_hash = build_data["commit"]
- created_at = datetime.fromisoformat(job["created_at"])
- build_step_key = job["step_key"]
- parallel_job_index = job.get("parallel_group_index")
- if job.get("started_at") and job.get("finished_at"):
- started_at = datetime.fromisoformat(job["started_at"])
- finished_at = datetime.fromisoformat(job["finished_at"])
- duration_in_min = (finished_at - started_at).total_seconds() / 60
- else:
- duration_in_min = None
- job_passed = build_job_state == "passed"
- job_completed = build_job_state in BUILDKITE_COMPLETED_BUILD_STEP_STATES
- exit_status = job.get("exit_status")
- retry_count = job.get("retries_count") or 0
- assert (
- not job_passed or duration_in_min is not None
- ), "Duration must be available for passed step"
- step_data = BuildStepOutcome(
- id=id,
- step_key=build_step_key,
- parallel_job_index=parallel_job_index,
- build_number=build_number,
- commit_hash=commit_hash,
- created_at=created_at,
- duration_in_min=duration_in_min,
- passed=job_passed,
- completed=job_completed,
- exit_status=exit_status,
- retry_count=retry_count,
- web_url_to_job=buildkite.get_job_url_from_build_url(
- build_data["web_url"], job["id"]
- ),
- )
- if retry_count == 0:
- collected_steps.append(step_data)
- else:
- # latest retry before other retries and original execution
- insertion_index = find_index_of_first_step_instance(
- collected_steps, build_number, build_step_key, parallel_job_index
- )
- collected_steps.insert(insertion_index, step_data)
- return collected_steps
- def find_index_of_first_step_instance(
- steps: list[BuildStepOutcome],
- build_number: int,
- build_step_key: str,
- parallel_job_index: int | None,
- ) -> int:
- index = len(steps)
- while index > 0:
- prev_index = index - 1
- prev_step = steps[prev_index]
- if (
- prev_step.build_number == build_number
- and prev_step.step_key == build_step_key
- and prev_step.parallel_job_index == parallel_job_index
- ):
- index = prev_index
- else:
- break
- return index
- def _shall_include_build_step(
- job: Any, selected_build_steps: list[BuildStepMatcher]
- ) -> bool:
- if len(selected_build_steps) == 0:
- return True
- job_step_key = job["step_key"]
- job_parallel_index = job.get("parallel_group_index")
- for build_step_matcher in selected_build_steps:
- if build_step_matcher.matches(job_step_key, job_parallel_index):
- return True
- return False
- def step_outcomes_to_job_outcomes(
- step_infos: list[BuildStepOutcome],
- ) -> list[BuildJobOutcome]:
- """
- This merges sharded executions of the same build and step.
- This may still produce multiple entries per step key in case of retries.
- """
- outcomes_by_build_and_step_key_and_retry: dict[str, list[BuildStepOutcome]] = dict()
- for step_info in step_infos:
- group_key = (
- f"{step_info.build_number}.{step_info.step_key}.{step_info.retry_count}"
- )
- outcomes_to_merge = (
- outcomes_by_build_and_step_key_and_retry.get(group_key) or []
- )
- outcomes_to_merge.append(step_info)
- outcomes_by_build_and_step_key_and_retry[group_key] = outcomes_to_merge
- result = []
- for _, outcomes_of_same_step in outcomes_by_build_and_step_key_and_retry.items():
- result.append(_step_outcomes_to_job_outcome(outcomes_of_same_step))
- return result
- def _step_outcomes_to_job_outcome(
- outcomes_of_same_step: list[BuildStepOutcome],
- ) -> BuildJobOutcome:
- any_execution = outcomes_of_same_step[0]
- for outcome in outcomes_of_same_step:
- assert outcome.build_number == any_execution.build_number
- assert outcome.step_key == any_execution.step_key
- ids = [s.id for s in outcomes_of_same_step]
- min_created_at = min([s.created_at for s in outcomes_of_same_step])
- durations = [
- s.duration_in_min
- for s in outcomes_of_same_step
- if s.duration_in_min is not None
- ]
- sum_duration_in_min = sum(durations) if len(durations) > 0 else None
- all_passed = len([1 for s in outcomes_of_same_step if not s.passed]) == 0
- all_completed = len([1 for s in outcomes_of_same_step if not s.completed]) == 0
- max_retry_count = any_execution.retry_count
- count_shards = len(outcomes_of_same_step)
- web_url_without_job_id = any_execution.web_url_to_build()
- return BuildJobOutcome(
- ids=ids,
- step_key=any_execution.step_key,
- build_number=any_execution.build_number,
- commit_hash=any_execution.commit_hash,
- created_at=min_created_at,
- duration_in_min=sum_duration_in_min,
- passed=all_passed,
- completed=all_completed,
- retry_count=max_retry_count,
- web_url_to_build=web_url_without_job_id,
- count_items=count_shards,
- )
- def extract_build_step_names_by_job_id(
- build_data: Any,
- ) -> dict[str, str]:
- return _extract_build_step_infos_by_job_id(build_data, "name")
- def extract_build_steps_by_job_id(
- build_data: Any,
- ) -> dict[str, str]:
- return _extract_build_step_infos_by_job_id(build_data, "step_key")
- def _extract_build_step_infos_by_job_id(
- build_data: Any, field_name: str
- ) -> dict[str, str]:
- build_job_info_by_job_id: dict[str, str] = dict()
- for job in build_data["jobs"]:
- build_job_id = job["id"]
- build_job_info = job.get(field_name, None)
- if build_job_info is not None:
- build_job_info_by_job_id[build_job_id] = build_job_info
- return build_job_info_by_job_id
|