benchmark.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import time
  10. from materialize.feature_benchmark.aggregation import Aggregation
  11. from materialize.feature_benchmark.executor import Executor
  12. from materialize.feature_benchmark.filter import Filter
  13. from materialize.feature_benchmark.measurement import (
  14. Measurement,
  15. MeasurementType,
  16. MeasurementUnit,
  17. WallclockDuration,
  18. )
  19. from materialize.feature_benchmark.measurement_source import MeasurementSource
  20. from materialize.feature_benchmark.scenario import Scenario
  21. from materialize.feature_benchmark.termination import TerminationCondition
  22. from materialize.mz_version import MzVersion
  23. class Benchmark:
  24. def __init__(
  25. self,
  26. mz_id: int,
  27. mz_version: MzVersion,
  28. scenario_cls: type[Scenario],
  29. executor: Executor,
  30. filter: Filter,
  31. termination_conditions: list[TerminationCondition],
  32. aggregation_class: type[Aggregation],
  33. default_size: int,
  34. seed: int,
  35. scale: str | None = None,
  36. measure_memory: bool = True,
  37. ) -> None:
  38. self._scale = scale
  39. self._mz_id = mz_id
  40. self._mz_version = mz_version
  41. self._scenario_cls = scenario_cls
  42. self._executor = executor
  43. self._filter = filter
  44. self._termination_conditions = termination_conditions
  45. self._performance_aggregation = aggregation_class()
  46. self._default_size = default_size
  47. self._seed = seed
  48. if measure_memory:
  49. self._memory_mz_aggregation = aggregation_class()
  50. self._memory_clusterd_aggregation = aggregation_class()
  51. def create_scenario_instance(self) -> Scenario:
  52. scale = self._scenario_cls.SCALE
  53. if self._scale and not self._scenario_cls.FIXED_SCALE:
  54. if self._scale.startswith("+"):
  55. scale = scale + float(self._scale.lstrip("+"))
  56. elif self._scale.startswith("-"):
  57. scale = scale - float(self._scale.lstrip("-"))
  58. elif float(self._scale) > 0:
  59. scale = float(self._scale)
  60. scenario_class = self._scenario_cls
  61. return scenario_class(
  62. scale=scale,
  63. mz_version=self._mz_version,
  64. default_size=self._default_size,
  65. seed=self._seed,
  66. )
  67. def run(self) -> list[Aggregation]:
  68. scenario = self.create_scenario_instance()
  69. print(
  70. f"--- Running scenario {scenario.name()}, scale = {scenario.scale()}, N = {scenario.n()}"
  71. )
  72. start_time = time.time()
  73. # Run the shared() section once for both Mzs under measurement
  74. self.run_shared(scenario)
  75. # Run the init() section once for each Mz
  76. self.run_init(scenario)
  77. i = 0
  78. while True:
  79. # Run the before() section once for each measurement
  80. self.run_before(scenario)
  81. performance_measurement = self.run_measurement(scenario, i)
  82. if self.shall_terminate(performance_measurement):
  83. duration = time.time() - start_time
  84. print(
  85. f"Scenario {scenario.name()}, scale = {scenario.scale()}, N = {scenario.n()} took {duration:.0f}s to run"
  86. )
  87. return [
  88. self._performance_aggregation,
  89. self._memory_mz_aggregation,
  90. self._memory_clusterd_aggregation,
  91. ]
  92. i = i + 1
  93. def run_shared(self, scenario: Scenario) -> None:
  94. shared = scenario.shared()
  95. if self._mz_id == 0 and shared is not None:
  96. print(
  97. f"Running the shared() section for scenario {scenario.name()} with {self._mz_version} ..."
  98. )
  99. for shared_item in shared if isinstance(shared, list) else [shared]:
  100. shared_item.run(executor=self._executor)
  101. print("shared() done")
  102. def run_init(self, scenario: Scenario) -> None:
  103. init = scenario.init()
  104. if init is not None:
  105. print(
  106. f"Running the init() section for scenario {scenario.name()} with {self._mz_version} ..."
  107. )
  108. for init_item in init if isinstance(init, list) else [init]:
  109. init_item.run(executor=self._executor)
  110. print("init() done")
  111. def run_before(self, scenario: Scenario) -> None:
  112. print(
  113. f"Running the before() section for scenario {scenario.name()} with {self._mz_version} ..."
  114. )
  115. before = scenario.before()
  116. if before is not None:
  117. for before_item in before if isinstance(before, list) else [before]:
  118. before_item.run(executor=self._executor)
  119. def shall_terminate(self, performance_measurement: Measurement) -> bool:
  120. for termination_condition in self._termination_conditions:
  121. if termination_condition.terminate(performance_measurement):
  122. return True
  123. return False
  124. def run_measurement(self, scenario: Scenario, i: int) -> Measurement:
  125. print(
  126. f"Running the benchmark for scenario {scenario.name()} with {self._mz_version} ..."
  127. )
  128. # Collect timestamps from any part of the workload being benchmarked
  129. timestamps: list[WallclockDuration] = []
  130. benchmark = scenario.benchmark()
  131. for benchmark_item in benchmark if isinstance(benchmark, list) else [benchmark]:
  132. assert isinstance(
  133. benchmark_item, MeasurementSource
  134. ), f"Benchmark item is of type {benchmark_item.__class__} but not a MeasurementSource"
  135. item_timestamps = benchmark_item.run(executor=self._executor)
  136. timestamps.extend(item_timestamps)
  137. self._validate_measurement_timestamps(scenario.name(), timestamps)
  138. performance_measurement = Measurement(
  139. type=MeasurementType.WALLCLOCK,
  140. value=timestamps[1].duration - timestamps[0].duration,
  141. unit=timestamps[0].unit,
  142. notes=f"Unit: {timestamps[0].unit}",
  143. )
  144. self._collect_performance_measurement(i, performance_measurement)
  145. if self._memory_mz_aggregation:
  146. self._collect_memory_measurement(
  147. i, MeasurementType.MEMORY_MZ, self._memory_mz_aggregation
  148. )
  149. if self._memory_clusterd_aggregation:
  150. self._collect_memory_measurement(
  151. i, MeasurementType.MEMORY_CLUSTERD, self._memory_clusterd_aggregation
  152. )
  153. return performance_measurement
  154. def _validate_measurement_timestamps(
  155. self, scenario_name: str, timestamps: list[WallclockDuration]
  156. ) -> None:
  157. assert (
  158. len(timestamps) == 2
  159. ), f"benchmark() did not return exactly 2 timestamps: scenario: {scenario_name}, timestamps: {timestamps}"
  160. assert (
  161. timestamps[0].unit == timestamps[1].unit
  162. ), f"benchmark() returned timestamps with different units: scenario: {scenario_name}, timestamps: {timestamps}"
  163. assert timestamps[1].is_equal_or_after(
  164. timestamps[0]
  165. ), f"Second timestamp reported not greater than first: scenario: {scenario_name}, timestamps: {timestamps}"
  166. def _collect_performance_measurement(
  167. self, i: int, performance_measurement: Measurement
  168. ) -> None:
  169. if not self._filter or not self._filter.filter(performance_measurement):
  170. print(f"{i} {performance_measurement}")
  171. self._performance_aggregation.append_measurement(performance_measurement)
  172. def _collect_memory_measurement(
  173. self, i: int, memory_measurement_type: MeasurementType, aggregation: Aggregation
  174. ) -> None:
  175. if memory_measurement_type == MeasurementType.MEMORY_MZ:
  176. value = self._executor.DockerMemMz()
  177. elif memory_measurement_type == MeasurementType.MEMORY_CLUSTERD:
  178. value = self._executor.DockerMemClusterd()
  179. else:
  180. raise ValueError(f"Unknown measurement type {memory_measurement_type}")
  181. memory_measurement = Measurement(
  182. type=memory_measurement_type,
  183. value=value / 2**20, # Convert to Mb
  184. unit=MeasurementUnit.MEGABYTE,
  185. )
  186. if memory_measurement.value > 0:
  187. if not self._filter or not self._filter.filter(memory_measurement):
  188. print(f"{i} {memory_measurement}")
  189. aggregation.append_measurement(memory_measurement)