scalability_result.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from dataclasses import dataclass
  10. from typing import TypeVar
  11. from materialize.scalability.df.df_details import DfDetails
  12. from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
  13. from materialize.scalability.result.comparison_outcome import ComparisonOutcome
  14. from materialize.scalability.result.workload_result import WorkloadResult
  15. from materialize.scalability.workload.workload import Workload
  16. from materialize.scalability.workload.workload_markers import WorkloadMarker
  17. from materialize.scalability.workload.workload_version import WorkloadVersion
  18. T = TypeVar("T")
  19. @dataclass
  20. class BenchmarkResult:
  21. overall_comparison_outcome: ComparisonOutcome
  22. df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
  23. df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]
  24. workload_version_by_name: dict[str, WorkloadVersion]
  25. workload_group_by_name: dict[str, str]
  26. def __init__(self):
  27. self.overall_comparison_outcome = ComparisonOutcome()
  28. self.df_total_by_endpoint_name_and_workload = dict()
  29. self.df_details_by_endpoint_name_and_workload = dict()
  30. self.workload_version_by_name = dict()
  31. self.workload_group_by_name = dict()
  32. def add_regression(self, comparison_outcome: ComparisonOutcome | None) -> None:
  33. if comparison_outcome is not None:
  34. self.overall_comparison_outcome.merge(comparison_outcome)
  35. def record_workload_metadata(self, workload: Workload) -> None:
  36. self.workload_version_by_name[workload.name()] = workload.version()
  37. assert isinstance(workload, WorkloadMarker)
  38. self.workload_group_by_name[workload.name()] = workload.group_name()
  39. def get_endpoint_names(self) -> list[str]:
  40. return list(self.df_total_by_endpoint_name_and_workload.keys())
  41. def append_workload_result(
  42. self, endpoint_version_info: str, result: WorkloadResult
  43. ) -> None:
  44. if (
  45. endpoint_version_info
  46. not in self.df_total_by_endpoint_name_and_workload.keys()
  47. ):
  48. self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict()
  49. self.df_details_by_endpoint_name_and_workload[endpoint_version_info] = (
  50. dict()
  51. )
  52. workload_name = result.workload.name()
  53. if (
  54. workload_name
  55. in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys()
  56. ):
  57. # Entry already exists, this happens in case of retries
  58. print(
  59. f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}"
  60. )
  61. self.df_total_by_endpoint_name_and_workload[endpoint_version_info][
  62. workload_name
  63. ] = result.df_totals
  64. self.df_details_by_endpoint_name_and_workload[endpoint_version_info][
  65. workload_name
  66. ] = result.df_details
  67. def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals:
  68. return concat_df_totals(
  69. list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values())
  70. )
  71. def get_df_total_by_workload_and_endpoint(
  72. self,
  73. ) -> dict[str, dict[str, DfTotals]]:
  74. return self._swap_endpoint_and_workload_grouping(
  75. self.df_total_by_endpoint_name_and_workload
  76. )
  77. def get_df_details_by_workload_and_endpoint(
  78. self,
  79. ) -> dict[str, dict[str, DfDetails]]:
  80. return self._swap_endpoint_and_workload_grouping(
  81. self.df_details_by_endpoint_name_and_workload
  82. )
  83. def _swap_endpoint_and_workload_grouping(
  84. self, result_by_endpoint_and_workload: dict[str, dict[str, T]]
  85. ) -> dict[str, dict[str, T]]:
  86. result: dict[str, dict[str, T]] = dict()
  87. for (
  88. endpoint_name,
  89. data_by_workload,
  90. ) in result_by_endpoint_and_workload.items():
  91. for workload_name, data in data_by_workload.items():
  92. if workload_name not in result.keys():
  93. result[workload_name] = dict()
  94. result[workload_name][endpoint_name] = data
  95. return result