framework.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import queue
  10. import random
  11. import sqlite3
  12. import threading
  13. import time
  14. from collections import defaultdict
  15. from collections.abc import Iterator, Sequence
  16. from dataclasses import dataclass
  17. from textwrap import dedent
  18. import psycopg
  19. from materialize.mzcompose.composition import Composition
  20. from materialize.util import PgConnInfo
  21. DB_FILE = "parallel-benchmark.db"
  22. assert (
  23. sqlite3.threadsafety == 3
  24. ), f"Thread safety level 3 (serialized) required, but is: {sqlite3.threadsafety}"
  25. class Measurement:
  26. duration: float
  27. timestamp: float
  28. def __init__(self, duration: float, timestamp: float):
  29. self.duration = duration
  30. self.timestamp = timestamp
  31. def __str__(self) -> str:
  32. return f"{self.timestamp} {self.duration}"
  33. class MeasurementsStore:
  34. def add(self, action: str, measurement: Measurement) -> None:
  35. raise NotImplementedError
  36. def actions(self) -> list[str]:
  37. raise NotImplementedError
  38. def close(self) -> None:
  39. raise NotImplementedError
  40. def get_data(
  41. self, action: str, start_time: float, end_time: float
  42. ) -> tuple[list[float], list[float]]:
  43. raise NotImplementedError
  44. class MemoryStore(MeasurementsStore):
  45. def __init__(self):
  46. self.data: defaultdict[str, list[Measurement]] = defaultdict(list)
  47. def add(self, action: str, measurement: Measurement) -> None:
  48. self.data[action].append(measurement)
  49. def actions(self) -> list[str]:
  50. return list(self.data.keys())
  51. def close(self) -> None:
  52. pass
  53. def get_data(
  54. self, action: str, start_time: float, end_time: float
  55. ) -> tuple[list[float], list[float]]:
  56. times: list[float] = [x.timestamp - start_time for x in self.data[action]]
  57. durations: list[float] = [x.duration * 1000 for x in self.data[action]]
  58. return (times, durations)
  59. class SQLiteStore(MeasurementsStore):
  60. def __init__(self, scenario: str):
  61. self.scenario = scenario
  62. self.lock = threading.Lock()
  63. self.conn = sqlite3.connect(
  64. DB_FILE, check_same_thread=False, isolation_level=None
  65. )
  66. cursor = self.conn.cursor()
  67. cursor.execute("PRAGMA journal_mode=WAL;")
  68. cursor.execute("PRAGMA synchronous=OFF;")
  69. cursor.execute("PRAGMA cache_size=-64000;") # 64 MB
  70. cursor.execute("PRAGMA locking_mode=EXCLUSIVE;")
  71. self.conn.commit()
  72. cursor.execute(
  73. "CREATE TABLE IF NOT EXISTS measurements (scenario TEXT NOT NULL, action TEXT NOT NULL, duration FLOAT NOT NULL, timestamp FLOAT NOT NULL);"
  74. )
  75. cursor.execute("DELETE FROM measurements WHERE scenario = ?", (self.scenario,))
  76. cursor.close()
  77. def add(self, action: str, measurement: Measurement) -> None:
  78. with self.lock:
  79. cursor = self.conn.cursor()
  80. try:
  81. cursor.execute(
  82. "INSERT INTO measurements VALUES (?, ?, ?, ?)",
  83. (
  84. self.scenario,
  85. action,
  86. measurement.duration * 1000,
  87. measurement.timestamp,
  88. ),
  89. )
  90. except Exception as e:
  91. print(
  92. f"Caught exception {str(e)} with values: {self.scenario}, {action}, {measurement.duration}, {measurement.timestamp}"
  93. )
  94. raise
  95. cursor.close()
  96. def actions(self) -> list[str]:
  97. with self.lock:
  98. cursor = self.conn.cursor()
  99. cursor.execute(
  100. "SELECT DISTINCT action FROM measurements WHERE scenario = ?",
  101. (self.scenario,),
  102. )
  103. result = [row[0] for row in cursor.fetchall()]
  104. cursor.close()
  105. return result
  106. def close(self) -> None:
  107. with self.lock:
  108. self.conn.close()
  109. def get_data(
  110. self, action: str, start_time: float, end_time: float
  111. ) -> tuple[list[float], list[float]]:
  112. with self.lock:
  113. cursor = self.conn.cursor()
  114. cursor.execute(
  115. "SELECT duration, timestamp FROM measurements WHERE scenario = ? AND action = ? AND timestamp BETWEEN ? AND ?",
  116. (self.scenario, action, start_time, end_time),
  117. )
  118. times: list[float] = []
  119. durations: list[float] = []
  120. for row in cursor:
  121. durations.append(row[0])
  122. times.append(row[1] - start_time)
  123. return (times, durations)
  124. @dataclass
  125. class State:
  126. measurements: MeasurementsStore
  127. load_phase_duration: int | None
  128. periodic_dists: dict[str, int]
  129. def execute_query(cur: psycopg.Cursor, query: str) -> None:
  130. while True:
  131. try:
  132. cur.execute(query.encode())
  133. break
  134. except Exception as e:
  135. if "deadlock detected" in str(e):
  136. print(f"Deadlock detected, retrying: {query}")
  137. elif (
  138. "timed out before ingesting the source's visible frontier when real-time-recency query issued"
  139. in str(e)
  140. ):
  141. print("RTR timeout, ignoring")
  142. break
  143. else:
  144. raise
  145. class Action:
  146. def run(
  147. self,
  148. start_time: float,
  149. conns: queue.Queue,
  150. state: State,
  151. ):
  152. self._run(conns)
  153. duration = time.time() - start_time
  154. state.measurements.add(str(self), Measurement(duration, start_time))
  155. def _run(self, conns: queue.Queue):
  156. raise NotImplementedError
  157. class TdAction(Action):
  158. def __init__(self, td: str, c: Composition):
  159. self.td = dedent(td)
  160. self.c = c
  161. def _run(self, conns: queue.Queue):
  162. self.c.testdrive(self.td, quiet=True)
  163. def __str__(self) -> str:
  164. return "testdrive"
  165. class StandaloneQuery(Action):
  166. def __init__(
  167. self,
  168. query: str,
  169. conn_info: PgConnInfo,
  170. strict_serializable: bool = True,
  171. ):
  172. self.query = query
  173. self.conn_info = conn_info
  174. self.strict_serializable = strict_serializable
  175. def _run(self, conns: queue.Queue):
  176. conn = self.conn_info.connect()
  177. conn.autocommit = True
  178. with conn.cursor() as cur:
  179. if not self.strict_serializable:
  180. cur.execute("SET TRANSACTION_ISOLATION TO 'SERIALIZABLE'")
  181. execute_query(cur, self.query)
  182. conn.close()
  183. def __str__(self) -> str:
  184. return f"{self.query} (standalone)"
  185. class ReuseConnQuery(Action):
  186. def __init__(
  187. self,
  188. query: str,
  189. conn_info: PgConnInfo,
  190. strict_serializable: bool = True,
  191. ):
  192. self.query = query
  193. self.conn_info = conn_info
  194. self.strict_serializable = strict_serializable
  195. self._reconnect()
  196. def _reconnect(self) -> None:
  197. self.conn = self.conn_info.connect()
  198. self.conn.autocommit = True
  199. self.cur = self.conn.cursor()
  200. self.cur.execute(
  201. f"SET TRANSACTION_ISOLATION TO '{'STRICT SERIALIZABLE' if self.strict_serializable else 'SERIALIZABLE'}'"
  202. )
  203. def _run(self, conns: queue.Queue):
  204. execute_query(self.cur, self.query)
  205. def __str__(self) -> str:
  206. return f"{self.query} (reuse connection)"
  207. class PooledQuery(Action):
  208. def __init__(self, query: str, conn_info: PgConnInfo):
  209. self.query = query
  210. self.conn_info = conn_info
  211. def _run(self, conns: queue.Queue):
  212. conn = conns.get()
  213. with conn.cursor() as cur:
  214. try:
  215. execute_query(cur, self.query)
  216. except psycopg.OperationalError as e:
  217. print(f"Connection failed on query '{self.query}', reconnecting: {e}")
  218. conn.close()
  219. conn = self.conn_info.connect()
  220. execute_query(cur, self.query)
  221. conns.task_done()
  222. conns.put(conn)
  223. def __str__(self) -> str:
  224. return f"{self.query} (pooled)"
  225. def sleep_until(timestamp: float) -> None:
  226. time_to_sleep = timestamp - time.time()
  227. if time_to_sleep > 0:
  228. time.sleep(time_to_sleep)
  229. class Distribution:
  230. def generate(
  231. self, duration: int, action_name: str, state: State
  232. ) -> Iterator[float]:
  233. raise NotImplementedError
  234. class Periodic(Distribution):
  235. """Run the action in each thread in one second, spread apart by the 1/per_second"""
  236. def __init__(self, per_second: float):
  237. self.per_second = per_second
  238. def generate(
  239. self, duration: int, action_name: str, state: State
  240. ) -> Iterator[float]:
  241. per_second = state.periodic_dists.get(action_name) or self.per_second
  242. next_time = time.time()
  243. for i in range(int(duration * per_second)):
  244. yield next_time
  245. next_time += 1 / per_second
  246. sleep_until(next_time)
  247. class Gaussian(Distribution):
  248. """Run the action with a sleep time between actions drawn from a Gaussian distribution"""
  249. def __init__(self, mean: float, stddev: float):
  250. self.mean = mean
  251. self.stddev = stddev
  252. def generate(
  253. self, duration: int, action_name: str, state: State
  254. ) -> Iterator[float]:
  255. end_time = time.time() + duration
  256. next_time = time.time()
  257. while time.time() < end_time:
  258. yield next_time
  259. next_time += max(0, random.gauss(self.mean, self.stddev))
  260. sleep_until(next_time)
  261. class PhaseAction:
  262. report_regressions: bool
  263. action: Action
  264. def run(
  265. self,
  266. duration: int,
  267. jobs: queue.Queue,
  268. conns: queue.Queue,
  269. state: State,
  270. ) -> None:
  271. raise NotImplementedError
  272. class OpenLoop(PhaseAction):
  273. def __init__(
  274. self, action: Action, dist: Distribution, report_regressions: bool = True
  275. ):
  276. self.action = action
  277. self.dist = dist
  278. self.report_regressions = report_regressions
  279. def run(
  280. self,
  281. duration: int,
  282. jobs: queue.Queue,
  283. conns: queue.Queue,
  284. state: State,
  285. ) -> None:
  286. for start_time in self.dist.generate(duration, str(self.action), state):
  287. jobs.put(lambda: self.action.run(start_time, conns, state))
  288. class ClosedLoop(PhaseAction):
  289. def __init__(self, action: Action, report_regressions: bool = True):
  290. self.action = action
  291. self.report_regressions = report_regressions
  292. def run(
  293. self,
  294. duration: int,
  295. jobs: queue.Queue,
  296. conns: queue.Queue,
  297. state: State,
  298. ) -> None:
  299. end_time = time.time() + duration
  300. while time.time() < end_time:
  301. self.action.run(time.time(), conns, state)
  302. class Phase:
  303. def run(
  304. self,
  305. c: Composition,
  306. jobs: queue.Queue,
  307. conns: queue.Queue,
  308. state: State,
  309. ) -> None:
  310. raise NotImplementedError
  311. class TdPhase(Phase):
  312. def __init__(self, td: str):
  313. self.td = dedent(td)
  314. def run(
  315. self,
  316. c: Composition,
  317. jobs: queue.Queue,
  318. conns: queue.Queue,
  319. state: State,
  320. ) -> None:
  321. c.testdrive(self.td, quiet=True)
  322. class LoadPhase(Phase):
  323. duration: int
  324. phase_actions: Sequence[PhaseAction]
  325. def __init__(self, duration: int, actions: Sequence[PhaseAction]):
  326. self.duration = duration
  327. self.phase_actions = actions
  328. def run(
  329. self,
  330. c: Composition,
  331. jobs: queue.Queue,
  332. conns: queue.Queue,
  333. state: State,
  334. ) -> None:
  335. duration = state.load_phase_duration or self.duration
  336. print(f"Load phase for {duration}s")
  337. threads = [
  338. threading.Thread(
  339. target=phase_action.run,
  340. args=(duration, jobs, conns, state),
  341. )
  342. for phase_action in self.phase_actions
  343. ]
  344. for thread in threads:
  345. thread.start()
  346. for thread in threads:
  347. thread.join()
  348. def run_job(jobs: queue.Queue) -> None:
  349. while True:
  350. job = jobs.get()
  351. try:
  352. if not job:
  353. return
  354. job()
  355. finally:
  356. jobs.task_done()
  357. class Scenario:
  358. # Has to be set for the class already, not just in the constructor, so that
  359. # we can change the value for the entire class in the decorator
  360. enabled: bool = True
  361. phases: list[Phase]
  362. thread_pool_size: int
  363. conn_pool_size: int
  364. guarantees: dict[str, dict[str, float]]
  365. regression_thresholds: dict[str, dict[str, float]]
  366. jobs: queue.Queue
  367. conns: queue.Queue
  368. thread_pool: list[threading.Thread]
  369. version: str = "1.0.0"
  370. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  371. raise NotImplementedError
  372. @classmethod
  373. def name(cls) -> str:
  374. return cls.__name__
  375. def init(
  376. self,
  377. phases: list[Phase],
  378. thread_pool_size: int = 5000,
  379. conn_pool_size: int = 0,
  380. guarantees: dict[str, dict[str, float]] = {},
  381. regression_thresholds: dict[str, dict[str, float]] = {},
  382. ):
  383. self.phases = phases
  384. self.thread_pool_size = thread_pool_size
  385. self.conn_pool_size = conn_pool_size
  386. self.guarantees = guarantees
  387. self.regression_thresholds = regression_thresholds
  388. self.jobs = queue.Queue()
  389. self.conns = queue.Queue()
  390. def setup(self, c: Composition, conn_infos: dict[str, PgConnInfo]) -> None:
  391. conn_info = conn_infos["materialized"]
  392. self.thread_pool = [
  393. threading.Thread(target=run_job, args=(self.jobs,))
  394. for i in range(self.thread_pool_size)
  395. ]
  396. for thread in self.thread_pool:
  397. thread.start()
  398. # Start threads and have them wait for work from a queue
  399. for i in range(self.conn_pool_size):
  400. conn = conn_info.connect()
  401. conn.autocommit = True
  402. self.conns.put(conn)
  403. def run(
  404. self,
  405. c: Composition,
  406. state: State,
  407. ) -> None:
  408. for phase in self.phases:
  409. phase.run(c, self.jobs, self.conns, state)
  410. def teardown(self) -> None:
  411. while not self.conns.empty():
  412. conn = self.conns.get()
  413. conn.close()
  414. self.conns.task_done()
  415. for i in range(len(self.thread_pool)):
  416. # Indicate to every thread to stop working
  417. self.jobs.put(None)
  418. for thread in self.thread_pool:
  419. thread.join()
  420. self.jobs.join()
  421. def disabled(ignore_reason: str):
  422. def decorator(cls):
  423. cls.enabled = False
  424. return cls
  425. return decorator