123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import random
- import re
- from math import ceil, floor
- from pathlib import Path
- from textwrap import dedent
- from parameterized import parameterized_class # type: ignore
- import materialize.optbench.sql
- from materialize.feature_benchmark.action import Action, Kgen, TdAction
- from materialize.feature_benchmark.measurement_source import (
- Lambda,
- MeasurementSource,
- Td,
- )
- from materialize.feature_benchmark.scenario import (
- BenchmarkingSequence,
- Scenario,
- ScenarioBig,
- ScenarioDisabled,
- )
- from materialize.feature_benchmark.scenario_version import ScenarioVersion
- # for pdoc ignores
- __pdoc__ = {}
- class FastPath(Scenario):
- """Feature benchmarks related to the "fast path" in query execution, as described in the
- 'Internals of One-off Queries' presentation.
- """
- class FastPathFilterNoIndex(FastPath):
- """Measure the time it takes for the fast path to filter our all rows from a materialized view and return"""
- SCALE = 7
- FIXED_SCALE = True # OOM with 10**8 = 100M records
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});
- > CREATE DEFAULT INDEX ON v1;
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- """
- > /* A */ SELECT 1;
- 1
- > /* B */ SELECT * FROM v1 WHERE f2 < 0;
- """
- )
- class MFPPushdown(Scenario):
- """Test MFP pushdown -- WHERE clause with a suitable condition and no index defined."""
- SCALE = 7
- FIXED_SCALE = True # OOM with 10**8 = 100M records
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- """
- > /* A */ SELECT 1;
- 1
- > /* B */ SELECT * FROM v1 WHERE f2 < 0;
- """
- )
- class FastPathFilterIndex(FastPath):
- """Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
- > CREATE DEFAULT INDEX ON v1;
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- # Since an individual query of this particular type being benchmarked takes 1ms to execute, the results are susceptible
- # to a lot of random noise. As we can not make the query any slower by using e.g. a large dataset,
- # we run the query 100 times in a row and measure the total execution time.
- def benchmark(self) -> MeasurementSource:
- hundred_selects = "\n".join(
- "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000)
- )
- return Td(
- f"""
- > SET auto_route_introspection_queries TO false
- > BEGIN
- > SELECT 1;
- /* A */
- 1
- {hundred_selects}
- > SELECT 1
- /* B */
- 1
- """
- )
- class FastPathOrderByLimit(FastPath):
- """Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>"""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
- > CREATE DEFAULT INDEX ON v1;
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- """
- > SELECT 1;
- /* A */
- 1
- > SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000
- /* B */
- """
- + "\n".join([str(x) for x in range(self.n() - 1000, self.n())])
- )
- class FastPathLimit(FastPath):
- """Benchmark the case SELECT * FROM source LIMIT <i> , optimized by materialize#21615"""
- def init(self) -> list[Action]:
- return [
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()})
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- dedent(
- """
- > SELECT 1;
- /* A */
- 1
- > SELECT * FROM v1 LIMIT 100
- /* B */
- """
- )
- + "\n".join([str(x) for x in range(1, 101)])
- )
- class DML(Scenario):
- """Benchmarks around the performance of DML statements"""
- pass
- class Insert(DML):
- """Measure the time it takes for an INSERT statement to return."""
- def init(self) -> Action:
- return self.table_ten()
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- $ postgres-execute connection=mz_system
- ALTER SYSTEM SET max_result_size = 17179869184;
- > DROP TABLE IF EXISTS t1;
- > CREATE TABLE t1 (f1 INTEGER)
- /* A */
- > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
- /* B */
- """
- )
- class ManySmallInserts(DML):
- """Measure the time it takes for several small INSERT statements to return."""
- # Sometimes goes OoM
- SCALE = 3
- def init(self) -> Action:
- return self.table_ten()
- def benchmark(self) -> MeasurementSource:
- random.seed(self.seed())
- statements = []
- for _ in range(0, self.n()):
- statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})")
- insert_statements_str = "\n".join(statements)
- return Td(
- f"""
- > DROP TABLE IF EXISTS t1;
- > CREATE TABLE t1 (f1 INTEGER)
- /* A */
- {insert_statements_str}
- /* B */
- """
- )
- class InsertBatch(DML):
- """Measure the time it takes for a batch of INSERT statements to return."""
- SCALE = 4
- def benchmark(self) -> MeasurementSource:
- inserts = "\n".join(
- f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n())
- )
- return Td(
- f"""
- > DROP TABLE IF EXISTS t1;
- > CREATE TABLE t1 (f1 INTEGER)
- /* A */
- > SET auto_route_introspection_queries TO false
- > BEGIN
- {inserts}
- > COMMIT
- /* B */
- """
- )
- class InsertMultiRow(DML):
- """Measure the time it takes for a single multi-row INSERT statement to return.
- When `sequence_insert` calls `constant_optimizer`, it should be able to reach a constant. Otherwise, we run the full
- logical optimizer, which makes this test show a regression.
- """
- SCALE = 4 # FATAL: request larger than 2.0 MB
- def benchmark(self) -> MeasurementSource:
- values = ", ".join(f"({i})" for i in range(0, self.n()))
- return Td(
- f"""
- > DROP TABLE IF EXISTS t1;
- > CREATE TABLE t1 (f1 INTEGER)
- /* A */
- > INSERT INTO t1 VALUES {values}
- /* B */
- """
- )
- class Update(DML):
- """Measure the time it takes for an UPDATE statement to return to client"""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE TABLE t1 (f1 BIGINT);
- > CREATE DEFAULT INDEX ON t1;
- > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- > UPDATE t1 SET f1 = f1 + {self.n()}
- /* B */
- """
- )
- class ManySmallUpdates(DML):
- """Measure the time it takes for several small UPDATE statements to return to client"""
- SCALE = 2 # runs ~2.5 hours with SCALE = 3
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 2, 0)
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- """
- > CREATE TABLE t1 (f1 INT, f2 INT);
- > CREATE DEFAULT INDEX ON t1;
- > INSERT INTO t1 SELECT generate_series(1, 10);
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- random.seed(self.seed())
- statements = []
- for _ in range(0, self.n()):
- statements.append(
- f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}"
- )
- update_statements_str = "\n".join(statements)
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- {update_statements_str}
- /* B */
- """
- )
- class UpdateMultiNoIndex(DML):
- """Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue database-issues#3233"""
- def before(self) -> Action:
- # Due to exterme variability in the results, we have no option but to drop and re-create
- # the table prior to each measurement
- return TdAction(
- f"""
- > DROP TABLE IF EXISTS t1;
- > CREATE TABLE t1 (f1 BIGINT);
- > INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()})
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- > UPDATE t1 SET f1 = f1 + {self.n()}
- > SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()}
- /* B */
- {self.n()}
- """
- )
- class InsertAndSelect(DML):
- """Measure the time it takes for an INSERT statement to return
- AND for a follow-up SELECT to return data, that is, for the
- dataflow to be completely caught up.
- """
- def init(self) -> Action:
- return self.table_ten()
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- $ postgres-execute connection=mz_system
- ALTER SYSTEM SET max_result_size = 17179869184;
- > DROP TABLE IF EXISTS t1;
- > CREATE TABLE t1 (f1 INTEGER)
- /* A */
- > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()};
- > SELECT 1 FROM t1 WHERE f1 = 1
- /* B */
- 1
- """
- )
- class Dataflow(Scenario):
- """Benchmark scenarios around individual dataflow patterns/operators"""
- pass
- class OrderBy(Dataflow):
- """Benchmark ORDER BY as executed by the dataflow layer,
- in contrast with an ORDER BY executed using a Finish step in the coordinator"""
- def init(self) -> Action:
- # Just to spice things up a bit, we perform individual
- # inserts here so that the rows are assigned separate timestamps
- inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10))
- return TdAction(
- f"""
- > CREATE TABLE ten (f1 INTEGER);
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
- {inserts}
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- )
- def benchmark(self) -> MeasurementSource:
- # Explicit LIMIT is needed for the ORDER BY to not be optimized away
- return Td(
- f"""
- > DROP MATERIALIZED VIEW IF EXISTS v2
- /* A */
- > CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999
- > SELECT COUNT(*) FROM v2
- /* B */
- {self.n()}
- """
- )
- class CountDistinct(Dataflow):
- def init(self) -> list[Action]:
- return [
- self.view_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()};
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- > SELECT COUNT(DISTINCT f1) AS f1 FROM v1
- /* B */
- {self.n()}
- """
- )
- class MinMax(Dataflow):
- def init(self) -> list[Action]:
- return [
- self.view_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- > SELECT MIN(f1), MAX(f1) AS f1 FROM v1
- /* B */
- 0 {self.n()-1}
- """
- )
- class MinMaxMaintained(Dataflow):
- """Benchmark MinMax as an indexed view, which renders a dataflow for incremental
- maintenance, in contrast with one-shot SELECT processing"""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP VIEW IF EXISTS v2
- /* A */
- > CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1
- > CREATE DEFAULT INDEX ON v2
- > SELECT * FROM v2
- /* B */
- 0 {self.n()-1}
- """
- )
- class GroupBy(Dataflow):
- def init(self) -> list[Action]:
- return [
- self.view_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
- > SELECT COUNT(*) = {self.n()} FROM v1
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- > SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
- /* B */
- {self.n()} 0 {self.n()-1}
- """
- )
- class GroupByMaintained(Dataflow):
- """Benchmark GroupBy as an indexed view, which renders a dataflow for incremental
- maintenance, in contrast with one-shot SELECT processing"""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
- > SELECT COUNT(*) = {self.n()} FROM v1
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP VIEW IF EXISTS v2;
- /* A */
- > CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
- > CREATE DEFAULT INDEX ON v2
- > SELECT * FROM v2
- /* B */
- {self.n()} 0 {self.n()-1}
- """
- )
- class CrossJoin(Dataflow):
- def init(self) -> Action:
- return self.view_ten()
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP MATERIALIZED VIEW IF EXISTS v1;
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
- /* A */
- > SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
- /* B */
- true
- """
- )
- class AccumulateReductions(Dataflow):
- """Benchmark the accumulation of reductions."""
- SCALE = 5
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 1, 0)
- def before(self) -> Action:
- return TdAction(
- """
- > DROP TABLE IF EXISTS t CASCADE;
- > CREATE TABLE t (a int, b int, c int, d int);
- > CREATE MATERIALIZED VIEW data AS
- SELECT a, a AS b FROM generate_series(1, 10000000) AS a
- UNION ALL
- SELECT a, b FROM t;
- > INSERT INTO t (a, b) VALUES (1, 1);
- > INSERT INTO t (a, b) VALUES (0, 0);
- > DROP CLUSTER IF EXISTS idx_cluster CASCADE;
- > CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1;
- > CREATE VIEW accumulable AS
- SELECT
- a,
- sum(a) AS sum_a, count(a) as cnt_a,
- sum(b) AS sum_b, count(b) as cnt_b
- FROM data
- GROUP BY a;
- """
- )
- def benchmark(self) -> MeasurementSource:
- sql = """
- > SELECT 1
- /* A */
- 1
- > CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a);
- > SET CLUSTER = idx_cluster;
- ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT count(*) FROM accumulable;
- Explained Query:
- With
- cte l0 =
- Reduce aggregates=[count(*)] // { arity: 1 }
- Project () // { arity: 0 }
- ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }
- Return // { arity: 1 }
- Union // { arity: 1 }
- Get l0 // { arity: 1 }
- Map (0) // { arity: 1 }
- Union // { arity: 0 }
- Negate // { arity: 0 }
- Project () // { arity: 0 }
- Get l0 // { arity: 1 }
- Constant // { arity: 0 }
- - ()
- Used Indexes:
- - materialize.public.i_accumulable (*** full scan ***)
- Target cluster: idx_cluster
- ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT count(*) FROM accumulable;
- Explained Query:
- With
- cte l0 =
- Reduce aggregates=[count(*)] // { arity: 1 }
- Project () // { arity: 0 }
- ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }
- Return // { arity: 1 }
- Union // { arity: 1 }
- Get l0 // { arity: 1 }
- Map (0) // { arity: 1 }
- Union // { arity: 0 }
- Negate // { arity: 0 }
- Project () // { arity: 0 }
- Get l0 // { arity: 1 }
- Constant // { arity: 0 }
- - ()
- Used Indexes:
- - materialize.public.i_accumulable (*** full scan ***)
- Target cluster: idx_cluster
- > SELECT count(*) FROM accumulable;
- /* B */
- 10000001
- > SET CLUSTER = default;
- """
- return Td(sql)
- class Retraction(Dataflow):
- """Benchmark the time it takes to process a very large retraction"""
- def before(self) -> Action:
- return TdAction(
- f"""
- > DROP TABLE IF EXISTS ten CASCADE;
- > CREATE TABLE ten (f1 INTEGER);
- > INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
- > SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
- true
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- """
- > SELECT 1
- /* A */
- 1
- > DELETE FROM ten;
- > SELECT COUNT(*) FROM v1
- /* B */
- 0
- """
- )
- class CreateIndex(Dataflow):
- """Measure the time it takes for CREATE INDEX to return *plus* the time
- it takes for a SELECT query that would use the index to return rows.
- """
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
- > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
- # Make sure the dataflow is fully hydrated
- > SELECT 1 FROM t1 WHERE f1 = 0;
- 1
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- """
- > DROP INDEX IF EXISTS i1;
- /* A */
- > CREATE INDEX i1 ON t1(f1);
- > SELECT COUNT(*)
- FROM t1 AS a1, t1 AS a2
- WHERE a1.f1 = a2.f1
- AND a1.f1 = 0
- AND a2.f1 = 0
- /* B */
- 1
- """
- )
- class DeltaJoin(Dataflow):
- def init(self) -> list[Action]:
- return [
- self.view_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1;
- /* A */
- 1
- # Delta joins require 3+ tables
- > SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
- /* B */
- {self.n()}
- """
- )
- class DeltaJoinMaintained(Dataflow):
- """Benchmark DeltaJoin as an indexed view with table-based data initialization, where the
- empty frontier is not emitted, in contrast with one-shot SELECT processing based on data
- initialized as a constant view"""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP VIEW IF EXISTS v2;
- /* A */
- # Delta joins require 3+ tables
- > CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
- > CREATE DEFAULT INDEX ON v2
- > SELECT * FROM v2
- /* B */
- {self.n()}
- """
- )
- class DifferentialJoin(Dataflow):
- def init(self) -> list[Action]:
- return [
- self.view_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1;
- /* A */
- 1
- > SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
- /* B */
- {self.n()}
- """
- )
- class FullOuterJoin(Dataflow):
- def benchmark(self) -> BenchmarkingSequence:
- columns_select = ", ".join(
- [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))]
- )
- columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))])
- inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)])
- return [
- Td(
- f"""
- > DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE;
- > DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE;
- > DROP TABLE IF EXISTS ten;
- > CREATE TABLE ten (f1 INTEGER);
- > CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()}
- > SELECT 1;
- /* A */
- 1
- > CREATE MATERIALIZED VIEW v2 AS
- SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2
- FROM v1 AS a1
- FULL OUTER JOIN v1 AS a2 USING ({columns_using});
- {inserts}
- > SELECT * FROM v2;
- /* B */
- {self.n()} {self.n()}
- """
- )
- ]
- class Finish(Scenario):
- """Benchmarks around te Finish stage of query processing"""
- class FinishOrderByLimit(Finish):
- """Benchmark ORDER BY + LIMIT without the benefit of an index"""
- def init(self) -> list[Action]:
- return [
- self.view_ten(),
- TdAction(
- f"""
- > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
- > SELECT COUNT(*) = {self.n()} FROM v1;
- true
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > SELECT 1
- /* A */
- 1
- > SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1
- /* B */
- {self.n()-1}
- """
- )
- class Kafka(Scenario):
- pass
- class KafkaEnvelopeNoneBytes(Kafka):
- def shared(self) -> Action:
- data = "a" * 512
- return TdAction(
- f"""
- $ kafka-create-topic topic=kafka-envelope-none-bytes
- $ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()}
- {data}
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}')
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}")
- FORMAT BYTES
- ENVELOPE NONE
- /* A */
- > SELECT COUNT(*) = {self.n()} FROM s1_tbl
- /* B */
- true
- """
- )
- class KafkaUpsert(Kafka):
- def shared(self) -> Action:
- return TdAction(
- self.keyschema()
- + self.schema()
- + f"""
- $ kafka-create-topic topic=kafka-upsert
- $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
- {{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }}
- $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}}
- {{"f1": 2}} {{"f2": 2}}
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${{testdrive.schema-registry-url}}'
- );
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}')
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-upsert-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT
- /* A */
- > SELECT f1 FROM s1_tbl
- /* B */
- 1
- 2
- """
- )
- class KafkaUpsertUnique(Kafka):
- def shared(self) -> Action:
- return TdAction(
- self.keyschema()
- + self.schema()
- + f"""
- $ kafka-create-topic topic=upsert-unique partitions=16
- $ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
- {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
- > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
- TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
- /* A */
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}')
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-upsert-unique-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
- ENVELOPE UPSERT
- > SELECT COUNT(*) FROM s1_tbl;
- /* B */
- {self.n()}
- """
- )
- class KafkaRestart(ScenarioDisabled):
- """This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch.
- With presistence however, no reingestion takes place and the scenario exhibits extreme variability.
- Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by
- the one second timestamp granularity
- """
- def shared(self) -> Action:
- return TdAction(
- self.keyschema()
- + self.schema()
- + f"""
- $ kafka-create-topic topic=kafka-recovery partitions=8
- $ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
- {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
- """
- )
- def init(self) -> Action:
- return TdAction(
- f"""
- > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
- > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
- TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}');
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-recovery-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
- ENVELOPE UPSERT;
- # Make sure we are fully caught up before continuing
- > SELECT COUNT(*) FROM s1_tbl;
- {self.n()}
- # Give time for any background tasks (e.g. compaction) to settle down
- $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="10s"
- """
- )
- def benchmark(self) -> BenchmarkingSequence:
- return [
- Lambda(lambda e: e.RestartMzClusterd()),
- Td(
- f"""
- > SELECT COUNT(*) /* {self.n()} */ FROM s1_tbl;
- /* B */
- {self.n()}
- """
- ),
- ]
- class KafkaRestartBig(ScenarioBig):
- """Ingest 100M records without constructing
- a dataflow that would keep all of them in memory. For the purpose, we
- emit a bunch of "EOF" records after the primary ingestion is complete
- and consider that the source has caught up when all the EOF records have
- been seen.
- """
- SCALE = 8
- def shared(self) -> list[Action]:
- return [
- TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"),
- # Ingest 10 ** SCALE records
- Kgen(
- topic="kafka-recovery-big",
- args=[
- "--keys=random",
- f"--num-records={self.n()}",
- "--values=bytes",
- "--max-message-size=32",
- "--min-message-size=32",
- "--key-min=256",
- f"--key-max={256+(self.n()**2)}",
- ],
- ),
- # Add 256 EOF markers with key values <= 256.
- # This high number is chosen as to guarantee that there will be an EOF marker
- # in each partition, even if the number of partitions is increased in the future.
- Kgen(
- topic="kafka-recovery-big",
- args=[
- "--keys=sequential",
- "--num-records=256",
- "--values=bytes",
- "--min-message-size=32",
- "--max-message-size=32",
- ],
- ),
- ]
- def init(self) -> Action:
- return TdAction(
- f"""
- > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}');
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-recovery-big-${{testdrive.seed}}")
- KEY FORMAT BYTES
- VALUE FORMAT BYTES
- ENVELOPE UPSERT;
- # Confirm that all the EOF markers generated above have been processed
- > CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1_tbl WHERE key <= '\\x00000000000000ff'
- > SELECT * FROM s1_is_complete;
- true
- """
- )
- def benchmark(self) -> BenchmarkingSequence:
- return [
- Lambda(lambda e: e.RestartMzClusterd()),
- Td(
- """
- > SELECT * FROM s1_is_complete
- /* B */
- true
- """
- ),
- ]
- for i in [5, 6, 7, 8, 9]:
- __pdoc__[f"KafkaEnvelopeNoneBytesScalability_scale_{i}"] = False
- @parameterized_class(
- [{"SCALE": i} for i in [5, 6, 7, 8, 9]], class_name_func=Scenario.name_with_scale
- )
- class KafkaEnvelopeNoneBytesScalability(ScenarioBig):
- """Run the same scenario across different scales. Do not materialize the entire
- source but rather just a non-memory-consuming view on top of it.
- """
- def shared(self) -> list[Action]:
- return [
- TdAction(
- """
- $ kafka-create-topic topic=kafka-scalability partitions=8
- """
- ),
- Kgen(
- topic="kafka-scalability",
- args=[
- "--keys=sequential",
- f"--num-records={self.n()}",
- "--values=bytes",
- "--max-message-size=100",
- "--min-message-size=100",
- ],
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}')
- > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-kafka-scalability-${{testdrive.seed}}")
- KEY FORMAT BYTES
- VALUE FORMAT BYTES
- ENVELOPE NONE
- /* A */
- > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1_tbl;
- > SELECT c = {self.n()} FROM v1
- /* B */
- true
- """
- )
- class Sink(Scenario):
- pass
- class ExactlyOnce(Sink):
- """Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited
- means to figure out when the complete output has been emited, we have no option but to re-ingest
- the data to determine completion.
- """
- FIXED_SCALE = True # TODO: Remove when database-issues#8705 is fixed
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 1, 0)
- def shared(self) -> Action:
- return TdAction(
- self.keyschema()
- + self.schema()
- + f"""
- $ kafka-create-topic topic=sink-input partitions=16
- $ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
- {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
- """
- )
- def init(self) -> Action:
- return TdAction(
- f"""
- > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- > CREATE CONNECTION IF NOT EXISTS csr_conn
- FOR CONFLUENT SCHEMA REGISTRY
- URL '${{testdrive.schema-registry-url}}';
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE source1
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}');
- > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-sink-input-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT;
- > SELECT COUNT(*) FROM source1_tbl;
- {self.n()}
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > DROP SINK IF EXISTS sink1;
- > DROP SOURCE IF EXISTS sink1_check CASCADE;
- /* A */
- > DROP CLUSTER IF EXISTS sink_cluster CASCADE
- > CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SINK sink1
- IN CLUSTER sink_cluster
- FROM source1_tbl
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
- KEY (f1)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- $ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true
- # Wait until all the records have been emited from the sink, as observed by the sink1_check source
- > CREATE SOURCE sink1_check
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}');
- > CREATE TABLE sink1_check_tbl FROM SOURCE sink1_check (REFERENCE "testdrive-sink-output-${{testdrive.seed}}")
- KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE UPSERT;
- > CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check_tbl;
- > SELECT * FROM sink1_check_v
- /* B */
- """
- + str(self.n())
- )
- class ManyKafkaSourcesOnSameCluster(Scenario):
- """Measure the time it takes to ingest data from many Kafka sources"""
- # Runs ~2 hours with 300 sources
- SCALE = 1.7 # 50 sources
- FIXED_SCALE = True
- COUNT_SOURCE_ENTRIES = 100000
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 2, 0)
- def shared(self) -> Action:
- create_topics = "\n".join(
- f"""
- $ kafka-create-topic topic=many-kafka-sources-{i}
- $ kafka-ingest format=avro topic=many-kafka-sources-{i} schema=${{schema}} repeat={self.COUNT_SOURCE_ENTRIES}
- {{"f2": ${{kafka-ingest.iteration}}}}
- """
- for i in range(0, self.n())
- )
- return TdAction(self.schema() + create_topics)
- def init(self) -> Action:
- return TdAction(
- f"""
- $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- $ postgres-execute connection=mz_system
- ALTER SYSTEM SET max_sources = {self.n() * 4};
- ALTER SYSTEM SET max_tables = {self.n() * 4};
- > DROP OWNED BY materialize CASCADE;
- > CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
- FOR CONFLUENT SCHEMA REGISTRY
- URL '${{testdrive.schema-registry-url}}';
- > DROP CLUSTER IF EXISTS kafka_source_cluster CASCADE;
- > CREATE CLUSTER kafka_source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- """
- )
- def benchmark(self) -> BenchmarkingSequence:
- drop_sources = "\n".join(
- f"""
- > DROP SOURCE IF EXISTS kafka_source{i} CASCADE;
- """
- for i in range(0, self.n())
- )
- create_sources = "\n".join(
- f"""
- > CREATE SOURCE kafka_source{i}
- IN CLUSTER kafka_source_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-many-kafka-sources-{i}-${{testdrive.seed}}');
- > CREATE TABLE kafka_source{i}_tbl FROM SOURCE kafka_source{i} (REFERENCE "testdrive-many-kafka-sources-{i}-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
- ENVELOPE NONE;
- """
- for i in range(0, self.n())
- )
- check_sources = "\n".join(
- f"> SELECT COUNT(*) = {self.COUNT_SOURCE_ENTRIES} FROM kafka_source{i}_tbl;\ntrue"
- for i in range(0, self.n())
- )
- return [
- Td(
- self.schema()
- + f"""
- {drop_sources}
- > SELECT 1;
- /* A */
- 1
- {create_sources}
- {check_sources}
- > SELECT 1;
- /* B */
- 1
- """
- ),
- ]
- class PgCdc(Scenario):
- pass
- class PgCdcInitialLoad(PgCdc):
- """Measure the time it takes to read 1M existing records from Postgres
- when creating a materialized source"""
- def shared(self) -> Action:
- return TdAction(
- f"""
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- DROP SCHEMA IF EXISTS public CASCADE;
- CREATE SCHEMA public;
- DROP PUBLICATION IF EXISTS mz_source;
- CREATE PUBLICATION mz_source FOR ALL TABLES;
- CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
- INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x;
- ALTER TABLE pk_table REPLICA IDENTITY FULL;
- """
- )
- def before(self) -> Action:
- return TdAction(
- """
- > DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE;
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
- > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass
- )
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE mz_source_pgcdc
- IN CLUSTER source_cluster
- FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source');
- > CREATE TABLE pk_table FROM SOURCE mz_source_pgcdc (REFERENCE pk_table);
- /* A */
- > SELECT count(*) FROM pk_table
- /* B */
- {self.n()}
- """
- )
- class PgCdcStreaming(PgCdc):
- """Measure the time it takes to ingest records from Postgres post-snapshot"""
- SCALE = 5
- def shared(self) -> Action:
- return TdAction(
- """
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- ALTER USER postgres WITH replication;
- DROP SCHEMA IF EXISTS public CASCADE;
- CREATE SCHEMA public;
- DROP PUBLICATION IF EXISTS p1;
- CREATE PUBLICATION p1 FOR ALL TABLES;
- """
- )
- def before(self) -> Action:
- return TdAction(
- f"""
- > DROP SOURCE IF EXISTS s1 CASCADE;
- > DROP CLUSTER IF EXISTS source_cluster CASCADE;
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- DROP TABLE IF EXISTS t1;
- CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
- ALTER TABLE t1 REPLICA IDENTITY FULL;
- > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
- > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
- HOST postgres,
- DATABASE postgres,
- USER postgres,
- PASSWORD SECRET pgpass
- )
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1');
- > CREATE TABLE t1 FROM SOURCE s1 (REFERENCE t1);
- """
- )
- def benchmark(self) -> MeasurementSource:
- insertions = "\n".join(
- [
- f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;"
- for i in range(0, 1000)
- ]
- )
- return Td(
- f"""
- > SELECT 1;
- /* A */
- 1
- $ postgres-execute connection=postgres://postgres:postgres@postgres
- {insertions}
- > SELECT count(*) FROM t1
- /* B */
- {self.n()}
- """
- )
- class MySqlCdc(Scenario):
- pass
- class MySqlInitialLoad(MySqlCdc):
- """Measure the time it takes to read 1M existing records from MySQL
- when creating a materialized source"""
- FIXED_SCALE = True # TODO: Remove when database-issues#7556 is fixed
- def shared(self) -> Action:
- return TdAction(
- f"""
- $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
- $ mysql-execute name=mysql
- DROP DATABASE IF EXISTS public;
- CREATE DATABASE public;
- USE public;
- SET @i:=0;
- CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
- INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()};
- """
- )
- def before(self) -> Action:
- return TdAction(
- """
- > DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
- > DROP CLUSTER IF EXISTS source_cluster CASCADE
- """
- )
- def benchmark(self) -> MeasurementSource:
- return Td(
- f"""
- > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
- > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
- HOST mysql,
- USER root,
- PASSWORD SECRET mysqlpass
- )
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE mz_source_mysqlcdc
- IN CLUSTER source_cluster
- FROM MYSQL CONNECTION mysql_conn;
- > CREATE TABLE pk_table FROM SOURCE mz_source_mysqlcdc (REFERENCE public.pk_table);
- /* A */
- > SELECT count(*) FROM pk_table
- /* B */
- {self.n()}
- """
- )
- class MySqlStreaming(MySqlCdc):
- """Measure the time it takes to ingest records from MySQL post-snapshot"""
- SCALE = 5
- def shared(self) -> Action:
- return TdAction(
- """
- $ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}
- $ mysql-execute name=mysql
- DROP DATABASE IF EXISTS public;
- CREATE DATABASE public;
- USE public;
- """
- )
- def before(self) -> Action:
- return TdAction(
- f"""
- > DROP SOURCE IF EXISTS s1 CASCADE;
- > DROP CLUSTER IF EXISTS source_cluster CASCADE;
- $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
- $ mysql-execute name=mysql
- DROP DATABASE IF EXISTS public;
- CREATE DATABASE public;
- USE public;
- DROP TABLE IF EXISTS t1;
- CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
- > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
- > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
- HOST mysql,
- USER root,
- PASSWORD SECRET mysqlpass
- )
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE s1
- IN CLUSTER source_cluster
- FROM MYSQL CONNECTION mysql_conn;
- > CREATE TABLE t1 FROM SOURCE s1 (REFERENCE public.t1);
- """
- )
- def benchmark(self) -> MeasurementSource:
- insertions = "\n".join(
- [
- dedent(
- f"""
- SET @i:=0;
- INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)};
- COMMIT;
- """
- )
- for i in range(0, 1000)
- ]
- )
- return Td(
- f"""
- > SELECT 1;
- /* A */
- 1
- $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
- $ mysql-execute name=mysql
- USE public;
- {insertions}
- > SELECT count(*) FROM t1
- /* B */
- {self.n()}
- """
- )
- class Coordinator(Scenario):
- """Feature benchmarks pertaining to the coordinator."""
- class QueryLatency(Coordinator):
- SCALE = 3
- """Measure the time it takes to run SELECT 1 queries"""
- def benchmark(self) -> MeasurementSource:
- selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))
- return Td(
- f"""
- > SET auto_route_introspection_queries TO false
- > BEGIN
- > SELECT 1;
- /* A */
- 1
- {selects}
- > SELECT 1;
- /* B */
- 1
- """
- )
- class ConnectionLatency(Coordinator):
- """Measure the time it takes to establish connections to Mz"""
- SCALE = 2 # Many connections * many measurements = TCP port exhaustion
- def benchmark(self) -> MeasurementSource:
- connections = "\n".join(
- """
- $ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
- SELECT 1;
- """
- for i in range(0, self.n())
- )
- return Td(
- f"""
- > SET auto_route_introspection_queries TO false
- > BEGIN
- > SELECT 1;
- /* A */
- 1
- {connections}
- > SELECT 1;
- /* B */
- 1
- """
- )
- class Startup(Scenario):
- pass
- class StartupEmpty(Startup):
- """Measure the time it takes to restart an empty Mz instance."""
- def benchmark(self) -> BenchmarkingSequence:
- return [
- Lambda(lambda e: e.RestartMzClusterd()),
- Td(
- """
- > SELECT 1;
- /* B */
- 1
- """
- ),
- ]
- class StartupLoaded(Scenario):
- """Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something"""
- SCALE = 1.2 # 25 objects of each kind
- FIXED_SCALE = (
- True # Can not scale to 100s of objects, so --size=+N will have no effect
- )
- def shared(self) -> Action:
- return TdAction(
- self.schema()
- + """
- $ kafka-create-topic topic=startup-time
- $ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1
- {"f2": 1}
- """
- )
- def init(self) -> Action:
- create_tables = "\n".join(
- f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;"
- for i in range(0, self.n())
- )
- create_sources = "\n".join(
- f"""
- > DROP CLUSTER IF EXISTS source{i}_cluster CASCADE;
- > CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SOURCE source{i}
- IN CLUSTER source{i}_cluster
- FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}')
- > CREATE TABLE source{i}_tbl FROM SOURCE source{i} (REFERENCE "testdrive-startup-time-${{testdrive.seed}}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
- ENVELOPE NONE
- """
- for i in range(0, self.n())
- )
- join = " ".join(
- f"LEFT JOIN source{i}_tbl USING (f2)"
- for i in range(1, (ceil(self.scale())))
- )
- create_views = "\n".join(
- f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i}_tbl AS s {join} LIMIT {i+1}"
- for i in range(0, self.n())
- )
- create_sinks = "\n".join(
- f"""
- > DROP CLUSTER IF EXISTS sink{i}_cluster;
- > CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > CREATE SINK sink{i}
- IN CLUSTER sink{i}_cluster
- FROM source{i}_tbl
- INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
- KEY (f2)
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
- ENVELOPE DEBEZIUM
- """
- for i in range(0, self.n())
- )
- return TdAction(
- f"""
- $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- $ postgres-execute connection=mz_system
- ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10};
- ALTER SYSTEM SET max_materialized_views = {self.n() * 2};
- ALTER SYSTEM SET max_sources = {self.n() * 2};
- ALTER SYSTEM SET max_sinks = {self.n() * 2};
- ALTER SYSTEM SET max_tables = {self.n() * 2};
- ALTER SYSTEM SET max_clusters = {self.n() * 6};
- > DROP OWNED BY materialize CASCADE;
- > CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS s1_csr_conn
- FOR CONFLUENT SCHEMA REGISTRY
- URL '${{testdrive.schema-registry-url}}';
- {create_tables}
- {create_sources}
- {create_views}
- {create_sinks}
- """
- )
- def benchmark(self) -> BenchmarkingSequence:
- check_tables = "\n".join(
- f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n())
- )
- check_sources = "\n".join(
- f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n())
- )
- check_views = "\n".join(
- f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n())
- )
- return [
- Lambda(lambda e: e.RestartMzClusterd()),
- Td(
- f"""
- {check_views}
- {check_sources}
- {check_tables}
- > SELECT 1;
- /* B */
- 1
- """
- ),
- ]
- class StartupTpch(Scenario):
- """Measure the time it takes to restart a Mz instance populated with TPC-H and have all the dataflows be ready to return something"""
- # Runs ~3 hours with SCALE = 1.2
- SCALE = 0.1 # 1 object of each kind
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 1, 0)
- def init(self) -> Action:
- # We need to massage the SQL statements so that Testdrive doesn't get confused.
- comment = re.compile(r"--.*?\n", re.IGNORECASE)
- newline = re.compile(r"\n", re.IGNORECASE)
- create_tables = "\n".join(
- f"""
- > {newline.sub(" ", comment.sub("", ddl))}
- """
- for ddl in materialize.optbench.sql.parse_from_file(
- Path("misc/python/materialize/optbench/schema/tpch.sql")
- )
- )
- queries = [
- newline.sub(" ", comment.sub("", query))
- for query in materialize.optbench.sql.parse_from_file(
- Path("misc/python/materialize/optbench/workload/tpch.sql")
- )
- ]
- create_views = "\n".join(
- f"""
- > CREATE VIEW v_{q}_{i} AS {query}
- """
- for q, query in enumerate(queries)
- for i in range(0, self.n())
- )
- create_indexes = "\n".join(
- f"""
- > CREATE DEFAULT INDEX ON v_{q}_{i};
- """
- for q in range(0, len(queries))
- for i in range(0, self.n())
- )
- create_materialized_views = "\n".join(
- f"""
- > CREATE MATERIALIZED VIEW mv_{q}_{i} AS {query}
- """
- for q, query in enumerate(queries)
- for i in range(0, self.n())
- )
- return TdAction(
- f"""
- $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
- $ postgres-execute connection=mz_system
- ALTER SYSTEM SET max_objects_per_schema = {self.n() * 100};
- ALTER SYSTEM SET max_materialized_views = {self.n() * 100};
- ALTER SYSTEM SET max_tables = {self.n() * 100};
- > DROP OWNED BY materialize CASCADE;
- {create_tables}
- {create_views}
- {create_indexes}
- {create_materialized_views}
- """
- )
- def benchmark(self) -> BenchmarkingSequence:
- num_queries = len(
- materialize.optbench.sql.parse_from_file(
- Path("misc/python/materialize/optbench/workload/tpch.sql")
- )
- )
- check_views = "\n".join(
- f"> SELECT COUNT(*) >= 0 FROM v_{q}_{i}\ntrue"
- for q in range(0, num_queries)
- for i in range(0, self.n())
- )
- check_materialized_views = "\n".join(
- f"> SELECT COUNT(*) >= 0 FROM mv_{q}_{i}\ntrue"
- for q in range(0, num_queries)
- for i in range(0, self.n())
- )
- return [
- Lambda(lambda e: e.RestartMzClusterd()),
- Td(
- f"""
- {check_materialized_views}
- {check_views}
- > SELECT 1;
- /* B */
- 1
- """
- ),
- ]
- class HydrateIndex(Scenario):
- """Measure the time it takes for an index to hydrate when a cluster comes online."""
- def init(self) -> list[Action]:
- return [
- self.table_ten(),
- TdAction(
- """
- > CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- sql = f"""
- > DROP TABLE IF EXISTS t1 CASCADE
- > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER)
- > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0)
- > CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1)
- > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
- > UPDATE t1 SET f1 = f1 + 100000
- > UPDATE t1 SET f1 = f1 + 1000000
- > UPDATE t1 SET f1 = f1 + 10000000
- > UPDATE t1 SET f1 = f1 + 100000000
- > UPDATE t1 SET f1 = f1 + 1000000000
- > SELECT 1
- /* A */
- 1
- > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1)
- > SET CLUSTER = idx_cluster
- ?[version>=13500] EXPLAIN OPTIMIZED PLAN AS VERBOSE TEXT FOR SELECT COUNT(*) FROM t1
- Explained Query:
- With
- cte l0 =
- Reduce aggregates=[count(*)] // {{ arity: 1 }}
- Project () // {{ arity: 0 }}
- ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}
- Return // {{ arity: 1 }}
- Union // {{ arity: 1 }}
- Get l0 // {{ arity: 1 }}
- Map (0) // {{ arity: 1 }}
- Union // {{ arity: 0 }}
- Negate // {{ arity: 0 }}
- Project () // {{ arity: 0 }}
- Get l0 // {{ arity: 1 }}
- Constant // {{ arity: 0 }}
- - ()
- Used Indexes:
- - materialize.public.i1 (*** full scan ***)
- Target cluster: idx_cluster
- ?[version<13500] EXPLAIN OPTIMIZED PLAN FOR SELECT COUNT(*) FROM t1
- Explained Query:
- With
- cte l0 =
- Reduce aggregates=[count(*)] // {{ arity: 1 }}
- Project () // {{ arity: 0 }}
- ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}
- Return // {{ arity: 1 }}
- Union // {{ arity: 1 }}
- Get l0 // {{ arity: 1 }}
- Map (0) // {{ arity: 1 }}
- Union // {{ arity: 0 }}
- Negate // {{ arity: 0 }}
- Project () // {{ arity: 0 }}
- Get l0 // {{ arity: 1 }}
- Constant // {{ arity: 0 }}
- - ()
- Used Indexes:
- - materialize.public.i1 (*** full scan ***)
- Target cluster: idx_cluster
- > SELECT COUNT(*) FROM t1
- /* B */
- {self._n}
- > SET CLUSTER = default
- """
- return Td(sql)
- def remove_arity_information_from_explain(sql: str) -> str:
- return re.sub(r" // { arity: \d+ }", "", sql)
- def remove_target_cluster_from_explain(sql: str) -> str:
- return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
- class SwapSchema(Scenario):
- SCALE = 2
- FIXED_SCALE = True
- def init(self) -> list[Action]:
- blue_views_on_table = "\n".join(
- f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;"
- for i in range(0, self.n())
- )
- green_views_on_table = "\n".join(
- f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;"
- for i in range(0, self.n())
- )
- noise_views_on_blue_view = "\n".join(
- f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;"
- for i in range(0, self.n())
- )
- noise_views_on_noise_view = "\n".join(
- f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;"
- for i in range(0, self.n())
- )
- return [
- TdAction(
- f"""
- > CREATE SCHEMA blue;
- > CREATE SCHEMA green;
- > CREATE SCHEMA noise;
- > CREATE TABLE blue.t1 (a int, b text);
- > CREATE TABLE green.t1 (a int, b text);
- {blue_views_on_table}
- {green_views_on_table}
- {noise_views_on_blue_view}
- {noise_views_on_noise_view}
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- dedent(
- """
- > SELECT 1;
- /* A */
- 1
- > ALTER SCHEMA blue SWAP WITH green;
- > SELECT 1;
- /* B */
- 1
- """
- )
- )
- class ReplicaExpiration(Scenario):
- # Causes "tried to kill container, but did not receive an exit event" errors when killing container afterwards
- SCALE = 5
- # Too slow with larger scale
- FIXED_SCALE = True
- def version(self) -> ScenarioVersion:
- return ScenarioVersion.create(1, 1, 0)
- def init(self) -> list[Action]:
- return [
- TdAction(
- """
- > CREATE TABLE events_scale (
- scale INT NOT NULL,
- event_ts TIMESTAMP NOT NULL
- );
- > CREATE VIEW events AS
- SELECT concat('somelongstringthatdoesntmattermuchatallbutrequiresmemorytostoreXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', x::text) AS content, (SELECT event_ts FROM events_scale LIMIT 1) AS event_ts FROM generate_series(1, (SELECT scale FROM events_scale LIMIT 1)) x;
- > CREATE MATERIALIZED VIEW last_30_days AS
- SELECT event_ts, content
- FROM events
- WHERE mz_now() <= event_ts + INTERVAL '30 days';
- > CREATE DEFAULT INDEX ON last_30_days
- """
- ),
- ]
- def benchmark(self) -> MeasurementSource:
- return Td(
- dedent(
- f"""
- > DELETE FROM events_scale;
- > SELECT COUNT(*) FROM last_30_days
- 0
- > SELECT 1;
- /* A */
- 1
- > INSERT INTO events_scale VALUES ({self.n()}, now());
- > SELECT COUNT(*) FROM last_30_days
- {self.n()}
- > SELECT 1;
- /* B */
- 1
- """
- )
- )
|