df_totals.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from __future__ import annotations
  10. from typing import TypeVar
  11. import pandas as pd
  12. from materialize.scalability.df import df_totals_cols, df_totals_ext_cols
  13. from materialize.scalability.df.df_wrapper_base import (
  14. DfWrapperBase,
  15. concat_df_wrapper_data,
  16. )
  17. from materialize.scalability.endpoint.endpoint import Endpoint
  18. from materialize.scalability.result.scalability_change import ScalabilityChange
  19. SCALABILITY_CHANGE_TYPE = TypeVar("SCALABILITY_CHANGE_TYPE", bound=ScalabilityChange)
  20. class DfTotalsBase(DfWrapperBase):
  21. """Wrapper base for totals data frame."""
  22. def __init__(self, data: pd.DataFrame = pd.DataFrame()):
  23. super().__init__(data)
  24. def get_max_concurrency(self) -> int:
  25. return self.data[df_totals_cols.CONCURRENCY].max()
  26. def get_concurrency_values(self) -> list[int]:
  27. return self.data[df_totals_cols.CONCURRENCY].tolist()
  28. class DfTotals(DfTotalsBase):
  29. """
  30. Wrapper for totals data frame.
  31. Columns are specified in df_totals_cols.
  32. """
  33. def __init__(self, data: pd.DataFrame = pd.DataFrame()):
  34. super().__init__(data)
  35. def get_tps_values(self) -> list[int]:
  36. return self.data[df_totals_cols.TPS].tolist()
  37. def merge(self, other: DfTotals) -> DfTotalsMerged:
  38. merge_columns = [
  39. df_totals_cols.COUNT,
  40. df_totals_cols.CONCURRENCY,
  41. df_totals_cols.WORKLOAD,
  42. ]
  43. columns_to_keep = merge_columns + [df_totals_cols.TPS]
  44. tps_per_endpoint = self.data[columns_to_keep].merge(
  45. other.data[columns_to_keep], on=merge_columns
  46. )
  47. tps_per_endpoint.rename(
  48. columns={
  49. f"{df_totals_cols.TPS}_x": df_totals_ext_cols.TPS_BASELINE,
  50. f"{df_totals_cols.TPS}_y": df_totals_ext_cols.TPS_OTHER,
  51. },
  52. inplace=True,
  53. )
  54. return DfTotalsMerged(tps_per_endpoint)
  55. def concat_df_totals(entries: list[DfTotals]) -> DfTotals:
  56. return DfTotals(concat_df_wrapper_data(entries))
  57. class DfTotalsMerged(DfTotalsBase):
  58. """
  59. Wrapper for two totals data frame of different endpoints that were merged.
  60. It is an intermediate representation and not intended to be used in evaluations and plots.
  61. """
  62. def __init__(self, data: pd.DataFrame = pd.DataFrame()):
  63. super().__init__(data)
  64. def to_enriched_result_frame(
  65. self,
  66. baseline_version_name: str,
  67. other_version_name: str,
  68. ) -> DfTotalsExtended:
  69. tps_per_endpoint = self.data
  70. tps_per_endpoint[df_totals_ext_cols.TPS_DIFF] = (
  71. tps_per_endpoint[df_totals_ext_cols.TPS_OTHER]
  72. - tps_per_endpoint[df_totals_ext_cols.TPS_BASELINE]
  73. )
  74. tps_per_endpoint[df_totals_ext_cols.TPS_DIFF_PERC] = (
  75. tps_per_endpoint[df_totals_ext_cols.TPS_DIFF]
  76. / tps_per_endpoint[df_totals_ext_cols.TPS_BASELINE]
  77. )
  78. tps_per_endpoint[df_totals_ext_cols.INFO_BASELINE] = baseline_version_name
  79. tps_per_endpoint[df_totals_ext_cols.INFO_OTHER] = other_version_name
  80. return DfTotalsExtended(tps_per_endpoint)
  81. class DfTotalsExtended(DfTotalsBase):
  82. """
  83. Wrapper for two totals data frame of different endpoints that were merged and enriched with further data.
  84. Columns are specified in df_totals_ext_cols.
  85. """
  86. def __init__(self, data: pd.DataFrame = pd.DataFrame()):
  87. super().__init__(data)
  88. def to_filtered_with_threshold(
  89. self, max_deviation: float, match_results_better_than_baseline: bool
  90. ) -> DfTotalsExtended:
  91. tps_per_endpoint = self.data
  92. filtered_data = tps_per_endpoint.loc[
  93. # keep entries exceeding the baseline by x%
  94. tps_per_endpoint[df_totals_ext_cols.TPS_DIFF_PERC]
  95. * (+1 if match_results_better_than_baseline else -1)
  96. > max_deviation
  97. ]
  98. return DfTotalsExtended(filtered_data)
  99. def to_scalability_change(
  100. self,
  101. change_type: type[SCALABILITY_CHANGE_TYPE],
  102. workload_name: str,
  103. other_endpoint: Endpoint,
  104. ) -> list[SCALABILITY_CHANGE_TYPE]:
  105. result = []
  106. for index, row in self.data.iterrows():
  107. regression = change_type(
  108. workload_name,
  109. concurrency=int(row[df_totals_ext_cols.CONCURRENCY]),
  110. count=int(row[df_totals_ext_cols.COUNT]),
  111. tps=row[df_totals_ext_cols.TPS_OTHER],
  112. tps_baseline=row[df_totals_ext_cols.TPS_BASELINE],
  113. tps_diff=row[df_totals_ext_cols.TPS_DIFF],
  114. tps_diff_percent=row[df_totals_ext_cols.TPS_DIFF_PERC],
  115. endpoint=other_endpoint,
  116. )
  117. result.append(regression)
  118. return result
  119. def concat_df_totals_extended(entries: list[DfTotalsExtended]) -> DfTotalsExtended:
  120. return DfTotalsExtended(concat_df_wrapper_data(entries))