123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """
- Simple benchmark of mostly individual queries using testdrive. Can find
- wallclock/memorys regressions in single-connection query executions, not
- suitable for concurrency.
- """
- import argparse
- import os
- import sys
- import time
- import uuid
- from textwrap import dedent
- from materialize import buildkite
- from materialize.docker import is_image_tag_of_release_version
- from materialize.feature_benchmark.benchmark_result_evaluator import (
- BenchmarkResultEvaluator,
- )
- from materialize.feature_benchmark.benchmark_result_selection import (
- BestBenchmarkResultSelector,
- get_discarded_reports_per_scenario,
- )
- from materialize.feature_benchmark.report import (
- Report,
- determine_scenario_classes_with_regressions,
- )
- from materialize.mz_version import MzVersion
- from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
- from materialize.mzcompose.services.mysql import MySql
- from materialize.mzcompose.test_result import (
- FailedTestExecutionError,
- TestFailureDetails,
- )
- from materialize.test_analytics.config.test_analytics_db_config import (
- create_test_analytics_config,
- )
- from materialize.test_analytics.data.feature_benchmark import (
- feature_benchmark_result_storage,
- )
- from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
- from materialize.version_ancestor_overrides import (
- get_ancestor_overrides_for_performance_regressions,
- )
- from materialize.version_list import (
- get_commits_of_accepted_regressions_between_versions,
- get_latest_published_version,
- resolve_ancestor_image_tag,
- )
- # mzcompose may start this script from the root of the Mz repository,
- # so we need to explicitly add this directory to the Python module search path
- sys.path.append(os.path.dirname(__file__))
- from materialize.feature_benchmark.aggregation import Aggregation, MinAggregation
- from materialize.feature_benchmark.benchmark import Benchmark
- from materialize.feature_benchmark.benchmark_result import (
- BenchmarkScenarioResult,
- )
- from materialize.feature_benchmark.executor import Docker
- from materialize.feature_benchmark.filter import Filter, FilterFirst, NoFilter
- from materialize.feature_benchmark.measurement import MeasurementType
- from materialize.feature_benchmark.scenarios.benchmark_main import * # noqa: F401 F403
- from materialize.feature_benchmark.scenarios.benchmark_main import (
- Scenario,
- )
- from materialize.feature_benchmark.scenarios.concurrency import * # noqa: F401 F403
- from materialize.feature_benchmark.scenarios.customer import * # noqa: F401 F403
- from materialize.feature_benchmark.scenarios.optbench import * # noqa: F401 F403
- from materialize.feature_benchmark.scenarios.scale import * # noqa: F401 F403
- from materialize.feature_benchmark.scenarios.skew import * # noqa: F401 F403
- from materialize.feature_benchmark.scenarios.subscribe import * # noqa: F401 F403
- from materialize.feature_benchmark.termination import (
- NormalDistributionOverlap,
- ProbForMin,
- RunAtMost,
- TerminationCondition,
- )
- from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
- from materialize.mzcompose.services.azurite import Azurite
- from materialize.mzcompose.services.balancerd import Balancerd
- from materialize.mzcompose.services.clusterd import Clusterd
- from materialize.mzcompose.services.cockroach import Cockroach
- from materialize.mzcompose.services.kafka import Kafka as KafkaService
- from materialize.mzcompose.services.kgen import Kgen as KgenService
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.minio import Minio
- from materialize.mzcompose.services.mz import Mz
- from materialize.mzcompose.services.postgres import Postgres
- from materialize.mzcompose.services.redpanda import Redpanda
- from materialize.mzcompose.services.schema_registry import SchemaRegistry
- from materialize.mzcompose.services.testdrive import Testdrive
- from materialize.mzcompose.services.zookeeper import Zookeeper
- from materialize.util import all_subclasses
- #
- # Global feature benchmark thresholds and termination conditions
- #
- FEATURE_BENCHMARK_FRAMEWORK_VERSION = "1.5.0"
- def make_filter(args: argparse.Namespace) -> Filter:
- # Discard the first run unless a small --max-runs limit is explicitly set
- if args.max_measurements <= 5:
- return NoFilter()
- else:
- return FilterFirst()
- def make_termination_conditions(args: argparse.Namespace) -> list[TerminationCondition]:
- return [
- NormalDistributionOverlap(threshold=0.95),
- ProbForMin(threshold=0.90),
- RunAtMost(threshold=args.max_measurements),
- ]
- def make_aggregation_class() -> type[Aggregation]:
- return MinAggregation
- default_timeout = "1800s"
- SERVICES = [
- Zookeeper(),
- KafkaService(),
- SchemaRegistry(),
- Redpanda(),
- Cockroach(setup_materialize=True),
- Minio(setup_materialize=True),
- Azurite(),
- KgenService(),
- Postgres(),
- MySql(),
- Balancerd(),
- # Overridden below
- Materialized(),
- Clusterd(),
- Testdrive(),
- Mz(app_password=""),
- ]
- def run_one_scenario(
- c: Composition, scenario_class: type[Scenario], args: argparse.Namespace
- ) -> BenchmarkScenarioResult:
- scenario_name = scenario_class.__name__
- print(f"--- Now benchmarking {scenario_name} ...")
- measurement_types = [MeasurementType.WALLCLOCK]
- if args.measure_memory:
- measurement_types.append(MeasurementType.MEMORY_MZ)
- measurement_types.append(MeasurementType.MEMORY_CLUSTERD)
- result = BenchmarkScenarioResult(scenario_class, measurement_types)
- common_seed = round(time.time())
- early_abort = False
- for mz_id, instance in enumerate(["this", "other"]):
- balancerd, tag, size, params = (
- (args.this_balancerd, args.this_tag, args.this_size, args.this_params)
- if instance == "this"
- else (
- args.other_balancerd,
- args.other_tag,
- args.other_size,
- args.other_params,
- )
- )
- tag = resolve_tag(tag, scenario_class, args.scale)
- entrypoint_host = "balancerd" if balancerd else "materialized"
- c.up({"name": "testdrive", "persistent": True})
- additional_system_parameter_defaults = (
- ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
- | {
- "max_clusters": "15",
- "enable_unorchestrated_cluster_replicas": "true",
- "unsafe_enable_unorchestrated_cluster_replicas": "true",
- }
- )
- if params is not None:
- for param in params.split(";"):
- param_name, param_value = param.split("=")
- additional_system_parameter_defaults[param_name] = param_value
- mz_image = f"materialize/materialized:{tag}" if tag else None
- # TODO: Better azurite support detection
- mz = create_mz_service(
- mz_image,
- size,
- additional_system_parameter_defaults,
- args.azurite and instance == "this",
- )
- clusterd_image = f"materialize/clusterd:{tag}" if tag else None
- clusterd = create_clusterd_service(
- clusterd_image, size, additional_system_parameter_defaults
- )
- if tag is not None and not c.try_pull_service_image(mz):
- print(
- f"Unable to find materialize image with tag {tag}, proceeding with latest instead!"
- )
- mz_image = "materialize/materialized:latest"
- # TODO: Better azurite support detection
- mz = create_mz_service(
- mz_image,
- size,
- additional_system_parameter_defaults,
- args.azurite and instance == "this",
- )
- clusterd_image = f"materialize/clusterd:{tag}" if tag else None
- clusterd = create_clusterd_service(
- clusterd_image, size, additional_system_parameter_defaults
- )
- start_overridden_mz_clusterd_and_cockroach(c, mz, clusterd, instance)
- if balancerd:
- c.up("balancerd")
- with c.override(
- Testdrive(
- materialize_url=f"postgres://materialize@{entrypoint_host}:6875",
- default_timeout=default_timeout,
- materialize_params={"statement_timeout": f"'{default_timeout}'"},
- metadata_store="cockroach",
- external_blob_store=True,
- blob_store_is_azure=args.azurite,
- )
- ):
- c.testdrive(
- dedent(
- """
- $[version<9000] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET enable_unmanaged_cluster_replicas = true;
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- CREATE CLUSTER cluster_default REPLICAS (r1 (STORAGECTL ADDRESSES ['clusterd:2100'], STORAGE ADDRESSES ['clusterd:2103'], COMPUTECTL ADDRESSES ['clusterd:2101'], COMPUTE ADDRESSES ['clusterd:2102'], WORKERS 1));
- ALTER SYSTEM SET cluster = cluster_default;
- GRANT ALL PRIVILEGES ON CLUSTER cluster_default TO materialize;"""
- ),
- )
- executor = Docker(
- composition=c, seed=common_seed, materialized=mz, clusterd=clusterd
- )
- mz_version = MzVersion.parse_mz(c.query_mz_version())
- benchmark = Benchmark(
- mz_id=mz_id,
- mz_version=mz_version,
- scenario_cls=scenario_class,
- scale=args.scale,
- executor=executor,
- filter=make_filter(args),
- termination_conditions=make_termination_conditions(args),
- aggregation_class=make_aggregation_class(),
- measure_memory=args.measure_memory,
- default_size=size,
- seed=common_seed,
- )
- if not scenario_class.can_run(mz_version):
- print(
- f"Skipping scenario {scenario_class} not supported in version {mz_version}"
- )
- early_abort = True
- else:
- aggregations = benchmark.run()
- scenario_version = benchmark.create_scenario_instance().version()
- result.set_scenario_version(scenario_version)
- for aggregation, metric in zip(aggregations, result.metrics):
- assert (
- aggregation.measurement_type == metric.measurement_type
- or aggregation.measurement_type is None
- ), f"Aggregation contains {aggregation.measurement_type} but metric contains {metric.measurement_type} as measurement type"
- metric.append_point(
- aggregation.aggregate(),
- aggregation.unit(),
- aggregation.name(),
- )
- c.kill("cockroach", "materialized", "clusterd", "testdrive")
- c.rm("cockroach", "materialized", "clusterd", "testdrive")
- c.rm_volumes("mzdata")
- if early_abort:
- result.empty()
- break
- return result
- resolved_tags: dict[tuple[str, frozenset[tuple[str, MzVersion]]], str] = {}
- def resolve_tag(tag: str, scenario_class: type[Scenario], scale: str | None) -> str:
- if tag == "common-ancestor":
- overrides = get_ancestor_overrides_for_performance_regressions(
- scenario_class, scale
- )
- key = (tag, frozenset(overrides.items()))
- if key not in resolved_tags:
- resolved_tags[key] = resolve_ancestor_image_tag(overrides)
- return resolved_tags[key]
- return tag
- def create_mz_service(
- mz_image: str | None,
- default_size: int,
- additional_system_parameter_defaults: dict[str, str] | None,
- azurite: bool,
- ) -> Materialized:
- return Materialized(
- image=mz_image,
- default_size=default_size,
- # Avoid clashes with the Kafka sink progress topic across restarts
- environment_id=f"local-az1-{uuid.uuid4()}-0",
- soft_assertions=False,
- additional_system_parameter_defaults=additional_system_parameter_defaults,
- external_metadata_store=True,
- metadata_store="cockroach",
- external_blob_store=True,
- blob_store_is_azure=azurite,
- sanity_restart=False,
- )
- def create_clusterd_service(
- clusterd_image: str | None,
- default_size: int,
- additional_system_parameter_defaults: dict[str, str] | None,
- ) -> Clusterd:
- return Clusterd(image=clusterd_image)
- def start_overridden_mz_clusterd_and_cockroach(
- c: Composition, mz: Materialized, clusterd: Clusterd, instance: str
- ) -> None:
- with c.override(mz, clusterd):
- version_request_command = c.run(
- "materialized",
- "-c",
- "environmentd --version | grep environmentd",
- entrypoint="bash",
- rm=True,
- capture=True,
- )
- version = version_request_command.stdout.strip()
- print(f"The version of the '{instance.upper()}' Mz instance is: {version}")
- c.up("cockroach", "materialized", "clusterd")
- def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
- """Feature benchmark framework."""
- c.silent = True
- parser.add_argument(
- "--redpanda",
- action="store_true",
- help="run against Redpanda instead of the Confluent Platform",
- )
- parser.add_argument(
- "--measure-memory",
- default=True,
- action=argparse.BooleanOptionalAction,
- help="Measure memory usage",
- )
- parser.add_argument(
- "--this-tag",
- metavar="TAG",
- type=str,
- default=os.getenv("THIS_TAG", None),
- help="'This' Materialize container tag to benchmark. If not provided, the current source will be used.",
- )
- parser.add_argument(
- "--this-balancerd",
- action=argparse.BooleanOptionalAction,
- default=False,
- help="Use balancerd for THIS",
- )
- parser.add_argument(
- "--this-params",
- metavar="PARAMS",
- type=str,
- default=os.getenv("THIS_PARAMS", None),
- help="Semicolon-separated list of parameter=value pairs to apply to the 'THIS' Mz instance",
- )
- parser.add_argument(
- "--other-tag",
- metavar="TAG",
- type=str,
- default=os.getenv("OTHER_TAG", str(get_latest_published_version())),
- help="'Other' Materialize container tag to benchmark. If not provided, the last released Mz version will be used.",
- )
- parser.add_argument(
- "--other-params",
- metavar="PARAMS",
- type=str,
- default=os.getenv("OTHER_PARAMS", None),
- help="Semicolon-separated list of parameter=value pairs to apply to the 'OTHER' Mz instance",
- )
- parser.add_argument(
- "--other-balancerd",
- action=argparse.BooleanOptionalAction,
- default=False,
- help="Use balancerd for OTHER",
- )
- parser.add_argument(
- "--root-scenario",
- "--scenario",
- metavar="SCENARIO",
- type=str,
- default="Scenario",
- help="Scenario or scenario family to benchmark. See scenarios.py for available scenarios.",
- )
- parser.add_argument(
- "--scale",
- metavar="+N | -N | N",
- type=str,
- default=None,
- help="Absolute or relative scale to apply.",
- )
- parser.add_argument(
- "--max-measurements",
- metavar="N",
- type=int,
- default=99,
- help="Limit the number of measurements to N.",
- )
- parser.add_argument(
- "--runs-per-scenario",
- metavar="N",
- type=int,
- default=5,
- )
- parser.add_argument(
- "--this-size",
- metavar="N",
- type=int,
- default=4,
- help="SIZE use for 'THIS'",
- )
- parser.add_argument(
- "--ignore-other-tag-missing",
- action=argparse.BooleanOptionalAction,
- default=False,
- help="Don't run anything if 'OTHER' tag is missing",
- )
- parser.add_argument(
- "--other-size", metavar="N", type=int, default=4, help="SIZE to use for 'OTHER'"
- )
- parser.add_argument(
- "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
- )
- args = parser.parse_args()
- print(
- dedent(
- f"""
- this_tag: {args.this_tag}
- this_size: {args.this_size}
- this_balancerd: {args.this_balancerd}
- other_tag: {args.other_tag}
- other_size: {args.other_size}
- other_balancerd: {args.other_balancerd}
- root_scenario: {args.root_scenario}"""
- )
- )
- specified_root_scenario = globals()[args.root_scenario]
- if specified_root_scenario.__subclasses__():
- selected_scenarios = sorted(
- # collect all leafs of the specified root scenario
- [
- s
- for s in all_subclasses(specified_root_scenario)
- if not s.__subclasses__()
- ],
- key=repr,
- )
- else:
- # specified root scenario is a leaf
- selected_scenarios = [specified_root_scenario]
- dependencies = ["postgres", "mysql"]
- if args.redpanda:
- dependencies += ["redpanda"]
- else:
- dependencies += ["zookeeper", "kafka", "schema-registry"]
- c.up(*dependencies)
- scenario_classes_scheduled_to_run: list[type[Scenario]] = buildkite.shard_list(
- selected_scenarios, lambda scenario_cls: scenario_cls.__name__
- )
- if (
- len(scenario_classes_scheduled_to_run) == 0
- and buildkite.is_in_buildkite()
- and not os.getenv("CI_EXTRA_ARGS")
- ):
- raise FailedTestExecutionError(
- error_summary="No scenarios were selected", errors=[]
- )
- reports = []
- for run_index in range(0, args.runs_per_scenario):
- run_number = run_index + 1
- print(
- f"Run {run_number} with scenarios: {', '.join([scenario.__name__ for scenario in scenario_classes_scheduled_to_run])}"
- )
- report = Report(cycle_number=run_number)
- reports.append(report)
- for scenario_class in scenario_classes_scheduled_to_run:
- try:
- scenario_result = run_one_scenario(c, scenario_class, args)
- except RuntimeError as e:
- if (
- "No image found for commit hash" in str(e)
- and args.ignore_other_tag_missing
- ):
- print(
- "Missing image for base, which can happen when main branch fails to build, ignoring"
- )
- return
- raise e
- if scenario_result.is_empty():
- continue
- report.add_scenario_result(scenario_result)
- print(f"+++ Benchmark Report for run {run_number}:")
- print(report)
- benchmark_result_selector = BestBenchmarkResultSelector()
- selected_report_by_scenario_name = (
- benchmark_result_selector.choose_report_per_scenario(reports)
- )
- discarded_reports_by_scenario_name = get_discarded_reports_per_scenario(
- reports, selected_report_by_scenario_name
- )
- scenarios_with_regressions = determine_scenario_classes_with_regressions(
- selected_report_by_scenario_name
- )
- if len(scenarios_with_regressions) > 0:
- justification_by_scenario_name = _check_regressions_justified(
- scenarios_with_regressions,
- this_tag=args.this_tag,
- baseline_tag=args.other_tag,
- scale=args.scale,
- )
- justifications = [
- justification
- for justification in justification_by_scenario_name.values()
- if justification is not None
- ]
- all_regressions_justified = len(justification_by_scenario_name) == len(
- justifications
- )
- print("+++ Regressions")
- print(
- f"{'INFO' if all_regressions_justified else 'ERROR'}:"
- f" The following scenarios have regressions:"
- f" {', '.join([scenario.__name__ for scenario in scenarios_with_regressions])}"
- )
- if all_regressions_justified:
- print("All regressions are justified:")
- print("\n".join(justifications))
- successful_run = True
- elif len(justifications) > 0:
- print("Some regressions are justified:")
- print("\n".join(justifications))
- successful_run = False
- else:
- successful_run = False
- else:
- successful_run = True
- justification_by_scenario_name = dict()
- upload_results_to_test_analytics(
- c,
- args.this_tag,
- scenario_classes_scheduled_to_run,
- scenarios_with_regressions,
- args.scale,
- selected_report_by_scenario_name,
- discarded_reports_by_scenario_name,
- successful_run,
- )
- if not successful_run:
- raise FailedTestExecutionError(
- error_summary="At least one regression occurred",
- errors=_regressions_to_failure_details(
- scenarios_with_regressions,
- selected_report_by_scenario_name,
- justification_by_scenario_name,
- baseline_tag=args.other_tag,
- scale=args.scale,
- ),
- )
- def is_regression(
- evaluator: BenchmarkResultEvaluator, scenario_result: BenchmarkScenarioResult
- ) -> bool:
- return any([evaluator.is_regression(metric) for metric in scenario_result.metrics])
- def _check_regressions_justified(
- scenarios_with_regressions: list[type[Scenario]],
- this_tag: str | None,
- baseline_tag: str | None,
- scale: str | None,
- ) -> dict[str, str | None]:
- """
- :return: justification per scenario name if justified else None
- """
- justification_by_scenario_name: dict[str, str | None] = dict()
- for scenario_class in scenarios_with_regressions:
- regressions_justified, comment = _is_regression_justified(
- scenario_class, this_tag=this_tag, baseline_tag=baseline_tag, scale=scale
- )
- justification_by_scenario_name[scenario_class.__name__] = (
- comment if regressions_justified else None
- )
- return justification_by_scenario_name
- def _is_regression_justified(
- scenario_class: type[Scenario],
- this_tag: str | None,
- baseline_tag: str | None,
- scale: str | None,
- ) -> tuple[bool, str]:
- if (
- this_tag is None
- or not _tag_references_release_version(this_tag)
- or not _tag_references_release_version(baseline_tag)
- ):
- return False, ""
- # Checked in _tag_references_release_version
- assert this_tag is not None
- assert baseline_tag is not None
- this_version = MzVersion.parse_mz(this_tag)
- baseline_version = MzVersion.parse_mz(baseline_tag)
- commits_with_regressions = get_commits_of_accepted_regressions_between_versions(
- get_ancestor_overrides_for_performance_regressions(scenario_class, scale),
- since_version_exclusive=baseline_version,
- to_version_inclusive=this_version,
- )
- if len(commits_with_regressions) == 0:
- return False, ""
- return (
- True,
- f"* {scenario_class.__name__}: Justified regressions were introduced with commits {commits_with_regressions}.",
- )
- def _tag_references_release_version(image_tag: str | None) -> bool:
- if image_tag is None:
- return False
- return is_image_tag_of_release_version(
- image_tag
- ) and MzVersion.is_valid_version_string(image_tag)
- def _regressions_to_failure_details(
- scenarios_with_regressions: list[type[Scenario]],
- latest_report_by_scenario_name: dict[str, Report],
- justification_by_scenario_name: dict[str, str | None],
- baseline_tag: str,
- scale: str | None,
- ) -> list[TestFailureDetails]:
- failure_details = []
- for scenario_cls in scenarios_with_regressions:
- scenario_name = scenario_cls.__name__
- if justification_by_scenario_name[scenario_name] is not None:
- continue
- regression_against_tag = resolve_tag(baseline_tag, scenario_cls, scale)
- report = latest_report_by_scenario_name[scenario_name]
- failure_details.append(
- TestFailureDetails(
- test_case_name_override=f"Scenario '{scenario_name}'",
- message=f"New regression against {regression_against_tag}",
- details=report.as_string(
- use_colors=False, limit_to_scenario=scenario_name
- ),
- )
- )
- return failure_details
- def upload_results_to_test_analytics(
- c: Composition,
- this_tag: str | None,
- scenario_classes: list[type[Scenario]],
- scenarios_with_regressions: list[type[Scenario]],
- scale: str | None,
- latest_report_by_scenario_name: dict[str, Report],
- discarded_reports_by_scenario_name: dict[str, list[Report]],
- was_successful: bool,
- ) -> None:
- if not buildkite.is_in_buildkite():
- return
- if this_tag is not None:
- # only include measurements on HEAD
- return
- test_analytics = TestAnalyticsDb(create_test_analytics_config(c))
- test_analytics.builds.add_build_job(was_successful=was_successful)
- result_entries = []
- discarded_entries = []
- for scenario_cls in scenario_classes:
- scenario_name = scenario_cls.__name__
- report = latest_report_by_scenario_name[scenario_name]
- result_entries.append(
- _create_feature_benchmark_result_entry(
- scenario_cls,
- report,
- scale,
- is_regression=scenario_cls in scenarios_with_regressions,
- )
- )
- for discarded_report in discarded_reports_by_scenario_name.get(
- scenario_name, []
- ):
- discarded_entries.append(
- _create_feature_benchmark_result_entry(
- scenario_cls, discarded_report, scale, is_regression=True
- )
- )
- test_analytics.benchmark_results.add_result(
- framework_version=FEATURE_BENCHMARK_FRAMEWORK_VERSION,
- results=result_entries,
- )
- test_analytics.benchmark_results.add_discarded_entries(discarded_entries)
- try:
- test_analytics.submit_updates()
- print("Uploaded results.")
- except Exception as e:
- # An error during an upload must never cause the build to fail
- test_analytics.on_upload_failed(e)
- def _create_feature_benchmark_result_entry(
- scenario_cls: type[Scenario], report: Report, scale: str | None, is_regression: bool
- ) -> feature_benchmark_result_storage.FeatureBenchmarkResultEntry:
- scenario_name = scenario_cls.__name__
- scenario_group = scenario_cls.__bases__[0].__name__
- scenario_version = report.get_scenario_version(scenario_name)
- measurements = report.measurements_of_this(scenario_name)
- return feature_benchmark_result_storage.FeatureBenchmarkResultEntry(
- scenario_name=scenario_name,
- scenario_group=scenario_group,
- scenario_version=str(scenario_version),
- cycle=report.cycle_number,
- scale=scale or "default",
- is_regression=is_regression,
- wallclock=measurements[MeasurementType.WALLCLOCK],
- memory_mz=measurements[MeasurementType.MEMORY_MZ],
- memory_clusterd=measurements[MeasurementType.MEMORY_CLUSTERD],
- )
|