123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from dataclasses import dataclass
- from typing import TypeVar
- from materialize.scalability.df.df_details import DfDetails
- from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
- from materialize.scalability.result.comparison_outcome import ComparisonOutcome
- from materialize.scalability.result.workload_result import WorkloadResult
- from materialize.scalability.workload.workload import Workload
- from materialize.scalability.workload.workload_markers import WorkloadMarker
- from materialize.scalability.workload.workload_version import WorkloadVersion
- T = TypeVar("T")
- @dataclass
- class BenchmarkResult:
- overall_comparison_outcome: ComparisonOutcome
- df_total_by_endpoint_name_and_workload: dict[str, dict[str, DfTotals]]
- df_details_by_endpoint_name_and_workload: dict[str, dict[str, DfDetails]]
- workload_version_by_name: dict[str, WorkloadVersion]
- workload_group_by_name: dict[str, str]
- def __init__(self):
- self.overall_comparison_outcome = ComparisonOutcome()
- self.df_total_by_endpoint_name_and_workload = dict()
- self.df_details_by_endpoint_name_and_workload = dict()
- self.workload_version_by_name = dict()
- self.workload_group_by_name = dict()
- def add_regression(self, comparison_outcome: ComparisonOutcome | None) -> None:
- if comparison_outcome is not None:
- self.overall_comparison_outcome.merge(comparison_outcome)
- def record_workload_metadata(self, workload: Workload) -> None:
- self.workload_version_by_name[workload.name()] = workload.version()
- assert isinstance(workload, WorkloadMarker)
- self.workload_group_by_name[workload.name()] = workload.group_name()
- def get_endpoint_names(self) -> list[str]:
- return list(self.df_total_by_endpoint_name_and_workload.keys())
- def append_workload_result(
- self, endpoint_version_info: str, result: WorkloadResult
- ) -> None:
- if (
- endpoint_version_info
- not in self.df_total_by_endpoint_name_and_workload.keys()
- ):
- self.df_total_by_endpoint_name_and_workload[endpoint_version_info] = dict()
- self.df_details_by_endpoint_name_and_workload[endpoint_version_info] = (
- dict()
- )
- workload_name = result.workload.name()
- if (
- workload_name
- in self.df_total_by_endpoint_name_and_workload[endpoint_version_info].keys()
- ):
- # Entry already exists, this happens in case of retries
- print(
- f"Replacing result entry for endpoint ({endpoint_version_info}) and workload {workload_name}"
- )
- self.df_total_by_endpoint_name_and_workload[endpoint_version_info][
- workload_name
- ] = result.df_totals
- self.df_details_by_endpoint_name_and_workload[endpoint_version_info][
- workload_name
- ] = result.df_details
- def get_df_total_by_endpoint_name(self, endpoint_name: str) -> DfTotals:
- return concat_df_totals(
- list(self.df_total_by_endpoint_name_and_workload[endpoint_name].values())
- )
- def get_df_total_by_workload_and_endpoint(
- self,
- ) -> dict[str, dict[str, DfTotals]]:
- return self._swap_endpoint_and_workload_grouping(
- self.df_total_by_endpoint_name_and_workload
- )
- def get_df_details_by_workload_and_endpoint(
- self,
- ) -> dict[str, dict[str, DfDetails]]:
- return self._swap_endpoint_and_workload_grouping(
- self.df_details_by_endpoint_name_and_workload
- )
- def _swap_endpoint_and_workload_grouping(
- self, result_by_endpoint_and_workload: dict[str, dict[str, T]]
- ) -> dict[str, dict[str, T]]:
- result: dict[str, dict[str, T]] = dict()
- for (
- endpoint_name,
- data_by_workload,
- ) in result_by_endpoint_and_workload.items():
- for workload_name, data in data_by_workload.items():
- if workload_name not in result.keys():
- result[workload_name] = dict()
- result[workload_name][endpoint_name] = data
- return result
|