mzcompose.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Benchmark for how various queries scale, compares against old Materialize versions.
  11. """
  12. import argparse
  13. import sys
  14. from pathlib import Path
  15. import pandas as pd
  16. from jupyter_core.command import main as jupyter_core_command_main
  17. from matplotlib import pyplot as plt
  18. from materialize import buildkite, git
  19. from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
  20. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  21. from materialize.mzcompose.services.balancerd import Balancerd
  22. from materialize.mzcompose.services.cockroach import Cockroach
  23. from materialize.mzcompose.services.materialized import Materialized
  24. from materialize.mzcompose.services.mz import Mz
  25. from materialize.mzcompose.services.postgres import Postgres
  26. from materialize.mzcompose.test_result import FailedTestExecutionError
  27. from materialize.scalability.config.benchmark_config import BenchmarkConfiguration
  28. from materialize.scalability.df import df_totals_cols
  29. from materialize.scalability.df.df_totals import DfTotalsExtended
  30. from materialize.scalability.endpoint.endpoint import Endpoint
  31. from materialize.scalability.endpoint.endpoints import (
  32. TARGET_HEAD,
  33. TARGET_MATERIALIZE_LOCAL,
  34. TARGET_MATERIALIZE_REMOTE,
  35. TARGET_POSTGRES,
  36. MaterializeContainer,
  37. MaterializeLocal,
  38. MaterializeRemote,
  39. PostgresContainer,
  40. endpoint_name_to_description,
  41. )
  42. from materialize.scalability.executor.benchmark_executor import BenchmarkExecutor
  43. from materialize.scalability.io import paths
  44. from materialize.scalability.plot.plot import (
  45. plot_duration_by_connections_for_workload,
  46. plot_duration_by_endpoints_for_workload,
  47. plot_tps_per_connections,
  48. )
  49. from materialize.scalability.result.comparison_outcome import ComparisonOutcome
  50. from materialize.scalability.result.regression_assessment import RegressionAssessment
  51. from materialize.scalability.result.result_analyzer import ResultAnalyzer
  52. from materialize.scalability.result.result_analyzers import DefaultResultAnalyzer
  53. from materialize.scalability.result.scalability_result import BenchmarkResult
  54. from materialize.scalability.schema.schema import Schema, TransactionIsolation
  55. from materialize.scalability.workload.workload import Workload
  56. from materialize.scalability.workload.workload_markers import WorkloadMarker
  57. from materialize.scalability.workload.workloads.connection_workloads import * # noqa: F401 F403
  58. from materialize.scalability.workload.workloads.ddl_workloads import * # noqa: F401 F403
  59. from materialize.scalability.workload.workloads.dml_dql_workloads import * # noqa: F401 F403
  60. from materialize.scalability.workload.workloads.self_test_workloads import * # noqa: F401 F403
  61. from materialize.test_analytics.config.test_analytics_db_config import (
  62. create_test_analytics_config,
  63. )
  64. from materialize.test_analytics.data.scalability_framework import (
  65. scalability_framework_result_storage,
  66. )
  67. from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
  68. from materialize.util import YesNoOnce, all_subclasses
  69. from materialize.version_ancestor_overrides import (
  70. ANCESTOR_OVERRIDES_FOR_SCALABILITY_REGRESSIONS,
  71. )
  72. from materialize.version_list import (
  73. resolve_ancestor_image_tag,
  74. )
  75. SERVICES = [
  76. Cockroach(setup_materialize=True),
  77. Materialized(
  78. image="materialize/materialized:latest",
  79. sanity_restart=False,
  80. additional_system_parameter_defaults=ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS,
  81. external_metadata_store=True,
  82. metadata_store="cockroach",
  83. ),
  84. Postgres(),
  85. Balancerd(),
  86. Mz(app_password=""),
  87. ]
  88. DEFAULT_REGRESSION_THRESHOLD = 0.2
  89. SCALABILITY_FRAMEWORK_VERSION = "1.5.0"
  90. INCLUDE_ZERO_IN_Y_AXIS = True
  91. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  92. parser.add_argument(
  93. "--target",
  94. help="Target for the benchmark: 'HEAD', 'local', 'remote', 'common-ancestor', 'Postgres', or a DockerHub tag",
  95. action="append",
  96. default=[],
  97. )
  98. parser.add_argument(
  99. "--regression-against",
  100. type=str,
  101. help="Detect regression against: 'HEAD', 'local', 'remote', 'common-ancestor', 'Postgres', or a DockerHub tag",
  102. default=None,
  103. )
  104. parser.add_argument(
  105. "--regression-threshold",
  106. type=float,
  107. help="Regression threshold (max. relative deterioration from target) as percent decimal",
  108. default=DEFAULT_REGRESSION_THRESHOLD,
  109. )
  110. parser.add_argument(
  111. "--exponent-base",
  112. type=float,
  113. help="Exponent base to use when deciding what concurrencies to test",
  114. default=2,
  115. )
  116. parser.add_argument(
  117. "--min-concurrency", type=int, help="Minimum concurrency to test", default=1
  118. )
  119. parser.add_argument(
  120. "--max-concurrency",
  121. type=int,
  122. help="Maximum concurrency to test",
  123. default=256,
  124. )
  125. parser.add_argument(
  126. "--workload",
  127. metavar="WORKLOAD",
  128. action="append",
  129. help="Workloads(s) to run.",
  130. )
  131. parser.add_argument(
  132. "--workload-group-marker",
  133. type=str,
  134. help="Workload group to run. Required if --workload is not set.",
  135. )
  136. parser.add_argument(
  137. "--count",
  138. metavar="COUNT",
  139. type=int,
  140. default=512,
  141. help="Number of individual operations to benchmark at concurrency 1 (and COUNT * SQRT(concurrency) for higher concurrencies)",
  142. )
  143. parser.add_argument(
  144. "--object-count",
  145. metavar="COUNT",
  146. type=int,
  147. default=1,
  148. help="Number of database objects",
  149. )
  150. parser.add_argument(
  151. "--create-index",
  152. default=True,
  153. action=argparse.BooleanOptionalAction,
  154. help="Execute a CREATE INDEX",
  155. )
  156. parser.add_argument(
  157. "--transaction-isolation",
  158. type=TransactionIsolation,
  159. choices=TransactionIsolation,
  160. default=None,
  161. help="SET transaction_isolation",
  162. )
  163. parser.add_argument(
  164. "--materialize-url",
  165. type=str,
  166. help="URL to connect to for remote targets",
  167. action="append",
  168. )
  169. parser.add_argument(
  170. "--use-balancerd",
  171. default=True,
  172. action=argparse.BooleanOptionalAction,
  173. help="Whether to communicate through balancerd (only applicable to Materialize containers)",
  174. )
  175. parser.add_argument(
  176. "--verbose",
  177. default=False,
  178. type=bool,
  179. action=argparse.BooleanOptionalAction,
  180. )
  181. parser.add_argument("--cluster-name", type=str, help="Cluster to SET CLUSTER to")
  182. args = parser.parse_args()
  183. regression_against_target = args.regression_against
  184. validate_and_adjust_targets(args, regression_against_target)
  185. baseline_endpoint, other_endpoints = get_baseline_and_other_endpoints(
  186. c, args, regression_against_target
  187. )
  188. workload_classes = get_workload_classes(args)
  189. print(f"Targets: {args.target}")
  190. print(f"Checking regression against: {regression_against_target}")
  191. print("Workloads:")
  192. for workload_cls in workload_classes:
  193. print(f"* {workload_cls.__name__}")
  194. print(f"Baseline: {baseline_endpoint}")
  195. print("Other endpoints:")
  196. for other_endpoint in other_endpoints:
  197. print(f"* {other_endpoint}")
  198. # fetch main branch and git tags so that their commit messages can be resolved
  199. git.fetch(remote=git.get_remote(), branch="main", include_tags=YesNoOnce.ONCE)
  200. schema = Schema(
  201. create_index=args.create_index,
  202. transaction_isolation=args.transaction_isolation,
  203. cluster_name=args.cluster_name,
  204. object_count=args.object_count,
  205. )
  206. regression_threshold = args.regression_threshold
  207. result_analyzer = create_result_analyzer(regression_threshold)
  208. workload_names = [workload_cls.__name__ for workload_cls in workload_classes]
  209. df_workloads = pd.DataFrame(data={"workload": workload_names})
  210. df_workloads.to_csv(paths.workloads_csv())
  211. config = BenchmarkConfiguration(
  212. workload_classes=workload_classes,
  213. exponent_base=args.exponent_base,
  214. min_concurrency=args.min_concurrency,
  215. max_concurrency=args.max_concurrency,
  216. count=args.count,
  217. verbose=args.verbose,
  218. )
  219. executor = BenchmarkExecutor(
  220. config, schema, baseline_endpoint, other_endpoints, result_analyzer
  221. )
  222. benchmark_result = executor.run_workloads()
  223. result_file_paths = store_results_in_files(benchmark_result)
  224. upload_results_to_buildkite(result_file_paths)
  225. create_plots(benchmark_result, baseline_endpoint)
  226. upload_plots_to_buildkite()
  227. regression_assessment = RegressionAssessment(
  228. baseline_endpoint,
  229. benchmark_result.overall_comparison_outcome,
  230. )
  231. report_regression_result(
  232. baseline_endpoint,
  233. regression_threshold,
  234. benchmark_result.overall_comparison_outcome,
  235. )
  236. report_assessment(regression_assessment)
  237. is_failure = regression_assessment.has_unjustified_regressions()
  238. upload_results_to_test_analytics(
  239. c, other_endpoints, benchmark_result, not is_failure
  240. )
  241. if is_failure:
  242. raise FailedTestExecutionError(
  243. error_summary="At least one regression occurred",
  244. errors=regression_assessment.to_failure_details(),
  245. )
  246. def validate_and_adjust_targets(
  247. args: argparse.Namespace, regression_against_target: str
  248. ) -> None:
  249. if args.materialize_url is not None and "remote" not in args.target:
  250. raise RuntimeError("--materialize_url requires --target=remote")
  251. if len(args.target) == 0:
  252. args.target = ["HEAD"]
  253. if (
  254. regression_against_target is not None
  255. and regression_against_target not in args.target
  256. ):
  257. print(f"Adding {regression_against_target} as target")
  258. args.target.append(regression_against_target)
  259. def get_baseline_and_other_endpoints(
  260. c: Composition, args: argparse.Namespace, regression_against_target: str
  261. ) -> tuple[Endpoint | None, list[Endpoint]]:
  262. use_balancerd = args.use_balancerd
  263. baseline_endpoint: Endpoint | None = None
  264. other_endpoints: list[Endpoint] = []
  265. for i, specified_target in enumerate(args.target):
  266. endpoint: Endpoint | None = None
  267. if specified_target == TARGET_MATERIALIZE_LOCAL:
  268. endpoint = MaterializeLocal()
  269. elif specified_target == TARGET_MATERIALIZE_REMOTE:
  270. endpoint = MaterializeRemote(materialize_url=args.materialize_url[i])
  271. elif specified_target == TARGET_POSTGRES:
  272. endpoint = PostgresContainer(composition=c)
  273. elif specified_target == TARGET_HEAD:
  274. endpoint = MaterializeContainer(
  275. composition=c,
  276. specified_target=specified_target,
  277. resolved_target=specified_target,
  278. use_balancerd=use_balancerd,
  279. )
  280. else:
  281. resolved_target = specified_target
  282. if specified_target == "common-ancestor":
  283. resolved_target = resolve_ancestor_image_tag(
  284. ANCESTOR_OVERRIDES_FOR_SCALABILITY_REGRESSIONS
  285. )
  286. endpoint = MaterializeContainer(
  287. composition=c,
  288. specified_target=specified_target,
  289. resolved_target=resolved_target,
  290. use_balancerd=use_balancerd,
  291. image=f"materialize/materialized:{resolved_target}",
  292. alternative_image="materialize/materialized:latest",
  293. )
  294. assert endpoint is not None
  295. if specified_target == regression_against_target:
  296. baseline_endpoint = endpoint
  297. else:
  298. other_endpoints.append(endpoint)
  299. return baseline_endpoint, other_endpoints
  300. def get_workload_classes(args: argparse.Namespace) -> list[type[Workload]]:
  301. if args.workload:
  302. workload_classes = [globals()[workload] for workload in args.workload]
  303. else:
  304. assert (
  305. args.workload_group_marker is not None
  306. ), "--workload-group-marker must be set"
  307. workload_group_marker_class: type[WorkloadMarker] = globals()[
  308. args.workload_group_marker
  309. ]
  310. workload_classes: list[type[Workload]] = [
  311. workload_cls for workload_cls in all_subclasses(workload_group_marker_class)
  312. ]
  313. assert len(workload_classes) > 0, "No workload class matched"
  314. # sort classes to ensure a stable order
  315. workload_classes.sort(key=lambda cls: cls.__name__)
  316. return workload_classes
  317. def report_regression_result(
  318. baseline_endpoint: Endpoint | None,
  319. regression_threshold: float,
  320. outcome: ComparisonOutcome,
  321. ) -> None:
  322. if baseline_endpoint is None:
  323. print("No regression detection because '--regression-against' param is not set")
  324. return
  325. baseline_desc = endpoint_name_to_description(baseline_endpoint.try_load_version())
  326. print("+++ Scalability changes")
  327. if outcome.has_scalability_changes():
  328. print(
  329. f"{'ERROR' if outcome.has_regressions() else 'INFO'}: "
  330. f"The following scalability changes were detected "
  331. f"(threshold: {regression_threshold}, baseline: {baseline_desc}):\n"
  332. f"{outcome.to_description()}"
  333. )
  334. if buildkite.is_in_buildkite():
  335. upload_regressions_to_buildkite(outcome)
  336. upload_significant_improvements_to_buildkite(outcome)
  337. else:
  338. print("No scalability changes were detected.")
  339. def report_assessment(regression_assessment: RegressionAssessment):
  340. print("+++ Assessment of regressions")
  341. if not regression_assessment.has_comparison_target():
  342. print("No comparison was performed because not baseline was specified")
  343. return
  344. assert regression_assessment.baseline_endpoint is not None
  345. if not regression_assessment.has_regressions():
  346. print("No regressions were detected")
  347. return
  348. baseline_desc = endpoint_name_to_description(
  349. regression_assessment.baseline_endpoint.try_load_version()
  350. )
  351. for (
  352. endpoint_with_regression,
  353. justification,
  354. ) in regression_assessment.endpoints_with_regressions_and_justifications.items():
  355. endpoint_desc = endpoint_name_to_description(
  356. endpoint_with_regression.try_load_version()
  357. )
  358. if justification is None:
  359. print(
  360. f"* There are regressions between baseline {baseline_desc} and endpoint {endpoint_desc} that need to be checked."
  361. )
  362. else:
  363. print(
  364. f"* Although there are regressions between baseline {baseline_desc} and endpoint {endpoint_desc},"
  365. f" they can be explained by the following commits that are marked as accepted regressions: {justification}."
  366. )
  367. def create_result_analyzer(regression_threshold: float) -> ResultAnalyzer:
  368. return DefaultResultAnalyzer(max_deviation_as_percent_decimal=regression_threshold)
  369. def create_plots(result: BenchmarkResult, baseline_endpoint: Endpoint | None) -> None:
  370. paths.plot_dir().mkdir(parents=True, exist_ok=True)
  371. for (
  372. workload_name,
  373. results_by_endpoint,
  374. ) in result.get_df_total_by_workload_and_endpoint().items():
  375. fig = plt.figure(layout="constrained", figsize=(16, 6))
  376. (subfigure) = fig.subfigures(1, 1)
  377. plot_tps_per_connections(
  378. workload_name,
  379. subfigure,
  380. results_by_endpoint,
  381. baseline_version_name=(
  382. baseline_endpoint.try_load_version() if baseline_endpoint else None
  383. ),
  384. include_zero_in_y_axis=INCLUDE_ZERO_IN_Y_AXIS,
  385. include_workload_in_title=True,
  386. )
  387. plt.savefig(paths.plot_png("tps", workload_name), bbox_inches="tight", dpi=300)
  388. plt.close()
  389. for (
  390. workload_name,
  391. results_by_endpoint,
  392. ) in result.get_df_details_by_workload_and_endpoint().items():
  393. fig = plt.figure(layout="constrained", figsize=(16, 10))
  394. (subfigure) = fig.subfigures(1, 1)
  395. plot_duration_by_connections_for_workload(
  396. workload_name,
  397. subfigure,
  398. results_by_endpoint,
  399. include_zero_in_y_axis=INCLUDE_ZERO_IN_Y_AXIS,
  400. include_workload_in_title=True,
  401. )
  402. plt.savefig(
  403. paths.plot_png("duration_by_connections", workload_name),
  404. bbox_inches="tight",
  405. dpi=300,
  406. )
  407. plt.close()
  408. fig = plt.figure(layout="constrained", figsize=(16, 10))
  409. (subfigure) = fig.subfigures(1, 1)
  410. plot_duration_by_endpoints_for_workload(
  411. workload_name,
  412. subfigure,
  413. results_by_endpoint,
  414. include_zero_in_y_axis=INCLUDE_ZERO_IN_Y_AXIS,
  415. include_workload_in_title=True,
  416. )
  417. plt.savefig(
  418. paths.plot_png("duration_by_endpoints", workload_name),
  419. bbox_inches="tight",
  420. dpi=300,
  421. )
  422. plt.close()
  423. def upload_regressions_to_buildkite(outcome: ComparisonOutcome) -> None:
  424. if outcome.has_regressions():
  425. _upload_scalability_changes_to_buildkite(
  426. outcome.regression_df, paths.regressions_csv()
  427. )
  428. def upload_significant_improvements_to_buildkite(outcome: ComparisonOutcome) -> None:
  429. if outcome.has_significant_improvements():
  430. _upload_scalability_changes_to_buildkite(
  431. outcome.significant_improvement_df, paths.significant_improvements_csv()
  432. )
  433. def _upload_scalability_changes_to_buildkite(
  434. scalability_changes: DfTotalsExtended, file_path: Path
  435. ) -> None:
  436. scalability_changes.to_csv(file_path)
  437. buildkite.upload_artifact(
  438. file_path.relative_to(paths.RESULTS_DIR),
  439. cwd=paths.RESULTS_DIR,
  440. )
  441. def store_results_in_files(result: BenchmarkResult) -> list[Path]:
  442. created_files = []
  443. for endpoint_name in result.get_endpoint_names():
  444. print(
  445. f"Writing results of {endpoint_name} to {paths.results_csv(endpoint_name)}"
  446. )
  447. df_total = result.get_df_total_by_endpoint_name(endpoint_name)
  448. file_path = paths.results_csv(endpoint_name)
  449. df_total.to_csv(file_path)
  450. created_files.append(file_path)
  451. return created_files
  452. def upload_results_to_buildkite(result_file_paths: list[Path]) -> None:
  453. if not buildkite.is_in_buildkite():
  454. return
  455. for path in result_file_paths:
  456. buildkite.upload_artifact(
  457. path.relative_to(paths.RESULTS_DIR),
  458. cwd=paths.RESULTS_DIR,
  459. )
  460. def upload_plots_to_buildkite() -> None:
  461. if not buildkite.is_in_buildkite():
  462. return
  463. buildkite.upload_artifact(
  464. f"{paths.plot_dir().relative_to(paths.RESULTS_DIR)}/*.png",
  465. cwd=paths.RESULTS_DIR,
  466. )
  467. def upload_results_to_test_analytics(
  468. c: Composition,
  469. endpoints: list[Endpoint],
  470. benchmark_result: BenchmarkResult,
  471. was_successful: bool,
  472. ) -> None:
  473. if not buildkite.is_in_buildkite():
  474. return
  475. head_target_endpoint = _get_head_target_endpoint(endpoints)
  476. if head_target_endpoint is None:
  477. print("Not uploading results because not HEAD version included in endpoints")
  478. return
  479. endpoint_version_info = head_target_endpoint.try_load_version()
  480. results_of_endpoint = benchmark_result.df_total_by_endpoint_name_and_workload[
  481. endpoint_version_info
  482. ]
  483. test_analytics = TestAnalyticsDb(create_test_analytics_config(c))
  484. test_analytics.builds.add_build_job(was_successful=was_successful)
  485. result_entries = []
  486. for workload_name, result in results_of_endpoint.items():
  487. workload_version = benchmark_result.workload_version_by_name[workload_name]
  488. workload_group = benchmark_result.workload_group_by_name[workload_name]
  489. for index, row in result.data.iterrows():
  490. result_entries.append(
  491. scalability_framework_result_storage.ScalabilityFrameworkResultEntry(
  492. workload_name=workload_name,
  493. workload_group=workload_group,
  494. workload_version=str(workload_version),
  495. concurrency=row[df_totals_cols.CONCURRENCY],
  496. count=row[df_totals_cols.COUNT],
  497. tps=row[df_totals_cols.TPS],
  498. )
  499. )
  500. test_analytics.scalability_results.add_result(
  501. framework_version=SCALABILITY_FRAMEWORK_VERSION,
  502. results=result_entries,
  503. )
  504. try:
  505. test_analytics.submit_updates()
  506. print("Uploaded results.")
  507. except Exception as e:
  508. # An error during an upload must never cause the build to fail
  509. test_analytics.on_upload_failed(e)
  510. def _get_head_target_endpoint(endpoints: list[Endpoint]) -> Endpoint | None:
  511. for endpoint in endpoints:
  512. if endpoint.specified_target() == TARGET_HEAD:
  513. return endpoint
  514. return None
  515. def workflow_lab(c: Composition) -> None:
  516. sys.argv = ["jupyter", "lab", "--no-browser"]
  517. jupyter_core_command_main()
  518. def workflow_notebook(c: Composition) -> None:
  519. sys.argv = ["jupyter", "notebook", "--no-browser"]
  520. jupyter_core_command_main()