123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from __future__ import annotations
- from typing import TypeVar
- import pandas as pd
- from materialize.scalability.df import df_totals_cols, df_totals_ext_cols
- from materialize.scalability.df.df_wrapper_base import (
- DfWrapperBase,
- concat_df_wrapper_data,
- )
- from materialize.scalability.endpoint.endpoint import Endpoint
- from materialize.scalability.result.scalability_change import ScalabilityChange
- SCALABILITY_CHANGE_TYPE = TypeVar("SCALABILITY_CHANGE_TYPE", bound=ScalabilityChange)
- class DfTotalsBase(DfWrapperBase):
- """Wrapper base for totals data frame."""
- def __init__(self, data: pd.DataFrame = pd.DataFrame()):
- super().__init__(data)
- def get_max_concurrency(self) -> int:
- return self.data[df_totals_cols.CONCURRENCY].max()
- def get_concurrency_values(self) -> list[int]:
- return self.data[df_totals_cols.CONCURRENCY].tolist()
- class DfTotals(DfTotalsBase):
- """
- Wrapper for totals data frame.
- Columns are specified in df_totals_cols.
- """
- def __init__(self, data: pd.DataFrame = pd.DataFrame()):
- super().__init__(data)
- def get_tps_values(self) -> list[int]:
- return self.data[df_totals_cols.TPS].tolist()
- def merge(self, other: DfTotals) -> DfTotalsMerged:
- merge_columns = [
- df_totals_cols.COUNT,
- df_totals_cols.CONCURRENCY,
- df_totals_cols.WORKLOAD,
- ]
- columns_to_keep = merge_columns + [df_totals_cols.TPS]
- tps_per_endpoint = self.data[columns_to_keep].merge(
- other.data[columns_to_keep], on=merge_columns
- )
- tps_per_endpoint.rename(
- columns={
- f"{df_totals_cols.TPS}_x": df_totals_ext_cols.TPS_BASELINE,
- f"{df_totals_cols.TPS}_y": df_totals_ext_cols.TPS_OTHER,
- },
- inplace=True,
- )
- return DfTotalsMerged(tps_per_endpoint)
- def concat_df_totals(entries: list[DfTotals]) -> DfTotals:
- return DfTotals(concat_df_wrapper_data(entries))
- class DfTotalsMerged(DfTotalsBase):
- """
- Wrapper for two totals data frame of different endpoints that were merged.
- It is an intermediate representation and not intended to be used in evaluations and plots.
- """
- def __init__(self, data: pd.DataFrame = pd.DataFrame()):
- super().__init__(data)
- def to_enriched_result_frame(
- self,
- baseline_version_name: str,
- other_version_name: str,
- ) -> DfTotalsExtended:
- tps_per_endpoint = self.data
- tps_per_endpoint[df_totals_ext_cols.TPS_DIFF] = (
- tps_per_endpoint[df_totals_ext_cols.TPS_OTHER]
- - tps_per_endpoint[df_totals_ext_cols.TPS_BASELINE]
- )
- tps_per_endpoint[df_totals_ext_cols.TPS_DIFF_PERC] = (
- tps_per_endpoint[df_totals_ext_cols.TPS_DIFF]
- / tps_per_endpoint[df_totals_ext_cols.TPS_BASELINE]
- )
- tps_per_endpoint[df_totals_ext_cols.INFO_BASELINE] = baseline_version_name
- tps_per_endpoint[df_totals_ext_cols.INFO_OTHER] = other_version_name
- return DfTotalsExtended(tps_per_endpoint)
- class DfTotalsExtended(DfTotalsBase):
- """
- Wrapper for two totals data frame of different endpoints that were merged and enriched with further data.
- Columns are specified in df_totals_ext_cols.
- """
- def __init__(self, data: pd.DataFrame = pd.DataFrame()):
- super().__init__(data)
- def to_filtered_with_threshold(
- self, max_deviation: float, match_results_better_than_baseline: bool
- ) -> DfTotalsExtended:
- tps_per_endpoint = self.data
- filtered_data = tps_per_endpoint.loc[
- # keep entries exceeding the baseline by x%
- tps_per_endpoint[df_totals_ext_cols.TPS_DIFF_PERC]
- * (+1 if match_results_better_than_baseline else -1)
- > max_deviation
- ]
- return DfTotalsExtended(filtered_data)
- def to_scalability_change(
- self,
- change_type: type[SCALABILITY_CHANGE_TYPE],
- workload_name: str,
- other_endpoint: Endpoint,
- ) -> list[SCALABILITY_CHANGE_TYPE]:
- result = []
- for index, row in self.data.iterrows():
- regression = change_type(
- workload_name,
- concurrency=int(row[df_totals_ext_cols.CONCURRENCY]),
- count=int(row[df_totals_ext_cols.COUNT]),
- tps=row[df_totals_ext_cols.TPS_OTHER],
- tps_baseline=row[df_totals_ext_cols.TPS_BASELINE],
- tps_diff=row[df_totals_ext_cols.TPS_DIFF],
- tps_diff_percent=row[df_totals_ext_cols.TPS_DIFF_PERC],
- endpoint=other_endpoint,
- )
- result.append(regression)
- return result
- def concat_df_totals_extended(entries: list[DfTotalsExtended]) -> DfTotalsExtended:
- return DfTotalsExtended(concat_df_wrapper_data(entries))
|