measurement_source.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import re
  10. import textwrap
  11. import time
  12. from collections.abc import Callable
  13. from materialize.feature_benchmark.executor import Executor
  14. from materialize.feature_benchmark.measurement import (
  15. MeasurementUnit,
  16. WallclockDuration,
  17. )
  18. class MeasurementSource:
  19. def __init__(self) -> None:
  20. self._executor: Executor | None = None
  21. def run(
  22. self,
  23. executor: Executor | None = None,
  24. ) -> list[WallclockDuration]:
  25. raise NotImplementedError
  26. class Td(MeasurementSource):
  27. """Use testdrive to run the queries under benchmark and extract the timing information
  28. out of the testdrive output. The output looks like this:
  29. > /* A */ CREATE ...
  30. rows match; continuing at ts 1639561166.4809854
  31. > /* B */ SELECT ...
  32. rows didn't match; sleeping to see if dataflow catches up
  33. rows match; continuing at ts 1639561175.6951854
  34. So we fish for the /* A */ and /* B */ markers and the timestamps reported for each
  35. """
  36. def __init__(self, td_str: str, dedent: bool = True) -> None:
  37. self._td_str = textwrap.dedent(td_str) if dedent else td_str
  38. self._executor: Executor | None = None
  39. def run(
  40. self,
  41. executor: Executor | None = None,
  42. ) -> list[WallclockDuration]:
  43. assert not (executor is not None and self._executor is not None)
  44. executor = executor or self._executor
  45. assert executor
  46. # Print each query once so that it is easier to reproduce regressions
  47. # based on just the logs from CI
  48. if executor.add_known_fragment(self._td_str):
  49. print(self._td_str)
  50. td_output = executor.Td(self._td_str)
  51. lines = td_output.splitlines()
  52. lines = [l for l in lines if l]
  53. timestamps = []
  54. for marker in ["A", "B"]:
  55. timestamp = self._get_time_for_marker(lines, marker)
  56. if timestamp is not None:
  57. timestamps.append(WallclockDuration(timestamp, MeasurementUnit.SECONDS))
  58. return timestamps
  59. def _get_time_for_marker(self, lines: list[str], marker: str) -> None | float:
  60. matched_line_id = None
  61. for id, line in enumerate(lines):
  62. if f"/* {marker} */" in line:
  63. for id2 in range(id + 1, len(lines)):
  64. if "rows match" in lines[id2]:
  65. matched_line_id = id2
  66. break
  67. else:
  68. raise RuntimeError("row match not found")
  69. if not matched_line_id:
  70. # Marker /* ... */ not found
  71. return None
  72. matched_line = lines[matched_line_id]
  73. regex = re.search("at ts ([0-9.]+)", matched_line)
  74. assert regex, f"'at ts' string not found on line '{matched_line}'"
  75. return float(regex.group(1))
  76. class Lambda(MeasurementSource):
  77. # Execute a lambda, such as Mz restart, within a benchmark() block and record the end timestamp
  78. def __init__(self, _lambda: Callable) -> None:
  79. self._lambda = _lambda
  80. def run(
  81. self,
  82. executor: Executor | None = None,
  83. ) -> list[WallclockDuration]:
  84. e = executor or self._executor
  85. assert e is not None
  86. e.Lambda(self._lambda)
  87. return [WallclockDuration(time.time(), MeasurementUnit.SECONDS)]