generic_cache.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from collections.abc import Callable
  10. from dataclasses import dataclass
  11. from typing import Any
  12. from materialize import MZ_ROOT
  13. from materialize.buildkite_insights.cache.cache_constants import (
  14. FetchMode,
  15. )
  16. from materialize.buildkite_insights.util.data_io import (
  17. FilePath,
  18. exists_file,
  19. exists_file_with_recent_data,
  20. get_last_modification_date,
  21. read_results_from_file,
  22. write_results_to_file,
  23. )
  24. from materialize.util import ensure_dir_exists
  25. PATH_TO_CACHE_DIR = MZ_ROOT / "temp"
  26. @dataclass
  27. class CacheFilePath(FilePath):
  28. cache_item_type: str
  29. pipeline_slug: str
  30. params_hash: str
  31. file_extension: str = "json"
  32. def get_path_to_directory(self):
  33. return f"{PATH_TO_CACHE_DIR}/{self.cache_item_type}"
  34. def get(self) -> str:
  35. return f"{self.get_path_to_directory()}/{self.pipeline_slug}-params-{self.params_hash}.{self.file_extension}"
  36. def get_or_query_data(
  37. cache_file_path: CacheFilePath,
  38. fetch_action: Callable[[], Any],
  39. fetch_mode: FetchMode,
  40. max_allowed_cache_age_in_hours: int | None = 12,
  41. add_to_cache_if_not_present: bool = True,
  42. quiet_mode: bool = False,
  43. ) -> Any:
  44. ensure_dir_exists(cache_file_path.get_path_to_directory())
  45. no_fetch = fetch_mode == FetchMode.NEVER
  46. if no_fetch and not exists_file(cache_file_path):
  47. raise RuntimeError(f"File missing: {cache_file_path}")
  48. if fetch_mode == FetchMode.AVOID and exists_file(cache_file_path):
  49. no_fetch = True
  50. elif fetch_mode == FetchMode.AUTO and exists_file_with_recent_data(
  51. cache_file_path, max_allowed_cache_age_in_hours
  52. ):
  53. no_fetch = True
  54. if no_fetch:
  55. if not quiet_mode:
  56. last_modified_date = get_last_modification_date(cache_file_path)
  57. print(
  58. f"Using existing data: {cache_file_path} (downloaded: {last_modified_date})"
  59. )
  60. return read_results_from_file(cache_file_path, quiet_mode=quiet_mode)
  61. fetched_data = fetch_action()
  62. if add_to_cache_if_not_present:
  63. write_results_to_file(fetched_data, cache_file_path, quiet_mode=quiet_mode)
  64. return fetched_data