scenarios.py 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from copy import deepcopy
  10. from materialize.mzcompose.composition import Composition
  11. from materialize.mzcompose.services.mysql import MySql
  12. from materialize.parallel_benchmark.framework import (
  13. ClosedLoop,
  14. LoadPhase,
  15. OpenLoop,
  16. Periodic,
  17. PooledQuery,
  18. ReuseConnQuery,
  19. Scenario,
  20. StandaloneQuery,
  21. TdAction,
  22. TdPhase,
  23. disabled,
  24. )
  25. from materialize.util import PgConnInfo
  26. class Kafka(Scenario):
  27. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  28. self.init(
  29. [
  30. TdPhase(
  31. """
  32. $ set keyschema={"type": "record", "name": "Key", "fields": [ { "name": "f1", "type": "long" } ] }
  33. $ set schema={"type" : "record", "name" : "test", "fields": [ { "name": "f2", "type": "long" } ] }
  34. $ kafka-create-topic topic=kafka
  35. $ kafka-ingest format=avro topic=kafka key-format=avro key-schema=${keyschema} schema=${schema} repeat=10
  36. {"f1": 1} {"f2": ${kafka-ingest.iteration} }
  37. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  38. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  39. URL '${testdrive.schema-registry-url}');
  40. > CREATE SOURCE kafka
  41. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-${testdrive.seed}');
  42. > CREATE TABLE kafka_tbl FROM SOURCE kafka (REFERENCE "testdrive-kafka-${testdrive.seed}")
  43. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  44. ENVELOPE UPSERT;
  45. > CREATE MATERIALIZED VIEW kafka_mv AS SELECT * FROM kafka_tbl;
  46. > CREATE DEFAULT INDEX ON kafka_mv;
  47. """
  48. ),
  49. LoadPhase(
  50. duration=120,
  51. actions=[
  52. OpenLoop(
  53. action=TdAction(
  54. """
  55. $ set keyschema={"type": "record", "name": "Key", "fields": [ { "name": "f1", "type": "long" } ] }
  56. $ set schema={"type" : "record", "name" : "test", "fields": [ { "name": "f2", "type": "long" } ] }
  57. $ kafka-ingest format=avro topic=kafka key-format=avro key-schema=${keyschema} schema=${schema} repeat=10
  58. {"f1": 1} {"f2": ${kafka-ingest.iteration} }
  59. """,
  60. c,
  61. ),
  62. dist=Periodic(per_second=1),
  63. )
  64. ]
  65. + [
  66. ClosedLoop(
  67. action=StandaloneQuery(
  68. "SELECT * FROM kafka_mv",
  69. conn_infos["materialized"],
  70. strict_serializable=False,
  71. ),
  72. )
  73. for i in range(10)
  74. ],
  75. ),
  76. ],
  77. guarantees={
  78. "SELECT * FROM kafka_mv (standalone)": {"qps": 15, "p99": 400},
  79. },
  80. )
  81. class PgReadReplica(Scenario):
  82. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  83. self.init(
  84. [
  85. TdPhase(
  86. """
  87. > DROP SECRET IF EXISTS pgpass CASCADE
  88. > CREATE SECRET pgpass AS 'postgres'
  89. > CREATE CONNECTION pg TO POSTGRES (
  90. HOST postgres,
  91. DATABASE postgres,
  92. USER postgres,
  93. PASSWORD SECRET pgpass
  94. )
  95. $ postgres-execute connection=postgres://postgres:postgres@postgres
  96. DROP PUBLICATION IF EXISTS mz_source;
  97. DROP TABLE IF EXISTS t1 CASCADE;
  98. ALTER USER postgres WITH replication;
  99. CREATE TABLE t1 (f1 INTEGER);
  100. ALTER TABLE t1 REPLICA IDENTITY FULL;
  101. CREATE PUBLICATION mz_source FOR ALL TABLES;
  102. > CREATE SOURCE mz_source
  103. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  104. > CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE t1);
  105. > CREATE MATERIALIZED VIEW mv_sum AS
  106. SELECT COUNT(*) FROM t1;
  107. > CREATE DEFAULT INDEX ON mv_sum;
  108. """
  109. ),
  110. LoadPhase(
  111. duration=120,
  112. actions=[
  113. OpenLoop(
  114. action=StandaloneQuery(
  115. "INSERT INTO t1 VALUES (1)",
  116. conn_infos["postgres"],
  117. ),
  118. dist=Periodic(per_second=100),
  119. )
  120. ]
  121. + [
  122. ClosedLoop(
  123. action=StandaloneQuery(
  124. "SELECT * FROM mv_sum",
  125. conn_infos["materialized"],
  126. strict_serializable=False,
  127. ),
  128. )
  129. for i in range(10)
  130. ],
  131. ),
  132. ],
  133. guarantees={
  134. "SELECT * FROM mv_sum (standalone)": {"qps": 15, "p99": 400},
  135. },
  136. )
  137. class PgReadReplicaRTR(Scenario):
  138. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  139. self.init(
  140. [
  141. TdPhase(
  142. """
  143. > DROP SECRET IF EXISTS pgpass CASCADE
  144. > CREATE SECRET pgpass AS 'postgres'
  145. > CREATE CONNECTION pg TO POSTGRES (
  146. HOST postgres,
  147. DATABASE postgres,
  148. USER postgres,
  149. PASSWORD SECRET pgpass
  150. )
  151. $ postgres-execute connection=postgres://postgres:postgres@postgres
  152. DROP PUBLICATION IF EXISTS mz_source2;
  153. DROP TABLE IF EXISTS t2 CASCADE;
  154. ALTER USER postgres WITH replication;
  155. CREATE TABLE t2 (f1 INTEGER);
  156. ALTER TABLE t2 REPLICA IDENTITY FULL;
  157. CREATE PUBLICATION mz_source2 FOR ALL TABLES;
  158. > CREATE SOURCE mz_source2
  159. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source2');
  160. > CREATE TABLE t2 FROM SOURCE mz_source2 (REFERENCE t2);
  161. > CREATE MATERIALIZED VIEW mv_sum AS
  162. SELECT COUNT(*) FROM t2;
  163. > CREATE DEFAULT INDEX ON mv_sum;
  164. """
  165. ),
  166. LoadPhase(
  167. duration=120,
  168. actions=[
  169. OpenLoop(
  170. action=StandaloneQuery(
  171. "INSERT INTO t2 VALUES (1)",
  172. conn_infos["postgres"],
  173. ),
  174. dist=Periodic(per_second=100),
  175. ),
  176. OpenLoop(
  177. action=StandaloneQuery(
  178. "SET REAL_TIME_RECENCY TO TRUE; SELECT * FROM mv_sum",
  179. conn_infos["materialized"],
  180. strict_serializable=False,
  181. ),
  182. dist=Periodic(per_second=125),
  183. report_regressions=False, # TODO: Currently not stable enough, reenable when RTR becomes more consistent
  184. ),
  185. ],
  186. ),
  187. ],
  188. guarantees={
  189. # TODO(def-): Lower max when RTR becomes more performant
  190. "SET REAL_TIME_RECENCY TO TRUE; SELECT * FROM mv_sum (standalone)": {
  191. "qps": 50,
  192. "p99": 5000,
  193. },
  194. },
  195. )
  196. class MySQLReadReplica(Scenario):
  197. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  198. self.init(
  199. [
  200. TdPhase(
  201. f"""
  202. > DROP SECRET IF EXISTS mysqlpass CASCADE
  203. > CREATE SECRET mysqlpass AS '{MySql.DEFAULT_ROOT_PASSWORD}'
  204. > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (HOST mysql, USER root, PASSWORD SECRET mysqlpass)
  205. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  206. $ mysql-execute name=mysql
  207. DROP DATABASE IF EXISTS public;
  208. CREATE DATABASE public;
  209. USE public;
  210. CREATE TABLE t3 (f1 INTEGER);
  211. > CREATE SOURCE mysql_source
  212. FROM MYSQL CONNECTION mysql_conn
  213. FOR TABLES (public.t3);
  214. > CREATE MATERIALIZED VIEW mv_sum_mysql AS
  215. SELECT COUNT(*) FROM t3;
  216. > CREATE DEFAULT INDEX ON mv_sum_mysql;
  217. """
  218. ),
  219. LoadPhase(
  220. duration=120,
  221. actions=[
  222. OpenLoop(
  223. action=TdAction(
  224. f"""
  225. $ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}
  226. $ mysql-execute name=mysql
  227. USE public;
  228. {"INSERT INTO t3 VALUES (1); " * 100}
  229. """,
  230. c,
  231. ),
  232. dist=Periodic(per_second=1),
  233. )
  234. ]
  235. + [
  236. ClosedLoop(
  237. action=StandaloneQuery(
  238. "SELECT * FROM mv_sum_mysql",
  239. conn_info=conn_infos["materialized"],
  240. strict_serializable=False,
  241. ),
  242. )
  243. for i in range(10)
  244. ],
  245. ),
  246. ],
  247. guarantees={
  248. "SELECT * FROM mv_sum_mysql (standalone)": {"qps": 15, "p99": 400},
  249. },
  250. )
  251. class OpenIndexedSelects(Scenario):
  252. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  253. self.init(
  254. [
  255. TdPhase(
  256. """
  257. > CREATE TABLE t4 (f1 TEXT, f2 INTEGER);
  258. > CREATE DEFAULT INDEX ON t4;
  259. > INSERT INTO t4 VALUES ('A', 1);
  260. > INSERT INTO t4 VALUES ('B', 2);
  261. > INSERT INTO t4 VALUES ('C', 3);
  262. > INSERT INTO t4 VALUES ('D', 4);
  263. """
  264. ),
  265. LoadPhase(
  266. duration=120,
  267. actions=[
  268. OpenLoop(
  269. action=PooledQuery(
  270. "SELECT * FROM t4", conn_info=conn_infos["materialized"]
  271. ),
  272. dist=Periodic(per_second=400),
  273. ),
  274. ],
  275. ),
  276. ],
  277. conn_pool_size=100,
  278. guarantees={
  279. "SELECT * FROM t4 (pooled)": {"qps": 390, "p99": 100},
  280. },
  281. )
  282. class ConnectRead(Scenario):
  283. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  284. self.init(
  285. [
  286. LoadPhase(
  287. duration=120,
  288. actions=[
  289. ClosedLoop(
  290. action=StandaloneQuery(
  291. "SELECT 1",
  292. conn_info=conn_infos["materialized"],
  293. strict_serializable=False,
  294. ),
  295. )
  296. for i in range(10)
  297. ],
  298. ),
  299. ],
  300. guarantees={
  301. "SELECT * FROM t4 (pooled)": {"qps": 35, "max": 700},
  302. },
  303. )
  304. class FlagUpdate(Scenario):
  305. """Reproduces database-issues#8480"""
  306. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  307. self.init(
  308. [
  309. LoadPhase(
  310. duration=120,
  311. actions=[
  312. OpenLoop(
  313. action=ReuseConnQuery(
  314. # The particular flag and value used here
  315. # doesn't matter. It just needs to be a flag
  316. # that exists in both versions to be
  317. # benchmarked.
  318. "ALTER SYSTEM SET enable_disk_cluster_replicas = true",
  319. conn_info=conn_infos["mz_system"],
  320. ),
  321. dist=Periodic(per_second=1),
  322. report_regressions=False, # We don't care about this query getting slower
  323. ),
  324. ]
  325. + [
  326. ClosedLoop(
  327. action=ReuseConnQuery(
  328. "SELECT 1",
  329. conn_info=conn_infos["materialized"],
  330. strict_serializable=False,
  331. ),
  332. )
  333. for i in range(10)
  334. ],
  335. ),
  336. ],
  337. guarantees={
  338. # TODO(def-): Lower when database-issues#8480 is fixed to prevent regressions
  339. "SELECT 1 (reuse connection)": {"avg": 5, "max": 500, "slope": 0.1},
  340. },
  341. )
  342. class Read(Scenario):
  343. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  344. self.init(
  345. [
  346. LoadPhase(
  347. duration=120,
  348. actions=[
  349. ClosedLoop(
  350. action=ReuseConnQuery(
  351. "SELECT 1",
  352. conn_info=conn_infos["materialized"],
  353. strict_serializable=False,
  354. ),
  355. )
  356. for i in range(10)
  357. ],
  358. ),
  359. ],
  360. guarantees={
  361. "SELECT 1 (reuse connection)": {"qps": 2000, "max": 100, "slope": 0.1},
  362. },
  363. )
  364. class PoolRead(Scenario):
  365. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  366. self.init(
  367. [
  368. LoadPhase(
  369. duration=120,
  370. actions=[
  371. OpenLoop(
  372. action=PooledQuery(
  373. "SELECT 1", conn_info=conn_infos["materialized"]
  374. ),
  375. dist=Periodic(per_second=100),
  376. # dist=Gaussian(mean=0.01, stddev=0.05),
  377. ),
  378. ],
  379. ),
  380. ],
  381. conn_pool_size=100,
  382. guarantees={
  383. "SELECT 1 (pooled)": {"avg": 5, "max": 200, "slope": 0.1},
  384. },
  385. )
  386. class StatementLogging(Scenario):
  387. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  388. self.init(
  389. [
  390. TdPhase(
  391. """
  392. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  393. ALTER SYSTEM SET statement_logging_max_sample_rate = 1.0;
  394. ALTER SYSTEM SET statement_logging_default_sample_rate = 1.0;
  395. ALTER SYSTEM SET enable_statement_lifecycle_logging = true;
  396. """
  397. ),
  398. LoadPhase(
  399. duration=120,
  400. actions=[
  401. OpenLoop(
  402. action=PooledQuery(
  403. "SELECT 1", conn_info=conn_infos["materialized"]
  404. ),
  405. dist=Periodic(per_second=100),
  406. # dist=Gaussian(mean=0.01, stddev=0.05),
  407. ),
  408. ],
  409. ),
  410. TdPhase(
  411. """
  412. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  413. ALTER SYSTEM SET statement_logging_default_sample_rate = 0;
  414. ALTER SYSTEM SET statement_logging_max_sample_rate = 0;
  415. ALTER SYSTEM SET enable_statement_lifecycle_logging = false;
  416. """
  417. ),
  418. ],
  419. conn_pool_size=100,
  420. guarantees={
  421. "SELECT 1 (pooled)": {"avg": 5, "max": 200, "slope": 0.1},
  422. },
  423. )
  424. class InsertWhereNotExists(Scenario):
  425. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  426. self.init(
  427. [
  428. TdPhase(
  429. """
  430. > CREATE TABLE insert_table (a int, b text);
  431. """
  432. ),
  433. LoadPhase(
  434. duration=120,
  435. actions=[
  436. OpenLoop(
  437. action=ReuseConnQuery(
  438. "INSERT INTO insert_table SELECT 1, '1' WHERE NOT EXISTS (SELECT 1 FROM insert_table WHERE a = 100)",
  439. conn_infos["materialized"],
  440. strict_serializable=False,
  441. ),
  442. dist=Periodic(per_second=5),
  443. )
  444. ],
  445. ),
  446. ],
  447. conn_pool_size=100,
  448. # TODO(def-): Bump per_second and add guarantees when database-issues#8510 is fixed
  449. )
  450. class InsertsSelects(Scenario):
  451. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  452. self.init(
  453. [
  454. TdPhase(
  455. """
  456. > CREATE TABLE insert_select_table (a int, b text);
  457. """
  458. ),
  459. LoadPhase(
  460. duration=120,
  461. actions=[
  462. OpenLoop(
  463. action=ReuseConnQuery(
  464. "INSERT INTO insert_select_table VALUES (1, '1')",
  465. conn_infos["materialized"],
  466. strict_serializable=False,
  467. ),
  468. dist=Periodic(per_second=1),
  469. report_regressions=False,
  470. ),
  471. ClosedLoop(
  472. action=ReuseConnQuery(
  473. "SELECT min(a) FROM insert_select_table",
  474. conn_infos["materialized"],
  475. strict_serializable=False,
  476. ),
  477. ),
  478. ],
  479. ),
  480. ],
  481. conn_pool_size=100,
  482. guarantees={
  483. "SELECT min(a) FROM insert_select_table (reuse connection)": {
  484. "qps": 10,
  485. "p99": 350,
  486. },
  487. },
  488. )
  489. # TODO Try these scenarios' scaling behavior against cc sizes (locally and remote)
  490. class CommandQueryResponsibilitySegregation(Scenario):
  491. # TODO: Have one Postgres source with many inserts/updates/deletes and multiple complex materialized view on top of it, read from Mz
  492. # This should be blocked by materialized view performance
  493. # We probably need strict serializable to make sure results stay up to date
  494. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  495. self.init(
  496. [
  497. TdPhase(
  498. """
  499. > DROP SECRET IF EXISTS pgpass CASCADE
  500. > CREATE SECRET pgpass AS 'postgres'
  501. > CREATE CONNECTION pg TO POSTGRES (
  502. HOST postgres,
  503. DATABASE postgres,
  504. USER postgres,
  505. PASSWORD SECRET pgpass
  506. )
  507. $ postgres-execute connection=postgres://postgres:postgres@postgres
  508. DROP PUBLICATION IF EXISTS mz_cqrs_source;
  509. DROP TABLE IF EXISTS t1 CASCADE;
  510. ALTER USER postgres WITH replication;
  511. CREATE TABLE t1 (id INTEGER, name TEXT, date TIMESTAMPTZ);
  512. ALTER TABLE t1 REPLICA IDENTITY FULL;
  513. CREATE PUBLICATION mz_cqrs_source FOR ALL TABLES;
  514. > CREATE SOURCE mz_cqrs_source
  515. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_cqrs_source')
  516. > CREATE TABLE t1 FROM SOURCE mz_cqrs_source (REFERENCE t1);
  517. > CREATE MATERIALIZED VIEW mv_cqrs AS
  518. SELECT t1.date, SUM(t1.id) FROM t1 JOIN t1 AS t2 ON true JOIN t1 AS t3 ON true JOIN t1 AS t4 ON true GROUP BY t1.date;
  519. > CREATE DEFAULT INDEX ON mv_cqrs;
  520. """
  521. ),
  522. LoadPhase(
  523. duration=120,
  524. actions=[
  525. OpenLoop(
  526. action=StandaloneQuery(
  527. "INSERT INTO t1 VALUES (1, '1', now())",
  528. # "INSERT INTO t1 (id, name, date) SELECT i, i::text, now() FROM generate_series(1, 1000) AS s(i);",
  529. conn_infos["postgres"],
  530. strict_serializable=False,
  531. ),
  532. dist=Periodic(per_second=100),
  533. report_regressions=False,
  534. ),
  535. OpenLoop(
  536. action=StandaloneQuery(
  537. "UPDATE t1 SET id = id + 1",
  538. conn_infos["postgres"],
  539. strict_serializable=False,
  540. ),
  541. dist=Periodic(per_second=10),
  542. report_regressions=False,
  543. ),
  544. OpenLoop(
  545. action=StandaloneQuery(
  546. "DELETE FROM t1 WHERE date < now() - INTERVAL '10 seconds'",
  547. conn_infos["postgres"],
  548. strict_serializable=False,
  549. ),
  550. dist=Periodic(per_second=1),
  551. report_regressions=False,
  552. ),
  553. ]
  554. + [
  555. ClosedLoop(
  556. action=ReuseConnQuery(
  557. "SELECT * FROM mv_cqrs",
  558. conn_infos["materialized"],
  559. strict_serializable=True,
  560. ),
  561. report_regressions=False, # TODO: Currently not stable enough
  562. )
  563. ],
  564. ),
  565. ],
  566. )
  567. class OperationalDataStore(Scenario):
  568. # TODO: Get data from multiple sources with high volume (webhook source, Kafka, Postgres, MySQL), export to Kafka Sink and Subscribes
  569. # This should be blocked by read/write performance
  570. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  571. self.init(
  572. [
  573. TdPhase(
  574. """
  575. > DROP SECRET IF EXISTS pgpass CASCADE
  576. > CREATE SECRET pgpass AS 'postgres'
  577. > CREATE CONNECTION pg TO POSTGRES (
  578. HOST postgres,
  579. DATABASE postgres,
  580. USER postgres,
  581. PASSWORD SECRET pgpass
  582. )
  583. $ postgres-execute connection=postgres://postgres:postgres@postgres
  584. DROP PUBLICATION IF EXISTS mz_source;
  585. DROP TABLE IF EXISTS t1 CASCADE;
  586. ALTER USER postgres WITH replication;
  587. CREATE TABLE t1 (f1 INTEGER);
  588. ALTER TABLE t1 REPLICA IDENTITY FULL;
  589. CREATE PUBLICATION mz_source FOR ALL TABLES;
  590. > CREATE SOURCE mz_source
  591. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source');
  592. > CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE t1);
  593. > CREATE MATERIALIZED VIEW mv_sum AS
  594. SELECT COUNT(*) FROM t1;
  595. > CREATE DEFAULT INDEX ON mv_sum;
  596. # TODO: Other sources
  597. """
  598. ),
  599. LoadPhase(
  600. duration=120,
  601. actions=[
  602. OpenLoop(
  603. action=StandaloneQuery(
  604. "INSERT INTO t1 (f1) SELECT i FROM generate_series(1, 50000) AS s(i);",
  605. conn_infos["postgres"],
  606. strict_serializable=False,
  607. ),
  608. report_regressions=False,
  609. dist=Periodic(per_second=10),
  610. ),
  611. ClosedLoop(
  612. action=ReuseConnQuery(
  613. "SET REAL_TIME_RECENCY TO TRUE; SELECT * FROM mv_sum",
  614. conn_infos["materialized"],
  615. strict_serializable=True,
  616. ),
  617. report_regressions=False, # TODO: Currently not stable enough, reenable when RTR becomes more consistent
  618. ),
  619. ],
  620. ),
  621. ],
  622. )
  623. class OperationalDataMesh(Scenario):
  624. # TODO: One Kafka source/sink, one data source, many materialized views, all exported to Kafka
  625. # This should be blocked by the number of source/sink combinations
  626. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  627. self.init(
  628. [
  629. TdPhase(
  630. """
  631. $ set keyschema={"type": "record", "name": "Key", "fields": [ { "name": "f1", "type": "long" } ] }
  632. $ set schema={"type" : "record", "name" : "test", "fields": [ { "name": "f2", "type": "long" } ] }
  633. $ kafka-create-topic topic=kafka-mesh
  634. $ kafka-ingest format=avro topic=kafka-mesh key-format=avro key-schema=${keyschema} schema=${schema} repeat=10
  635. {"f1": 1} {"f2": ${kafka-ingest.iteration} }
  636. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  637. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  638. URL '${testdrive.schema-registry-url}');
  639. > CREATE SOURCE kafka_mesh
  640. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-mesh-${testdrive.seed}');
  641. > CREATE TABLE kafka_mesh_tbl FROM SOURCE kafka_mesh (REFERENCE "testdrive-kafka-mesh-${testdrive.seed}")
  642. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  643. ENVELOPE UPSERT;
  644. > CREATE MATERIALIZED VIEW kafka_mesh_mv AS SELECT * FROM kafka_mesh_tbl;
  645. > CREATE DEFAULT INDEX ON kafka_mesh_mv;
  646. > CREATE SINK sink FROM kafka_mesh_mv
  647. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink')
  648. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  649. ENVELOPE DEBEZIUM;
  650. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="10s"
  651. #$ kafka-verify-topic sink=sink
  652. > CREATE SOURCE sink_source
  653. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink');
  654. > CREATE TABLE sink_source_tbl FROM SOURCE sink_source (REFERENCE "sink")
  655. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  656. ENVELOPE NONE;
  657. """
  658. ),
  659. LoadPhase(
  660. duration=120,
  661. actions=[
  662. OpenLoop(
  663. action=TdAction(
  664. """
  665. $ set keyschema={"type": "record", "name": "Key", "fields": [ { "name": "f1", "type": "long" } ] }
  666. $ set schema={"type" : "record", "name" : "test", "fields": [ { "name": "f2", "type": "long" } ] }
  667. $ kafka-ingest format=avro topic=kafka-mesh key-format=avro key-schema=${keyschema} schema=${schema} repeat=100000
  668. {"f1": 1} {"f2": ${kafka-ingest.iteration} }
  669. """,
  670. c,
  671. ),
  672. dist=Periodic(per_second=1),
  673. ),
  674. ClosedLoop(
  675. action=StandaloneQuery(
  676. # TODO: This doesn't actually measure rtr all the way
  677. "SET REAL_TIME_RECENCY TO TRUE; SELECT * FROM sink_source",
  678. conn_infos["materialized"],
  679. strict_serializable=True,
  680. ),
  681. report_regressions=False, # TODO: Currently not stable enough, reenable when RTR becomes more consistent
  682. ),
  683. ],
  684. ),
  685. ],
  686. )
  687. @disabled(
  688. "Not well suited to measure regressions since too many queries are running at once"
  689. )
  690. class ReadReplicaBenchmark(Scenario):
  691. # We might want to run a full version of rr-bench instead, this is not a
  692. # very realistic representation of it but might already help us catch some
  693. # regressions: https://github.com/MaterializeIncLabs/rr-bench
  694. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  695. self.init(
  696. [
  697. TdPhase(
  698. """
  699. $ postgres-execute connection=postgres://postgres:postgres@postgres
  700. DROP TABLE IF EXISTS customers CASCADE;
  701. DROP TABLE IF EXISTS accounts CASCADE;
  702. DROP TABLE IF EXISTS securities CASCADE;
  703. DROP TABLE IF EXISTS trades CASCADE;
  704. DROP TABLE IF EXISTS orders CASCADE;
  705. DROP TABLE IF EXISTS market_data CASCADE;
  706. CREATE TABLE customers (customer_id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, address VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
  707. CREATE TABLE accounts (account_id SERIAL PRIMARY KEY, customer_id INT REFERENCES customers(customer_id) ON DELETE CASCADE, account_type VARCHAR(50) NOT NULL, balance DECIMAL(18, 2) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
  708. CREATE TABLE securities (security_id SERIAL PRIMARY KEY, ticker VARCHAR(10) NOT NULL, name VARCHAR(255), sector VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
  709. CREATE TABLE trades (trade_id SERIAL PRIMARY KEY, account_id INT REFERENCES accounts(account_id) ON DELETE CASCADE, security_id INT REFERENCES securities(security_id) ON DELETE CASCADE, trade_type VARCHAR(10) NOT NULL CHECK (trade_type IN ('buy', 'sell')), quantity INT NOT NULL, price DECIMAL(18, 4) NOT NULL, trade_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
  710. CREATE TABLE orders (order_id SERIAL PRIMARY KEY, account_id INT REFERENCES accounts(account_id) ON DELETE CASCADE, security_id INT REFERENCES securities(security_id) ON DELETE CASCADE, order_type VARCHAR(10) NOT NULL CHECK (order_type IN ('buy', 'sell')), quantity INT NOT NULL, limit_price DECIMAL(18, 4), status VARCHAR(10) NOT NULL CHECK (status IN ('pending', 'completed', 'canceled')), order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
  711. CREATE TABLE market_data (market_data_id SERIAL PRIMARY KEY, security_id INT REFERENCES securities(security_id) ON DELETE CASCADE, price DECIMAL(18, 4) NOT NULL, volume INT NOT NULL, market_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
  712. DROP PUBLICATION IF EXISTS mz_source3;
  713. ALTER USER postgres WITH replication;
  714. ALTER TABLE customers REPLICA IDENTITY FULL;
  715. ALTER TABLE accounts REPLICA IDENTITY FULL;
  716. ALTER TABLE securities REPLICA IDENTITY FULL;
  717. ALTER TABLE trades REPLICA IDENTITY FULL;
  718. ALTER TABLE orders REPLICA IDENTITY FULL;
  719. ALTER TABLE market_data REPLICA IDENTITY FULL;
  720. CREATE PUBLICATION mz_source3 FOR ALL TABLES;
  721. INSERT INTO customers (customer_id, name, address, created_at) VALUES (1, 'Elizabeth Ebert', 'Raleigh Motorway', '2024-09-11 15:27:44'), (2, 'Kelley Kuhlman', 'Marvin Circle', '2024-09-11 15:27:44'), (3, 'Frieda Waters', 'Jessy Roads', '2024-09-11 15:27:44'), (4, 'Ian Thiel', 'Rodriguez Squares', '2024-09-11 15:27:44'), (5, 'Clementine Hauck', 'Allen Junction', '2024-09-11 15:27:44'), (6, 'Caesar White', 'Cheyenne Green', '2024-09-11 15:27:44'), (7, 'Hudson Wintheiser', 'Wiza Plain', '2024-09-11 15:27:44'), (8, 'Kendall Marks', 'Kuhn Ports', '2024-09-11 15:27:44'), (9, 'Haley Schneider', 'Erwin Cliffs', '2024-09-11 15:27:44');
  722. INSERT INTO accounts (account_id, customer_id, account_type, balance, created_at) VALUES (1, 1, 'Brokerage', 796.9554824679382, '2024-09-11 15:27:44'), (2, 2, 'Checking', 7808.991622105239, '2024-09-11 15:27:44'), (3, 3, 'Checking', 4540.988288421537, '2024-09-11 15:27:44'), (4, 4, 'Brokerage', 4607.257663873947, '2024-09-11 15:27:44'), (5, 5, 'Savings', 9105.123905180497, '2024-09-11 15:27:44'), (6, 6, 'Brokerage', 6072.871742690154, '2024-09-11 15:27:44'), (7, 7, 'Savings', 7374.831288928072, '2024-09-11 15:27:44'), (8, 8, 'Brokerage', 6554.8717824477, '2024-09-11 15:27:44'), (9, 9, 'Checking', 2629.393130856843, '2024-09-11 15:27:44');
  723. INSERT INTO securities (security_id, ticker, name, sector, created_at) VALUES (1, 'Y1Fu', 'Goldner and Bechtelar LLC', 'Printing', '2024-09-11 15:27:44'), (2, 'MOF5', 'Adams and Homenick Inc', 'Market Research', '2024-09-11 15:27:44'), (3, 'Oo09', 'Tillman and Wilkinson Inc', 'Apparel & Fashion', '2024-09-11 15:27:44'), (4, 'zmAy', 'Toy and Williamson LLC', 'International Affairs', '2024-09-11 15:27:44'), (5, 'ORyo', 'Olson and Prohaska and Sons', 'Textiles', '2024-09-11 15:27:44'), (6, 'Fpn2', 'Gusikowski and Schinner Inc', 'Think Tanks', '2024-09-11 15:27:44'), (7, 'gTv2', 'Davis and Sons', 'Package / Freight Delivery', '2024-09-11 15:27:44'), (8, '38RH', 'Johns and Braun Group', 'Public Safety', '2024-09-11 15:27:44'), (9, 'Ym5u', 'Goyette Group', 'Cosmetics', '2024-09-11 15:27:44');
  724. INSERT INTO trades (trade_id, account_id, security_id, trade_type, quantity, price, trade_date) VALUES (1, 1, 1, 'buy', 337, 464.45448203724607, '2024-09-11 15:27:44'), (2, 2, 2, 'buy', 312, 299.91031464748926, '2024-09-11 15:27:44'), (3, 3, 3, 'buy', 874, 338.5711431239059, '2024-09-11 15:27:44'), (4, 4, 4, 'buy', 523, 356.4236193709552, '2024-09-11 15:27:44'), (5, 5, 5, 'sell', 251, 354.6345239481285, '2024-09-11 15:27:44'), (6, 6, 6, 'buy', 810, 437.6742610108604, '2024-09-11 15:27:44'), (7, 7, 7, 'sell', 271, 116.70199857394587, '2024-09-11 15:27:44'), (8, 8, 8, 'buy', 84, 415.0658279744514, '2024-09-11 15:27:44'), (9, 9, 9, 'sell', 763, 312.3375311232852, '2024-09-11 15:27:44');
  725. INSERT INTO orders (order_id, account_id, security_id, order_type, quantity, limit_price, status, order_date) VALUES (1, 1, 1, 'buy', 207, 456.0, 'completed', '2024-09-11 15:27:44'), (2, 2, 2, 'buy', 697, 515.0, 'canceled', '2024-09-11 15:27:44'), (3, 3, 3, 'buy', 789, 198.0, 'completed', '2024-09-11 15:27:44'), (4, 4, 4, 'sell', 280, 505.0, 'completed', '2024-09-11 15:27:44'), (5, 5, 5, 'buy', 368, 966.0, 'pending', '2024-09-11 15:27:44'), (6, 6, 6, 'buy', 439, 7.0, 'completed', '2024-09-11 15:27:44'), (7, 7, 7, 'sell', 345, 972.0, 'completed', '2024-09-11 15:27:44'), (8, 8, 8, 'sell', 867, 968.0, 'completed', '2024-09-11 15:27:44'), (9, 9, 9, 'sell', 472, 534.0, 'completed', '2024-09-11 15:27:44');
  726. INSERT INTO market_data (market_data_id, security_id, price, volume, market_date) VALUES (1, 1, 134.07573356469547, 17326, '2024-09-11 15:27:44'), (2, 2, 107.2440801092168, 63229, '2024-09-11 15:27:44'), (3, 3, 498.13544872323644, 69305, '2024-09-11 15:27:44'), (4, 4, 194.24235075387645, 45224, '2024-09-11 15:27:44'), (5, 5, 352.2334739296001, 79796, '2024-09-11 15:27:44'), (6, 6, 241.83322476711587, 44295, '2024-09-11 15:27:44'), (7, 7, 226.93537920792713, 23212, '2024-09-11 15:27:44'), (8, 8, 169.2983285300141, 96883, '2024-09-11 15:27:44'), (9, 9, 331.36982054471935, 5651, '2024-09-11 15:27:44');
  727. > DROP SECRET IF EXISTS pgpass CASCADE
  728. > CREATE SECRET pgpass AS 'postgres'
  729. > CREATE CONNECTION pg TO POSTGRES (
  730. HOST postgres,
  731. DATABASE postgres,
  732. USER postgres,
  733. PASSWORD SECRET pgpass
  734. )
  735. > CREATE SOURCE mz_source3
  736. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source3');
  737. > CREATE TABLE customers FROM SOURCE mz_source3 (REFERENCE customers);
  738. > CREATE TABLE accounts FROM SOURCE mz_source3 (REFERENCE accounts);
  739. > CREATE TABLE securities FROM SOURCE mz_source3 (REFERENCE securities);
  740. > CREATE TABLE trades FROM SOURCE mz_source3 (REFERENCE trades);
  741. > CREATE TABLE orders FROM SOURCE mz_source3 (REFERENCE orders);
  742. > CREATE TABLE market_data FROM SOURCE mz_source3 (REFERENCE market_data);
  743. > CREATE VIEW customer_portfolio AS
  744. SELECT c.customer_id, c.name, a.account_id, s.ticker, s.name AS security_name,
  745. SUM(t.quantity * t.price) AS total_value
  746. FROM customers c
  747. JOIN accounts a ON c.customer_id = a.customer_id
  748. JOIN trades t ON a.account_id = t.account_id
  749. JOIN securities s ON t.security_id = s.security_id
  750. GROUP BY c.customer_id, c.name, a.account_id, s.ticker, s.name;
  751. > CREATE VIEW top_performers AS
  752. WITH trade_volume AS (
  753. SELECT security_id, SUM(quantity) AS total_traded_volume
  754. FROM trades
  755. GROUP BY security_id
  756. ORDER BY SUM(quantity) DESC
  757. LIMIT 10
  758. )
  759. SELECT s.ticker, s.name, t.total_traded_volume
  760. FROM trade_volume t
  761. JOIN securities s USING (security_id);
  762. > CREATE VIEW market_overview AS
  763. SELECT s.sector, AVG(md.price) AS avg_price, SUM(md.volume) AS total_volume,
  764. MAX(md.market_date) AS last_update
  765. FROM securities s
  766. LEFT JOIN market_data md ON s.security_id = md.security_id
  767. GROUP BY s.sector
  768. HAVING MAX(md.market_date) + INTERVAL '5 minutes' > mz_now() ;
  769. > CREATE VIEW recent_large_trades AS
  770. SELECT t.trade_id, a.account_id, s.ticker, t.quantity, t.price, t.trade_date
  771. FROM trades t
  772. JOIN accounts a ON t.account_id = a.account_id
  773. JOIN securities s ON t.security_id = s.security_id
  774. WHERE t.quantity > (SELECT AVG(quantity) FROM trades) * 5
  775. AND t.trade_date + INTERVAL '1 hour' > mz_now();
  776. > CREATE VIEW customer_order_book AS
  777. SELECT c.customer_id, c.name, COUNT(o.order_id) AS open_orders,
  778. SUM(CASE WHEN o.status = 'completed' THEN 1 ELSE 0 END) AS completed_orders
  779. FROM customers c
  780. JOIN accounts a ON c.customer_id = a.customer_id
  781. JOIN orders o ON a.account_id = o.account_id
  782. GROUP BY c.customer_id, c.name;
  783. > CREATE VIEW sector_performance AS
  784. SELECT s.sector, AVG(t.price) AS avg_trade_price, COUNT(t.trade_id) AS trade_count,
  785. SUM(t.quantity) AS total_volume
  786. FROM trades t
  787. JOIN securities s ON t.security_id = s.security_id
  788. GROUP BY s.sector;
  789. > CREATE VIEW account_activity_summary AS
  790. SELECT a.account_id, COUNT(t.trade_id) AS trade_count,
  791. SUM(t.quantity * t.price) AS total_trade_value,
  792. MAX(t.trade_date) AS last_trade_date
  793. FROM accounts a
  794. LEFT JOIN trades t ON a.account_id = t.account_id
  795. GROUP BY a.account_id;
  796. > CREATE VIEW daily_market_movements AS
  797. WITH last_two_days AS (
  798. SELECT grp.security_id, price, market_date
  799. FROM (SELECT DISTINCT security_id FROM market_data) grp,
  800. LATERAL (
  801. SELECT md.security_id, md.price, md.market_date
  802. FROM market_data md
  803. WHERE md.security_id = grp.security_id AND md.market_date + INTERVAL '1 day' > mz_now()
  804. ORDER BY md.market_date DESC
  805. LIMIT 2
  806. )
  807. ),
  808. stg AS (
  809. SELECT security_id, today.price AS current_price, yesterday.price AS previous_price, today.market_date
  810. FROM last_two_days today
  811. LEFT JOIN last_two_days yesterday USING (security_id)
  812. WHERE today.market_date > yesterday.market_date
  813. )
  814. SELECT
  815. security_id,
  816. ticker,
  817. name,
  818. current_price,
  819. previous_price,
  820. current_price - previous_price AS price_change,
  821. market_date
  822. FROM stg
  823. JOIN securities USING (security_id);
  824. > CREATE VIEW high_value_customers AS
  825. SELECT c.customer_id, c.name, SUM(a.balance) AS total_balance
  826. FROM customers c
  827. JOIN accounts a ON c.customer_id = a.customer_id
  828. GROUP BY c.customer_id, c.name
  829. HAVING SUM(a.balance) > 1000000;
  830. > CREATE VIEW pending_orders_summary AS
  831. SELECT s.ticker, s.name, COUNT(o.order_id) AS pending_order_count,
  832. SUM(o.quantity) AS pending_volume,
  833. AVG(o.limit_price) AS avg_limit_price
  834. FROM orders o
  835. JOIN securities s ON o.security_id = s.security_id
  836. WHERE o.status = 'pending'
  837. GROUP BY s.ticker, s.name;
  838. > CREATE VIEW trade_volume_by_hour AS
  839. SELECT EXTRACT(HOUR FROM t.trade_date) AS trade_hour,
  840. COUNT(t.trade_id) AS trade_count,
  841. SUM(t.quantity) AS total_quantity
  842. FROM trades t
  843. GROUP BY EXTRACT(HOUR FROM t.trade_date);
  844. > CREATE VIEW top_securities_by_sector AS
  845. SELECT grp.sector, ticker, name, total_volume
  846. FROM (SELECT DISTINCT sector FROM securities) grp,
  847. LATERAL (
  848. SELECT s.sector, s.ticker, s.name, SUM(t.quantity) AS total_volume
  849. FROM trades t
  850. JOIN securities s ON t.security_id = s.security_id
  851. WHERE s.sector = grp.sector
  852. GROUP BY s.sector, s.ticker, s.name
  853. ORDER BY total_volume DESC
  854. LIMIT 5
  855. );
  856. > CREATE VIEW recent_trades_by_account AS
  857. SELECT a.account_id, s.ticker, t.quantity, t.price, t.trade_date
  858. FROM trades t
  859. JOIN accounts a ON t.account_id = a.account_id
  860. JOIN securities s ON t.security_id = s.security_id
  861. WHERE t.trade_date + INTERVAL '1 day'> mz_now();
  862. > CREATE VIEW order_fulfillment_rates AS
  863. SELECT c.customer_id, c.name,
  864. COUNT(o.order_id) AS total_orders,
  865. SUM(CASE WHEN o.status = 'completed' THEN 1 ELSE 0 END) AS fulfilled_orders,
  866. (SUM(CASE WHEN o.status = 'completed' THEN 1 ELSE 0 END) * 100.0 / COUNT(o.order_id)) AS fulfillment_rate
  867. FROM customers c
  868. JOIN accounts a ON c.customer_id = a.customer_id
  869. JOIN orders o ON a.account_id = o.account_id
  870. GROUP BY c.customer_id, c.name;
  871. > CREATE VIEW sector_order_activity AS
  872. SELECT s.sector, COUNT(o.order_id) AS order_count,
  873. SUM(o.quantity) AS total_quantity,
  874. AVG(o.limit_price) AS avg_limit_price
  875. FROM orders o
  876. JOIN securities s ON o.security_id = s.security_id
  877. GROUP BY s.sector;
  878. > CREATE INDEX ON securities (security_id);
  879. > CREATE INDEX ON accounts (account_id);
  880. > CREATE INDEX ON customers (customer_id);
  881. > CREATE INDEX ON customer_portfolio (customer_id);
  882. > CREATE INDEX ON top_performers (ticker);
  883. > CREATE INDEX ON market_overview (sector);
  884. > CREATE INDEX ON recent_large_trades (trade_id);
  885. > CREATE INDEX ON customer_order_book (customer_id);
  886. > CREATE INDEX ON account_activity_summary (account_id);
  887. > CREATE INDEX ON daily_market_movements (security_id);
  888. > CREATE INDEX ON high_value_customers (customer_id);
  889. > CREATE INDEX ON pending_orders_summary (ticker);
  890. > CREATE INDEX ON trade_volume_by_hour (trade_hour);
  891. > CREATE INDEX ON top_securities_by_sector (sector);
  892. > CREATE INDEX ON recent_trades_by_account (account_id);
  893. > CREATE INDEX ON order_fulfillment_rates (customer_id);
  894. > CREATE INDEX ON sector_order_activity (sector);
  895. > CREATE INDEX ON sector_performance (sector);
  896. """
  897. ),
  898. LoadPhase(
  899. duration=120,
  900. actions=[
  901. OpenLoop(
  902. action=StandaloneQuery(
  903. "UPDATE customers SET address = 'foo' WHERE customer_id = 1",
  904. conn_infos["postgres"],
  905. ),
  906. dist=Periodic(per_second=1),
  907. ),
  908. OpenLoop(
  909. action=StandaloneQuery(
  910. "UPDATE accounts SET balance = balance + 1 WHERE customer_id = 1",
  911. conn_infos["postgres"],
  912. ),
  913. dist=Periodic(per_second=1),
  914. ),
  915. OpenLoop(
  916. action=StandaloneQuery(
  917. "UPDATE trades SET price = price + 1 WHERE trade_id = 1",
  918. conn_infos["postgres"],
  919. ),
  920. dist=Periodic(per_second=1),
  921. ),
  922. OpenLoop(
  923. action=StandaloneQuery(
  924. "UPDATE orders SET status = 'pending', limit_price = limit_price + 1 WHERE order_id = 1",
  925. conn_infos["postgres"],
  926. ),
  927. dist=Periodic(per_second=1),
  928. ),
  929. OpenLoop(
  930. action=StandaloneQuery(
  931. "UPDATE market_data SET price = price + 1, volume = volume + 1, market_date = CURRENT_TIMESTAMP WHERE market_data_id = 1",
  932. conn_infos["postgres"],
  933. ),
  934. dist=Periodic(per_second=1),
  935. ),
  936. # TODO deletes
  937. # DELETE FROM accounts WHERE account_id = $1
  938. # DELETE FROM securities WHERE security_id = $1
  939. # DELETE FROM trades WHERE trade_id = $1
  940. # DELETE FROM orders WHERE order_id = $1
  941. # DELETE FROM market_data WHERE market_data_id = $1
  942. ClosedLoop(
  943. action=ReuseConnQuery(
  944. "SELECT * FROM customer_portfolio WHERE customer_id = 1",
  945. conn_infos["materialized"],
  946. strict_serializable=True,
  947. ),
  948. ),
  949. ClosedLoop(
  950. action=ReuseConnQuery(
  951. "SELECT * FROM top_performers",
  952. conn_infos["materialized"],
  953. strict_serializable=True,
  954. ),
  955. ),
  956. ClosedLoop(
  957. action=ReuseConnQuery(
  958. "SELECT * FROM market_overview WHERE sector = 'Printing'",
  959. conn_infos["materialized"],
  960. strict_serializable=True,
  961. ),
  962. ),
  963. ClosedLoop(
  964. action=ReuseConnQuery(
  965. "SELECT * FROM recent_large_trades WHERE account_id = 1",
  966. conn_infos["materialized"],
  967. strict_serializable=True,
  968. ),
  969. ),
  970. ClosedLoop(
  971. action=ReuseConnQuery(
  972. "SELECT * FROM customer_order_book WHERE customer_id = 1",
  973. conn_infos["materialized"],
  974. strict_serializable=True,
  975. ),
  976. ),
  977. ClosedLoop(
  978. action=ReuseConnQuery(
  979. "SELECT * FROM sector_performance WHERE sector = 'Printing'",
  980. conn_infos["materialized"],
  981. strict_serializable=True,
  982. ),
  983. ),
  984. # TODO: More selects
  985. # SELECT * FROM account_activity_summary WHERE account_id = $1
  986. # SELECT * FROM daily_market_movements WHERE security_id = $1
  987. # SELECT * FROM high_value_customers
  988. # SELECT * FROM pending_orders_summary WHERE ticker = $1
  989. # SELECT * FROM trade_volume_by_hour
  990. # SELECT * FROM top_securities_by_sector WHERE sector = $1
  991. # SELECT * FROM recent_trades_by_account WHERE account_id = $1
  992. # SELECT * FROM order_fulfillment_rates WHERE customer_id = $1
  993. # SELECT * FROM sector_order_activity WHERE sector = $1
  994. # SELECT * FROM cascading_order_cancellation_alert
  995. ],
  996. ),
  997. ]
  998. )
  999. @disabled("Only run separately in QA Canary pipeline")
  1000. class StagingBench(Scenario):
  1001. # TODO: Kafka source + sink
  1002. # TODO: Webhook source
  1003. def __init__(self, c: Composition, conn_infos: dict[str, PgConnInfo]):
  1004. conn_infos = deepcopy(conn_infos)
  1005. conn_infos["materialized"].cluster = "quickstart"
  1006. self.init(
  1007. [
  1008. LoadPhase(
  1009. duration=82800,
  1010. actions=[
  1011. OpenLoop(
  1012. action=PooledQuery(
  1013. "SELECT 1", conn_info=conn_infos["materialized"]
  1014. ),
  1015. dist=Periodic(per_second=500),
  1016. ),
  1017. ClosedLoop(
  1018. action=ReuseConnQuery(
  1019. "SELECT COUNT(DISTINCT l_returnflag) FROM qa_canary_environment.public_tpch.tpch_q01 WHERE sum_charge > 0",
  1020. conn_info=conn_infos["materialized"],
  1021. ),
  1022. ),
  1023. ClosedLoop(
  1024. action=ReuseConnQuery(
  1025. "SELECT COUNT(DISTINCT c_name) FROM qa_canary_environment.public_tpch.tpch_q18 WHERE o_orderdate <= '2023-01-01'",
  1026. conn_info=conn_infos["materialized"],
  1027. ),
  1028. ),
  1029. ClosedLoop(
  1030. action=ReuseConnQuery(
  1031. "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_pg_cdc.pg_wmr WHERE degree > 1",
  1032. conn_info=conn_infos["materialized"],
  1033. ),
  1034. ),
  1035. ClosedLoop(
  1036. action=ReuseConnQuery(
  1037. "SELECT COUNT(DISTINCT a_name) FROM qa_canary_environment.public_mysql_cdc.mysql_wmr WHERE degree > 1",
  1038. conn_info=conn_infos["materialized"],
  1039. ),
  1040. ),
  1041. ClosedLoop(
  1042. action=ReuseConnQuery(
  1043. "SELECT COUNT(DISTINCT count_star) FROM qa_canary_environment.public_loadgen.sales_product_product_category WHERE count_distinct_product_id > 0",
  1044. conn_info=conn_infos["materialized"],
  1045. ),
  1046. ),
  1047. ClosedLoop(
  1048. action=ReuseConnQuery(
  1049. "SELECT * FROM qa_canary_environment.public_table.table_mv",
  1050. conn_info=conn_infos["materialized"],
  1051. ),
  1052. ),
  1053. ClosedLoop(
  1054. action=ReuseConnQuery(
  1055. "SELECT min(c), max(c), count(*) FROM qa_canary_environment.public_table.table",
  1056. conn_info=conn_infos["materialized"],
  1057. ),
  1058. ),
  1059. ],
  1060. ),
  1061. ],
  1062. conn_pool_size=100,
  1063. )