benchmark_executor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import pathlib
  10. import threading
  11. import time
  12. from concurrent import futures
  13. from typing import Any
  14. import pandas as pd
  15. from psycopg import Cursor
  16. from materialize.scalability.config.benchmark_config import BenchmarkConfiguration
  17. from materialize.scalability.df import df_details_cols, df_totals_cols
  18. from materialize.scalability.df.df_details import DfDetails, concat_df_details
  19. from materialize.scalability.df.df_totals import DfTotals, concat_df_totals
  20. from materialize.scalability.endpoint.endpoint import Endpoint
  21. from materialize.scalability.io import paths
  22. from materialize.scalability.operation.scalability_operation import Operation
  23. from materialize.scalability.result.comparison_outcome import ComparisonOutcome
  24. from materialize.scalability.result.result_analyzer import ResultAnalyzer
  25. from materialize.scalability.result.scalability_result import BenchmarkResult
  26. from materialize.scalability.result.workload_result import WorkloadResult
  27. from materialize.scalability.schema.schema import Schema
  28. from materialize.scalability.workload.workload import Workload, WorkloadWithContext
  29. from materialize.scalability.workload.workloads.connection_workloads import * # noqa: F401 F403
  30. from materialize.scalability.workload.workloads.ddl_workloads import * # noqa: F401 F403
  31. from materialize.scalability.workload.workloads.dml_dql_workloads import * # noqa: F401 F403
  32. from materialize.scalability.workload.workloads.self_test_workloads import * # noqa: F401 F403
  33. # number of retries in addition to the first run
  34. MAX_RETRIES_ON_REGRESSION = 2
  35. class BenchmarkExecutor:
  36. def __init__(
  37. self,
  38. config: BenchmarkConfiguration,
  39. schema: Schema,
  40. baseline_endpoint: Endpoint | None,
  41. other_endpoints: list[Endpoint],
  42. result_analyzer: ResultAnalyzer,
  43. ):
  44. self.config = config
  45. self.schema = schema
  46. self.baseline_endpoint = baseline_endpoint
  47. self.other_endpoints = other_endpoints
  48. self.result_analyzer = result_analyzer
  49. self.result = BenchmarkResult()
  50. def run_workloads(
  51. self,
  52. ) -> BenchmarkResult:
  53. for workload_cls in self.config.workload_classes:
  54. assert issubclass(
  55. workload_cls, Workload
  56. ), f"{workload_cls} is not a Workload"
  57. self.run_workload_for_all_endpoints(
  58. workload_cls,
  59. )
  60. return self.result
  61. def run_workload_for_all_endpoints(
  62. self,
  63. workload_cls: type[Workload],
  64. ):
  65. if self.baseline_endpoint is not None:
  66. baseline_result = self.run_workload_for_endpoint(
  67. self.baseline_endpoint,
  68. self.create_workload_instance(
  69. workload_cls, endpoint=self.baseline_endpoint
  70. ),
  71. )
  72. else:
  73. baseline_result = None
  74. for other_endpoint in self.other_endpoints:
  75. comparison_outcome = self.run_and_evaluate_workload_for_endpoint(
  76. workload_cls, other_endpoint, baseline_result, try_count=0
  77. )
  78. self.result.add_regression(comparison_outcome)
  79. def run_and_evaluate_workload_for_endpoint(
  80. self,
  81. workload_cls: type[Workload],
  82. other_endpoint: Endpoint,
  83. baseline_result: WorkloadResult | None,
  84. try_count: int,
  85. ) -> ComparisonOutcome | None:
  86. workload_name = workload_cls.__name__
  87. other_endpoint_result = self.run_workload_for_endpoint(
  88. other_endpoint,
  89. self.create_workload_instance(workload_cls, endpoint=other_endpoint),
  90. )
  91. if self.baseline_endpoint is None or baseline_result is None:
  92. return None
  93. outcome = self.result_analyzer.perform_comparison_in_workload(
  94. workload_name,
  95. self.baseline_endpoint,
  96. other_endpoint,
  97. baseline_result,
  98. other_endpoint_result,
  99. )
  100. if outcome.has_regressions() and try_count < MAX_RETRIES_ON_REGRESSION:
  101. print(
  102. f"Potential regression in workload {workload_name} at endpoint {other_endpoint},"
  103. f" triggering retry {try_count + 1} of {MAX_RETRIES_ON_REGRESSION}"
  104. )
  105. return self.run_and_evaluate_workload_for_endpoint(
  106. workload_cls, other_endpoint, baseline_result, try_count=try_count + 1
  107. )
  108. return outcome
  109. def run_workload_for_endpoint(
  110. self,
  111. endpoint: Endpoint,
  112. workload: Workload,
  113. ) -> WorkloadResult:
  114. print(f"--- Running workload {workload.name()} on {endpoint}")
  115. self.result.record_workload_metadata(workload)
  116. df_totals = DfTotals()
  117. df_details = DfDetails()
  118. concurrencies = self._get_concurrencies()
  119. print(f"Concurrencies: {concurrencies}")
  120. for concurrency in concurrencies:
  121. df_total, df_detail = self.run_workload_for_endpoint_with_concurrency(
  122. endpoint,
  123. workload,
  124. concurrency,
  125. self.config.get_count_for_concurrency(concurrency),
  126. )
  127. df_totals = concat_df_totals([df_totals, df_total])
  128. df_details = concat_df_details([df_details, df_detail])
  129. endpoint_version_name = endpoint.try_load_version()
  130. pathlib.Path(paths.endpoint_dir(endpoint_version_name)).mkdir(
  131. parents=True, exist_ok=True
  132. )
  133. df_totals.to_csv(
  134. paths.df_totals_csv(endpoint_version_name, workload.name())
  135. )
  136. df_details.to_csv(
  137. paths.df_details_csv(endpoint_version_name, workload.name())
  138. )
  139. result = WorkloadResult(workload, endpoint, df_totals, df_details)
  140. self._record_results(result)
  141. return result
  142. def run_workload_for_endpoint_with_concurrency(
  143. self,
  144. endpoint: Endpoint,
  145. workload: Workload,
  146. concurrency: int,
  147. count: int,
  148. ) -> tuple[DfTotals, DfDetails]:
  149. print(
  150. f"Preparing benchmark for workload '{workload.name()}' at concurrency {concurrency} ..."
  151. )
  152. endpoint.up()
  153. init_sqls = self.schema.init_sqls()
  154. init_conn = endpoint.sql_connection()
  155. init_conn.autocommit = True
  156. init_cursor = init_conn.cursor()
  157. for init_sql in init_sqls:
  158. print(init_sql)
  159. init_cursor.execute(init_sql.encode("utf8"))
  160. for init_operation in workload.init_operations():
  161. workload.execute_operation(
  162. init_operation, init_cursor, -1, -1, self.config.verbose
  163. )
  164. print(
  165. f"Creating a cursor pool with {concurrency} entries against endpoint: {endpoint.url()}"
  166. )
  167. cursor_pool = self._create_cursor_pool(concurrency, endpoint)
  168. print(
  169. f"Benchmarking workload '{workload.name()}' at concurrency {concurrency} ..."
  170. )
  171. operations = workload.operations()
  172. global next_worker_id
  173. next_worker_id = 0
  174. local = threading.local()
  175. lock = threading.Lock()
  176. start = time.time()
  177. with futures.ThreadPoolExecutor(
  178. concurrency, initializer=self.initialize_worker, initargs=(local, lock)
  179. ) as executor:
  180. measurements = executor.map(
  181. self.execute_operation,
  182. [
  183. (
  184. workload,
  185. concurrency,
  186. local,
  187. cursor_pool,
  188. operations[i % len(operations)],
  189. int(i / len(operations)),
  190. )
  191. for i in range(count)
  192. ],
  193. )
  194. wallclock_total = time.time() - start
  195. df_detail = pd.DataFrame(measurements)
  196. print("Best and worst individual measurements:")
  197. print(df_detail.sort_values(by=[df_details_cols.WALLCLOCK]))
  198. print(
  199. f"concurrency: {concurrency}; wallclock_total: {wallclock_total}; tps = {count/wallclock_total}"
  200. )
  201. df_total = pd.DataFrame(
  202. [
  203. {
  204. df_totals_cols.CONCURRENCY: concurrency,
  205. df_totals_cols.WALLCLOCK: wallclock_total,
  206. df_totals_cols.WORKLOAD: workload.name(),
  207. df_totals_cols.COUNT: count,
  208. df_totals_cols.TPS: count / wallclock_total,
  209. df_totals_cols.MEAN_TX_DURATION: df_detail[
  210. df_details_cols.WALLCLOCK
  211. ].mean(),
  212. df_totals_cols.MEDIAN_TX_DURATION: df_detail[
  213. df_details_cols.WALLCLOCK
  214. ].median(),
  215. df_totals_cols.MIN_TX_DURATION: df_detail[
  216. df_details_cols.WALLCLOCK
  217. ].min(),
  218. df_totals_cols.MAX_TX_DURATION: df_detail[
  219. df_details_cols.WALLCLOCK
  220. ].max(),
  221. }
  222. ]
  223. )
  224. return DfTotals(df_total), DfDetails(df_detail)
  225. def execute_operation(
  226. self, args: tuple[Workload, int, threading.local, list[Cursor], Operation, int]
  227. ) -> dict[str, Any]:
  228. workload, concurrency, local, cursor_pool, operation, transaction_index = args
  229. worker_id = local.worker_id
  230. assert (
  231. len(cursor_pool) >= worker_id + 1
  232. ), f"len(cursor_pool) is {len(cursor_pool)} but local.worker_id is {worker_id}"
  233. cursor = cursor_pool[worker_id]
  234. start = time.time()
  235. workload.execute_operation(
  236. operation, cursor, worker_id, transaction_index, self.config.verbose
  237. )
  238. wallclock = time.time() - start
  239. return {
  240. df_details_cols.CONCURRENCY: concurrency,
  241. df_details_cols.WALLCLOCK: wallclock,
  242. df_details_cols.OPERATION: type(operation).__name__,
  243. df_details_cols.WORKLOAD: workload.name(),
  244. df_details_cols.TRANSACTION_INDEX: transaction_index,
  245. }
  246. def create_workload_instance(
  247. self, workload_cls: type[Workload], endpoint: Endpoint
  248. ) -> Workload:
  249. workload = workload_cls()
  250. if isinstance(workload, WorkloadWithContext):
  251. workload.set_endpoint(endpoint)
  252. workload.set_schema(self.schema)
  253. return workload
  254. def initialize_worker(self, local: threading.local, lock: threading.Lock):
  255. """Give each other worker thread a unique ID"""
  256. lock.acquire()
  257. global next_worker_id
  258. local.worker_id = next_worker_id
  259. next_worker_id = next_worker_id + 1
  260. lock.release()
  261. def _get_concurrencies(self) -> list[int]:
  262. range_end = 1024 if self.config.exponent_base < 2.0 else 32
  263. concurrencies: list[int] = [
  264. round(self.config.exponent_base**c) for c in range(0, range_end)
  265. ]
  266. concurrencies = sorted(set(concurrencies))
  267. return [
  268. c
  269. for c in concurrencies
  270. if self.config.min_concurrency <= c <= self.config.max_concurrency
  271. ]
  272. def _create_cursor_pool(self, concurrency: int, endpoint: Endpoint) -> list[Cursor]:
  273. connect_sqls = self.schema.connect_sqls()
  274. cursor_pool = []
  275. for i in range(concurrency):
  276. conn = endpoint.sql_connection()
  277. conn.autocommit = True
  278. cursor = conn.cursor()
  279. for connect_sql in connect_sqls:
  280. cursor.execute(connect_sql.encode("utf8"))
  281. cursor_pool.append(cursor)
  282. return cursor_pool
  283. def _record_results(self, result: WorkloadResult) -> None:
  284. endpoint_version_info = result.endpoint.try_load_version()
  285. print(
  286. f"Collecting results of endpoint {result.endpoint} with name {endpoint_version_info}"
  287. )
  288. self.result.append_workload_result(endpoint_version_info, result)