builds_api.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from typing import Any
  10. from materialize.buildkite_insights.buildkite_api import generic_api
  11. def get_single_build(pipeline_slug: str, build_number: int) -> list[Any]:
  12. request_path = (
  13. f"organizations/materialize/pipelines/{pipeline_slug}/builds/{build_number}"
  14. )
  15. return generic_api.get(request_path, dict())
  16. def get_builds(
  17. pipeline_slug: str,
  18. max_fetches: int | None,
  19. branch: str | None,
  20. build_states: list[str] | None,
  21. items_per_page: int = 100,
  22. include_retries: bool = True,
  23. first_page: int = 1,
  24. ) -> list[Any]:
  25. request_path = f"organizations/materialize/pipelines/{pipeline_slug}/builds"
  26. params = _get_params(
  27. branch=branch,
  28. build_states=build_states,
  29. items_per_page=items_per_page,
  30. include_retries=include_retries,
  31. )
  32. return generic_api.get_multiple(
  33. request_path, params, max_fetches=max_fetches, first_page=first_page
  34. )
  35. def get_builds_of_all_pipelines(
  36. max_fetches: int | None,
  37. branch: str | None,
  38. build_states: list[str] | None = None,
  39. items_per_page: int = 100,
  40. include_retries: bool = True,
  41. first_page: int = 1,
  42. ) -> list[Any]:
  43. params = _get_params(
  44. branch=branch,
  45. build_states=build_states,
  46. items_per_page=items_per_page,
  47. include_retries=include_retries,
  48. )
  49. return generic_api.get_multiple(
  50. "organizations/materialize/builds",
  51. params,
  52. max_fetches=max_fetches,
  53. first_page=first_page,
  54. )
  55. def get_url_to_build(pipeline_slug: str, build_number: int, job_id: str | None) -> str:
  56. job_id_anchor = f"#{job_id}" if job_id is not None else ""
  57. return f"https://buildkite.com/materialize/{pipeline_slug}/builds/{build_number}{job_id_anchor}"
  58. def _get_params(
  59. branch: str | None,
  60. build_states: list[str] | None,
  61. items_per_page: int = 100,
  62. include_retries: bool = True,
  63. ) -> dict[str, Any]:
  64. params: dict[str, Any] = {
  65. "include_retried_jobs": str(include_retries).lower(),
  66. "per_page": str(items_per_page),
  67. }
  68. if branch is not None:
  69. params["branch"] = branch
  70. if build_states is not None and len(build_states) > 0:
  71. params["state[]"] = build_states
  72. return params