df_details.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from __future__ import annotations
  10. from collections.abc import Callable
  11. import pandas as pd
  12. from pandas import Series
  13. from pandas.core.groupby.generic import SeriesGroupBy
  14. from materialize.scalability.df import df_details_cols
  15. from materialize.scalability.df.df_wrapper_base import (
  16. DfWrapperBase,
  17. concat_df_wrapper_data,
  18. )
  19. class DfDetails(DfWrapperBase):
  20. """Wrapper for details data frame. Columns are specified in df_details_cols."""
  21. def __init__(self, data: pd.DataFrame = pd.DataFrame()):
  22. super().__init__(data)
  23. def to_filtered_by_concurrency(self, concurrency: int) -> DfDetails:
  24. filtered_data = self.data.loc[
  25. self.data[df_details_cols.CONCURRENCY] == concurrency
  26. ]
  27. return DfDetails(filtered_data)
  28. def get_wallclock_values(self, group_by_transaction: bool = True) -> list[float]:
  29. aggregation = lambda groupby_series: groupby_series.sum()
  30. return self._get_column_values(
  31. df_details_cols.WALLCLOCK, group_by_transaction, aggregation
  32. ).tolist()
  33. def get_concurrency_values(self, group_by_transaction: bool = True) -> list[int]:
  34. aggregation = lambda groupby_series: groupby_series.unique()
  35. return self._get_column_values(
  36. df_details_cols.CONCURRENCY, group_by_transaction, aggregation
  37. ).tolist()
  38. def get_unique_concurrency_values(self) -> list[int]:
  39. return self.data[df_details_cols.CONCURRENCY].unique().tolist()
  40. def _get_column_values(
  41. self,
  42. column_name: str,
  43. group_by_transaction: bool,
  44. aggregation: Callable[[SeriesGroupBy], Series],
  45. ) -> Series:
  46. if group_by_transaction:
  47. groupby_series = self.data.groupby(by=[df_details_cols.TRANSACTION_INDEX])[
  48. column_name
  49. ]
  50. return aggregation(groupby_series)
  51. return self.data[column_name]
  52. def concat_df_details(entries: list[DfDetails]) -> DfDetails:
  53. return DfDetails(concat_df_wrapper_data(entries))