generic_api.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import os
  10. from typing import Any
  11. import requests
  12. from requests import Response
  13. BUILDKITE_API_URL = "https://api.buildkite.com/v2"
  14. STATUS_CODE_RATE_LIMIT_EXCEEDED = 429
  15. class RateLimitExceeded(Exception):
  16. def __init__(self, partial_result: list[Any]):
  17. self.partial_result = partial_result
  18. def get(request_path: str, params: dict[str, Any], as_json: bool = True) -> Any:
  19. response = _perform_get_request(request_path, params)
  20. if as_json:
  21. return response.json()
  22. else:
  23. return response.text
  24. def get_multiple(
  25. request_path: str,
  26. params: dict[str, Any],
  27. max_fetches: int | None,
  28. first_page: int = 1,
  29. ) -> list[Any]:
  30. results = []
  31. print(f"Starting to fetch data from Buildkite: {request_path}")
  32. params["per_page"] = 100
  33. params["page"] = str(first_page)
  34. fetch_count = 0
  35. while True:
  36. try:
  37. result = get(request_path, params)
  38. except RateLimitExceeded:
  39. raise RateLimitExceeded(partial_result=results)
  40. fetch_count += 1
  41. if not result:
  42. print("No further results.")
  43. break
  44. if isinstance(result, dict) and result.get("message"):
  45. raise RuntimeError(f"Something went wrong! ({result['message']})")
  46. params["page"] = str(int(params["page"]) + 1)
  47. entry_count = len(result)
  48. created_at = result[-1]["created_at"]
  49. print(f"Fetched {entry_count} entries, created at {created_at}.")
  50. results.extend(result)
  51. if max_fetches is not None and fetch_count >= max_fetches:
  52. print("Max fetches reached.")
  53. break
  54. return results
  55. def get_and_download_to_file(
  56. request_path: str, params: dict[str, Any], file_path: str
  57. ) -> Any:
  58. response = _perform_get_request(request_path, params)
  59. with open(file_path, "wb") as f:
  60. f.write(response.content)
  61. def _perform_get_request(request_path: str, params: dict[str, Any]) -> Response:
  62. headers = {}
  63. token = os.getenv("BUILDKITE_CI_API_KEY") or os.getenv("BUILDKITE_TOKEN")
  64. if token is not None and len(token) > 0:
  65. headers["Authorization"] = f"Bearer {token}"
  66. else:
  67. print("Authentication token is not specified or empty!")
  68. url = f"{BUILDKITE_API_URL}/{request_path}"
  69. response = requests.get(headers=headers, url=url, params=params)
  70. if response.status_code == STATUS_CODE_RATE_LIMIT_EXCEEDED:
  71. raise RateLimitExceeded([])
  72. if response.status_code != 200:
  73. raise RuntimeError(
  74. f"Status code for request {request_path} was {response.status_code}"
  75. )
  76. return response