123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import queue
- import random
- import sqlite3
- import threading
- import time
- from collections import defaultdict
- from collections.abc import Iterator, Sequence
- from dataclasses import dataclass
- from textwrap import dedent
- import psycopg
- from materialize.mzcompose.composition import Composition
- from materialize.util import PgConnInfo
- DB_FILE = "parallel-benchmark.db"
- assert (
- sqlite3.threadsafety == 3
- ), f"Thread safety level 3 (serialized) required, but is: {sqlite3.threadsafety}"
- class Measurement:
- duration: float
- timestamp: float
- def __init__(self, duration: float, timestamp: float):
- self.duration = duration
- self.timestamp = timestamp
- def __str__(self) -> str:
- return f"{self.timestamp} {self.duration}"
- class MeasurementsStore:
- def add(self, action: str, measurement: Measurement) -> None:
- raise NotImplementedError
- def actions(self) -> list[str]:
- raise NotImplementedError
- def close(self) -> None:
- raise NotImplementedError
- def get_data(
- self, action: str, start_time: float, end_time: float
- ) -> tuple[list[float], list[float]]:
- raise NotImplementedError
- class MemoryStore(MeasurementsStore):
- def __init__(self):
- self.data: defaultdict[str, list[Measurement]] = defaultdict(list)
- def add(self, action: str, measurement: Measurement) -> None:
- self.data[action].append(measurement)
- def actions(self) -> list[str]:
- return list(self.data.keys())
- def close(self) -> None:
- pass
- def get_data(
- self, action: str, start_time: float, end_time: float
- ) -> tuple[list[float], list[float]]:
- times: list[float] = [x.timestamp - start_time for x in self.data[action]]
- durations: list[float] = [x.duration * 1000 for x in self.data[action]]
- return (times, durations)
- class SQLiteStore(MeasurementsStore):
- def __init__(self, scenario: str):
- self.scenario = scenario
- self.lock = threading.Lock()
- self.conn = sqlite3.connect(
- DB_FILE, check_same_thread=False, isolation_level=None
- )
- cursor = self.conn.cursor()
- cursor.execute("PRAGMA journal_mode=WAL;")
- cursor.execute("PRAGMA synchronous=OFF;")
- cursor.execute("PRAGMA cache_size=-64000;") # 64 MB
- cursor.execute("PRAGMA locking_mode=EXCLUSIVE;")
- self.conn.commit()
- cursor.execute(
- "CREATE TABLE IF NOT EXISTS measurements (scenario TEXT NOT NULL, action TEXT NOT NULL, duration FLOAT NOT NULL, timestamp FLOAT NOT NULL);"
- )
- cursor.execute("DELETE FROM measurements WHERE scenario = ?", (self.scenario,))
- cursor.close()
- def add(self, action: str, measurement: Measurement) -> None:
- with self.lock:
- cursor = self.conn.cursor()
- try:
- cursor.execute(
- "INSERT INTO measurements VALUES (?, ?, ?, ?)",
- (
- self.scenario,
- action,
- measurement.duration * 1000,
- measurement.timestamp,
- ),
- )
- except Exception as e:
- print(
- f"Caught exception {str(e)} with values: {self.scenario}, {action}, {measurement.duration}, {measurement.timestamp}"
- )
- raise
- cursor.close()
- def actions(self) -> list[str]:
- with self.lock:
- cursor = self.conn.cursor()
- cursor.execute(
- "SELECT DISTINCT action FROM measurements WHERE scenario = ?",
- (self.scenario,),
- )
- result = [row[0] for row in cursor.fetchall()]
- cursor.close()
- return result
- def close(self) -> None:
- with self.lock:
- self.conn.close()
- def get_data(
- self, action: str, start_time: float, end_time: float
- ) -> tuple[list[float], list[float]]:
- with self.lock:
- cursor = self.conn.cursor()
- cursor.execute(
- "SELECT duration, timestamp FROM measurements WHERE scenario = ? AND action = ? AND timestamp BETWEEN ? AND ?",
- (self.scenario, action, start_time, end_time),
- )
- times: list[float] = []
- durations: list[float] = []
- for row in cursor:
- durations.append(row[0])
- times.append(row[1] - start_time)
- return (times, durations)
- @dataclass
- class State:
- measurements: MeasurementsStore
- load_phase_duration: int | None
- periodic_dists: dict[str, int]
- def execute_query(cur: psycopg.Cursor, query: str) -> None:
- while True:
- try:
- cur.execute(query.encode())
- break
- except Exception as e:
- if "deadlock detected" in str(e):
- print(f"Deadlock detected, retrying: {query}")
- elif (
- "timed out before ingesting the source's visible frontier when real-time-recency query issued"
- in str(e)
- ):
- print("RTR timeout, ignoring")
- break
- else:
- raise
- class Action:
- def run(
- self,
- start_time: float,
- conns: queue.Queue,
- state: State,
- ):
- self._run(conns)
- duration = time.time() - start_time
- state.measurements.add(str(self), Measurement(duration, start_time))
- def _run(self, conns: queue.Queue):
- raise NotImplementedError
- class TdAction(Action):
- def __init__(self, td: str, c: Composition):
- self.td = dedent(td)
- self.c = c
- def _run(self, conns: queue.Queue):
- self.c.testdrive(self.td, quiet=True)
- def __str__(self) -> str:
- return "testdrive"
- class StandaloneQuery(Action):
- def __init__(
- self,
- query: str,
- conn_info: PgConnInfo,
- strict_serializable: bool = True,
- ):
- self.query = query
- self.conn_info = conn_info
- self.strict_serializable = strict_serializable
- def _run(self, conns: queue.Queue):
- conn = self.conn_info.connect()
- conn.autocommit = True
- with conn.cursor() as cur:
- if not self.strict_serializable:
- cur.execute("SET TRANSACTION_ISOLATION TO 'SERIALIZABLE'")
- execute_query(cur, self.query)
- conn.close()
- def __str__(self) -> str:
- return f"{self.query} (standalone)"
- class ReuseConnQuery(Action):
- def __init__(
- self,
- query: str,
- conn_info: PgConnInfo,
- strict_serializable: bool = True,
- ):
- self.query = query
- self.conn_info = conn_info
- self.strict_serializable = strict_serializable
- self._reconnect()
- def _reconnect(self) -> None:
- self.conn = self.conn_info.connect()
- self.conn.autocommit = True
- self.cur = self.conn.cursor()
- self.cur.execute(
- f"SET TRANSACTION_ISOLATION TO '{'STRICT SERIALIZABLE' if self.strict_serializable else 'SERIALIZABLE'}'"
- )
- def _run(self, conns: queue.Queue):
- execute_query(self.cur, self.query)
- def __str__(self) -> str:
- return f"{self.query} (reuse connection)"
- class PooledQuery(Action):
- def __init__(self, query: str, conn_info: PgConnInfo):
- self.query = query
- self.conn_info = conn_info
- def _run(self, conns: queue.Queue):
- conn = conns.get()
- with conn.cursor() as cur:
- try:
- execute_query(cur, self.query)
- except psycopg.OperationalError as e:
- print(f"Connection failed on query '{self.query}', reconnecting: {e}")
- conn.close()
- conn = self.conn_info.connect()
- execute_query(cur, self.query)
- conns.task_done()
- conns.put(conn)
- def __str__(self) -> str:
- return f"{self.query} (pooled)"
- def sleep_until(timestamp: float) -> None:
- time_to_sleep = timestamp - time.time()
- if time_to_sleep > 0:
- time.sleep(time_to_sleep)
- class Distribution:
- def generate(
- self, duration: int, action_name: str, state: State
- ) -> Iterator[float]:
- raise NotImplementedError
- class Periodic(Distribution):
- """Run the action in each thread in one second, spread apart by the 1/per_second"""
- def __init__(self, per_second: float):
- self.per_second = per_second
- def generate(
- self, duration: int, action_name: str, state: State
- ) -> Iterator[float]:
- per_second = state.periodic_dists.get(action_name) or self.per_second
- next_time = time.time()
- for i in range(int(duration * per_second)):
- yield next_time
- next_time += 1 / per_second
- sleep_until(next_time)
- class Gaussian(Distribution):
- """Run the action with a sleep time between actions drawn from a Gaussian distribution"""
- def __init__(self, mean: float, stddev: float):
- self.mean = mean
- self.stddev = stddev
- def generate(
- self, duration: int, action_name: str, state: State
- ) -> Iterator[float]:
- end_time = time.time() + duration
- next_time = time.time()
- while time.time() < end_time:
- yield next_time
- next_time += max(0, random.gauss(self.mean, self.stddev))
- sleep_until(next_time)
- class PhaseAction:
- report_regressions: bool
- action: Action
- def run(
- self,
- duration: int,
- jobs: queue.Queue,
- conns: queue.Queue,
- state: State,
- ) -> None:
- raise NotImplementedError
- class OpenLoop(PhaseAction):
- def __init__(
- self, action: Action, dist: Distribution, report_regressions: bool = True
- ):
- self.action = action
- self.dist = dist
- self.report_regressions = report_regressions
- def run(
- self,
- duration: int,
- jobs: queue.Queue,
- conns: queue.Queue,
- state: State,
- ) -> None:
- for start_time in self.dist.generate(duration, str(self.action), state):
- jobs.put(lambda: self.action.run(start_time, conns, state))
- class ClosedLoop(PhaseAction):
- def __init__(self, action: Action, report_regressions: bool = True):
- self.action = action
- self.report_regressions = report_regressions
- def run(
- self,
- duration: int,
- jobs: queue.Queue,
- conns: queue.Queue,
- state: State,
- ) -> None:
- end_time = time.time() + duration
- while time.time() < end_time:
- self.action.run(time.time(), conns, state)
- class Phase:
- def run(
- self,
- c: Composition,
- jobs: queue.Queue,
- conns: queue.Queue,
- state: State,
- ) -> None:
- raise NotImplementedError
- class TdPhase(Phase):
- def __init__(self, td: str):
- self.td = dedent(td)
- def run(
- self,
- c: Composition,
- jobs: queue.Queue,
- conns: queue.Queue,
- state: State,
- ) -> None:
- c.testdrive(self.td, quiet=True)
- class LoadPhase(Phase):
- duration: int
- phase_actions: Sequence[PhaseAction]
- def __init__(self, duration: int, actions: Sequence[PhaseAction]):
- self.duration = duration
- self.phase_actions = actions
- def run(
- self,
- c: Composition,
- jobs: queue.Queue,
- conns: queue.Queue,
- state: State,
- ) -> None:
- duration = state.load_phase_duration or self.duration
- print(f"Load phase for {duration}s")
- threads = [
- threading.Thread(
- target=phase_action.run,
- args=(duration, jobs, conns, state),
- )
- for phase_action in self.phase_actions
- ]
- for thread in threads:
- thread.start()
- for thread in threads:
- thread.join()
- def run_job(jobs: queue.Queue) -> None:
- while True:
- job = jobs.get()
- try:
- if not job:
- return
- job()
- finally:
- jobs.task_done()
- class Scenario:
- # Has to be set for the class already, not just in the constructor, so that
- # we can change the value for the entire class in the decorator
- enabled: bool = True
- phases: list[Phase]
- thread_pool_size: int
- conn_pool_size: int
- guarantees: dict[str, dict[str, float]]
- regression_thresholds: dict[str, dict[str, float]]
- jobs: queue.Queue
- conns: queue.Queue
- thread_pool: list[threading.Thread]
- version: str = "1.0.0"
- def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
- raise NotImplementedError
- @classmethod
- def name(cls) -> str:
- return cls.__name__
- def init(
- self,
- phases: list[Phase],
- thread_pool_size: int = 5000,
- conn_pool_size: int = 0,
- guarantees: dict[str, dict[str, float]] = {},
- regression_thresholds: dict[str, dict[str, float]] = {},
- ):
- self.phases = phases
- self.thread_pool_size = thread_pool_size
- self.conn_pool_size = conn_pool_size
- self.guarantees = guarantees
- self.regression_thresholds = regression_thresholds
- self.jobs = queue.Queue()
- self.conns = queue.Queue()
- def setup(self, c: Composition, conn_infos: dict[str, PgConnInfo]) -> None:
- conn_info = conn_infos["materialized"]
- self.thread_pool = [
- threading.Thread(target=run_job, args=(self.jobs,))
- for i in range(self.thread_pool_size)
- ]
- for thread in self.thread_pool:
- thread.start()
- # Start threads and have them wait for work from a queue
- for i in range(self.conn_pool_size):
- conn = conn_info.connect()
- conn.autocommit = True
- self.conns.put(conn)
- def run(
- self,
- c: Composition,
- state: State,
- ) -> None:
- for phase in self.phases:
- phase.run(c, self.jobs, self.conns, state)
- def teardown(self) -> None:
- while not self.conns.empty():
- conn = self.conns.get()
- conn.close()
- self.conns.task_done()
- for i in range(len(self.thread_pool)):
- # Indicate to every thread to stop working
- self.jobs.put(None)
- for thread in self.thread_pool:
- thread.join()
- self.jobs.join()
- def disabled(ignore_reason: str):
- def decorator(cls):
- cls.enabled = False
- return cls
- return decorator
|