optbench.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import re
  10. from pathlib import Path
  11. from typing import Any
  12. from parameterized import parameterized_class # type: ignore
  13. import materialize.optbench
  14. import materialize.optbench.sql
  15. from materialize.feature_benchmark.action import Action
  16. from materialize.feature_benchmark.executor import Executor
  17. from materialize.feature_benchmark.measurement import (
  18. MeasurementType,
  19. MeasurementUnit,
  20. WallclockDuration,
  21. )
  22. from materialize.feature_benchmark.measurement_source import (
  23. MeasurementSource,
  24. )
  25. from materialize.feature_benchmark.scenario import Scenario
  26. # for pdoc ignores
  27. __pdoc__ = {}
  28. from materialize.feature_benchmark.scenario_version import ScenarioVersion
  29. class OptbenchInit(Action):
  30. def __init__(self, scenario: str, no_indexes: bool = False) -> None:
  31. self._executor: Executor | None = None
  32. self._scenario = scenario
  33. self._no_indexes = no_indexes
  34. def run(self, executor: Executor | None = None) -> None:
  35. e = executor or self._executor
  36. statements = materialize.optbench.sql.parse_from_file(
  37. Path(f"misc/python/materialize/optbench/schema/{self._scenario}.sql")
  38. )
  39. if self._no_indexes:
  40. idx_re = re.compile(r"(create|create\s+default|drop)\s+index\s+")
  41. statements = [
  42. statement
  43. for statement in statements
  44. if not idx_re.match(statement.lower())
  45. ]
  46. e._composition.sql("\n".join(statements)) # type: ignore
  47. class OptbenchRun(MeasurementSource):
  48. def __init__(self, optbench_scenario: str, query: int):
  49. super().__init__()
  50. self._optbench_scenario = optbench_scenario
  51. self._query = query
  52. def run(self, executor: Executor | None = None) -> list[WallclockDuration]:
  53. assert not (executor is None and self._executor is None)
  54. assert not (executor is not None and self._executor is not None)
  55. e = executor or self._executor
  56. queries = materialize.optbench.sql.parse_from_file(
  57. Path(
  58. f"misc/python/materialize/optbench/workload/{self._optbench_scenario}.sql"
  59. )
  60. )
  61. assert 1 <= self._query <= len(queries)
  62. query = queries[self._query - 1]
  63. explain_query = materialize.optbench.sql.Query(query).explain(timing=True)
  64. explain_output = materialize.optbench.sql.ExplainOutput(
  65. e._composition.sql_query(explain_query)[0][0] # type: ignore
  66. )
  67. # Optimization time is in nanoseconds, divide by 3 to get a more readable number (still in wrong unit)
  68. optimization_time = explain_output.optimization_time()
  69. assert optimization_time is not None
  70. optimization_time_in_ns = optimization_time.astype("timedelta64[ns]")
  71. optimization_duration_in_ns = float(optimization_time_in_ns)
  72. timestamps = [
  73. WallclockDuration(0, MeasurementUnit.NANOSECONDS),
  74. WallclockDuration(
  75. optimization_duration_in_ns,
  76. MeasurementUnit.NANOSECONDS,
  77. ),
  78. ]
  79. return timestamps
  80. def name_with_query(
  81. cls: type["OptbenchTPCH"], num: int, params_dict: dict[str, Any]
  82. ) -> str:
  83. return f"OptbenchTPCHQ{params_dict['QUERY']:02d}"
  84. for i in range(1, 23):
  85. __pdoc__[f"OptbenchTPCHQ{i:02d}"] = False
  86. @parameterized_class(
  87. [{"QUERY": i} for i in range(1, 23)], class_name_func=name_with_query
  88. )
  89. class OptbenchTPCH(Scenario):
  90. """Run optbench TPCH for optimizer benchmarks"""
  91. QUERY = 1
  92. RELATIVE_THRESHOLD: dict[MeasurementType, float] = {
  93. MeasurementType.WALLCLOCK: 0.20, # increased because it's easy to regress
  94. MeasurementType.MEMORY_MZ: 0.20,
  95. MeasurementType.MEMORY_CLUSTERD: 0.50,
  96. }
  97. def init(self) -> list[Action]:
  98. return [OptbenchInit("tpch")]
  99. def benchmark(self) -> MeasurementSource:
  100. return OptbenchRun("tpch", self.QUERY)
  101. def version(self) -> ScenarioVersion:
  102. return ScenarioVersion.create(1, 1, 0)