mzcompose.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Simple benchmark of mostly individual queries using testdrive. Can find
  11. wallclock/memorys regressions in single-connection query executions, not
  12. suitable for concurrency.
  13. """
  14. import argparse
  15. import os
  16. import sys
  17. import time
  18. import uuid
  19. from textwrap import dedent
  20. from materialize import buildkite
  21. from materialize.docker import is_image_tag_of_release_version
  22. from materialize.feature_benchmark.benchmark_result_evaluator import (
  23. BenchmarkResultEvaluator,
  24. )
  25. from materialize.feature_benchmark.benchmark_result_selection import (
  26. BestBenchmarkResultSelector,
  27. get_discarded_reports_per_scenario,
  28. )
  29. from materialize.feature_benchmark.report import (
  30. Report,
  31. determine_scenario_classes_with_regressions,
  32. )
  33. from materialize.mz_version import MzVersion
  34. from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
  35. from materialize.mzcompose.services.mysql import MySql
  36. from materialize.mzcompose.test_result import (
  37. FailedTestExecutionError,
  38. TestFailureDetails,
  39. )
  40. from materialize.test_analytics.config.test_analytics_db_config import (
  41. create_test_analytics_config,
  42. )
  43. from materialize.test_analytics.data.feature_benchmark import (
  44. feature_benchmark_result_storage,
  45. )
  46. from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
  47. from materialize.version_ancestor_overrides import (
  48. get_ancestor_overrides_for_performance_regressions,
  49. )
  50. from materialize.version_list import (
  51. get_commits_of_accepted_regressions_between_versions,
  52. get_latest_published_version,
  53. resolve_ancestor_image_tag,
  54. )
  55. # mzcompose may start this script from the root of the Mz repository,
  56. # so we need to explicitly add this directory to the Python module search path
  57. sys.path.append(os.path.dirname(__file__))
  58. from materialize.feature_benchmark.aggregation import Aggregation, MinAggregation
  59. from materialize.feature_benchmark.benchmark import Benchmark
  60. from materialize.feature_benchmark.benchmark_result import (
  61. BenchmarkScenarioResult,
  62. )
  63. from materialize.feature_benchmark.executor import Docker
  64. from materialize.feature_benchmark.filter import Filter, FilterFirst, NoFilter
  65. from materialize.feature_benchmark.measurement import MeasurementType
  66. from materialize.feature_benchmark.scenarios.benchmark_main import * # noqa: F401 F403
  67. from materialize.feature_benchmark.scenarios.benchmark_main import (
  68. Scenario,
  69. )
  70. from materialize.feature_benchmark.scenarios.concurrency import * # noqa: F401 F403
  71. from materialize.feature_benchmark.scenarios.customer import * # noqa: F401 F403
  72. from materialize.feature_benchmark.scenarios.optbench import * # noqa: F401 F403
  73. from materialize.feature_benchmark.scenarios.scale import * # noqa: F401 F403
  74. from materialize.feature_benchmark.scenarios.skew import * # noqa: F401 F403
  75. from materialize.feature_benchmark.scenarios.subscribe import * # noqa: F401 F403
  76. from materialize.feature_benchmark.termination import (
  77. NormalDistributionOverlap,
  78. ProbForMin,
  79. RunAtMost,
  80. TerminationCondition,
  81. )
  82. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  83. from materialize.mzcompose.services.azurite import Azurite
  84. from materialize.mzcompose.services.balancerd import Balancerd
  85. from materialize.mzcompose.services.clusterd import Clusterd
  86. from materialize.mzcompose.services.cockroach import Cockroach
  87. from materialize.mzcompose.services.kafka import Kafka as KafkaService
  88. from materialize.mzcompose.services.kgen import Kgen as KgenService
  89. from materialize.mzcompose.services.materialized import Materialized
  90. from materialize.mzcompose.services.minio import Minio
  91. from materialize.mzcompose.services.mz import Mz
  92. from materialize.mzcompose.services.postgres import Postgres
  93. from materialize.mzcompose.services.redpanda import Redpanda
  94. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  95. from materialize.mzcompose.services.testdrive import Testdrive
  96. from materialize.mzcompose.services.zookeeper import Zookeeper
  97. from materialize.util import all_subclasses
  98. #
  99. # Global feature benchmark thresholds and termination conditions
  100. #
  101. FEATURE_BENCHMARK_FRAMEWORK_VERSION = "1.5.0"
  102. def make_filter(args: argparse.Namespace) -> Filter:
  103. # Discard the first run unless a small --max-runs limit is explicitly set
  104. if args.max_measurements <= 5:
  105. return NoFilter()
  106. else:
  107. return FilterFirst()
  108. def make_termination_conditions(args: argparse.Namespace) -> list[TerminationCondition]:
  109. return [
  110. NormalDistributionOverlap(threshold=0.95),
  111. ProbForMin(threshold=0.90),
  112. RunAtMost(threshold=args.max_measurements),
  113. ]
  114. def make_aggregation_class() -> type[Aggregation]:
  115. return MinAggregation
  116. default_timeout = "1800s"
  117. SERVICES = [
  118. Zookeeper(),
  119. KafkaService(),
  120. SchemaRegistry(),
  121. Redpanda(),
  122. Cockroach(setup_materialize=True),
  123. Minio(setup_materialize=True),
  124. Azurite(),
  125. KgenService(),
  126. Postgres(),
  127. MySql(),
  128. Balancerd(),
  129. # Overridden below
  130. Materialized(),
  131. Clusterd(),
  132. Testdrive(),
  133. Mz(app_password=""),
  134. ]
  135. def run_one_scenario(
  136. c: Composition, scenario_class: type[Scenario], args: argparse.Namespace
  137. ) -> BenchmarkScenarioResult:
  138. scenario_name = scenario_class.__name__
  139. print(f"--- Now benchmarking {scenario_name} ...")
  140. measurement_types = [MeasurementType.WALLCLOCK]
  141. if args.measure_memory:
  142. measurement_types.append(MeasurementType.MEMORY_MZ)
  143. measurement_types.append(MeasurementType.MEMORY_CLUSTERD)
  144. result = BenchmarkScenarioResult(scenario_class, measurement_types)
  145. common_seed = round(time.time())
  146. early_abort = False
  147. for mz_id, instance in enumerate(["this", "other"]):
  148. balancerd, tag, size, params = (
  149. (args.this_balancerd, args.this_tag, args.this_size, args.this_params)
  150. if instance == "this"
  151. else (
  152. args.other_balancerd,
  153. args.other_tag,
  154. args.other_size,
  155. args.other_params,
  156. )
  157. )
  158. tag = resolve_tag(tag, scenario_class, args.scale)
  159. entrypoint_host = "balancerd" if balancerd else "materialized"
  160. c.up({"name": "testdrive", "persistent": True})
  161. additional_system_parameter_defaults = (
  162. ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
  163. | {
  164. "max_clusters": "15",
  165. "enable_unorchestrated_cluster_replicas": "true",
  166. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  167. }
  168. )
  169. if params is not None:
  170. for param in params.split(";"):
  171. param_name, param_value = param.split("=")
  172. additional_system_parameter_defaults[param_name] = param_value
  173. mz_image = f"materialize/materialized:{tag}" if tag else None
  174. # TODO: Better azurite support detection
  175. mz = create_mz_service(
  176. mz_image,
  177. size,
  178. additional_system_parameter_defaults,
  179. args.azurite and instance == "this",
  180. )
  181. clusterd_image = f"materialize/clusterd:{tag}" if tag else None
  182. clusterd = create_clusterd_service(
  183. clusterd_image, size, additional_system_parameter_defaults
  184. )
  185. if tag is not None and not c.try_pull_service_image(mz):
  186. print(
  187. f"Unable to find materialize image with tag {tag}, proceeding with latest instead!"
  188. )
  189. mz_image = "materialize/materialized:latest"
  190. # TODO: Better azurite support detection
  191. mz = create_mz_service(
  192. mz_image,
  193. size,
  194. additional_system_parameter_defaults,
  195. args.azurite and instance == "this",
  196. )
  197. clusterd_image = f"materialize/clusterd:{tag}" if tag else None
  198. clusterd = create_clusterd_service(
  199. clusterd_image, size, additional_system_parameter_defaults
  200. )
  201. start_overridden_mz_clusterd_and_cockroach(c, mz, clusterd, instance)
  202. if balancerd:
  203. c.up("balancerd")
  204. with c.override(
  205. Testdrive(
  206. materialize_url=f"postgres://materialize@{entrypoint_host}:6875",
  207. default_timeout=default_timeout,
  208. materialize_params={"statement_timeout": f"'{default_timeout}'"},
  209. metadata_store="cockroach",
  210. external_blob_store=True,
  211. blob_store_is_azure=args.azurite,
  212. )
  213. ):
  214. c.testdrive(
  215. dedent(
  216. """
  217. $[version<9000] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  218. ALTER SYSTEM SET enable_unmanaged_cluster_replicas = true;
  219. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  220. CREATE CLUSTER cluster_default REPLICAS (r1 (STORAGECTL ADDRESSES ['clusterd:2100'], STORAGE ADDRESSES ['clusterd:2103'], COMPUTECTL ADDRESSES ['clusterd:2101'], COMPUTE ADDRESSES ['clusterd:2102'], WORKERS 1));
  221. ALTER SYSTEM SET cluster = cluster_default;
  222. GRANT ALL PRIVILEGES ON CLUSTER cluster_default TO materialize;"""
  223. ),
  224. )
  225. executor = Docker(
  226. composition=c, seed=common_seed, materialized=mz, clusterd=clusterd
  227. )
  228. mz_version = MzVersion.parse_mz(c.query_mz_version())
  229. benchmark = Benchmark(
  230. mz_id=mz_id,
  231. mz_version=mz_version,
  232. scenario_cls=scenario_class,
  233. scale=args.scale,
  234. executor=executor,
  235. filter=make_filter(args),
  236. termination_conditions=make_termination_conditions(args),
  237. aggregation_class=make_aggregation_class(),
  238. measure_memory=args.measure_memory,
  239. default_size=size,
  240. seed=common_seed,
  241. )
  242. if not scenario_class.can_run(mz_version):
  243. print(
  244. f"Skipping scenario {scenario_class} not supported in version {mz_version}"
  245. )
  246. early_abort = True
  247. else:
  248. aggregations = benchmark.run()
  249. scenario_version = benchmark.create_scenario_instance().version()
  250. result.set_scenario_version(scenario_version)
  251. for aggregation, metric in zip(aggregations, result.metrics):
  252. assert (
  253. aggregation.measurement_type == metric.measurement_type
  254. or aggregation.measurement_type is None
  255. ), f"Aggregation contains {aggregation.measurement_type} but metric contains {metric.measurement_type} as measurement type"
  256. metric.append_point(
  257. aggregation.aggregate(),
  258. aggregation.unit(),
  259. aggregation.name(),
  260. )
  261. c.kill("cockroach", "materialized", "clusterd", "testdrive")
  262. c.rm("cockroach", "materialized", "clusterd", "testdrive")
  263. c.rm_volumes("mzdata")
  264. if early_abort:
  265. result.empty()
  266. break
  267. return result
  268. resolved_tags: dict[tuple[str, frozenset[tuple[str, MzVersion]]], str] = {}
  269. def resolve_tag(tag: str, scenario_class: type[Scenario], scale: str | None) -> str:
  270. if tag == "common-ancestor":
  271. overrides = get_ancestor_overrides_for_performance_regressions(
  272. scenario_class, scale
  273. )
  274. key = (tag, frozenset(overrides.items()))
  275. if key not in resolved_tags:
  276. resolved_tags[key] = resolve_ancestor_image_tag(overrides)
  277. return resolved_tags[key]
  278. return tag
  279. def create_mz_service(
  280. mz_image: str | None,
  281. default_size: int,
  282. additional_system_parameter_defaults: dict[str, str] | None,
  283. azurite: bool,
  284. ) -> Materialized:
  285. return Materialized(
  286. image=mz_image,
  287. default_size=default_size,
  288. # Avoid clashes with the Kafka sink progress topic across restarts
  289. environment_id=f"local-az1-{uuid.uuid4()}-0",
  290. soft_assertions=False,
  291. additional_system_parameter_defaults=additional_system_parameter_defaults,
  292. external_metadata_store=True,
  293. metadata_store="cockroach",
  294. external_blob_store=True,
  295. blob_store_is_azure=azurite,
  296. sanity_restart=False,
  297. )
  298. def create_clusterd_service(
  299. clusterd_image: str | None,
  300. default_size: int,
  301. additional_system_parameter_defaults: dict[str, str] | None,
  302. ) -> Clusterd:
  303. return Clusterd(image=clusterd_image)
  304. def start_overridden_mz_clusterd_and_cockroach(
  305. c: Composition, mz: Materialized, clusterd: Clusterd, instance: str
  306. ) -> None:
  307. with c.override(mz, clusterd):
  308. version_request_command = c.run(
  309. "materialized",
  310. "-c",
  311. "environmentd --version | grep environmentd",
  312. entrypoint="bash",
  313. rm=True,
  314. capture=True,
  315. )
  316. version = version_request_command.stdout.strip()
  317. print(f"The version of the '{instance.upper()}' Mz instance is: {version}")
  318. c.up("cockroach", "materialized", "clusterd")
  319. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  320. """Feature benchmark framework."""
  321. c.silent = True
  322. parser.add_argument(
  323. "--redpanda",
  324. action="store_true",
  325. help="run against Redpanda instead of the Confluent Platform",
  326. )
  327. parser.add_argument(
  328. "--measure-memory",
  329. default=True,
  330. action=argparse.BooleanOptionalAction,
  331. help="Measure memory usage",
  332. )
  333. parser.add_argument(
  334. "--this-tag",
  335. metavar="TAG",
  336. type=str,
  337. default=os.getenv("THIS_TAG", None),
  338. help="'This' Materialize container tag to benchmark. If not provided, the current source will be used.",
  339. )
  340. parser.add_argument(
  341. "--this-balancerd",
  342. action=argparse.BooleanOptionalAction,
  343. default=False,
  344. help="Use balancerd for THIS",
  345. )
  346. parser.add_argument(
  347. "--this-params",
  348. metavar="PARAMS",
  349. type=str,
  350. default=os.getenv("THIS_PARAMS", None),
  351. help="Semicolon-separated list of parameter=value pairs to apply to the 'THIS' Mz instance",
  352. )
  353. parser.add_argument(
  354. "--other-tag",
  355. metavar="TAG",
  356. type=str,
  357. default=os.getenv("OTHER_TAG", str(get_latest_published_version())),
  358. help="'Other' Materialize container tag to benchmark. If not provided, the last released Mz version will be used.",
  359. )
  360. parser.add_argument(
  361. "--other-params",
  362. metavar="PARAMS",
  363. type=str,
  364. default=os.getenv("OTHER_PARAMS", None),
  365. help="Semicolon-separated list of parameter=value pairs to apply to the 'OTHER' Mz instance",
  366. )
  367. parser.add_argument(
  368. "--other-balancerd",
  369. action=argparse.BooleanOptionalAction,
  370. default=False,
  371. help="Use balancerd for OTHER",
  372. )
  373. parser.add_argument(
  374. "--root-scenario",
  375. "--scenario",
  376. metavar="SCENARIO",
  377. type=str,
  378. default="Scenario",
  379. help="Scenario or scenario family to benchmark. See scenarios.py for available scenarios.",
  380. )
  381. parser.add_argument(
  382. "--scale",
  383. metavar="+N | -N | N",
  384. type=str,
  385. default=None,
  386. help="Absolute or relative scale to apply.",
  387. )
  388. parser.add_argument(
  389. "--max-measurements",
  390. metavar="N",
  391. type=int,
  392. default=99,
  393. help="Limit the number of measurements to N.",
  394. )
  395. parser.add_argument(
  396. "--runs-per-scenario",
  397. metavar="N",
  398. type=int,
  399. default=5,
  400. )
  401. parser.add_argument(
  402. "--this-size",
  403. metavar="N",
  404. type=int,
  405. default=4,
  406. help="SIZE use for 'THIS'",
  407. )
  408. parser.add_argument(
  409. "--ignore-other-tag-missing",
  410. action=argparse.BooleanOptionalAction,
  411. default=False,
  412. help="Don't run anything if 'OTHER' tag is missing",
  413. )
  414. parser.add_argument(
  415. "--other-size", metavar="N", type=int, default=4, help="SIZE to use for 'OTHER'"
  416. )
  417. parser.add_argument(
  418. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  419. )
  420. args = parser.parse_args()
  421. print(
  422. dedent(
  423. f"""
  424. this_tag: {args.this_tag}
  425. this_size: {args.this_size}
  426. this_balancerd: {args.this_balancerd}
  427. other_tag: {args.other_tag}
  428. other_size: {args.other_size}
  429. other_balancerd: {args.other_balancerd}
  430. root_scenario: {args.root_scenario}"""
  431. )
  432. )
  433. specified_root_scenario = globals()[args.root_scenario]
  434. if specified_root_scenario.__subclasses__():
  435. selected_scenarios = sorted(
  436. # collect all leafs of the specified root scenario
  437. [
  438. s
  439. for s in all_subclasses(specified_root_scenario)
  440. if not s.__subclasses__()
  441. ],
  442. key=repr,
  443. )
  444. else:
  445. # specified root scenario is a leaf
  446. selected_scenarios = [specified_root_scenario]
  447. dependencies = ["postgres", "mysql"]
  448. if args.redpanda:
  449. dependencies += ["redpanda"]
  450. else:
  451. dependencies += ["zookeeper", "kafka", "schema-registry"]
  452. c.up(*dependencies)
  453. scenario_classes_scheduled_to_run: list[type[Scenario]] = buildkite.shard_list(
  454. selected_scenarios, lambda scenario_cls: scenario_cls.__name__
  455. )
  456. if (
  457. len(scenario_classes_scheduled_to_run) == 0
  458. and buildkite.is_in_buildkite()
  459. and not os.getenv("CI_EXTRA_ARGS")
  460. ):
  461. raise FailedTestExecutionError(
  462. error_summary="No scenarios were selected", errors=[]
  463. )
  464. reports = []
  465. for run_index in range(0, args.runs_per_scenario):
  466. run_number = run_index + 1
  467. print(
  468. f"Run {run_number} with scenarios: {', '.join([scenario.__name__ for scenario in scenario_classes_scheduled_to_run])}"
  469. )
  470. report = Report(cycle_number=run_number)
  471. reports.append(report)
  472. for scenario_class in scenario_classes_scheduled_to_run:
  473. try:
  474. scenario_result = run_one_scenario(c, scenario_class, args)
  475. except RuntimeError as e:
  476. if (
  477. "No image found for commit hash" in str(e)
  478. and args.ignore_other_tag_missing
  479. ):
  480. print(
  481. "Missing image for base, which can happen when main branch fails to build, ignoring"
  482. )
  483. return
  484. raise e
  485. if scenario_result.is_empty():
  486. continue
  487. report.add_scenario_result(scenario_result)
  488. print(f"+++ Benchmark Report for run {run_number}:")
  489. print(report)
  490. benchmark_result_selector = BestBenchmarkResultSelector()
  491. selected_report_by_scenario_name = (
  492. benchmark_result_selector.choose_report_per_scenario(reports)
  493. )
  494. discarded_reports_by_scenario_name = get_discarded_reports_per_scenario(
  495. reports, selected_report_by_scenario_name
  496. )
  497. scenarios_with_regressions = determine_scenario_classes_with_regressions(
  498. selected_report_by_scenario_name
  499. )
  500. if len(scenarios_with_regressions) > 0:
  501. justification_by_scenario_name = _check_regressions_justified(
  502. scenarios_with_regressions,
  503. this_tag=args.this_tag,
  504. baseline_tag=args.other_tag,
  505. scale=args.scale,
  506. )
  507. justifications = [
  508. justification
  509. for justification in justification_by_scenario_name.values()
  510. if justification is not None
  511. ]
  512. all_regressions_justified = len(justification_by_scenario_name) == len(
  513. justifications
  514. )
  515. print("+++ Regressions")
  516. print(
  517. f"{'INFO' if all_regressions_justified else 'ERROR'}:"
  518. f" The following scenarios have regressions:"
  519. f" {', '.join([scenario.__name__ for scenario in scenarios_with_regressions])}"
  520. )
  521. if all_regressions_justified:
  522. print("All regressions are justified:")
  523. print("\n".join(justifications))
  524. successful_run = True
  525. elif len(justifications) > 0:
  526. print("Some regressions are justified:")
  527. print("\n".join(justifications))
  528. successful_run = False
  529. else:
  530. successful_run = False
  531. else:
  532. successful_run = True
  533. justification_by_scenario_name = dict()
  534. upload_results_to_test_analytics(
  535. c,
  536. args.this_tag,
  537. scenario_classes_scheduled_to_run,
  538. scenarios_with_regressions,
  539. args.scale,
  540. selected_report_by_scenario_name,
  541. discarded_reports_by_scenario_name,
  542. successful_run,
  543. )
  544. if not successful_run:
  545. raise FailedTestExecutionError(
  546. error_summary="At least one regression occurred",
  547. errors=_regressions_to_failure_details(
  548. scenarios_with_regressions,
  549. selected_report_by_scenario_name,
  550. justification_by_scenario_name,
  551. baseline_tag=args.other_tag,
  552. scale=args.scale,
  553. ),
  554. )
  555. def is_regression(
  556. evaluator: BenchmarkResultEvaluator, scenario_result: BenchmarkScenarioResult
  557. ) -> bool:
  558. return any([evaluator.is_regression(metric) for metric in scenario_result.metrics])
  559. def _check_regressions_justified(
  560. scenarios_with_regressions: list[type[Scenario]],
  561. this_tag: str | None,
  562. baseline_tag: str | None,
  563. scale: str | None,
  564. ) -> dict[str, str | None]:
  565. """
  566. :return: justification per scenario name if justified else None
  567. """
  568. justification_by_scenario_name: dict[str, str | None] = dict()
  569. for scenario_class in scenarios_with_regressions:
  570. regressions_justified, comment = _is_regression_justified(
  571. scenario_class, this_tag=this_tag, baseline_tag=baseline_tag, scale=scale
  572. )
  573. justification_by_scenario_name[scenario_class.__name__] = (
  574. comment if regressions_justified else None
  575. )
  576. return justification_by_scenario_name
  577. def _is_regression_justified(
  578. scenario_class: type[Scenario],
  579. this_tag: str | None,
  580. baseline_tag: str | None,
  581. scale: str | None,
  582. ) -> tuple[bool, str]:
  583. if (
  584. this_tag is None
  585. or not _tag_references_release_version(this_tag)
  586. or not _tag_references_release_version(baseline_tag)
  587. ):
  588. return False, ""
  589. # Checked in _tag_references_release_version
  590. assert this_tag is not None
  591. assert baseline_tag is not None
  592. this_version = MzVersion.parse_mz(this_tag)
  593. baseline_version = MzVersion.parse_mz(baseline_tag)
  594. commits_with_regressions = get_commits_of_accepted_regressions_between_versions(
  595. get_ancestor_overrides_for_performance_regressions(scenario_class, scale),
  596. since_version_exclusive=baseline_version,
  597. to_version_inclusive=this_version,
  598. )
  599. if len(commits_with_regressions) == 0:
  600. return False, ""
  601. return (
  602. True,
  603. f"* {scenario_class.__name__}: Justified regressions were introduced with commits {commits_with_regressions}.",
  604. )
  605. def _tag_references_release_version(image_tag: str | None) -> bool:
  606. if image_tag is None:
  607. return False
  608. return is_image_tag_of_release_version(
  609. image_tag
  610. ) and MzVersion.is_valid_version_string(image_tag)
  611. def _regressions_to_failure_details(
  612. scenarios_with_regressions: list[type[Scenario]],
  613. latest_report_by_scenario_name: dict[str, Report],
  614. justification_by_scenario_name: dict[str, str | None],
  615. baseline_tag: str,
  616. scale: str | None,
  617. ) -> list[TestFailureDetails]:
  618. failure_details = []
  619. for scenario_cls in scenarios_with_regressions:
  620. scenario_name = scenario_cls.__name__
  621. if justification_by_scenario_name[scenario_name] is not None:
  622. continue
  623. regression_against_tag = resolve_tag(baseline_tag, scenario_cls, scale)
  624. report = latest_report_by_scenario_name[scenario_name]
  625. failure_details.append(
  626. TestFailureDetails(
  627. test_case_name_override=f"Scenario '{scenario_name}'",
  628. message=f"New regression against {regression_against_tag}",
  629. details=report.as_string(
  630. use_colors=False, limit_to_scenario=scenario_name
  631. ),
  632. )
  633. )
  634. return failure_details
  635. def upload_results_to_test_analytics(
  636. c: Composition,
  637. this_tag: str | None,
  638. scenario_classes: list[type[Scenario]],
  639. scenarios_with_regressions: list[type[Scenario]],
  640. scale: str | None,
  641. latest_report_by_scenario_name: dict[str, Report],
  642. discarded_reports_by_scenario_name: dict[str, list[Report]],
  643. was_successful: bool,
  644. ) -> None:
  645. if not buildkite.is_in_buildkite():
  646. return
  647. if this_tag is not None:
  648. # only include measurements on HEAD
  649. return
  650. test_analytics = TestAnalyticsDb(create_test_analytics_config(c))
  651. test_analytics.builds.add_build_job(was_successful=was_successful)
  652. result_entries = []
  653. discarded_entries = []
  654. for scenario_cls in scenario_classes:
  655. scenario_name = scenario_cls.__name__
  656. report = latest_report_by_scenario_name[scenario_name]
  657. result_entries.append(
  658. _create_feature_benchmark_result_entry(
  659. scenario_cls,
  660. report,
  661. scale,
  662. is_regression=scenario_cls in scenarios_with_regressions,
  663. )
  664. )
  665. for discarded_report in discarded_reports_by_scenario_name.get(
  666. scenario_name, []
  667. ):
  668. discarded_entries.append(
  669. _create_feature_benchmark_result_entry(
  670. scenario_cls, discarded_report, scale, is_regression=True
  671. )
  672. )
  673. test_analytics.benchmark_results.add_result(
  674. framework_version=FEATURE_BENCHMARK_FRAMEWORK_VERSION,
  675. results=result_entries,
  676. )
  677. test_analytics.benchmark_results.add_discarded_entries(discarded_entries)
  678. try:
  679. test_analytics.submit_updates()
  680. print("Uploaded results.")
  681. except Exception as e:
  682. # An error during an upload must never cause the build to fail
  683. test_analytics.on_upload_failed(e)
  684. def _create_feature_benchmark_result_entry(
  685. scenario_cls: type[Scenario], report: Report, scale: str | None, is_regression: bool
  686. ) -> feature_benchmark_result_storage.FeatureBenchmarkResultEntry:
  687. scenario_name = scenario_cls.__name__
  688. scenario_group = scenario_cls.__bases__[0].__name__
  689. scenario_version = report.get_scenario_version(scenario_name)
  690. measurements = report.measurements_of_this(scenario_name)
  691. return feature_benchmark_result_storage.FeatureBenchmarkResultEntry(
  692. scenario_name=scenario_name,
  693. scenario_group=scenario_group,
  694. scenario_version=str(scenario_version),
  695. cycle=report.cycle_number,
  696. scale=scale or "default",
  697. is_regression=is_regression,
  698. wallclock=measurements[MeasurementType.WALLCLOCK],
  699. memory_mz=measurements[MeasurementType.MEMORY_MZ],
  700. memory_clusterd=measurements[MeasurementType.MEMORY_CLUSTERD],
  701. )