mzcompose.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Benchmark with scenarios combining closed and open loops, can run multiple
  11. actions concurrently, measures various kinds of statistics.
  12. """
  13. import gc
  14. import os
  15. import time
  16. from pathlib import Path
  17. import matplotlib.pyplot as plt
  18. import numpy
  19. from matplotlib.markers import MarkerStyle
  20. from materialize import MZ_ROOT, buildkite
  21. from materialize.mz_env_util import get_cloud_hostname
  22. from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
  23. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  24. from materialize.mzcompose.services.azurite import Azurite
  25. from materialize.mzcompose.services.balancerd import Balancerd
  26. from materialize.mzcompose.services.cockroach import Cockroach
  27. from materialize.mzcompose.services.kafka import Kafka as KafkaService
  28. from materialize.mzcompose.services.kgen import Kgen as KgenService
  29. from materialize.mzcompose.services.materialized import Materialized
  30. from materialize.mzcompose.services.minio import Minio
  31. from materialize.mzcompose.services.mysql import MySql
  32. from materialize.mzcompose.services.mz import Mz
  33. from materialize.mzcompose.services.postgres import Postgres
  34. from materialize.mzcompose.services.redpanda import Redpanda
  35. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  36. from materialize.mzcompose.services.testdrive import Testdrive
  37. from materialize.mzcompose.services.zookeeper import Zookeeper
  38. from materialize.mzcompose.test_result import (
  39. FailedTestExecutionError,
  40. TestFailureDetails,
  41. )
  42. from materialize.parallel_benchmark.framework import (
  43. DB_FILE,
  44. LoadPhase,
  45. MeasurementsStore,
  46. MemoryStore,
  47. Scenario,
  48. SQLiteStore,
  49. State,
  50. )
  51. from materialize.parallel_benchmark.scenarios import * # noqa: F401 F403
  52. from materialize.test_analytics.config.test_analytics_db_config import (
  53. create_test_analytics_config,
  54. )
  55. from materialize.test_analytics.data.parallel_benchmark import (
  56. parallel_benchmark_result_storage,
  57. )
  58. from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
  59. from materialize.util import PgConnInfo, all_subclasses, parse_pg_conn_string
  60. from materialize.version_list import resolve_ancestor_image_tag
  61. PARALLEL_BENCHMARK_FRAMEWORK_VERSION = "1.2.0"
  62. def known_regression(scenario: str, other_tag: str) -> bool:
  63. return False
  64. REGRESSION_THRESHOLDS = {
  65. "queries": None,
  66. "qps": 1.2,
  67. "max": None,
  68. "min": None,
  69. "avg": 1.2,
  70. "p50": 1.2,
  71. "p95": 1.3,
  72. "p99": None,
  73. "p99_9": None,
  74. "p99_99": None,
  75. "p99_999": None,
  76. "p99_9999": None,
  77. "p99_99999": None,
  78. "p99_999999": None,
  79. "std": None,
  80. "slope": None,
  81. }
  82. SERVICES = [
  83. Zookeeper(),
  84. KafkaService(),
  85. SchemaRegistry(),
  86. Redpanda(),
  87. Cockroach(setup_materialize=True),
  88. Minio(setup_materialize=True),
  89. Azurite(),
  90. KgenService(),
  91. Postgres(),
  92. MySql(),
  93. Balancerd(),
  94. # Overridden below
  95. Materialized(),
  96. Testdrive(),
  97. Mz(app_password=""),
  98. ]
  99. class Statistics:
  100. def __init__(self, action: str, m: MeasurementsStore, start_time: float):
  101. if isinstance(m, MemoryStore):
  102. times: list[float] = [x.timestamp - start_time for x in m.data[action]]
  103. durations: list[float] = [x.duration * 1000 for x in m.data[action]]
  104. self.queries: int = len(times)
  105. self.qps: float = len(times) / max(times)
  106. self.max: float = max(durations)
  107. self.min: float = min(durations)
  108. self.avg: float = float(numpy.mean(durations))
  109. self.p50: float = float(numpy.median(durations))
  110. self.p95: float = float(numpy.percentile(durations, 95))
  111. self.p99: float = float(numpy.percentile(durations, 99))
  112. self.p99_9: float = float(numpy.percentile(durations, 99.9))
  113. self.p99_99: float = float(numpy.percentile(durations, 99.99))
  114. self.p99_999: float = float(numpy.percentile(durations, 99.999))
  115. self.p99_9999: float = float(numpy.percentile(durations, 99.9999))
  116. self.p99_99999: float = float(numpy.percentile(durations, 99.99999))
  117. self.p99_999999: float = float(numpy.percentile(durations, 99.999999))
  118. self.std: float = float(numpy.std(durations, ddof=1))
  119. self.slope: float = float(numpy.polyfit(times, durations, 1)[0])
  120. elif isinstance(m, SQLiteStore):
  121. cursor = m.conn.cursor()
  122. cursor.execute(
  123. """
  124. WITH RankedDurations AS (
  125. SELECT
  126. duration,
  127. ROW_NUMBER() OVER (ORDER BY duration ASC) AS row_num,
  128. COUNT(*) OVER () AS total_rows
  129. FROM measurements
  130. WHERE scenario = ? AND action = ?
  131. ),
  132. Percentiles AS (
  133. SELECT
  134. MAX(CASE WHEN row_num <= total_rows * 0.50 THEN duration END) AS p50,
  135. MAX(CASE WHEN row_num <= total_rows * 0.95 THEN duration END) AS p95,
  136. MAX(CASE WHEN row_num <= total_rows * 0.99 THEN duration END) AS p99,
  137. MAX(CASE WHEN row_num <= total_rows * 0.999 THEN duration END) AS p99_9,
  138. MAX(CASE WHEN row_num <= total_rows * 0.9999 THEN duration END) AS p99_99,
  139. MAX(CASE WHEN row_num <= total_rows * 0.99999 THEN duration END) AS p99_999,
  140. MAX(CASE WHEN row_num <= total_rows * 0.999999 THEN duration END) AS p99_9999,
  141. MAX(CASE WHEN row_num <= total_rows * 0.9999999 THEN duration END) AS p99_99999,
  142. MAX(CASE WHEN row_num <= total_rows * 0.99999999 THEN duration END) AS p99_999999
  143. FROM RankedDurations
  144. ),
  145. Regression AS (
  146. SELECT
  147. COUNT(*) AS n,
  148. SUM(timestamp * duration) AS sum_xy,
  149. SUM(timestamp) AS sum_x,
  150. SUM(duration) AS sum_y,
  151. SUM(timestamp * timestamp) AS sum_xx
  152. FROM measurements
  153. WHERE scenario = ? AND action = ?
  154. ),
  155. Stats AS (
  156. SELECT
  157. avg(duration) AS avg_duration,
  158. COUNT(*) AS count_durations
  159. FROM measurements
  160. WHERE scenario = ? AND action = ?
  161. ),
  162. VarianceCalc AS (
  163. SELECT
  164. SUM((duration - (SELECT avg_duration FROM Stats)) * (duration - (SELECT avg_duration FROM Stats))) AS variance
  165. FROM measurements
  166. WHERE scenario = ? AND action = ?
  167. )
  168. SELECT
  169. count(*),
  170. count(*) / (max(timestamp) - ?),
  171. max(duration),
  172. min(duration),
  173. avg(duration),
  174. (sqrt(variance / count_durations)),
  175. p50,
  176. p95,
  177. p99,
  178. p99_9,
  179. p99_99,
  180. p99_999,
  181. p99_9999,
  182. p99_99999,
  183. p99_999999,
  184. (r.n * r.sum_xy - r.sum_x * r.sum_y) / (r.n * r.sum_xx - r.sum_x * r.sum_x)
  185. FROM measurements
  186. JOIN Percentiles ON true
  187. JOIN Regression r ON true
  188. JOIN Stats ON true
  189. JOIN VarianceCalc ON true
  190. WHERE scenario = ? AND action = ?
  191. """,
  192. (
  193. m.scenario,
  194. action,
  195. m.scenario,
  196. action,
  197. m.scenario,
  198. action,
  199. m.scenario,
  200. action,
  201. start_time,
  202. m.scenario,
  203. action,
  204. ),
  205. )
  206. (
  207. self.queries,
  208. self.qps,
  209. self.max,
  210. self.min,
  211. self.avg,
  212. self.std,
  213. self.p50,
  214. self.p95,
  215. self.p99,
  216. self.p99_9,
  217. self.p99_99,
  218. self.p99_999,
  219. self.p99_9999,
  220. self.p99_99999,
  221. self.p99_999999,
  222. self.slope,
  223. ) = cursor.fetchone()
  224. else:
  225. raise ValueError(
  226. f"Unknown measurements store (for action {action}): {type(m)}"
  227. )
  228. def __str__(self) -> str:
  229. return f""" queries: {self.queries:>5}
  230. qps: {self.qps:>7.2f}
  231. min: {self.min:>7.2f}ms
  232. avg: {self.avg:>7.2f}ms
  233. p50: {self.p50:>7.2f}ms
  234. p95: {self.p95:>7.2f}ms
  235. p99: {self.p99:>7.2f}ms
  236. max: {self.max:>7.2f}ms
  237. std: {self.std:>7.2f}ms
  238. slope: {self.slope:>5.4f}"""
  239. def __dir__(self) -> list[str]:
  240. return [
  241. "queries",
  242. "qps",
  243. "max",
  244. "min",
  245. "avg",
  246. "p50",
  247. "p95",
  248. "p99",
  249. "std",
  250. "slope",
  251. ]
  252. def upload_plots(
  253. plot_paths: list[str],
  254. scenario_name: str,
  255. variant: str,
  256. ):
  257. if buildkite.is_in_buildkite():
  258. for plot_path in plot_paths:
  259. buildkite.upload_artifact(plot_path, cwd=MZ_ROOT, quiet=True)
  260. print(f"+++ Plot for {scenario_name} ({variant})")
  261. for plot_path in plot_paths:
  262. print(
  263. buildkite.inline_image(
  264. f"artifact://{plot_path}", f"Plot for {scenario_name} ({variant})"
  265. )
  266. )
  267. else:
  268. print(f"Saving plots to {plot_paths}")
  269. def report(
  270. mz_string: str,
  271. scenario: Scenario,
  272. measurements: MeasurementsStore,
  273. start_time: float,
  274. guarantees: bool,
  275. suffix: str,
  276. ) -> tuple[dict[str, Statistics], list[TestFailureDetails]]:
  277. scenario_name = type(scenario).name()
  278. stats: dict[str, Statistics] = {}
  279. failures: list[TestFailureDetails] = []
  280. end_time = time.time()
  281. for action in measurements.actions():
  282. stats[action] = Statistics(action, measurements, start_time)
  283. print(f"Statistics for {action}:\n{stats[action]}")
  284. if action in scenario.guarantees and guarantees:
  285. for stat, guarantee in scenario.guarantees[action].items():
  286. duration = getattr(stats[action], stat)
  287. less_than = less_than_is_regression(stat)
  288. if duration < guarantee if less_than else duration > guarantee:
  289. failure = f"Scenario {scenario_name} failed: {action}: {stat}: {duration:.2f} {'<' if less_than else '>'} {guarantee:.2f}"
  290. print(failure)
  291. failures.append(
  292. TestFailureDetails(
  293. message=failure,
  294. details=str(stats[action]),
  295. test_class_name_override=scenario_name,
  296. )
  297. )
  298. else:
  299. print(
  300. f"Scenario {scenario_name} succeeded: {action}: {stat}: {duration:.2f} {'>=' if less_than else '<='} {guarantee:.2f}"
  301. )
  302. plot_paths: list[str] = []
  303. num_plots = 1 if isinstance(measurements, MemoryStore) else 24
  304. for i in range(num_plots):
  305. plt.figure(figsize=(10, 6))
  306. for action in measurements.actions():
  307. interval = (end_time - start_time) / num_plots
  308. times, durations = measurements.get_data(
  309. action, start_time + interval * i, start_time + interval * (i + 1)
  310. )
  311. plt.scatter(times, durations, label=action[:60], marker=MarkerStyle("+"))
  312. plt.xlabel("time [s]")
  313. plt.ylabel("latency [ms]")
  314. plt.yscale("log")
  315. title = f"{scenario_name}\nagainst {mz_string}"
  316. if num_plots > 1:
  317. title += f"\n(part {i+1}/{num_plots})"
  318. plt.title(title)
  319. plt.legend(loc="best")
  320. plt.grid(True)
  321. plt.ylim(bottom=0)
  322. plot_path = f"plots/{scenario_name}_{suffix}_{i}_timeline.png"
  323. plt.savefig(MZ_ROOT / plot_path, dpi=300)
  324. plot_paths.append(plot_path)
  325. plt.close()
  326. upload_plots(plot_paths, scenario_name, "timeline")
  327. if isinstance(measurements, MemoryStore):
  328. # Plot CCDF
  329. plt.grid(True, which="both")
  330. plt.xscale("log")
  331. plt.yscale("log")
  332. plt.ylabel("CCDF")
  333. plt.xlabel("latency [ms]")
  334. plt.title(f"{scenario_name} against {mz_string}")
  335. for key, m in measurements.data.items():
  336. durations = [x.duration * 1000.0 for x in m]
  337. durations.sort()
  338. (uniqu_durations, counts) = numpy.unique(durations, return_counts=True)
  339. counts = numpy.cumsum(counts)
  340. plt.plot(uniqu_durations, 1 - counts / counts.max(), label=key)
  341. plt.legend(loc="best")
  342. plot_path = f"plots/{scenario_name}_{suffix}_ccdf.png"
  343. plt.savefig(MZ_ROOT / plot_path, dpi=300)
  344. upload_plots([plot_path], scenario_name, "ccdf")
  345. plt.close()
  346. return stats, failures
  347. def run_once(
  348. c: Composition,
  349. scenarios: list[type[Scenario]],
  350. service_names: list[str],
  351. tag: str | None,
  352. params: str | None,
  353. args,
  354. suffix: str,
  355. sqlite_store: bool,
  356. ) -> tuple[dict[Scenario, dict[str, Statistics]], list[TestFailureDetails]]:
  357. stats: dict[Scenario, dict[str, Statistics]] = {}
  358. failures: list[TestFailureDetails] = []
  359. overrides = []
  360. if args.benchmarking_env:
  361. assert not args.mz_url
  362. assert not args.canary_env
  363. region = "aws/us-east-1"
  364. environment = os.getenv("ENVIRONMENT", "staging")
  365. app_password = os.environ["QA_BENCHMARKING_APP_PASSWORD"]
  366. target = PgConnInfo(
  367. user="qabenchmarking",
  368. password=app_password,
  369. database="materialize",
  370. # Service accounts can't use mz
  371. host="4pe2w4etmpsnwx1iizersezg7.lb.us-east-1.aws.staging.materialize.cloud",
  372. # host=get_cloud_hostname(
  373. # c, region=region, environment=environment, app_password=app_password
  374. # ),
  375. port=6875,
  376. ssl=True,
  377. )
  378. elif args.canary_env:
  379. assert not args.mz_url
  380. assert not args.benchmarking_env
  381. region = "aws/us-east-1"
  382. environment = os.getenv("ENVIRONMENT", "production")
  383. app_password = os.environ["CANARY_LOADTEST_APP_PASSWORD"]
  384. target = PgConnInfo(
  385. user=os.getenv(
  386. "CANARY_LOADTEST_USERNAME", "infra+qacanaryload@materialize.io"
  387. ),
  388. password=app_password,
  389. database="materialize",
  390. host=get_cloud_hostname(
  391. c, region=region, environment=environment, app_password=app_password
  392. ),
  393. port=6875,
  394. ssl=True,
  395. )
  396. elif args.mz_url:
  397. overrides = [
  398. Testdrive(
  399. no_reset=True,
  400. materialize_url=args.mz_url,
  401. no_consistency_checks=True,
  402. )
  403. ]
  404. target = parse_pg_conn_string(args.mz_url)
  405. else:
  406. overrides = [
  407. Materialized(
  408. image=f"materialize/materialized:{tag}" if tag else None,
  409. default_size=args.size,
  410. soft_assertions=False,
  411. external_metadata_store=True,
  412. external_blob_store=True,
  413. # TODO: Better azurite support detection
  414. blob_store_is_azure=args.azurite and bool(tag),
  415. sanity_restart=False,
  416. additional_system_parameter_defaults=ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
  417. | {"max_connections": "100000"},
  418. metadata_store="cockroach",
  419. ),
  420. Testdrive(
  421. no_reset=True,
  422. seed=1,
  423. metadata_store="cockroach",
  424. external_blob_store=True,
  425. # TODO: Better azurite support detection
  426. blob_store_is_azure=args.azurite and bool(tag),
  427. ),
  428. ]
  429. target = None
  430. c.silent = True
  431. with c.override(*overrides):
  432. for scenario_class in scenarios:
  433. if target:
  434. c.up({"name": "testdrive", "persistent": True})
  435. conn_infos = {"materialized": target}
  436. conn = target.connect()
  437. with conn.cursor() as cur:
  438. cur.execute(
  439. "SELECT version()"
  440. if args.pure_postgres
  441. else "SELECT mz_version()"
  442. )
  443. mz_version = cur.fetchall()[0][0]
  444. conn.close()
  445. mz_string = f"{mz_version} ({target.host})"
  446. else:
  447. print("~~~ Starting up services")
  448. c.up(*service_names, {"name": "testdrive", "persistent": True})
  449. mz_version = c.query_mz_version()
  450. mz_string = f"{mz_version} (docker)"
  451. conn_infos = {
  452. "materialized": PgConnInfo(
  453. user="materialize",
  454. database="materialize",
  455. host="127.0.0.1",
  456. port=c.default_port("materialized"),
  457. ),
  458. "mz_system": PgConnInfo(
  459. user="mz_system",
  460. database="materialize",
  461. host="127.0.0.1",
  462. port=c.port("materialized", 6877),
  463. ),
  464. "postgres": PgConnInfo(
  465. user="postgres",
  466. password="postgres",
  467. database="postgres",
  468. host="127.0.0.1",
  469. port=c.default_port("postgres"),
  470. ),
  471. }
  472. scenario_name = scenario_class.name()
  473. print(f"--- Running scenario {scenario_name}")
  474. state = State(
  475. measurements=(
  476. SQLiteStore(scenario_name) if sqlite_store else MemoryStore()
  477. ),
  478. load_phase_duration=args.load_phase_duration,
  479. periodic_dists={pd[0]: int(pd[1]) for pd in args.periodic_dist or []},
  480. )
  481. scenario = scenario_class(c, conn_infos)
  482. scenario.setup(c, conn_infos)
  483. start_time = time.time()
  484. Path(MZ_ROOT / "plots").mkdir(parents=True, exist_ok=True)
  485. try:
  486. if not args.benchmarking_env:
  487. # Don't let the garbage collector interfere with our measurements
  488. gc.disable()
  489. scenario.run(c, state)
  490. scenario.teardown()
  491. gc.collect()
  492. gc.enable()
  493. finally:
  494. new_stats, new_failures = report(
  495. mz_string,
  496. scenario,
  497. state.measurements,
  498. start_time,
  499. args.guarantees,
  500. suffix,
  501. )
  502. failures.extend(new_failures)
  503. stats[scenario] = new_stats
  504. state.measurements.close()
  505. if not target:
  506. print(
  507. "~~~ Resetting materialized to prevent interference between scenarios"
  508. )
  509. c.kill("cockroach", "materialized", "testdrive", "minio")
  510. c.rm(
  511. "cockroach",
  512. "materialized",
  513. "testdrive",
  514. "minio",
  515. destroy_volumes=True,
  516. )
  517. c.rm_volumes("mzdata")
  518. return stats, failures
  519. def less_than_is_regression(stat: str) -> bool:
  520. return stat == "qps"
  521. def check_regressions(
  522. this_stats: dict[Scenario, dict[str, Statistics]],
  523. other_stats: dict[Scenario, dict[str, Statistics]],
  524. other_tag: str,
  525. ) -> list[TestFailureDetails]:
  526. failures: list[TestFailureDetails] = []
  527. assert len(this_stats) == len(other_stats)
  528. for scenario, other_scenario in zip(this_stats.keys(), other_stats.keys()):
  529. scenario_name = type(scenario).name()
  530. assert type(other_scenario).name() == scenario_name
  531. has_failed = False
  532. print(f"Comparing scenario {scenario_name}")
  533. output_lines = [
  534. f"{'QUERY':<40} | {'STAT':<7} | {'THIS':^12} | {'OTHER':^12} | {'CHANGE':^9} | {'THRESHOLD':^9} | {'REGRESSION?':^12}",
  535. "-" * 118,
  536. ]
  537. ignored_queries = set()
  538. for phase in scenario.phases:
  539. # We only care about LoadPhases, and only they have report_regressions
  540. if not isinstance(phase, LoadPhase):
  541. continue
  542. for phase_action in phase.phase_actions:
  543. if not phase_action.report_regressions:
  544. ignored_queries.add(str(phase_action.action))
  545. for query in this_stats[scenario].keys():
  546. for stat in dir(this_stats[scenario][query]):
  547. this_value = getattr(this_stats[scenario][query], stat)
  548. other_value = getattr(other_stats[other_scenario][query], stat)
  549. less_than = less_than_is_regression(stat)
  550. try:
  551. percentage = f"{(this_value / other_value - 1) * 100:.2f}%"
  552. except ZeroDivisionError:
  553. percentage = ""
  554. threshold = (
  555. None
  556. if query in ignored_queries
  557. else (
  558. scenario.regression_thresholds.get(query, {}).get(stat)
  559. or REGRESSION_THRESHOLDS[stat]
  560. )
  561. )
  562. if threshold is None:
  563. regression = ""
  564. elif (
  565. this_value < other_value / threshold
  566. if less_than
  567. else this_value > other_value * threshold
  568. ):
  569. regression = "!!YES!!"
  570. if not known_regression(scenario_name, other_tag):
  571. has_failed = True
  572. else:
  573. regression = "no"
  574. threshold_text = (
  575. f"{((threshold - 1) * 100):.0f}%" if threshold is not None else ""
  576. )
  577. output_lines.append(
  578. f"{query[:40]:<40} | {stat:<7} | {this_value:>12.2f} | {other_value:>12.2f} | {percentage:>9} | {threshold_text:>9} | {regression:^12}"
  579. )
  580. print("\n".join(output_lines))
  581. if has_failed:
  582. failures.append(
  583. TestFailureDetails(
  584. message=f"Scenario {scenario_name} regressed",
  585. details="\n".join(output_lines),
  586. test_class_name_override=scenario_name,
  587. )
  588. )
  589. return failures
  590. def resolve_tag(tag: str) -> str:
  591. if tag == "common-ancestor":
  592. # TODO: We probably will need overrides too
  593. return resolve_ancestor_image_tag({})
  594. return tag
  595. def upload_results_to_test_analytics(
  596. c: Composition,
  597. load_phase_duration: int | None,
  598. stats: dict[Scenario, dict[str, Statistics]],
  599. was_successful: bool,
  600. ) -> None:
  601. if not buildkite.is_in_buildkite():
  602. return
  603. test_analytics = TestAnalyticsDb(create_test_analytics_config(c))
  604. test_analytics.builds.add_build_job(was_successful=was_successful)
  605. result_entries = []
  606. for scenario in stats.keys():
  607. scenario_name = type(scenario).name()
  608. scenario_version = scenario.version
  609. for query in stats[scenario].keys():
  610. result_entries.append(
  611. parallel_benchmark_result_storage.ParallelBenchmarkResultEntry(
  612. scenario_name=scenario_name,
  613. scenario_version=str(scenario_version),
  614. query=query,
  615. load_phase_duration=load_phase_duration,
  616. queries=stats[scenario][query].queries,
  617. qps=stats[scenario][query].qps,
  618. min=stats[scenario][query].min,
  619. max=stats[scenario][query].max,
  620. avg=stats[scenario][query].avg,
  621. p50=stats[scenario][query].p50,
  622. p95=stats[scenario][query].p95,
  623. p99=stats[scenario][query].p99,
  624. p99_9=stats[scenario][query].p99_9,
  625. p99_99=stats[scenario][query].p99_99,
  626. p99_999=stats[scenario][query].p99_999,
  627. p99_9999=stats[scenario][query].p99_9999,
  628. p99_99999=stats[scenario][query].p99_99999,
  629. p99_999999=stats[scenario][query].p99_999999,
  630. std=stats[scenario][query].std,
  631. slope=stats[scenario][query].slope,
  632. )
  633. )
  634. test_analytics.parallel_benchmark_results.add_result(
  635. framework_version=PARALLEL_BENCHMARK_FRAMEWORK_VERSION,
  636. results=result_entries,
  637. )
  638. try:
  639. test_analytics.submit_updates()
  640. print("Uploaded results.")
  641. except Exception as e:
  642. # An error during an upload must never cause the build to fail
  643. test_analytics.on_upload_failed(e)
  644. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  645. c.silent = True
  646. parser.add_argument(
  647. "--redpanda",
  648. action="store_true",
  649. help="run against Redpanda instead of the Confluent Platform",
  650. )
  651. parser.add_argument(
  652. "--guarantees",
  653. action="store_true",
  654. default=True,
  655. help="Check guarantees defined by test scenarios",
  656. )
  657. parser.add_argument(
  658. "--size",
  659. metavar="N-N",
  660. type=str,
  661. default="1",
  662. help="default SIZE",
  663. )
  664. parser.add_argument(
  665. "--scenario",
  666. metavar="SCENARIO",
  667. action="append",
  668. type=str,
  669. help="Scenario to run",
  670. )
  671. parser.add_argument(
  672. "--load-phase-duration",
  673. type=int,
  674. help="Override durations of LoadPhases",
  675. )
  676. parser.add_argument(
  677. "--periodic-dist",
  678. nargs=2,
  679. metavar=("action", "per_second"),
  680. action="append",
  681. help="Override periodic distribution for an action with specified name",
  682. )
  683. parser.add_argument(
  684. "--this-params",
  685. metavar="PARAMS",
  686. type=str,
  687. default=os.getenv("THIS_PARAMS", None),
  688. help="Semicolon-separated list of parameter=value pairs to apply to the 'THIS' Mz instance",
  689. )
  690. parser.add_argument(
  691. "--other-tag",
  692. metavar="TAG",
  693. type=str,
  694. default=None,
  695. help="'Other' Materialize container tag to benchmark. If not provided, the last released Mz version will be used.",
  696. )
  697. parser.add_argument(
  698. "--other-params",
  699. metavar="PARAMS",
  700. type=str,
  701. default=os.getenv("OTHER_PARAMS", None),
  702. help="Semicolon-separated list of parameter=value pairs to apply to the 'OTHER' Mz instance",
  703. )
  704. parser.add_argument("--mz-url", type=str, help="Remote Mz instance to run against")
  705. parser.add_argument(
  706. "--pure-postgres",
  707. action="store_true",
  708. help="Don't run any Materialize-specific preparation commands",
  709. )
  710. parser.add_argument(
  711. "--canary-env",
  712. action="store_true",
  713. help="Run against QA Canary production environment",
  714. )
  715. parser.add_argument(
  716. "--benchmarking-env",
  717. action="store_true",
  718. help="Run against QA Benchmarking staging environment",
  719. )
  720. parser.add_argument(
  721. "--sqlite-store",
  722. action="store_true",
  723. help="Store results in SQLite instead of in memory",
  724. )
  725. parser.add_argument(
  726. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  727. )
  728. args = parser.parse_args()
  729. if args.scenario:
  730. for scenario in args.scenario:
  731. assert scenario in globals(), f"scenario {scenario} does not exist"
  732. scenarios: list[type[Scenario]] = [
  733. globals()[scenario] for scenario in args.scenario
  734. ]
  735. else:
  736. scenarios = [
  737. scenario for scenario in all_subclasses(Scenario) if scenario.enabled
  738. ]
  739. sharded_scenarios = buildkite.shard_list(scenarios, lambda s: s.name())
  740. if not sharded_scenarios:
  741. return
  742. if args.sqlite_store and os.path.exists(DB_FILE):
  743. os.remove(DB_FILE)
  744. service_names = ["materialized", "postgres", "mysql"] + (
  745. ["redpanda"] if args.redpanda else ["zookeeper", "kafka", "schema-registry"]
  746. )
  747. this_stats, failures = run_once(
  748. c,
  749. sharded_scenarios,
  750. service_names,
  751. tag=None,
  752. params=args.this_params,
  753. args=args,
  754. suffix="this",
  755. sqlite_store=args.sqlite_store,
  756. )
  757. if args.other_tag:
  758. assert not args.mz_url, "Can't set both --mz-url and --other-tag"
  759. tag = resolve_tag(args.other_tag)
  760. print(f"--- Running against other tag for comparison: {tag}")
  761. args.guarantees = False
  762. other_stats, other_failures = run_once(
  763. c,
  764. sharded_scenarios,
  765. service_names,
  766. tag=tag,
  767. params=args.other_params,
  768. args=args,
  769. suffix="other",
  770. sqlite_store=args.sqlite_store,
  771. )
  772. failures.extend(other_failures)
  773. failures.extend(check_regressions(this_stats, other_stats, tag))
  774. upload_results_to_test_analytics(
  775. c, args.load_phase_duration, this_stats, not failures
  776. )
  777. if failures:
  778. raise FailedTestExecutionError(errors=failures)
  779. # TODO: Choose an existing cluster name (for remote mz)
  780. # TODO: Measure Memory?