parallel_workload.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import argparse
  10. import datetime
  11. import gc
  12. import os
  13. import random
  14. import sys
  15. import threading
  16. import time
  17. from collections import Counter, defaultdict
  18. import psycopg
  19. from materialize.mzcompose import get_default_system_parameters
  20. from materialize.mzcompose.composition import Composition
  21. from materialize.parallel_workload.action import (
  22. Action,
  23. ActionList,
  24. BackupRestoreAction,
  25. CancelAction,
  26. KillAction,
  27. StatisticsAction,
  28. ZeroDowntimeDeployAction,
  29. action_lists,
  30. ddl_action_list,
  31. dml_nontrans_action_list,
  32. fetch_action_list,
  33. read_action_list,
  34. write_action_list,
  35. )
  36. from materialize.parallel_workload.database import (
  37. MAX_CLUSTER_REPLICAS,
  38. MAX_CLUSTERS,
  39. MAX_KAFKA_SINKS,
  40. MAX_KAFKA_SOURCES,
  41. MAX_POSTGRES_SOURCES,
  42. MAX_ROLES,
  43. MAX_SCHEMAS,
  44. MAX_TABLES,
  45. MAX_VIEWS,
  46. MAX_WEBHOOK_SOURCES,
  47. Database,
  48. )
  49. from materialize.parallel_workload.executor import Executor, initialize_logging
  50. from materialize.parallel_workload.settings import Complexity, Scenario
  51. from materialize.parallel_workload.worker import Worker
  52. from materialize.parallel_workload.worker_exception import WorkerFailedException
  53. SEED_RANGE = 1_000_000
  54. REPORT_TIME = 10
  55. def run(
  56. host: str,
  57. ports: dict[str, int],
  58. seed: str,
  59. runtime: int,
  60. complexity: Complexity,
  61. scenario: Scenario,
  62. num_threads: int | None,
  63. naughty_identifiers: bool,
  64. replicas: int,
  65. composition: Composition | None,
  66. azurite: bool,
  67. sanity_restart: bool,
  68. ) -> None:
  69. num_threads = num_threads or os.cpu_count() or 10
  70. rng = random.Random(random.randrange(SEED_RANGE))
  71. print(
  72. f"+++ Running with: --seed={seed} --threads={num_threads} --runtime={runtime} --complexity={complexity.value} --scenario={scenario.value} {'--naughty-identifiers ' if naughty_identifiers else ''} --replicas={replicas} (--host={host})"
  73. )
  74. initialize_logging()
  75. end_time = (
  76. datetime.datetime.now() + datetime.timedelta(seconds=runtime)
  77. ).timestamp()
  78. database = Database(
  79. rng, seed, host, ports, complexity, scenario, naughty_identifiers
  80. )
  81. system_conn = psycopg.connect(
  82. host=host, port=ports["mz_system"], user="mz_system", dbname="materialize"
  83. )
  84. system_conn.autocommit = True
  85. with system_conn.cursor() as system_cur:
  86. system_exe = Executor(rng, system_cur, None, database)
  87. system_exe.execute(
  88. f"ALTER SYSTEM SET max_schemas_per_database = {MAX_SCHEMAS * 40 + num_threads}"
  89. )
  90. # The presence of ALTER TABLE RENAME can cause the total number of tables to exceed MAX_TABLES
  91. system_exe.execute(
  92. f"ALTER SYSTEM SET max_tables = {MAX_TABLES * 40 + num_threads}"
  93. )
  94. system_exe.execute(
  95. f"ALTER SYSTEM SET max_materialized_views = {MAX_VIEWS * 40 + num_threads}"
  96. )
  97. system_exe.execute(
  98. f"ALTER SYSTEM SET max_sources = {(MAX_WEBHOOK_SOURCES + MAX_KAFKA_SOURCES + MAX_POSTGRES_SOURCES) * 40 + num_threads}"
  99. )
  100. system_exe.execute(
  101. f"ALTER SYSTEM SET max_sinks = {MAX_KAFKA_SINKS * 40 + num_threads}"
  102. )
  103. system_exe.execute(
  104. f"ALTER SYSTEM SET max_roles = {MAX_ROLES * 40 + num_threads}"
  105. )
  106. system_exe.execute(
  107. f"ALTER SYSTEM SET max_clusters = {MAX_CLUSTERS * 40 + num_threads}"
  108. )
  109. system_exe.execute(
  110. f"ALTER SYSTEM SET max_replicas_per_cluster = {MAX_CLUSTER_REPLICAS * 40 + num_threads}"
  111. )
  112. system_exe.execute("ALTER SYSTEM SET max_secrets = 1000000")
  113. system_exe.execute("ALTER SYSTEM SET idle_in_transaction_session_timeout = 0")
  114. # Most queries should not fail because of privileges
  115. for object_type in [
  116. "TABLES",
  117. "TYPES",
  118. "SECRETS",
  119. "CONNECTIONS",
  120. "DATABASES",
  121. "SCHEMAS",
  122. "CLUSTERS",
  123. ]:
  124. system_exe.execute(
  125. f"ALTER DEFAULT PRIVILEGES FOR ALL ROLES GRANT ALL PRIVILEGES ON {object_type} TO PUBLIC"
  126. )
  127. if replicas > 1:
  128. system_exe.execute("DROP CLUSTER quickstart CASCADE")
  129. replica_names = [f"r{replica_id}" for replica_id in range(0, replicas)]
  130. replica_string = ",".join(
  131. f"{replica_name} (SIZE '4')" for replica_name in replica_names
  132. )
  133. system_exe.execute(
  134. f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
  135. )
  136. system_conn.close()
  137. conn = psycopg.connect(
  138. host=host,
  139. port=ports["materialized"],
  140. user="materialize",
  141. dbname="materialize",
  142. )
  143. conn.autocommit = True
  144. with conn.cursor() as cur:
  145. assert composition
  146. database.create(Executor(rng, cur, None, database), composition)
  147. conn.close()
  148. workers = []
  149. threads = []
  150. for i in range(num_threads):
  151. weights: list[float]
  152. if complexity == Complexity.DDL:
  153. weights = [60, 30, 30, 30, 100]
  154. elif complexity == Complexity.DML:
  155. weights = [60, 30, 30, 30, 0]
  156. elif complexity == Complexity.Read:
  157. weights = [60, 30, 0, 0, 0]
  158. elif complexity == Complexity.DDLOnly:
  159. weights = [0, 0, 0, 0, 100]
  160. else:
  161. raise ValueError(f"Unknown complexity {complexity}")
  162. worker_rng = random.Random(rng.randrange(SEED_RANGE))
  163. action_list = worker_rng.choices(
  164. [
  165. read_action_list,
  166. fetch_action_list,
  167. write_action_list,
  168. dml_nontrans_action_list,
  169. ddl_action_list,
  170. ],
  171. weights,
  172. )[0]
  173. actions = [
  174. action_class(worker_rng, composition)
  175. for action_class in action_list.action_classes
  176. ]
  177. worker = Worker(
  178. worker_rng,
  179. actions,
  180. action_list.weights,
  181. end_time,
  182. action_list.autocommit,
  183. system=False,
  184. composition=composition,
  185. action_list=action_list,
  186. )
  187. thread_name = f"worker_{i}"
  188. print(
  189. f"{thread_name}: {', '.join(action_class.__name__.removesuffix('Action') for action_class in action_list.action_classes)}"
  190. )
  191. workers.append(worker)
  192. thread = threading.Thread(
  193. name=thread_name,
  194. target=worker.run,
  195. args=(host, ports["materialized"], ports["http"], "materialize", database),
  196. )
  197. thread.start()
  198. threads.append(thread)
  199. if scenario == Scenario.Cancel:
  200. worker_rng = random.Random(rng.randrange(SEED_RANGE))
  201. worker = Worker(
  202. worker_rng,
  203. [CancelAction(worker_rng, composition, workers)],
  204. [1],
  205. end_time,
  206. autocommit=False,
  207. system=True,
  208. composition=composition,
  209. )
  210. workers.append(worker)
  211. thread = threading.Thread(
  212. name="cancel",
  213. target=worker.run,
  214. args=(host, ports["mz_system"], ports["http"], "mz_system", database),
  215. )
  216. thread.start()
  217. threads.append(thread)
  218. elif scenario == Scenario.Kill:
  219. worker_rng = random.Random(rng.randrange(SEED_RANGE))
  220. assert composition, "Kill scenario only works in mzcompose"
  221. worker = Worker(
  222. worker_rng,
  223. [KillAction(worker_rng, composition, azurite, sanity_restart)],
  224. [1],
  225. end_time,
  226. autocommit=False,
  227. system=False,
  228. composition=composition,
  229. )
  230. workers.append(worker)
  231. thread = threading.Thread(
  232. name="kill",
  233. target=worker.run,
  234. args=(host, ports["materialized"], ports["http"], "materialize", database),
  235. )
  236. thread.start()
  237. threads.append(thread)
  238. elif scenario == Scenario.ZeroDowntimeDeploy:
  239. worker_rng = random.Random(rng.randrange(SEED_RANGE))
  240. assert composition, "ZeroDowntimeDeploy scenario only works in mzcompose"
  241. worker = Worker(
  242. worker_rng,
  243. [
  244. ZeroDowntimeDeployAction(
  245. worker_rng,
  246. composition,
  247. azurite,
  248. sanity_restart,
  249. )
  250. ],
  251. [1],
  252. end_time,
  253. autocommit=False,
  254. system=False,
  255. composition=composition,
  256. )
  257. workers.append(worker)
  258. thread = threading.Thread(
  259. name="zero-downtime-deploy",
  260. target=worker.run,
  261. args=(host, ports["materialized"], ports["http"], "materialize", database),
  262. )
  263. thread.start()
  264. threads.append(thread)
  265. elif scenario == Scenario.BackupRestore:
  266. worker_rng = random.Random(rng.randrange(SEED_RANGE))
  267. assert composition, "Backup & Restore scenario only works in mzcompose"
  268. worker = Worker(
  269. worker_rng,
  270. [BackupRestoreAction(worker_rng, composition, database)],
  271. [1],
  272. end_time,
  273. autocommit=False,
  274. system=False,
  275. composition=composition,
  276. )
  277. workers.append(worker)
  278. thread = threading.Thread(
  279. name="kill",
  280. target=worker.run,
  281. args=(host, ports["materialized"], ports["http"], "materialize", database),
  282. )
  283. thread.start()
  284. threads.append(thread)
  285. elif scenario in (Scenario.Regression, Scenario.Rename):
  286. pass
  287. else:
  288. raise ValueError(f"Unknown scenario {scenario}")
  289. if False: # sanity check for debugging
  290. worker_rng = random.Random(rng.randrange(SEED_RANGE))
  291. worker = Worker(
  292. worker_rng,
  293. [StatisticsAction(worker_rng, composition)],
  294. [1],
  295. end_time,
  296. autocommit=False,
  297. system=True,
  298. composition=composition,
  299. )
  300. workers.append(worker)
  301. thread = threading.Thread(
  302. name="statistics",
  303. target=worker.run,
  304. args=(host, ports["mz_system"], ports["http"], "mz_system", database),
  305. )
  306. thread.start()
  307. threads.append(thread)
  308. num_queries = defaultdict(Counter)
  309. try:
  310. while time.time() < end_time:
  311. for thread in threads:
  312. if not thread.is_alive():
  313. occurred_exception = None
  314. for worker in workers:
  315. worker.end_time = time.time()
  316. occurred_exception = (
  317. occurred_exception or worker.occurred_exception
  318. )
  319. raise WorkerFailedException(
  320. f"^^^ +++ Thread {thread.name} failed, exiting",
  321. occurred_exception,
  322. )
  323. time.sleep(REPORT_TIME)
  324. print(
  325. "QPS: "
  326. + " ".join(
  327. f"{worker.num_queries.total() / REPORT_TIME:05.1f}"
  328. for worker in workers
  329. )
  330. )
  331. for worker in workers:
  332. for action in worker.num_queries.elements():
  333. num_queries[worker.action_list][action] += worker.num_queries[
  334. action
  335. ]
  336. worker.num_queries.clear()
  337. except KeyboardInterrupt:
  338. print("Keyboard interrupt, exiting")
  339. for worker in workers:
  340. worker.end_time = time.time()
  341. stopping_time = (
  342. datetime.datetime.now() + datetime.timedelta(seconds=300)
  343. ).timestamp()
  344. while time.time() < stopping_time:
  345. for thread in threads:
  346. thread.join(timeout=1)
  347. if all([not thread.is_alive() for thread in threads]):
  348. break
  349. else:
  350. for worker, thread in zip(workers, threads):
  351. if thread.is_alive():
  352. print(
  353. f"{thread.name} still running ({worker.exe.mz_service}): {worker.exe.last_log} ({worker.exe.last_status})"
  354. )
  355. print_stats(num_queries, workers, num_threads)
  356. if num_threads >= 50:
  357. # Under high load some queries can't finish quickly, especially UPDATE/DELETE
  358. os._exit(0)
  359. if scenario == scenario.ZeroDowntimeDeploy:
  360. # With 0dt deploys connections against the currently-fenced-out
  361. # environmentd will be stuck forever, the promoted environmentd can
  362. # take > 5 minutes to become responsive as well
  363. os._exit(0)
  364. print("Threads have not stopped within 5 minutes, exiting hard")
  365. os._exit(1)
  366. try:
  367. conn = psycopg.connect(
  368. host=host, port=ports["materialized"], user="materialize"
  369. )
  370. except Exception as e:
  371. if scenario == Scenario.ZeroDowntimeDeploy:
  372. print(f"Failed connecting to materialized, using materialized2: {e}")
  373. conn = psycopg.connect(
  374. host=host, port=ports["materialized2"], user="materialize"
  375. )
  376. else:
  377. raise
  378. conn.autocommit = True
  379. with conn.cursor() as cur:
  380. # Dropping the database also releases the long running connections
  381. # used by database objects.
  382. database.drop(Executor(rng, cur, None, database))
  383. # Make sure all unreachable connections are closed too
  384. gc.collect()
  385. stopping_time = datetime.datetime.now() + datetime.timedelta(seconds=30)
  386. while datetime.datetime.now() < stopping_time:
  387. cur.execute(
  388. "SELECT * FROM mz_internal.mz_sessions WHERE connection_id <> pg_backend_pid()"
  389. )
  390. sessions = cur.fetchall()
  391. if len(sessions) == 0:
  392. break
  393. print(
  394. f"Sessions are still running even though all threads are done: {sessions}"
  395. )
  396. # TODO(def-): Why is this failing with psycopg?
  397. # else:
  398. # raise ValueError("Sessions did not clean up within 30s of threads stopping")
  399. conn.close()
  400. print_stats(num_queries, workers, num_threads)
  401. def print_stats(
  402. num_queries: defaultdict[ActionList, Counter[type[Action]]],
  403. workers: list[Worker],
  404. num_threads: int,
  405. ) -> None:
  406. ignored_errors: defaultdict[str, Counter[type[Action]]] = defaultdict(Counter)
  407. num_failures = 0
  408. for worker in workers:
  409. for action_class, counter in worker.ignored_errors.items():
  410. ignored_errors[action_class].update(counter)
  411. for counter in ignored_errors.values():
  412. for count in counter.values():
  413. num_failures += count
  414. total_queries = sum(sub.total() for sub in num_queries.values())
  415. failed = 100.0 * num_failures / total_queries if total_queries else 0
  416. print(f"Queries executed: {total_queries} ({failed:.0f}% failed)")
  417. print("--- Action statistics:")
  418. for action_list in action_lists:
  419. text = ", ".join(
  420. [
  421. f"{action_class.__name__.removesuffix('Action')}: {num_queries[action_list][action_class]}"
  422. for action_class in action_list.action_classes
  423. ]
  424. )
  425. print(f" {text}")
  426. print("--- Error statistics:")
  427. for error, counter in ignored_errors.items():
  428. text = ", ".join(
  429. f"{action_class.__name__}: {count}"
  430. for action_class, count in counter.items()
  431. )
  432. print(f" {error}: {text}")
  433. assert failed < 50 if num_threads < 50 else failed < 75
  434. def parse_common_args(parser: argparse.ArgumentParser) -> None:
  435. parser.add_argument("--seed", type=str, default=str(int(time.time())))
  436. parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
  437. parser.add_argument(
  438. "--complexity",
  439. default="ddl",
  440. type=str,
  441. choices=[elem.value for elem in Complexity] + ["random"],
  442. )
  443. parser.add_argument(
  444. "--scenario",
  445. default="regression",
  446. type=str,
  447. choices=[elem.value for elem in Scenario] + ["random"],
  448. )
  449. parser.add_argument(
  450. "--threads",
  451. type=int,
  452. help="Number of threads to run, by default number of SMT threads",
  453. )
  454. parser.add_argument(
  455. "--naughty-identifiers",
  456. action="store_true",
  457. help="Whether to use naughty strings as identifiers, makes the queries unreadable",
  458. )
  459. parser.add_argument(
  460. "--fast-startup",
  461. action="store_true",
  462. help="Whether to initialize expensive parts like SQLsmith, sources, sinks (for fast local testing, reduces coverage)",
  463. )
  464. parser.add_argument(
  465. "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
  466. )
  467. parser.add_argument("--replicas", type=int, default=2, help="use multiple replicas")
  468. def main() -> int:
  469. parser = argparse.ArgumentParser(
  470. prog="parallel-workload",
  471. formatter_class=argparse.RawDescriptionHelpFormatter,
  472. description="Run a parallel workload against Materialize",
  473. )
  474. parser.add_argument("--host", default="localhost", type=str)
  475. parser.add_argument("--port", default=6875, type=int)
  476. parser.add_argument("--system-port", default=6877, type=int)
  477. parser.add_argument("--http-port", default=6876, type=int)
  478. parse_common_args(parser)
  479. args = parser.parse_args()
  480. ports: dict[str, int] = {
  481. "materialized": args.port,
  482. "mz_system": args.system_port,
  483. "http": 6876,
  484. "kafka": 9092,
  485. "schema-registry": 8081,
  486. }
  487. system_conn = psycopg.connect(
  488. host=args.host,
  489. port=ports["mz_system"],
  490. user="mz_system",
  491. dbname="materialize",
  492. )
  493. system_conn.autocommit = True
  494. with system_conn.cursor() as cur:
  495. # TODO: Currently the same as mzcompose default settings, add
  496. # more settings and shuffle them
  497. for key, value in get_default_system_parameters().items():
  498. cur.execute(f"ALTER SYSTEM SET {key} = '{value}'".encode())
  499. system_conn.close()
  500. random.seed(args.seed)
  501. run(
  502. args.host,
  503. ports,
  504. args.seed,
  505. args.runtime,
  506. Complexity(args.complexity),
  507. Scenario(args.scenario),
  508. args.threads,
  509. args.naughty_identifiers,
  510. args.replicas,
  511. composition=None, # only works in mzcompose
  512. azurite=args.azurite,
  513. sanity_restart=False, # only works in mzcompose
  514. )
  515. return 0
  516. if __name__ == "__main__":
  517. sys.exit(main())