benchmark_result_selection.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from __future__ import annotations
  10. from statistics import median
  11. from materialize.feature_benchmark.measurement import MeasurementType
  12. from materialize.feature_benchmark.report import Report
  13. class BenchmarkResultSelectorBase:
  14. def choose_report_per_scenario(
  15. self,
  16. reports: list[Report],
  17. ) -> dict[str, Report]:
  18. assert len(reports) > 0, "No reports"
  19. all_scenario_names = reports[0].get_scenario_names()
  20. result = dict()
  21. for scenario_name in all_scenario_names:
  22. result[scenario_name] = self.choose_report_of_single_scenario(
  23. reports, scenario_name
  24. )
  25. return result
  26. def choose_report_of_single_scenario(
  27. self, reports: list[Report], scenario_name: str
  28. ) -> Report:
  29. selectable_reports_by_wallclock_value: dict[float, list[Report]] = dict()
  30. available_wallclock_values = []
  31. for report in reports:
  32. scenario_result = report.get_scenario_result_by_name(scenario_name)
  33. metric_value = scenario_result.get_metric_by_measurement_type(
  34. MeasurementType.WALLCLOCK
  35. )
  36. if metric_value is None:
  37. continue
  38. wallclock_value = metric_value.this()
  39. if wallclock_value is None:
  40. continue
  41. reports_of_wallclock_value = selectable_reports_by_wallclock_value.get(
  42. wallclock_value, []
  43. )
  44. selectable_reports_by_wallclock_value[wallclock_value] = (
  45. reports_of_wallclock_value
  46. )
  47. reports_of_wallclock_value.append(report)
  48. # store wallclock values separately not to lose identical values
  49. available_wallclock_values.append(wallclock_value)
  50. if len(selectable_reports_by_wallclock_value) == 0:
  51. # pick the first report in this case
  52. return reports[0]
  53. return self._select_report_of_single_scenario(
  54. scenario_name,
  55. selectable_reports_by_wallclock_value,
  56. available_wallclock_values,
  57. )
  58. def _select_report_of_single_scenario(
  59. self,
  60. scenario_name: str,
  61. selectable_reports_by_wallclock_value: dict[float, list[Report]],
  62. available_wallclock_values: list[float],
  63. ) -> Report:
  64. raise NotImplementedError
  65. class MedianBenchmarkResultSelector(BenchmarkResultSelectorBase):
  66. """Chooses the report with the median wallclock value for each scenario"""
  67. def _select_report_of_single_scenario(
  68. self,
  69. scenario_name: str,
  70. selectable_reports_by_wallclock_value: dict[float, list[Report]],
  71. available_wallclock_values: list[float],
  72. ) -> Report:
  73. if len(available_wallclock_values) % 2 == 0:
  74. # in case of an even number of selectable reports, add zero to the values to get an existing value when computing the median
  75. available_wallclock_values.append(0)
  76. median_wallclock_value = median(available_wallclock_values)
  77. assert (
  78. median_wallclock_value in selectable_reports_by_wallclock_value.keys()
  79. ), f"Chosen median is {median_wallclock_value} but available values are {available_wallclock_values}"
  80. selected_report = selectable_reports_by_wallclock_value[median_wallclock_value][
  81. 0
  82. ]
  83. return selected_report
  84. class BestBenchmarkResultSelector(BenchmarkResultSelectorBase):
  85. """Chooses the report with the minimum wallclock value for each scenario and favors reports without regressions"""
  86. def _select_report_of_single_scenario(
  87. self,
  88. scenario_name: str,
  89. selectable_reports_by_wallclock_value: dict[float, list[Report]],
  90. available_wallclock_values: list[float],
  91. ) -> Report:
  92. best_report_with_regression: Report | None = None
  93. for wallclock_value in sorted(available_wallclock_values):
  94. reports = selectable_reports_by_wallclock_value[wallclock_value]
  95. for report in reports:
  96. if not report.has_scenario_regression(scenario_name):
  97. # this is the best report without regression (based on wallclock values)
  98. return report
  99. elif best_report_with_regression is None:
  100. best_report_with_regression = report
  101. assert best_report_with_regression is not None, "No report found"
  102. return best_report_with_regression
  103. def get_discarded_reports_per_scenario(
  104. reports: list[Report], selected_report_by_scenario_name: dict[str, Report]
  105. ) -> dict[str, list[Report]]:
  106. assert len(reports) > 0, "No reports"
  107. all_scenario_names = reports[0].get_scenario_names()
  108. result = dict()
  109. for scenario_name in all_scenario_names:
  110. selected_report = selected_report_by_scenario_name[scenario_name]
  111. discarded_reports = []
  112. for report in reports:
  113. if report.cycle_number != selected_report.cycle_number:
  114. discarded_reports.append(report)
  115. result[scenario_name] = discarded_reports
  116. return result