123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import re
- from pathlib import Path
- from typing import Any
- from parameterized import parameterized_class # type: ignore
- import materialize.optbench
- import materialize.optbench.sql
- from materialize.feature_benchmark.action import Action
- from materialize.feature_benchmark.executor import Executor
- from materialize.feature_benchmark.measurement import (
- MeasurementType,
- MeasurementUnit,
- WallclockDuration,
- )
- from materialize.feature_benchmark.measurement_source import (
- MeasurementSource,
- )
- from materialize.feature_benchmark.scenario import Scenario
- # for pdoc ignores
- __pdoc__ = {}
- from materialize.feature_benchmark.scenario_version import ScenarioVersion
- class OptbenchInit(Action):
- def __init__(self, scenario: str, no_indexes: bool = False) -> None:
- self._executor: Executor | None = None
- self._scenario = scenario
- self._no_indexes = no_indexes
- def run(self, executor: Executor | None = None) -> None:
- e = executor or self._executor
- statements = materialize.optbench.sql.parse_from_file(
- Path(f"misc/python/materialize/optbench/schema/{self._scenario}.sql")
- )
- if self._no_indexes:
- idx_re = re.compile(r"(create|create\s+default|drop)\s+index\s+")
- statements = [
- statement
- for statement in statements
- if not idx_re.match(statement.lower())
- ]
- e._composition.sql("\n".join(statements)) # type: ignore
- class OptbenchRun(MeasurementSource):
- def __init__(self, optbench_scenario: str, query: int):
- super().__init__()
- self._optbench_scenario = optbench_scenario
- self._query = query
- def run(self, executor: Executor | None = None) -> list[WallclockDuration]:
- assert not (executor is None and self._executor is None)
- assert not (executor is not None and self._executor is not None)
- e = executor or self._executor
- queries = materialize.optbench.sql.parse_from_file(
- Path(
- f"misc/python/materialize/optbench/workload/{self._optbench_scenario}.sql"
- )
- )
- assert 1 <= self._query <= len(queries)
- query = queries[self._query - 1]
- explain_query = materialize.optbench.sql.Query(query).explain(timing=True)
- explain_output = materialize.optbench.sql.ExplainOutput(
- e._composition.sql_query(explain_query)[0][0] # type: ignore
- )
- # Optimization time is in nanoseconds, divide by 3 to get a more readable number (still in wrong unit)
- optimization_time = explain_output.optimization_time()
- assert optimization_time is not None
- optimization_time_in_ns = optimization_time.astype("timedelta64[ns]")
- optimization_duration_in_ns = float(optimization_time_in_ns)
- timestamps = [
- WallclockDuration(0, MeasurementUnit.NANOSECONDS),
- WallclockDuration(
- optimization_duration_in_ns,
- MeasurementUnit.NANOSECONDS,
- ),
- ]
- return timestamps
- def name_with_query(
- cls: type["OptbenchTPCH"], num: int, params_dict: dict[str, Any]
- ) -> str:
- return f"OptbenchTPCHQ{params_dict['QUERY']:02d}"
- for i in range(1, 23):
- __pdoc__[f"OptbenchTPCHQ{i:02d}"] = False
- @parameterized_class(
- [{"QUERY": i} for i in range(1, 23)], class_name_func=name_with_query
- )
- class OptbenchTPCH(Scenario):
- """Run optbench TPCH for optimizer benchmarks"""
- QUERY = 1
- RELATIVE_THRESHOLD: dict[MeasurementType, float] = {
- MeasurementType.WALLCLOCK: 0.20, # increased because it's easy to regress
- MeasurementType.MEMORY_MZ: 0.20,
- MeasurementType.MEMORY_CLUSTERD: 0.50,
- }
- def init(self) -> list[Action]:
- return [OptbenchInit("tpch")]
- def benchmark(self) -> MeasurementSource:
- return OptbenchRun("tpch", self.QUERY)
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 1, 0)
|