benchmark_main.py 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. import re
  11. from math import ceil, floor
  12. from pathlib import Path
  13. from textwrap import dedent
  14. from parameterized import parameterized_class # type: ignore
  15. import materialize.optbench.sql
  16. from materialize.feature_benchmark.action import Action, Kgen, TdAction
  17. from materialize.feature_benchmark.measurement_source import (
  18. Lambda,
  19. MeasurementSource,
  20. Td,
  21. )
  22. from materialize.feature_benchmark.scenario import (
  23. BenchmarkingSequence,
  24. Scenario,
  25. ScenarioBig,
  26. ScenarioDisabled,
  27. )
  28. from materialize.feature_benchmark.scenario_version import ScenarioVersion
  29. # for pdoc ignores
  30. __pdoc__ = {}
  31. class FastPath(Scenario):
  32. """Feature benchmarks related to the "fast path" in query execution, as described in the
  33. 'Internals of One-off Queries' presentation.
  34. """
  35. class FastPathFilterNoIndex(FastPath):
  36. """Measure the time it takes for the fast path to filter our all rows from a materialized view and return"""
  37. SCALE = 7
  38. FIXED_SCALE = True # OOM with 10**8 = 100M records
  39. def init(self) -> list[Action]:
  40. return [
  41. self.table_ten(),
  42. TdAction(
  43. f"""
  44. > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});
  45. > CREATE DEFAULT INDEX ON v1;
  46. > SELECT COUNT(*) = {self.n()} FROM v1;
  47. true
  48. """
  49. ),
  50. ]
  51. def benchmark(self) -> MeasurementSource:
  52. return Td(
  53. """
  54. > /* A */ SELECT 1;
  55. 1
  56. > /* B */ SELECT * FROM v1 WHERE f2 < 0;
  57. """
  58. )
  59. class MFPPushdown(Scenario):
  60. """Test MFP pushdown -- WHERE clause with a suitable condition and no index defined."""
  61. SCALE = 7
  62. FIXED_SCALE = True # OOM with 10**8 = 100M records
  63. def init(self) -> list[Action]:
  64. return [
  65. self.table_ten(),
  66. TdAction(
  67. f"""
  68. > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});
  69. > SELECT COUNT(*) = {self.n()} FROM v1;
  70. true
  71. """
  72. ),
  73. ]
  74. def benchmark(self) -> MeasurementSource:
  75. return Td(
  76. """
  77. > /* A */ SELECT 1;
  78. 1
  79. > /* B */ SELECT * FROM v1 WHERE f2 < 0;
  80. """
  81. )
  82. class FastPathFilterIndex(FastPath):
  83. """Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""
  84. def init(self) -> list[Action]:
  85. return [
  86. self.table_ten(),
  87. TdAction(
  88. f"""
  89. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
  90. > CREATE DEFAULT INDEX ON v1;
  91. > SELECT COUNT(*) = {self.n()} FROM v1;
  92. true
  93. """
  94. ),
  95. ]
  96. # Since an individual query of this particular type being benchmarked takes 1ms to execute, the results are susceptible
  97. # to a lot of random noise. As we can not make the query any slower by using e.g. a large dataset,
  98. # we run the query 100 times in a row and measure the total execution time.
  99. def benchmark(self) -> MeasurementSource:
  100. hundred_selects = "\n".join(
  101. "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000)
  102. )
  103. return Td(
  104. f"""
  105. > SET auto_route_introspection_queries TO false
  106. > BEGIN
  107. > SELECT 1;
  108. /* A */
  109. 1
  110. {hundred_selects}
  111. > SELECT 1
  112. /* B */
  113. 1
  114. """
  115. )
  116. class FastPathOrderByLimit(FastPath):
  117. """Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>"""
  118. def init(self) -> list[Action]:
  119. return [
  120. self.table_ten(),
  121. TdAction(
  122. f"""
  123. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
  124. > CREATE DEFAULT INDEX ON v1;
  125. > SELECT COUNT(*) = {self.n()} FROM v1;
  126. true
  127. """
  128. ),
  129. ]
  130. def benchmark(self) -> MeasurementSource:
  131. return Td(
  132. """
  133. > SELECT 1;
  134. /* A */
  135. 1
  136. > SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000
  137. /* B */
  138. """
  139. + "\n".join([str(x) for x in range(self.n() - 1000, self.n())])
  140. )
  141. class FastPathLimit(FastPath):
  142. """Benchmark the case SELECT * FROM source LIMIT <i> , optimized by materialize#21615"""
  143. def init(self) -> list[Action]:
  144. return [
  145. TdAction(
  146. f"""
  147. > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()})
  148. """
  149. ),
  150. ]
  151. def benchmark(self) -> MeasurementSource:
  152. return Td(
  153. dedent(
  154. """
  155. > SELECT 1;
  156. /* A */
  157. 1
  158. > SELECT * FROM v1 LIMIT 100
  159. /* B */
  160. """
  161. )
  162. + "\n".join([str(x) for x in range(1, 101)])
  163. )
  164. class DML(Scenario):
  165. """Benchmarks around the performance of DML statements"""
  166. pass
  167. class Insert(DML):
  168. """Measure the time it takes for an INSERT statement to return."""
  169. def init(self) -> Action:
  170. return self.table_ten()
  171. def benchmark(self) -> MeasurementSource:
  172. return Td(
  173. f"""
  174. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  175. $ postgres-execute connection=mz_system
  176. ALTER SYSTEM SET max_result_size = 17179869184;
  177. > DROP TABLE IF EXISTS t1;
  178. > CREATE TABLE t1 (f1 INTEGER)
  179. /* A */
  180. > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
  181. /* B */
  182. """
  183. )
  184. class ManySmallInserts(DML):
  185. """Measure the time it takes for several small INSERT statements to return."""
  186. # Sometimes goes OoM
  187. SCALE = 3
  188. def init(self) -> Action:
  189. return self.table_ten()
  190. def benchmark(self) -> MeasurementSource:
  191. random.seed(self.seed())
  192. statements = []
  193. for _ in range(0, self.n()):
  194. statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})")
  195. insert_statements_str = "\n".join(statements)
  196. return Td(
  197. f"""
  198. > DROP TABLE IF EXISTS t1;
  199. > CREATE TABLE t1 (f1 INTEGER)
  200. /* A */
  201. {insert_statements_str}
  202. /* B */
  203. """
  204. )
  205. class InsertBatch(DML):
  206. """Measure the time it takes for a batch of INSERT statements to return."""
  207. SCALE = 4
  208. def benchmark(self) -> MeasurementSource:
  209. inserts = "\n".join(
  210. f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n())
  211. )
  212. return Td(
  213. f"""
  214. > DROP TABLE IF EXISTS t1;
  215. > CREATE TABLE t1 (f1 INTEGER)
  216. /* A */
  217. > SET auto_route_introspection_queries TO false
  218. > BEGIN
  219. {inserts}
  220. > COMMIT
  221. /* B */
  222. """
  223. )
  224. class InsertMultiRow(DML):
  225. """Measure the time it takes for a single multi-row INSERT statement to return.
  226. When `sequence_insert` calls `constant_optimizer`, it should be able to reach a constant. Otherwise, we run the full
  227. logical optimizer, which makes this test show a regression.
  228. """
  229. SCALE = 4 # FATAL: request larger than 2.0 MB
  230. def benchmark(self) -> MeasurementSource:
  231. values = ", ".join(f"({i})" for i in range(0, self.n()))
  232. return Td(
  233. f"""
  234. > DROP TABLE IF EXISTS t1;
  235. > CREATE TABLE t1 (f1 INTEGER)
  236. /* A */
  237. > INSERT INTO t1 VALUES {values}
  238. /* B */
  239. """
  240. )
  241. class Update(DML):
  242. """Measure the time it takes for an UPDATE statement to return to client"""
  243. def init(self) -> list[Action]:
  244. return [
  245. self.table_ten(),
  246. TdAction(
  247. f"""
  248. > CREATE TABLE t1 (f1 BIGINT);
  249. > CREATE DEFAULT INDEX ON t1;
  250. > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
  251. """
  252. ),
  253. ]
  254. def benchmark(self) -> MeasurementSource:
  255. return Td(
  256. f"""
  257. > SELECT 1
  258. /* A */
  259. 1
  260. > UPDATE t1 SET f1 = f1 + {self.n()}
  261. /* B */
  262. """
  263. )
  264. class ManySmallUpdates(DML):
  265. """Measure the time it takes for several small UPDATE statements to return to client"""
  266. SCALE = 2 # runs ~2.5 hours with SCALE = 3
  267. def version(self) -> ScenarioVersion:
  268. return ScenarioVersion.create(1, 2, 0)
  269. def init(self) -> list[Action]:
  270. return [
  271. self.table_ten(),
  272. TdAction(
  273. """
  274. > CREATE TABLE t1 (f1 INT, f2 INT);
  275. > CREATE DEFAULT INDEX ON t1;
  276. > INSERT INTO t1 SELECT generate_series(1, 10);
  277. """
  278. ),
  279. ]
  280. def benchmark(self) -> MeasurementSource:
  281. random.seed(self.seed())
  282. statements = []
  283. for _ in range(0, self.n()):
  284. statements.append(
  285. f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}"
  286. )
  287. update_statements_str = "\n".join(statements)
  288. return Td(
  289. f"""
  290. > SELECT 1
  291. /* A */
  292. 1
  293. {update_statements_str}
  294. /* B */
  295. """
  296. )
  297. class UpdateMultiNoIndex(DML):
  298. """Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue database-issues#3233"""
  299. def before(self) -> Action:
  300. # Due to exterme variability in the results, we have no option but to drop and re-create
  301. # the table prior to each measurement
  302. return TdAction(
  303. f"""
  304. > DROP TABLE IF EXISTS t1;
  305. > CREATE TABLE t1 (f1 BIGINT);
  306. > INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()})
  307. """
  308. )
  309. def benchmark(self) -> MeasurementSource:
  310. return Td(
  311. f"""
  312. > SELECT 1
  313. /* A */
  314. 1
  315. > UPDATE t1 SET f1 = f1 + {self.n()}
  316. > SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()}
  317. /* B */
  318. {self.n()}
  319. """
  320. )
  321. class InsertAndSelect(DML):
  322. """Measure the time it takes for an INSERT statement to return
  323. AND for a follow-up SELECT to return data, that is, for the
  324. dataflow to be completely caught up.
  325. """
  326. def init(self) -> Action:
  327. return self.table_ten()
  328. def benchmark(self) -> MeasurementSource:
  329. return Td(
  330. f"""
  331. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  332. $ postgres-execute connection=mz_system
  333. ALTER SYSTEM SET max_result_size = 17179869184;
  334. > DROP TABLE IF EXISTS t1;
  335. > CREATE TABLE t1 (f1 INTEGER)
  336. /* A */
  337. > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()};
  338. > SELECT 1 FROM t1 WHERE f1 = 1
  339. /* B */
  340. 1
  341. """
  342. )
  343. class Dataflow(Scenario):
  344. """Benchmark scenarios around individual dataflow patterns/operators"""
  345. pass
  346. class OrderBy(Dataflow):
  347. """Benchmark ORDER BY as executed by the dataflow layer,
  348. in contrast with an ORDER BY executed using a Finish step in the coordinator"""
  349. def init(self) -> Action:
  350. # Just to spice things up a bit, we perform individual
  351. # inserts here so that the rows are assigned separate timestamps
  352. inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10))
  353. return TdAction(
  354. f"""
  355. > CREATE TABLE ten (f1 INTEGER);
  356. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
  357. {inserts}
  358. > SELECT COUNT(*) = {self.n()} FROM v1;
  359. true
  360. """
  361. )
  362. def benchmark(self) -> MeasurementSource:
  363. # Explicit LIMIT is needed for the ORDER BY to not be optimized away
  364. return Td(
  365. f"""
  366. > DROP MATERIALIZED VIEW IF EXISTS v2
  367. /* A */
  368. > CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999
  369. > SELECT COUNT(*) FROM v2
  370. /* B */
  371. {self.n()}
  372. """
  373. )
  374. class CountDistinct(Dataflow):
  375. def init(self) -> list[Action]:
  376. return [
  377. self.view_ten(),
  378. TdAction(
  379. f"""
  380. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()};
  381. > SELECT COUNT(*) = {self.n()} FROM v1;
  382. true
  383. """
  384. ),
  385. ]
  386. def benchmark(self) -> MeasurementSource:
  387. return Td(
  388. f"""
  389. > SELECT 1
  390. /* A */
  391. 1
  392. > SELECT COUNT(DISTINCT f1) AS f1 FROM v1
  393. /* B */
  394. {self.n()}
  395. """
  396. )
  397. class MinMax(Dataflow):
  398. def init(self) -> list[Action]:
  399. return [
  400. self.view_ten(),
  401. TdAction(
  402. f"""
  403. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
  404. > SELECT COUNT(*) = {self.n()} FROM v1;
  405. true
  406. """
  407. ),
  408. ]
  409. def benchmark(self) -> MeasurementSource:
  410. return Td(
  411. f"""
  412. > SELECT 1
  413. /* A */
  414. 1
  415. > SELECT MIN(f1), MAX(f1) AS f1 FROM v1
  416. /* B */
  417. 0 {self.n()-1}
  418. """
  419. )
  420. class MinMaxMaintained(Dataflow):
  421. """Benchmark MinMax as an indexed view, which renders a dataflow for incremental
  422. maintenance, in contrast with one-shot SELECT processing"""
  423. def init(self) -> list[Action]:
  424. return [
  425. self.table_ten(),
  426. TdAction(
  427. f"""
  428. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
  429. > SELECT COUNT(*) = {self.n()} FROM v1;
  430. true
  431. """
  432. ),
  433. ]
  434. def benchmark(self) -> MeasurementSource:
  435. return Td(
  436. f"""
  437. > DROP VIEW IF EXISTS v2
  438. /* A */
  439. > CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1
  440. > CREATE DEFAULT INDEX ON v2
  441. > SELECT * FROM v2
  442. /* B */
  443. 0 {self.n()-1}
  444. """
  445. )
  446. class GroupBy(Dataflow):
  447. def init(self) -> list[Action]:
  448. return [
  449. self.view_ten(),
  450. TdAction(
  451. f"""
  452. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
  453. > SELECT COUNT(*) = {self.n()} FROM v1
  454. true
  455. """
  456. ),
  457. ]
  458. def benchmark(self) -> MeasurementSource:
  459. return Td(
  460. f"""
  461. > SELECT 1
  462. /* A */
  463. 1
  464. > SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
  465. /* B */
  466. {self.n()} 0 {self.n()-1}
  467. """
  468. )
  469. class GroupByMaintained(Dataflow):
  470. """Benchmark GroupBy as an indexed view, which renders a dataflow for incremental
  471. maintenance, in contrast with one-shot SELECT processing"""
  472. def init(self) -> list[Action]:
  473. return [
  474. self.table_ten(),
  475. TdAction(
  476. f"""
  477. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
  478. > SELECT COUNT(*) = {self.n()} FROM v1
  479. true
  480. """
  481. ),
  482. ]
  483. def benchmark(self) -> MeasurementSource:
  484. return Td(
  485. f"""
  486. > DROP VIEW IF EXISTS v2;
  487. /* A */
  488. > CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
  489. > CREATE DEFAULT INDEX ON v2
  490. > SELECT * FROM v2
  491. /* B */
  492. {self.n()} 0 {self.n()-1}
  493. """
  494. )
  495. class CrossJoin(Dataflow):
  496. def init(self) -> Action:
  497. return self.view_ten()
  498. def benchmark(self) -> MeasurementSource:
  499. return Td(
  500. f"""
  501. > DROP MATERIALIZED VIEW IF EXISTS v1;
  502. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
  503. /* A */
  504. > SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
  505. /* B */
  506. true
  507. """
  508. )
  509. class AccumulateReductions(Dataflow):
  510. """Benchmark the accumulation of reductions."""
  511. SCALE = 5
  512. def version(self) -> ScenarioVersion:
  513. return ScenarioVersion.create(1, 1, 0)
  514. def before(self) -> Action:
  515. return TdAction(
  516. """
  517. > DROP TABLE IF EXISTS t CASCADE;
  518. > CREATE TABLE t (a int, b int, c int, d int);
  519. > CREATE MATERIALIZED VIEW data AS
  520. SELECT a, a AS b FROM generate_series(1, 10000000) AS a
  521. UNION ALL
  522. SELECT a, b FROM t;
  523. > INSERT INTO t (a, b) VALUES (1, 1);
  524. > INSERT INTO t (a, b) VALUES (0, 0);
  525. > DROP CLUSTER IF EXISTS idx_cluster CASCADE;
  526. > CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1;
  527. > CREATE VIEW accumulable AS
  528. SELECT
  529. a,
  530. sum(a) AS sum_a, count(a) as cnt_a,
  531. sum(b) AS sum_b, count(b) as cnt_b
  532. FROM data
  533. GROUP BY a;
  534. """
  535. )
  536. def benchmark(self) -> MeasurementSource:
  537. sql = """
  538. > SELECT 1
  539. /* A */
  540. 1
  541. > CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a);
  542. > SET CLUSTER = idx_cluster;
  543. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT count(*) FROM accumulable;
  544. Explained Query:
  545. With
  546. cte l0 =
  547. Reduce aggregates=[count(*)] // { arity: 1 }
  548. Project () // { arity: 0 }
  549. ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }
  550. Return // { arity: 1 }
  551. Union // { arity: 1 }
  552. Get l0 // { arity: 1 }
  553. Map (0) // { arity: 1 }
  554. Union // { arity: 0 }
  555. Negate // { arity: 0 }
  556. Project () // { arity: 0 }
  557. Get l0 // { arity: 1 }
  558. Constant // { arity: 0 }
  559. - ()
  560. Used Indexes:
  561. - materialize.public.i_accumulable (*** full scan ***)
  562. Target cluster: idx_cluster
  563. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT count(*) FROM accumulable;
  564. Explained Query:
  565. With
  566. cte l0 =
  567. Reduce aggregates=[count(*)] // { arity: 1 }
  568. Project () // { arity: 0 }
  569. ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }
  570. Return // { arity: 1 }
  571. Union // { arity: 1 }
  572. Get l0 // { arity: 1 }
  573. Map (0) // { arity: 1 }
  574. Union // { arity: 0 }
  575. Negate // { arity: 0 }
  576. Project () // { arity: 0 }
  577. Get l0 // { arity: 1 }
  578. Constant // { arity: 0 }
  579. - ()
  580. Used Indexes:
  581. - materialize.public.i_accumulable (*** full scan ***)
  582. Target cluster: idx_cluster
  583. > SELECT count(*) FROM accumulable;
  584. /* B */
  585. 10000001
  586. > SET CLUSTER = default;
  587. """
  588. return Td(sql)
  589. class Retraction(Dataflow):
  590. """Benchmark the time it takes to process a very large retraction"""
  591. def before(self) -> Action:
  592. return TdAction(
  593. f"""
  594. > DROP TABLE IF EXISTS ten CASCADE;
  595. > CREATE TABLE ten (f1 INTEGER);
  596. > INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
  597. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
  598. > SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
  599. true
  600. """
  601. )
  602. def benchmark(self) -> MeasurementSource:
  603. return Td(
  604. """
  605. > SELECT 1
  606. /* A */
  607. 1
  608. > DELETE FROM ten;
  609. > SELECT COUNT(*) FROM v1
  610. /* B */
  611. 0
  612. """
  613. )
  614. class CreateIndex(Dataflow):
  615. """Measure the time it takes for CREATE INDEX to return *plus* the time
  616. it takes for a SELECT query that would use the index to return rows.
  617. """
  618. def init(self) -> list[Action]:
  619. return [
  620. self.table_ten(),
  621. TdAction(
  622. f"""
  623. > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
  624. > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
  625. # Make sure the dataflow is fully hydrated
  626. > SELECT 1 FROM t1 WHERE f1 = 0;
  627. 1
  628. """
  629. ),
  630. ]
  631. def benchmark(self) -> MeasurementSource:
  632. return Td(
  633. """
  634. > DROP INDEX IF EXISTS i1;
  635. /* A */
  636. > CREATE INDEX i1 ON t1(f1);
  637. > SELECT COUNT(*)
  638. FROM t1 AS a1, t1 AS a2
  639. WHERE a1.f1 = a2.f1
  640. AND a1.f1 = 0
  641. AND a2.f1 = 0
  642. /* B */
  643. 1
  644. """
  645. )
  646. class DeltaJoin(Dataflow):
  647. def init(self) -> list[Action]:
  648. return [
  649. self.view_ten(),
  650. TdAction(
  651. f"""
  652. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
  653. """
  654. ),
  655. ]
  656. def benchmark(self) -> MeasurementSource:
  657. return Td(
  658. f"""
  659. > SELECT 1;
  660. /* A */
  661. 1
  662. # Delta joins require 3+ tables
  663. > SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
  664. /* B */
  665. {self.n()}
  666. """
  667. )
  668. class DeltaJoinMaintained(Dataflow):
  669. """Benchmark DeltaJoin as an indexed view with table-based data initialization, where the
  670. empty frontier is not emitted, in contrast with one-shot SELECT processing based on data
  671. initialized as a constant view"""
  672. def init(self) -> list[Action]:
  673. return [
  674. self.table_ten(),
  675. TdAction(
  676. f"""
  677. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
  678. """
  679. ),
  680. ]
  681. def benchmark(self) -> MeasurementSource:
  682. return Td(
  683. f"""
  684. > DROP VIEW IF EXISTS v2;
  685. /* A */
  686. # Delta joins require 3+ tables
  687. > CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
  688. > CREATE DEFAULT INDEX ON v2
  689. > SELECT * FROM v2
  690. /* B */
  691. {self.n()}
  692. """
  693. )
  694. class DifferentialJoin(Dataflow):
  695. def init(self) -> list[Action]:
  696. return [
  697. self.view_ten(),
  698. TdAction(
  699. f"""
  700. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
  701. """
  702. ),
  703. ]
  704. def benchmark(self) -> MeasurementSource:
  705. return Td(
  706. f"""
  707. > SELECT 1;
  708. /* A */
  709. 1
  710. > SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
  711. /* B */
  712. {self.n()}
  713. """
  714. )
  715. class FullOuterJoin(Dataflow):
  716. def benchmark(self) -> BenchmarkingSequence:
  717. columns_select = ", ".join(
  718. [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))]
  719. )
  720. columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))])
  721. inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)])
  722. return [
  723. Td(
  724. f"""
  725. > DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE;
  726. > DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE;
  727. > DROP TABLE IF EXISTS ten;
  728. > CREATE TABLE ten (f1 INTEGER);
  729. > CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()}
  730. > SELECT 1;
  731. /* A */
  732. 1
  733. > CREATE MATERIALIZED VIEW v2 AS
  734. SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2
  735. FROM v1 AS a1
  736. FULL OUTER JOIN v1 AS a2 USING ({columns_using});
  737. {inserts}
  738. > SELECT * FROM v2;
  739. /* B */
  740. {self.n()} {self.n()}
  741. """
  742. )
  743. ]
  744. class Finish(Scenario):
  745. """Benchmarks around te Finish stage of query processing"""
  746. class FinishOrderByLimit(Finish):
  747. """Benchmark ORDER BY + LIMIT without the benefit of an index"""
  748. def init(self) -> list[Action]:
  749. return [
  750. self.view_ten(),
  751. TdAction(
  752. f"""
  753. > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
  754. > SELECT COUNT(*) = {self.n()} FROM v1;
  755. true
  756. """
  757. ),
  758. ]
  759. def benchmark(self) -> MeasurementSource:
  760. return Td(
  761. f"""
  762. > SELECT 1
  763. /* A */
  764. 1
  765. > SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1
  766. /* B */
  767. {self.n()-1}
  768. """
  769. )
  770. class Kafka(Scenario):
  771. pass
  772. class KafkaEnvelopeNoneBytes(Kafka):
  773. def shared(self) -> Action:
  774. data = "a" * 512
  775. return TdAction(
  776. f"""
  777. $ kafka-create-topic topic=kafka-envelope-none-bytes
  778. $ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()}
  779. {data}
  780. """
  781. )
  782. def benchmark(self) -> MeasurementSource:
  783. return Td(
  784. f"""
  785. > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
  786. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  787. > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  788. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  789. > CREATE SOURCE s1
  790. IN CLUSTER source_cluster
  791. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}')
  792. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}")
  793. FORMAT BYTES
  794. ENVELOPE NONE
  795. /* A */
  796. > SELECT COUNT(*) = {self.n()} FROM s1_tbl
  797. /* B */
  798. true
  799. """
  800. )
  801. class KafkaUpsert(Kafka):
  802. def shared(self) -> Action:
  803. return TdAction(
  804. self.keyschema()
  805. + self.schema()
  806. + f"""
  807. $ kafka-create-topic topic=kafka-upsert
  808. $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
  809. {{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }}
  810. $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}}
  811. {{"f1": 2}} {{"f2": 2}}
  812. """
  813. )
  814. def benchmark(self) -> MeasurementSource:
  815. return Td(
  816. f"""
  817. > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
  818. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  819. > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  820. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  821. URL '${{testdrive.schema-registry-url}}'
  822. );
  823. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  824. > CREATE SOURCE s1
  825. IN CLUSTER source_cluster
  826. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}')
  827. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-upsert-${{testdrive.seed}}")
  828. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  829. ENVELOPE UPSERT
  830. /* A */
  831. > SELECT f1 FROM s1_tbl
  832. /* B */
  833. 1
  834. 2
  835. """
  836. )
  837. class KafkaUpsertUnique(Kafka):
  838. def shared(self) -> Action:
  839. return TdAction(
  840. self.keyschema()
  841. + self.schema()
  842. + f"""
  843. $ kafka-create-topic topic=upsert-unique partitions=16
  844. $ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
  845. {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
  846. """
  847. )
  848. def benchmark(self) -> MeasurementSource:
  849. return Td(
  850. f"""
  851. > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
  852. > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
  853. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  854. > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  855. > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  856. TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  857. /* A */
  858. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  859. > CREATE SOURCE s1
  860. IN CLUSTER source_cluster
  861. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}')
  862. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-upsert-unique-${{testdrive.seed}}")
  863. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  864. ENVELOPE UPSERT
  865. > SELECT COUNT(*) FROM s1_tbl;
  866. /* B */
  867. {self.n()}
  868. """
  869. )
  870. class KafkaRestart(ScenarioDisabled):
  871. """This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch.
  872. With presistence however, no reingestion takes place and the scenario exhibits extreme variability.
  873. Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by
  874. the one second timestamp granularity
  875. """
  876. def shared(self) -> Action:
  877. return TdAction(
  878. self.keyschema()
  879. + self.schema()
  880. + f"""
  881. $ kafka-create-topic topic=kafka-recovery partitions=8
  882. $ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
  883. {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
  884. """
  885. )
  886. def init(self) -> Action:
  887. return TdAction(
  888. f"""
  889. > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
  890. > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
  891. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  892. > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  893. > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  894. TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  895. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  896. > CREATE SOURCE s1
  897. IN CLUSTER source_cluster
  898. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}');
  899. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-recovery-${{testdrive.seed}}")
  900. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  901. ENVELOPE UPSERT;
  902. # Make sure we are fully caught up before continuing
  903. > SELECT COUNT(*) FROM s1_tbl;
  904. {self.n()}
  905. # Give time for any background tasks (e.g. compaction) to settle down
  906. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="10s"
  907. """
  908. )
  909. def benchmark(self) -> BenchmarkingSequence:
  910. return [
  911. Lambda(lambda e: e.RestartMzClusterd()),
  912. Td(
  913. f"""
  914. > SELECT COUNT(*) /* {self.n()} */ FROM s1_tbl;
  915. /* B */
  916. {self.n()}
  917. """
  918. ),
  919. ]
  920. class KafkaRestartBig(ScenarioBig):
  921. """Ingest 100M records without constructing
  922. a dataflow that would keep all of them in memory. For the purpose, we
  923. emit a bunch of "EOF" records after the primary ingestion is complete
  924. and consider that the source has caught up when all the EOF records have
  925. been seen.
  926. """
  927. SCALE = 8
  928. def shared(self) -> list[Action]:
  929. return [
  930. TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"),
  931. # Ingest 10 ** SCALE records
  932. Kgen(
  933. topic="kafka-recovery-big",
  934. args=[
  935. "--keys=random",
  936. f"--num-records={self.n()}",
  937. "--values=bytes",
  938. "--max-message-size=32",
  939. "--min-message-size=32",
  940. "--key-min=256",
  941. f"--key-max={256+(self.n()**2)}",
  942. ],
  943. ),
  944. # Add 256 EOF markers with key values <= 256.
  945. # This high number is chosen as to guarantee that there will be an EOF marker
  946. # in each partition, even if the number of partitions is increased in the future.
  947. Kgen(
  948. topic="kafka-recovery-big",
  949. args=[
  950. "--keys=sequential",
  951. "--num-records=256",
  952. "--values=bytes",
  953. "--min-message-size=32",
  954. "--max-message-size=32",
  955. ],
  956. ),
  957. ]
  958. def init(self) -> Action:
  959. return TdAction(
  960. f"""
  961. > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  962. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  963. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  964. > CREATE SOURCE s1
  965. IN CLUSTER source_cluster
  966. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}');
  967. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-recovery-big-${{testdrive.seed}}")
  968. KEY FORMAT BYTES
  969. VALUE FORMAT BYTES
  970. ENVELOPE UPSERT;
  971. # Confirm that all the EOF markers generated above have been processed
  972. > CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1_tbl WHERE key <= '\\x00000000000000ff'
  973. > SELECT * FROM s1_is_complete;
  974. true
  975. """
  976. )
  977. def benchmark(self) -> BenchmarkingSequence:
  978. return [
  979. Lambda(lambda e: e.RestartMzClusterd()),
  980. Td(
  981. """
  982. > SELECT * FROM s1_is_complete
  983. /* B */
  984. true
  985. """
  986. ),
  987. ]
  988. for i in [5, 6, 7, 8, 9]:
  989. __pdoc__[f"KafkaEnvelopeNoneBytesScalability_scale_{i}"] = False
  990. @parameterized_class(
  991. [{"SCALE": i} for i in [5, 6, 7, 8, 9]], class_name_func=Scenario.name_with_scale
  992. )
  993. class KafkaEnvelopeNoneBytesScalability(ScenarioBig):
  994. """Run the same scenario across different scales. Do not materialize the entire
  995. source but rather just a non-memory-consuming view on top of it.
  996. """
  997. def shared(self) -> list[Action]:
  998. return [
  999. TdAction(
  1000. """
  1001. $ kafka-create-topic topic=kafka-scalability partitions=8
  1002. """
  1003. ),
  1004. Kgen(
  1005. topic="kafka-scalability",
  1006. args=[
  1007. "--keys=sequential",
  1008. f"--num-records={self.n()}",
  1009. "--values=bytes",
  1010. "--max-message-size=100",
  1011. "--min-message-size=100",
  1012. ],
  1013. ),
  1014. ]
  1015. def benchmark(self) -> MeasurementSource:
  1016. return Td(
  1017. f"""
  1018. > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
  1019. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  1020. > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  1021. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1022. > CREATE SOURCE s1
  1023. IN CLUSTER source_cluster
  1024. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}')
  1025. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-scalability-${{testdrive.seed}}")
  1026. KEY FORMAT BYTES
  1027. VALUE FORMAT BYTES
  1028. ENVELOPE NONE
  1029. /* A */
  1030. > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1_tbl;
  1031. > SELECT c = {self.n()} FROM v1
  1032. /* B */
  1033. true
  1034. """
  1035. )
  1036. class Sink(Scenario):
  1037. pass
  1038. class ExactlyOnce(Sink):
  1039. """Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited
  1040. means to figure out when the complete output has been emited, we have no option but to re-ingest
  1041. the data to determine completion.
  1042. """
  1043. FIXED_SCALE = True # TODO: Remove when database-issues#8705 is fixed
  1044. def version(self) -> ScenarioVersion:
  1045. return ScenarioVersion.create(1, 1, 0)
  1046. def shared(self) -> Action:
  1047. return TdAction(
  1048. self.keyschema()
  1049. + self.schema()
  1050. + f"""
  1051. $ kafka-create-topic topic=sink-input partitions=16
  1052. $ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
  1053. {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
  1054. """
  1055. )
  1056. def init(self) -> Action:
  1057. return TdAction(
  1058. f"""
  1059. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  1060. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  1061. > CREATE CONNECTION IF NOT EXISTS csr_conn
  1062. FOR CONFLUENT SCHEMA REGISTRY
  1063. URL '${{testdrive.schema-registry-url}}';
  1064. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1065. > CREATE SOURCE source1
  1066. IN CLUSTER source_cluster
  1067. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}');
  1068. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-sink-input-${{testdrive.seed}}")
  1069. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1070. ENVELOPE UPSERT;
  1071. > SELECT COUNT(*) FROM source1_tbl;
  1072. {self.n()}
  1073. """
  1074. )
  1075. def benchmark(self) -> MeasurementSource:
  1076. return Td(
  1077. f"""
  1078. > DROP SINK IF EXISTS sink1;
  1079. > DROP SOURCE IF EXISTS sink1_check CASCADE;
  1080. /* A */
  1081. > DROP CLUSTER IF EXISTS sink_cluster CASCADE
  1082. > CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1083. > CREATE SINK sink1
  1084. IN CLUSTER sink_cluster
  1085. FROM source1_tbl
  1086. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  1087. KEY (f1)
  1088. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1089. ENVELOPE DEBEZIUM
  1090. $ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true
  1091. # Wait until all the records have been emited from the sink, as observed by the sink1_check source
  1092. > CREATE SOURCE sink1_check
  1093. IN CLUSTER source_cluster
  1094. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}');
  1095. > CREATE TABLE sink1_check_tbl FROM SOURCE sink1_check (REFERENCE "testdrive-sink-output-${{testdrive.seed}}")
  1096. KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1097. VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1098. ENVELOPE UPSERT;
  1099. > CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check_tbl;
  1100. > SELECT * FROM sink1_check_v
  1101. /* B */
  1102. """
  1103. + str(self.n())
  1104. )
  1105. class ManyKafkaSourcesOnSameCluster(Scenario):
  1106. """Measure the time it takes to ingest data from many Kafka sources"""
  1107. # Runs ~2 hours with 300 sources
  1108. SCALE = 1.7 # 50 sources
  1109. FIXED_SCALE = True
  1110. COUNT_SOURCE_ENTRIES = 100000
  1111. def version(self) -> ScenarioVersion:
  1112. return ScenarioVersion.create(1, 2, 0)
  1113. def shared(self) -> Action:
  1114. create_topics = "\n".join(
  1115. f"""
  1116. $ kafka-create-topic topic=many-kafka-sources-{i}
  1117. $ kafka-ingest format=avro topic=many-kafka-sources-{i} schema=${{schema}} repeat={self.COUNT_SOURCE_ENTRIES}
  1118. {{"f2": ${{kafka-ingest.iteration}}}}
  1119. """
  1120. for i in range(0, self.n())
  1121. )
  1122. return TdAction(self.schema() + create_topics)
  1123. def init(self) -> Action:
  1124. return TdAction(
  1125. f"""
  1126. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  1127. $ postgres-execute connection=mz_system
  1128. ALTER SYSTEM SET max_sources = {self.n() * 4};
  1129. ALTER SYSTEM SET max_tables = {self.n() * 4};
  1130. > DROP OWNED BY materialize CASCADE;
  1131. > CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  1132. > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  1133. FOR CONFLUENT SCHEMA REGISTRY
  1134. URL '${{testdrive.schema-registry-url}}';
  1135. > DROP CLUSTER IF EXISTS kafka_source_cluster CASCADE;
  1136. > CREATE CLUSTER kafka_source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1137. """
  1138. )
  1139. def benchmark(self) -> BenchmarkingSequence:
  1140. drop_sources = "\n".join(
  1141. f"""
  1142. > DROP SOURCE IF EXISTS kafka_source{i} CASCADE;
  1143. """
  1144. for i in range(0, self.n())
  1145. )
  1146. create_sources = "\n".join(
  1147. f"""
  1148. > CREATE SOURCE kafka_source{i}
  1149. IN CLUSTER kafka_source_cluster
  1150. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-many-kafka-sources-{i}-${{testdrive.seed}}');
  1151. > CREATE TABLE kafka_source{i}_tbl FROM SOURCE kafka_source{i} (REFERENCE "testdrive-many-kafka-sources-{i}-${{testdrive.seed}}")
  1152. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  1153. ENVELOPE NONE;
  1154. """
  1155. for i in range(0, self.n())
  1156. )
  1157. check_sources = "\n".join(
  1158. f"> SELECT COUNT(*) = {self.COUNT_SOURCE_ENTRIES} FROM kafka_source{i}_tbl;\ntrue"
  1159. for i in range(0, self.n())
  1160. )
  1161. return [
  1162. Td(
  1163. self.schema()
  1164. + f"""
  1165. {drop_sources}
  1166. > SELECT 1;
  1167. /* A */
  1168. 1
  1169. {create_sources}
  1170. {check_sources}
  1171. > SELECT 1;
  1172. /* B */
  1173. 1
  1174. """
  1175. ),
  1176. ]
  1177. class PgCdc(Scenario):
  1178. pass
  1179. class PgCdcInitialLoad(PgCdc):
  1180. """Measure the time it takes to read 1M existing records from Postgres
  1181. when creating a materialized source"""
  1182. def shared(self) -> Action:
  1183. return TdAction(
  1184. f"""
  1185. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1186. ALTER USER postgres WITH replication;
  1187. DROP SCHEMA IF EXISTS public CASCADE;
  1188. CREATE SCHEMA public;
  1189. DROP PUBLICATION IF EXISTS mz_source;
  1190. CREATE PUBLICATION mz_source FOR ALL TABLES;
  1191. CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
  1192. INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x;
  1193. ALTER TABLE pk_table REPLICA IDENTITY FULL;
  1194. """
  1195. )
  1196. def before(self) -> Action:
  1197. return TdAction(
  1198. """
  1199. > DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE;
  1200. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  1201. """
  1202. )
  1203. def benchmark(self) -> MeasurementSource:
  1204. return Td(
  1205. f"""
  1206. > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
  1207. > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
  1208. HOST postgres,
  1209. DATABASE postgres,
  1210. USER postgres,
  1211. PASSWORD SECRET pgpass
  1212. )
  1213. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1214. > CREATE SOURCE mz_source_pgcdc
  1215. IN CLUSTER source_cluster
  1216. FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source');
  1217. > CREATE TABLE pk_table FROM SOURCE mz_source_pgcdc (REFERENCE pk_table);
  1218. /* A */
  1219. > SELECT count(*) FROM pk_table
  1220. /* B */
  1221. {self.n()}
  1222. """
  1223. )
  1224. class PgCdcStreaming(PgCdc):
  1225. """Measure the time it takes to ingest records from Postgres post-snapshot"""
  1226. SCALE = 5
  1227. def shared(self) -> Action:
  1228. return TdAction(
  1229. """
  1230. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1231. ALTER USER postgres WITH replication;
  1232. DROP SCHEMA IF EXISTS public CASCADE;
  1233. CREATE SCHEMA public;
  1234. DROP PUBLICATION IF EXISTS p1;
  1235. CREATE PUBLICATION p1 FOR ALL TABLES;
  1236. """
  1237. )
  1238. def before(self) -> Action:
  1239. return TdAction(
  1240. f"""
  1241. > DROP SOURCE IF EXISTS s1 CASCADE;
  1242. > DROP CLUSTER IF EXISTS source_cluster CASCADE;
  1243. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1244. DROP TABLE IF EXISTS t1;
  1245. CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
  1246. ALTER TABLE t1 REPLICA IDENTITY FULL;
  1247. > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
  1248. > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
  1249. HOST postgres,
  1250. DATABASE postgres,
  1251. USER postgres,
  1252. PASSWORD SECRET pgpass
  1253. )
  1254. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1255. > CREATE SOURCE s1
  1256. IN CLUSTER source_cluster
  1257. FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1');
  1258. > CREATE TABLE t1 FROM SOURCE s1 (REFERENCE t1);
  1259. """
  1260. )
  1261. def benchmark(self) -> MeasurementSource:
  1262. insertions = "\n".join(
  1263. [
  1264. f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;"
  1265. for i in range(0, 1000)
  1266. ]
  1267. )
  1268. return Td(
  1269. f"""
  1270. > SELECT 1;
  1271. /* A */
  1272. 1
  1273. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1274. {insertions}
  1275. > SELECT count(*) FROM t1
  1276. /* B */
  1277. {self.n()}
  1278. """
  1279. )
  1280. class MySqlCdc(Scenario):
  1281. pass
  1282. class MySqlInitialLoad(MySqlCdc):
  1283. """Measure the time it takes to read 1M existing records from MySQL
  1284. when creating a materialized source"""
  1285. FIXED_SCALE = True # TODO: Remove when database-issues#7556 is fixed
  1286. def shared(self) -> Action:
  1287. return TdAction(
  1288. f"""
  1289. $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
  1290. $ mysql-execute name=mysql
  1291. DROP DATABASE IF EXISTS public;
  1292. CREATE DATABASE public;
  1293. USE public;
  1294. SET @i:=0;
  1295. CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
  1296. INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()};
  1297. """
  1298. )
  1299. def before(self) -> Action:
  1300. return TdAction(
  1301. """
  1302. > DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
  1303. > DROP CLUSTER IF EXISTS source_cluster CASCADE
  1304. """
  1305. )
  1306. def benchmark(self) -> MeasurementSource:
  1307. return Td(
  1308. f"""
  1309. > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
  1310. > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
  1311. HOST mysql,
  1312. USER root,
  1313. PASSWORD SECRET mysqlpass
  1314. )
  1315. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1316. > CREATE SOURCE mz_source_mysqlcdc
  1317. IN CLUSTER source_cluster
  1318. FROM MYSQL CONNECTION mysql_conn;
  1319. > CREATE TABLE pk_table FROM SOURCE mz_source_mysqlcdc (REFERENCE public.pk_table);
  1320. /* A */
  1321. > SELECT count(*) FROM pk_table
  1322. /* B */
  1323. {self.n()}
  1324. """
  1325. )
  1326. class MySqlStreaming(MySqlCdc):
  1327. """Measure the time it takes to ingest records from MySQL post-snapshot"""
  1328. SCALE = 5
  1329. def shared(self) -> Action:
  1330. return TdAction(
  1331. """
  1332. $ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}
  1333. $ mysql-execute name=mysql
  1334. DROP DATABASE IF EXISTS public;
  1335. CREATE DATABASE public;
  1336. USE public;
  1337. """
  1338. )
  1339. def before(self) -> Action:
  1340. return TdAction(
  1341. f"""
  1342. > DROP SOURCE IF EXISTS s1 CASCADE;
  1343. > DROP CLUSTER IF EXISTS source_cluster CASCADE;
  1344. $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
  1345. $ mysql-execute name=mysql
  1346. DROP DATABASE IF EXISTS public;
  1347. CREATE DATABASE public;
  1348. USE public;
  1349. DROP TABLE IF EXISTS t1;
  1350. CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
  1351. > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
  1352. > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
  1353. HOST mysql,
  1354. USER root,
  1355. PASSWORD SECRET mysqlpass
  1356. )
  1357. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1358. > CREATE SOURCE s1
  1359. IN CLUSTER source_cluster
  1360. FROM MYSQL CONNECTION mysql_conn;
  1361. > CREATE TABLE t1 FROM SOURCE s1 (REFERENCE public.t1);
  1362. """
  1363. )
  1364. def benchmark(self) -> MeasurementSource:
  1365. insertions = "\n".join(
  1366. [
  1367. dedent(
  1368. f"""
  1369. SET @i:=0;
  1370. INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)};
  1371. COMMIT;
  1372. """
  1373. )
  1374. for i in range(0, 1000)
  1375. ]
  1376. )
  1377. return Td(
  1378. f"""
  1379. > SELECT 1;
  1380. /* A */
  1381. 1
  1382. $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
  1383. $ mysql-execute name=mysql
  1384. USE public;
  1385. {insertions}
  1386. > SELECT count(*) FROM t1
  1387. /* B */
  1388. {self.n()}
  1389. """
  1390. )
  1391. class Coordinator(Scenario):
  1392. """Feature benchmarks pertaining to the coordinator."""
  1393. class QueryLatency(Coordinator):
  1394. SCALE = 3
  1395. """Measure the time it takes to run SELECT 1 queries"""
  1396. def benchmark(self) -> MeasurementSource:
  1397. selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))
  1398. return Td(
  1399. f"""
  1400. > SET auto_route_introspection_queries TO false
  1401. > BEGIN
  1402. > SELECT 1;
  1403. /* A */
  1404. 1
  1405. {selects}
  1406. > SELECT 1;
  1407. /* B */
  1408. 1
  1409. """
  1410. )
  1411. class ConnectionLatency(Coordinator):
  1412. """Measure the time it takes to establish connections to Mz"""
  1413. SCALE = 2 # Many connections * many measurements = TCP port exhaustion
  1414. def benchmark(self) -> MeasurementSource:
  1415. connections = "\n".join(
  1416. """
  1417. $ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
  1418. SELECT 1;
  1419. """
  1420. for i in range(0, self.n())
  1421. )
  1422. return Td(
  1423. f"""
  1424. > SET auto_route_introspection_queries TO false
  1425. > BEGIN
  1426. > SELECT 1;
  1427. /* A */
  1428. 1
  1429. {connections}
  1430. > SELECT 1;
  1431. /* B */
  1432. 1
  1433. """
  1434. )
  1435. class Startup(Scenario):
  1436. pass
  1437. class StartupEmpty(Startup):
  1438. """Measure the time it takes to restart an empty Mz instance."""
  1439. def benchmark(self) -> BenchmarkingSequence:
  1440. return [
  1441. Lambda(lambda e: e.RestartMzClusterd()),
  1442. Td(
  1443. """
  1444. > SELECT 1;
  1445. /* B */
  1446. 1
  1447. """
  1448. ),
  1449. ]
  1450. class StartupLoaded(Scenario):
  1451. """Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something"""
  1452. SCALE = 1.2 # 25 objects of each kind
  1453. FIXED_SCALE = (
  1454. True # Can not scale to 100s of objects, so --size=+N will have no effect
  1455. )
  1456. def shared(self) -> Action:
  1457. return TdAction(
  1458. self.schema()
  1459. + """
  1460. $ kafka-create-topic topic=startup-time
  1461. $ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1
  1462. {"f2": 1}
  1463. """
  1464. )
  1465. def init(self) -> Action:
  1466. create_tables = "\n".join(
  1467. f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;"
  1468. for i in range(0, self.n())
  1469. )
  1470. create_sources = "\n".join(
  1471. f"""
  1472. > DROP CLUSTER IF EXISTS source{i}_cluster CASCADE;
  1473. > CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1474. > CREATE SOURCE source{i}
  1475. IN CLUSTER source{i}_cluster
  1476. FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}')
  1477. > CREATE TABLE source{i}_tbl FROM SOURCE source{i} (REFERENCE "testdrive-startup-time-${{testdrive.seed}}")
  1478. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  1479. ENVELOPE NONE
  1480. """
  1481. for i in range(0, self.n())
  1482. )
  1483. join = " ".join(
  1484. f"LEFT JOIN source{i}_tbl USING (f2)"
  1485. for i in range(1, (ceil(self.scale())))
  1486. )
  1487. create_views = "\n".join(
  1488. f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i}_tbl AS s {join} LIMIT {i+1}"
  1489. for i in range(0, self.n())
  1490. )
  1491. create_sinks = "\n".join(
  1492. f"""
  1493. > DROP CLUSTER IF EXISTS sink{i}_cluster;
  1494. > CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  1495. > CREATE SINK sink{i}
  1496. IN CLUSTER sink{i}_cluster
  1497. FROM source{i}_tbl
  1498. INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  1499. KEY (f2)
  1500. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  1501. ENVELOPE DEBEZIUM
  1502. """
  1503. for i in range(0, self.n())
  1504. )
  1505. return TdAction(
  1506. f"""
  1507. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  1508. $ postgres-execute connection=mz_system
  1509. ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10};
  1510. ALTER SYSTEM SET max_materialized_views = {self.n() * 2};
  1511. ALTER SYSTEM SET max_sources = {self.n() * 2};
  1512. ALTER SYSTEM SET max_sinks = {self.n() * 2};
  1513. ALTER SYSTEM SET max_tables = {self.n() * 2};
  1514. ALTER SYSTEM SET max_clusters = {self.n() * 6};
  1515. > DROP OWNED BY materialize CASCADE;
  1516. > CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  1517. > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  1518. FOR CONFLUENT SCHEMA REGISTRY
  1519. URL '${{testdrive.schema-registry-url}}';
  1520. {create_tables}
  1521. {create_sources}
  1522. {create_views}
  1523. {create_sinks}
  1524. """
  1525. )
  1526. def benchmark(self) -> BenchmarkingSequence:
  1527. check_tables = "\n".join(
  1528. f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n())
  1529. )
  1530. check_sources = "\n".join(
  1531. f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n())
  1532. )
  1533. check_views = "\n".join(
  1534. f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n())
  1535. )
  1536. return [
  1537. Lambda(lambda e: e.RestartMzClusterd()),
  1538. Td(
  1539. f"""
  1540. {check_views}
  1541. {check_sources}
  1542. {check_tables}
  1543. > SELECT 1;
  1544. /* B */
  1545. 1
  1546. """
  1547. ),
  1548. ]
  1549. class StartupTpch(Scenario):
  1550. """Measure the time it takes to restart a Mz instance populated with TPC-H and have all the dataflows be ready to return something"""
  1551. # Runs ~3 hours with SCALE = 1.2
  1552. SCALE = 0.1 # 1 object of each kind
  1553. def version(self) -> ScenarioVersion:
  1554. return ScenarioVersion.create(1, 1, 0)
  1555. def init(self) -> Action:
  1556. # We need to massage the SQL statements so that Testdrive doesn't get confused.
  1557. comment = re.compile(r"--.*?\n", re.IGNORECASE)
  1558. newline = re.compile(r"\n", re.IGNORECASE)
  1559. create_tables = "\n".join(
  1560. f"""
  1561. > {newline.sub(" ", comment.sub("", ddl))}
  1562. """
  1563. for ddl in materialize.optbench.sql.parse_from_file(
  1564. Path("misc/python/materialize/optbench/schema/tpch.sql")
  1565. )
  1566. )
  1567. queries = [
  1568. newline.sub(" ", comment.sub("", query))
  1569. for query in materialize.optbench.sql.parse_from_file(
  1570. Path("misc/python/materialize/optbench/workload/tpch.sql")
  1571. )
  1572. ]
  1573. create_views = "\n".join(
  1574. f"""
  1575. > CREATE VIEW v_{q}_{i} AS {query}
  1576. """
  1577. for q, query in enumerate(queries)
  1578. for i in range(0, self.n())
  1579. )
  1580. create_indexes = "\n".join(
  1581. f"""
  1582. > CREATE DEFAULT INDEX ON v_{q}_{i};
  1583. """
  1584. for q in range(0, len(queries))
  1585. for i in range(0, self.n())
  1586. )
  1587. create_materialized_views = "\n".join(
  1588. f"""
  1589. > CREATE MATERIALIZED VIEW mv_{q}_{i} AS {query}
  1590. """
  1591. for q, query in enumerate(queries)
  1592. for i in range(0, self.n())
  1593. )
  1594. return TdAction(
  1595. f"""
  1596. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
  1597. $ postgres-execute connection=mz_system
  1598. ALTER SYSTEM SET max_objects_per_schema = {self.n() * 100};
  1599. ALTER SYSTEM SET max_materialized_views = {self.n() * 100};
  1600. ALTER SYSTEM SET max_tables = {self.n() * 100};
  1601. > DROP OWNED BY materialize CASCADE;
  1602. {create_tables}
  1603. {create_views}
  1604. {create_indexes}
  1605. {create_materialized_views}
  1606. """
  1607. )
  1608. def benchmark(self) -> BenchmarkingSequence:
  1609. num_queries = len(
  1610. materialize.optbench.sql.parse_from_file(
  1611. Path("misc/python/materialize/optbench/workload/tpch.sql")
  1612. )
  1613. )
  1614. check_views = "\n".join(
  1615. f"> SELECT COUNT(*) >= 0 FROM v_{q}_{i}\ntrue"
  1616. for q in range(0, num_queries)
  1617. for i in range(0, self.n())
  1618. )
  1619. check_materialized_views = "\n".join(
  1620. f"> SELECT COUNT(*) >= 0 FROM mv_{q}_{i}\ntrue"
  1621. for q in range(0, num_queries)
  1622. for i in range(0, self.n())
  1623. )
  1624. return [
  1625. Lambda(lambda e: e.RestartMzClusterd()),
  1626. Td(
  1627. f"""
  1628. {check_materialized_views}
  1629. {check_views}
  1630. > SELECT 1;
  1631. /* B */
  1632. 1
  1633. """
  1634. ),
  1635. ]
  1636. class HydrateIndex(Scenario):
  1637. """Measure the time it takes for an index to hydrate when a cluster comes online."""
  1638. def init(self) -> list[Action]:
  1639. return [
  1640. self.table_ten(),
  1641. TdAction(
  1642. """
  1643. > CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1
  1644. """
  1645. ),
  1646. ]
  1647. def benchmark(self) -> MeasurementSource:
  1648. sql = f"""
  1649. > DROP TABLE IF EXISTS t1 CASCADE
  1650. > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER)
  1651. > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0)
  1652. > CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1)
  1653. > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
  1654. > UPDATE t1 SET f1 = f1 + 100000
  1655. > UPDATE t1 SET f1 = f1 + 1000000
  1656. > UPDATE t1 SET f1 = f1 + 10000000
  1657. > UPDATE t1 SET f1 = f1 + 100000000
  1658. > UPDATE t1 SET f1 = f1 + 1000000000
  1659. > SELECT 1
  1660. /* A */
  1661. 1
  1662. > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1)
  1663. > SET CLUSTER = idx_cluster
  1664. ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT COUNT(*) FROM t1
  1665. Explained Query:
  1666. With
  1667. cte l0 =
  1668. Reduce aggregates=[count(*)] // {{ arity: 1 }}
  1669. Project () // {{ arity: 0 }}
  1670. ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}
  1671. Return // {{ arity: 1 }}
  1672. Union // {{ arity: 1 }}
  1673. Get l0 // {{ arity: 1 }}
  1674. Map (0) // {{ arity: 1 }}
  1675. Union // {{ arity: 0 }}
  1676. Negate // {{ arity: 0 }}
  1677. Project () // {{ arity: 0 }}
  1678. Get l0 // {{ arity: 1 }}
  1679. Constant // {{ arity: 0 }}
  1680. - ()
  1681. Used Indexes:
  1682. - materialize.public.i1 (*** full scan ***)
  1683. Target cluster: idx_cluster
  1684. ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT COUNT(*) FROM t1
  1685. Explained Query:
  1686. With
  1687. cte l0 =
  1688. Reduce aggregates=[count(*)] // {{ arity: 1 }}
  1689. Project () // {{ arity: 0 }}
  1690. ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}
  1691. Return // {{ arity: 1 }}
  1692. Union // {{ arity: 1 }}
  1693. Get l0 // {{ arity: 1 }}
  1694. Map (0) // {{ arity: 1 }}
  1695. Union // {{ arity: 0 }}
  1696. Negate // {{ arity: 0 }}
  1697. Project () // {{ arity: 0 }}
  1698. Get l0 // {{ arity: 1 }}
  1699. Constant // {{ arity: 0 }}
  1700. - ()
  1701. Used Indexes:
  1702. - materialize.public.i1 (*** full scan ***)
  1703. Target cluster: idx_cluster
  1704. > SELECT COUNT(*) FROM t1
  1705. /* B */
  1706. {self._n}
  1707. > SET CLUSTER = default
  1708. """
  1709. return Td(sql)
  1710. def remove_arity_information_from_explain(sql: str) -> str:
  1711. return re.sub(r" // { arity: \d+ }", "", sql)
  1712. def remove_target_cluster_from_explain(sql: str) -> str:
  1713. return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
  1714. class SwapSchema(Scenario):
  1715. SCALE = 2
  1716. FIXED_SCALE = True
  1717. def init(self) -> list[Action]:
  1718. blue_views_on_table = "\n".join(
  1719. f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;"
  1720. for i in range(0, self.n())
  1721. )
  1722. green_views_on_table = "\n".join(
  1723. f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;"
  1724. for i in range(0, self.n())
  1725. )
  1726. noise_views_on_blue_view = "\n".join(
  1727. f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;"
  1728. for i in range(0, self.n())
  1729. )
  1730. noise_views_on_noise_view = "\n".join(
  1731. f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;"
  1732. for i in range(0, self.n())
  1733. )
  1734. return [
  1735. TdAction(
  1736. f"""
  1737. > CREATE SCHEMA blue;
  1738. > CREATE SCHEMA green;
  1739. > CREATE SCHEMA noise;
  1740. > CREATE TABLE blue.t1 (a int, b text);
  1741. > CREATE TABLE green.t1 (a int, b text);
  1742. {blue_views_on_table}
  1743. {green_views_on_table}
  1744. {noise_views_on_blue_view}
  1745. {noise_views_on_noise_view}
  1746. """
  1747. ),
  1748. ]
  1749. def benchmark(self) -> MeasurementSource:
  1750. return Td(
  1751. dedent(
  1752. """
  1753. > SELECT 1;
  1754. /* A */
  1755. 1
  1756. > ALTER SCHEMA blue SWAP WITH green;
  1757. > SELECT 1;
  1758. /* B */
  1759. 1
  1760. """
  1761. )
  1762. )
  1763. class ReplicaExpiration(Scenario):
  1764. # Causes "tried to kill container, but did not receive an exit event" errors when killing container afterwards
  1765. SCALE = 5
  1766. # Too slow with larger scale
  1767. FIXED_SCALE = True
  1768. def version(self) -> ScenarioVersion:
  1769. return ScenarioVersion.create(1, 1, 0)
  1770. def init(self) -> list[Action]:
  1771. return [
  1772. TdAction(
  1773. """
  1774. > CREATE TABLE events_scale (
  1775. scale INT NOT NULL,
  1776. event_ts TIMESTAMP NOT NULL
  1777. );
  1778. > CREATE VIEW events AS
  1779. SELECT concat('somelongstringthatdoesntmattermuchatallbutrequiresmemorytostoreXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', x::text) AS content, (SELECT event_ts FROM events_scale LIMIT 1) AS event_ts FROM generate_series(1, (SELECT scale FROM events_scale LIMIT 1)) x;
  1780. > CREATE MATERIALIZED VIEW last_30_days AS
  1781. SELECT event_ts, content
  1782. FROM events
  1783. WHERE mz_now() <= event_ts + INTERVAL '30 days';
  1784. > CREATE DEFAULT INDEX ON last_30_days
  1785. """
  1786. ),
  1787. ]
  1788. def benchmark(self) -> MeasurementSource:
  1789. return Td(
  1790. dedent(
  1791. f"""
  1792. > DELETE FROM events_scale;
  1793. > SELECT COUNT(*) FROM last_30_days
  1794. 0
  1795. > SELECT 1;
  1796. /* A */
  1797. 1
  1798. > INSERT INTO events_scale VALUES ({self.n()}, now());
  1799. > SELECT COUNT(*) FROM last_30_days
  1800. {self.n()}
  1801. > SELECT 1;
  1802. /* B */
  1803. 1
  1804. """
  1805. )
  1806. )