mzcompose.py 82 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Stresses Materialize with large number of objects, large ingestions, etc. Good
  11. to prevent regressions in basic functionality for larger installations.
  12. """
  13. import contextlib
  14. import json
  15. import re
  16. import sys
  17. import time
  18. import traceback
  19. import uuid
  20. from io import StringIO
  21. from textwrap import dedent
  22. from urllib.parse import quote
  23. from materialize import MZ_ROOT, buildkite
  24. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  25. from materialize.mzcompose.services.balancerd import Balancerd
  26. from materialize.mzcompose.services.clusterd import Clusterd
  27. from materialize.mzcompose.services.cockroach import Cockroach
  28. from materialize.mzcompose.services.frontegg import FronteggMock
  29. from materialize.mzcompose.services.kafka import Kafka
  30. from materialize.mzcompose.services.materialized import Materialized
  31. from materialize.mzcompose.services.mysql import MySql
  32. from materialize.mzcompose.services.mz import Mz
  33. from materialize.mzcompose.services.postgres import Postgres
  34. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  35. from materialize.mzcompose.services.test_certs import TestCerts
  36. from materialize.mzcompose.services.testdrive import Testdrive
  37. from materialize.mzcompose.services.zookeeper import Zookeeper
  38. from materialize.mzcompose.test_result import (
  39. FailedTestExecutionError,
  40. TestFailureDetails,
  41. )
  42. from materialize.test_analytics.config.test_analytics_db_config import (
  43. create_test_analytics_config,
  44. )
  45. from materialize.test_analytics.data.product_limits import (
  46. product_limits_result_storage,
  47. )
  48. from materialize.test_analytics.test_analytics_db import TestAnalyticsDb
  49. from materialize.util import all_subclasses
  50. PRODUCT_LIMITS_FRAMEWORK_VERSION = "1.0.0"
  51. class Statistics:
  52. def __init__(self, wallclock: float, explain_wallclock: float | None):
  53. self.wallclock = wallclock
  54. self.explain_wallclock = explain_wallclock
  55. def __str__(self) -> str:
  56. return f""" wallclock: {self.wallclock:>7.2f}
  57. explain_wallclock: {self.explain_wallclock:>7.2f}ms"""
  58. class Generator:
  59. """A common class for all the individual Generators.
  60. Provides a set of convenience iterators.
  61. """
  62. # By default, we create that many objects of the type under test
  63. # unless overriden on a per-test basis.
  64. #
  65. # For tests that deal with records, the number of records processed
  66. # is usually COUNT * 1000
  67. COUNT: int = 1000
  68. VERSION: str = "1.0.0"
  69. EXPLAIN: str | None = None
  70. MAX_COUNT: int | None = None
  71. @classmethod
  72. def header(cls) -> None:
  73. print(f"\n#\n# {cls}\n#\n")
  74. print(
  75. "$ postgres-connect name=mz_system url=postgres://mz_system@materialized:6877/materialize"
  76. )
  77. print("$ postgres-execute connection=mz_system")
  78. print("DROP SCHEMA IF EXISTS public CASCADE;")
  79. print(f"CREATE SCHEMA public /* {cls} */;")
  80. print("GRANT ALL PRIVILEGES ON SCHEMA public TO materialize")
  81. print(f'GRANT ALL PRIVILEGES ON SCHEMA public TO "{ADMIN_USER}"')
  82. print(
  83. f'GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO "{ADMIN_USER}";',
  84. )
  85. print(
  86. f'GRANT ALL PRIVILEGES ON CLUSTER single_worker_cluster TO "{ADMIN_USER}";',
  87. )
  88. print(
  89. f'GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{ADMIN_USER}";',
  90. )
  91. @classmethod
  92. def body(cls) -> None:
  93. raise NotImplementedError
  94. @classmethod
  95. def store_explain_and_run(cls, query: str) -> str | None:
  96. cls.EXPLAIN = f"EXPLAIN {query}"
  97. print(f"> {query}")
  98. @classmethod
  99. def footer(cls) -> None:
  100. print()
  101. @classmethod
  102. def generate(cls) -> None:
  103. cls.header()
  104. cls.body()
  105. cls.footer()
  106. @classmethod
  107. def all(cls) -> range:
  108. return range(1, cls.COUNT + 1)
  109. @classmethod
  110. def no_first(cls) -> range:
  111. return range(2, cls.COUNT + 1)
  112. @classmethod
  113. def no_last(cls) -> range:
  114. return range(1, cls.COUNT)
  115. class Connections(Generator):
  116. @classmethod
  117. def body(cls) -> None:
  118. print("$ postgres-execute connection=mz_system")
  119. # three extra connections for mz_system, default connection, and one
  120. # since sqlparse 0.4.4. 3 reserved superuser connections since materialize#25666
  121. # try bumping limit a bit further since this is sometimes flaky
  122. print(f"ALTER SYSTEM SET max_connections = {Connections.COUNT+10};")
  123. for i in cls.all():
  124. print(
  125. f"$ postgres-connect name=conn{i} url=postgres://{quote(ADMIN_USER)}:{app_password(ADMIN_USER)}@balancerd:6875?sslmode=require"
  126. )
  127. for i in cls.all():
  128. print(f"$ postgres-execute connection=conn{i}\nSELECT 1;\n")
  129. class Tables(Generator):
  130. COUNT = 90 # https://github.com/MaterializeInc/database-issues/issues/3675 and https://github.com/MaterializeInc/database-issues/issues/7830
  131. MAX_COUNT = 2880 # Too long-running with 5760 tables
  132. @classmethod
  133. def body(cls) -> None:
  134. print("$ postgres-execute connection=mz_system")
  135. print(f"ALTER SYSTEM SET max_tables = {cls.COUNT * 10};")
  136. print("$ postgres-execute connection=mz_system")
  137. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  138. for i in cls.all():
  139. print(f"> CREATE TABLE t{i} (f1 INTEGER);")
  140. for i in cls.all():
  141. print(f"> INSERT INTO t{i} VALUES ({i});")
  142. print("> BEGIN")
  143. for i in cls.all():
  144. cls.store_explain_and_run(f"SELECT * FROM t{i}")
  145. print(f"{i}")
  146. print("> COMMIT")
  147. class Subscribe(Generator):
  148. COUNT = 100 # Each SUBSCRIBE instantiates a dataflow, so impossible to do 1K
  149. @classmethod
  150. def body(cls) -> None:
  151. print("> DROP TABLE IF EXISTS t1 CASCADE;")
  152. print("> CREATE TABLE t1 (f1 INTEGER);")
  153. print("> INSERT INTO t1 VALUES (-1);")
  154. print("> CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) FROM t1;")
  155. for i in cls.all():
  156. print(
  157. f"$ postgres-connect name=conn{i} url=postgres://{quote(ADMIN_USER)}:{app_password(ADMIN_USER)}@balancerd:6875?sslmode=require"
  158. )
  159. for i in cls.all():
  160. print(f"$ postgres-execute connection=conn{i}")
  161. print("BEGIN;")
  162. for i in cls.all():
  163. print(f"$ postgres-execute connection=conn{i}")
  164. print(f"DECLARE c{i} CURSOR FOR SUBSCRIBE v1")
  165. for i in cls.all():
  166. print(f"$ postgres-execute connection=conn{i}")
  167. print(f"FETCH ALL FROM c{i};")
  168. class Indexes(Generator):
  169. MAX_COUNT = 2000 # Too long-running with count=2562
  170. @classmethod
  171. def body(cls) -> None:
  172. print("$ postgres-execute connection=mz_system")
  173. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  174. print(
  175. "> CREATE TABLE t (" + ", ".join(f"f{i} INTEGER" for i in cls.all()) + ");"
  176. )
  177. print("> INSERT INTO t VALUES (" + ", ".join(str(i) for i in cls.all()) + ");")
  178. for i in cls.all():
  179. print(f"> CREATE INDEX i{i} ON t(f{i})")
  180. for i in cls.all():
  181. cls.store_explain_and_run(f"SELECT f{i} FROM t")
  182. print(f"{i}")
  183. class KafkaTopics(Generator):
  184. COUNT = min(Generator.COUNT, 20) # CREATE SOURCE is slow
  185. MAX_COUNT = 640 # Too long-running with count=1280
  186. @classmethod
  187. def body(cls) -> None:
  188. print("$ postgres-execute connection=mz_system")
  189. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  190. print("$ postgres-execute connection=mz_system")
  191. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  192. print('$ set key-schema={"type": "string"}')
  193. print(
  194. '$ set value-schema={"type": "record", "name": "r", "fields": [{"name": "f1", "type": "string"}]}'
  195. )
  196. print(
  197. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  198. FOR CONFLUENT SCHEMA REGISTRY
  199. URL '${testdrive.schema-registry-url}';
  200. """
  201. )
  202. print(
  203. """> CREATE CONNECTION IF NOT EXISTS kafka_conn
  204. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  205. """
  206. )
  207. for i in cls.all():
  208. topic = f"kafka-sources-{i}"
  209. print(f"$ kafka-create-topic topic={topic}")
  210. print(
  211. f"$ kafka-ingest format=avro topic={topic} key-format=avro key-schema=${{key-schema}} schema=${{value-schema}}"
  212. )
  213. print(f'"{i}" {{"f1": "{i}"}}')
  214. print(
  215. f"""> CREATE SOURCE s{i}
  216. IN CLUSTER single_replica_cluster
  217. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-{topic}-${{testdrive.seed}}')
  218. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  219. ENVELOPE NONE;
  220. """
  221. )
  222. for i in cls.all():
  223. cls.store_explain_and_run(f"SELECT * FROM s{i}")
  224. print(f"{i}")
  225. class KafkaSourcesSameTopic(Generator):
  226. COUNT = 500 # high memory consumption
  227. MAX_COUNT = COUNT # Too long-running with 750 sources
  228. @classmethod
  229. def body(cls) -> None:
  230. print("$ postgres-execute connection=mz_system")
  231. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  232. print("$ postgres-execute connection=mz_system")
  233. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  234. print('$ set key-schema={"type": "string"}')
  235. print(
  236. '$ set value-schema={"type": "record", "name": "r", "fields": [{"name": "f1", "type": "string"}]}'
  237. )
  238. print("$ kafka-create-topic topic=topic")
  239. print(
  240. "$ kafka-ingest format=avro topic=topic key-format=avro key-schema=${key-schema} schema=${value-schema}"
  241. )
  242. print('"123" {"f1": "123"}')
  243. print(
  244. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  245. FOR CONFLUENT SCHEMA REGISTRY
  246. URL '${testdrive.schema-registry-url}';
  247. """
  248. )
  249. print(
  250. """> CREATE CONNECTION IF NOT EXISTS kafka_conn
  251. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  252. """
  253. )
  254. for i in cls.all():
  255. print(
  256. f"""> CREATE SOURCE s{i}
  257. IN CLUSTER single_replica_cluster
  258. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic-${{testdrive.seed}}')
  259. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  260. ENVELOPE NONE;
  261. """
  262. )
  263. for i in cls.all():
  264. cls.store_explain_and_run(f"SELECT * FROM s{i}")
  265. print("123")
  266. class KafkaPartitions(Generator):
  267. COUNT = min(Generator.COUNT, 100) # It takes 5+min to process 1K partitions
  268. MAX_COUNT = 3200 # Too long-running with 6400 partitions
  269. @classmethod
  270. def body(cls) -> None:
  271. print("$ postgres-execute connection=mz_system")
  272. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  273. print("$ postgres-execute connection=mz_system")
  274. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  275. # gh#12193 : topic_metadata_refresh_interval_ms is not observed so a default refresh interval of 300s applies
  276. print("$ set-sql-timeout duration=600s")
  277. print('$ set key-schema={"type": "string"}')
  278. print(
  279. '$ set value-schema={"type": "record", "name": "r", "fields": [{"name": "f1", "type": "string"}]}'
  280. )
  281. print(
  282. f"$ kafka-create-topic topic=kafka-partitions partitions={round(cls.COUNT/2)}"
  283. )
  284. print(
  285. "$ kafka-ingest format=avro topic=kafka-partitions key-format=avro key-schema=${key-schema} schema=${value-schema} partition=-1"
  286. )
  287. for i in cls.all():
  288. print(f'"{i}" {{"f1": "{i}"}}')
  289. print(
  290. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  291. FOR CONFLUENT SCHEMA REGISTRY
  292. URL '${testdrive.schema-registry-url}';
  293. """
  294. )
  295. print(
  296. """> CREATE CONNECTION IF NOT EXISTS kafka_conn
  297. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  298. """
  299. )
  300. print(
  301. """> CREATE SOURCE s1
  302. IN CLUSTER single_replica_cluster
  303. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-partitions-${testdrive.seed}')
  304. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  305. ENVELOPE NONE;
  306. """
  307. )
  308. print("> CREATE DEFAULT INDEX ON s1")
  309. print(
  310. f"$ kafka-add-partitions topic=kafka-partitions total-partitions={cls.COUNT}"
  311. )
  312. print(
  313. "$ kafka-ingest format=avro topic=kafka-partitions key-format=avro key-schema=${key-schema} schema=${value-schema} partition=-1"
  314. )
  315. for i in cls.all():
  316. print(f'"{i}" {{"f1": "{i}"}}')
  317. cls.store_explain_and_run("SELECT COUNT(*) FROM s1")
  318. print(f"{cls.COUNT * 2}")
  319. class KafkaRecordsEnvelopeNone(Generator):
  320. COUNT = Generator.COUNT * 10_000
  321. MAX_COUNT = COUNT # Only runs into max unsigned int size, takes a while
  322. @classmethod
  323. def body(cls) -> None:
  324. print("$ postgres-execute connection=mz_system")
  325. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  326. print("$ postgres-execute connection=mz_system")
  327. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  328. print(
  329. '$ set kafka-records-envelope-none={"type": "record", "name": "r", "fields": [{"name": "f1", "type": "string"}]}'
  330. )
  331. print("$ kafka-create-topic topic=kafka-records-envelope-none")
  332. print(
  333. f"$ kafka-ingest format=avro topic=kafka-records-envelope-none schema=${{kafka-records-envelope-none}} repeat={cls.COUNT}"
  334. )
  335. print('{"f1": "123"}')
  336. print(
  337. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  338. FOR CONFLUENT SCHEMA REGISTRY
  339. URL '${testdrive.schema-registry-url}';
  340. """
  341. )
  342. print(
  343. """> CREATE CONNECTION IF NOT EXISTS kafka_conn
  344. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  345. """
  346. )
  347. print(
  348. """> CREATE SOURCE kafka_records_envelope_none
  349. IN CLUSTER single_replica_cluster
  350. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-records-envelope-none-${testdrive.seed}')
  351. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  352. ENVELOPE NONE;
  353. """
  354. )
  355. cls.store_explain_and_run("SELECT COUNT(*) FROM kafka_records_envelope_none")
  356. print(f"{cls.COUNT}")
  357. class KafkaRecordsEnvelopeUpsertSameValue(Generator):
  358. COUNT = Generator.COUNT * 10_000
  359. MAX_COUNT = COUNT # Only runs into max unsigned int size, takes a while
  360. @classmethod
  361. def body(cls) -> None:
  362. print("$ postgres-execute connection=mz_system")
  363. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  364. print("$ postgres-execute connection=mz_system")
  365. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  366. print(
  367. '$ set kafka-records-envelope-upsert-same-key={"type": "record", "name": "Key", "fields": [ {"name": "key", "type": "string"} ] }'
  368. )
  369. print(
  370. '$ set kafka-records-envelope-upsert-same-value={"type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] }'
  371. )
  372. print("$ kafka-create-topic topic=kafka-records-envelope-upsert-same")
  373. print(
  374. f"$ kafka-ingest format=avro topic=kafka-records-envelope-upsert-same key-format=avro key-schema=${{kafka-records-envelope-upsert-same-key}} schema=${{kafka-records-envelope-upsert-same-value}} repeat={cls.COUNT}"
  375. )
  376. print('{"key": "fish"} {"f1": "fish"}')
  377. print(
  378. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  379. FOR CONFLUENT SCHEMA REGISTRY
  380. URL '${testdrive.schema-registry-url}';
  381. """
  382. )
  383. print(
  384. """> CREATE CONNECTION IF NOT EXISTS kafka_conn
  385. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  386. """
  387. )
  388. print(
  389. """> CREATE SOURCE kafka_records_envelope_upsert_same
  390. IN CLUSTER single_replica_cluster
  391. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-records-envelope-upsert-same-${testdrive.seed}')
  392. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  393. ENVELOPE UPSERT;
  394. """
  395. )
  396. print("> SELECT * FROM kafka_records_envelope_upsert_same;\nfish fish")
  397. cls.store_explain_and_run(
  398. "SELECT COUNT(*) FROM kafka_records_envelope_upsert_same"
  399. )
  400. print("1")
  401. class KafkaRecordsEnvelopeUpsertDistinctValues(Generator):
  402. COUNT = Generator.COUNT * 1_000
  403. @classmethod
  404. def body(cls) -> None:
  405. print("$ postgres-execute connection=mz_system")
  406. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  407. print("$ postgres-execute connection=mz_system")
  408. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  409. print(
  410. '$ set kafka-records-envelope-upsert-distinct-key={"type": "record", "name": "Key", "fields": [ {"name": "key", "type": "string"} ] }'
  411. )
  412. print(
  413. '$ set kafka-records-envelope-upsert-distinct-value={"type" : "record", "name" : "test", "fields" : [ {"name":"f1", "type":"string"} ] }'
  414. )
  415. print("$ kafka-create-topic topic=kafka-records-envelope-upsert-distinct")
  416. print(
  417. f"$ kafka-ingest format=avro topic=kafka-records-envelope-upsert-distinct key-format=avro key-schema=${{kafka-records-envelope-upsert-distinct-key}} schema=${{kafka-records-envelope-upsert-distinct-value}} repeat={cls.COUNT}"
  418. )
  419. print(
  420. '{"key": "${kafka-ingest.iteration}"} {"f1": "${kafka-ingest.iteration}"}'
  421. )
  422. print(
  423. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  424. FOR CONFLUENT SCHEMA REGISTRY
  425. URL '${testdrive.schema-registry-url}';
  426. """
  427. )
  428. print(
  429. """> CREATE CONNECTION IF NOT EXISTS kafka_conn
  430. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  431. """
  432. )
  433. print(
  434. """> CREATE SOURCE kafka_records_envelope_upsert_distinct
  435. IN CLUSTER single_replica_cluster
  436. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-records-envelope-upsert-distinct-${testdrive.seed}')
  437. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  438. ENVELOPE UPSERT;
  439. """
  440. )
  441. cls.store_explain_and_run(
  442. "SELECT COUNT(*), COUNT(DISTINCT f1) FROM kafka_records_envelope_upsert_distinct"
  443. )
  444. print(f"{cls.COUNT} {cls.COUNT}")
  445. print(
  446. f"$ kafka-ingest format=avro topic=kafka-records-envelope-upsert-distinct key-format=avro key-schema=${{kafka-records-envelope-upsert-distinct-key}} schema=${{kafka-records-envelope-upsert-distinct-value}} repeat={cls.COUNT}"
  447. )
  448. print('{"key": "${kafka-ingest.iteration}"}')
  449. print(
  450. "> SELECT COUNT(*), COUNT(DISTINCT f1) FROM kafka_records_envelope_upsert_distinct;\n0 0"
  451. )
  452. class KafkaSinks(Generator):
  453. COUNT = min(Generator.COUNT, 50) # $ kafka-verify-data is slow
  454. MAX_COUNT = 1600 # Too long-running with 3200 sinks
  455. @classmethod
  456. def body(cls) -> None:
  457. print("$ postgres-execute connection=mz_system")
  458. print(f"ALTER SYSTEM SET max_materialized_views = {cls.COUNT * 10};")
  459. print("$ postgres-execute connection=mz_system")
  460. print(f"ALTER SYSTEM SET max_sinks = {cls.COUNT * 10};")
  461. print("$ postgres-execute connection=mz_system")
  462. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  463. for i in cls.all():
  464. print(f"> CREATE MATERIALIZED VIEW v{i} (f1) AS VALUES ({i})")
  465. print(
  466. """> CREATE CONNECTION IF NOT EXISTS csr_conn
  467. FOR CONFLUENT SCHEMA REGISTRY
  468. URL '${testdrive.schema-registry-url}';
  469. """
  470. )
  471. for i in cls.all():
  472. print(
  473. dedent(
  474. f"""
  475. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  476. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  477. > CREATE SINK s{i}
  478. IN CLUSTER single_replica_cluster
  479. FROM v{i}
  480. INTO KAFKA CONNECTION kafka_conn (TOPIC 'kafka-sink-{i}')
  481. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  482. ENVELOPE DEBEZIUM;
  483. $ kafka-verify-topic sink=materialize.public.s{i}
  484. """
  485. )
  486. )
  487. for i in cls.all():
  488. print(
  489. dedent(
  490. f"""
  491. $ kafka-verify-data format=avro sink=materialize.public.s{i}
  492. {{"before": null, "after": {{"row": {{"f1": {i}}}}}}}
  493. """
  494. )
  495. )
  496. class KafkaSinksSameSource(Generator):
  497. COUNT = min(Generator.COUNT, 50) # $ kafka-verify-data is slow
  498. MAX_COUNT = 3200 # Too long-running with 6400 sinks
  499. @classmethod
  500. def body(cls) -> None:
  501. print("$ postgres-execute connection=mz_system")
  502. print(f"ALTER SYSTEM SET max_sinks = {cls.COUNT * 10};")
  503. print("$ postgres-execute connection=mz_system")
  504. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  505. print("> CREATE MATERIALIZED VIEW v1 (f1) AS VALUES (123)")
  506. print(
  507. """> CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);"""
  508. )
  509. print(
  510. """> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}');"""
  511. )
  512. for i in cls.all():
  513. print(
  514. dedent(
  515. f"""
  516. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  517. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  518. > CREATE SINK s{i}
  519. IN CLUSTER single_replica_cluster
  520. FROM v1
  521. INTO KAFKA CONNECTION kafka_conn (TOPIC 'kafka-sink-same-source-{i}')
  522. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  523. ENVELOPE DEBEZIUM
  524. $ kafka-verify-topic sink=materialize.public.s{i}
  525. """
  526. )
  527. )
  528. for i in cls.all():
  529. print(
  530. f'$ kafka-verify-data format=avro sink=materialize.public.s{i}\n{{"before": null, "after": {{"row": {{"f1": 123}}}}}}\n'
  531. )
  532. class Columns(Generator):
  533. @classmethod
  534. def body(cls) -> None:
  535. print(
  536. "> CREATE TABLE t (" + ", ".join(f"f{i} INTEGER" for i in cls.all()) + ");"
  537. )
  538. print("> INSERT INTO t VALUES (" + ", ".join(str(i) for i in cls.all()) + ");")
  539. print(
  540. "> CREATE MATERIALIZED VIEW v AS SELECT "
  541. + ", ".join(f"f{i} + 1 AS f{i}" for i in cls.all())
  542. + " FROM t;"
  543. )
  544. print("> CREATE DEFAULT INDEX ON v")
  545. cls.store_explain_and_run(
  546. "SELECT " + ", ".join(f"f{i} + 1" for i in cls.all()) + " FROM v;"
  547. )
  548. print(" ".join(str(i + 2) for i in cls.all()))
  549. class TablesCommaJoinNoCondition(Generator):
  550. COUNT = 100 # https://github.com/MaterializeInc/database-issues/issues/3682
  551. MAX_COUNT = 200 # Too long-running with 400 conditions
  552. @classmethod
  553. def body(cls) -> None:
  554. print("> CREATE TABLE t1 (f1 INTEGER);")
  555. print("> INSERT INTO t1 VALUES (1);")
  556. table_list = ", ".join(f"t1 as a{i}" for i in cls.all())
  557. cls.store_explain_and_run(f"SELECT * FROM {table_list};")
  558. print(" ".join("1" for i in cls.all()))
  559. class TablesCommaJoinWithJoinCondition(Generator):
  560. COUNT = 20 # Otherwise is very slow
  561. MAX_COUNT = 200 # Too long-running with 400 conditions
  562. @classmethod
  563. def body(cls) -> None:
  564. print("> CREATE TABLE t1 (f1 INTEGER);")
  565. print("> INSERT INTO t1 VALUES (1);")
  566. table_list = ", ".join(f"t1 as a{i}" for i in cls.all())
  567. condition_list = " AND ".join(f"a{i}.f1 = a{i+1}.f1" for i in cls.no_last())
  568. cls.store_explain_and_run(f"SELECT * FROM {table_list} WHERE {condition_list};")
  569. print(" ".join("1" for i in cls.all()))
  570. class TablesCommaJoinWithCondition(Generator):
  571. COUNT = min(Generator.COUNT, 100)
  572. @classmethod
  573. def body(cls) -> None:
  574. print("> CREATE TABLE t1 (f1 INTEGER);")
  575. print("> INSERT INTO t1 VALUES (1);")
  576. table_list = ", ".join(f"t1 as a{i}" for i in cls.all())
  577. condition_list = " AND ".join(f"a1.f1 = a{i}.f1" for i in cls.no_first())
  578. cls.store_explain_and_run(f"SELECT * FROM {table_list} WHERE {condition_list};")
  579. print(" ".join("1" for i in cls.all()))
  580. class TablesOuterJoinUsing(Generator):
  581. COUNT = min(Generator.COUNT, 100) # Otherwise is very slow
  582. @classmethod
  583. def body(cls) -> None:
  584. print("> CREATE TABLE t1 (f1 INTEGER);")
  585. print("> INSERT INTO t1 VALUES (1);")
  586. table_list = " LEFT JOIN ".join(
  587. f"t1 as a{i} USING (f1)" for i in cls.no_first()
  588. )
  589. cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 LEFT JOIN {table_list};")
  590. print("1")
  591. class TablesOuterJoinOn(Generator):
  592. COUNT = min(Generator.COUNT, 100) # Otherwise is very slow
  593. @classmethod
  594. def body(cls) -> None:
  595. print("> CREATE TABLE t1 (f1 INTEGER);")
  596. print("> INSERT INTO t1 VALUES (1);")
  597. table_list = " LEFT JOIN ".join(
  598. f"t1 as a{i} ON (a{i-1}.f1 = a{i}.f1)" for i in cls.no_first()
  599. )
  600. cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 LEFT JOIN {table_list};")
  601. print(" ".join("1" for i in cls.all()))
  602. class SubqueriesScalarSelectListWithCondition(Generator):
  603. COUNT = min(
  604. Generator.COUNT, 100
  605. ) # https://github.com/MaterializeInc/database-issues/issues/2626
  606. @classmethod
  607. def body(cls) -> None:
  608. print("> CREATE TABLE t1 (f1 INTEGER);")
  609. print("> INSERT INTO t1 VALUES (1);")
  610. select_list = ", ".join(
  611. f"(SELECT f1 FROM t1 AS a{i} WHERE a{i}.f1 + 1 = t1.f1 + 1)"
  612. for i in cls.no_first()
  613. )
  614. cls.store_explain_and_run(f"SELECT {select_list} FROM t1;")
  615. print(" ".join("1" for i in cls.no_first()))
  616. class SubqueriesScalarWhereClauseAnd(Generator):
  617. COUNT = min(
  618. Generator.COUNT, 10
  619. ) # https://github.com/MaterializeInc/database-issues/issues/2626
  620. @classmethod
  621. def body(cls) -> None:
  622. print("> CREATE TABLE t1 (f1 INTEGER);")
  623. print("> INSERT INTO t1 VALUES (1);")
  624. where_clause = " AND ".join(
  625. f"(SELECT * FROM t1 WHERE f1 <= {i}) = 1" for i in cls.all()
  626. )
  627. cls.store_explain_and_run(f"SELECT 1 WHERE {where_clause}")
  628. print("1")
  629. class SubqueriesExistWhereClause(Generator):
  630. COUNT = min(
  631. Generator.COUNT, 10
  632. ) # https://github.com/MaterializeInc/database-issues/issues/2626
  633. @classmethod
  634. def body(cls) -> None:
  635. print("> CREATE TABLE t1 (f1 INTEGER);")
  636. print("> INSERT INTO t1 VALUES (1);")
  637. where_clause = " AND ".join(
  638. f"EXISTS (SELECT * FROM t1 WHERE f1 <= {i})" for i in cls.all()
  639. )
  640. cls.store_explain_and_run(f"SELECT 1 WHERE {where_clause}")
  641. print("1")
  642. class SubqueriesInWhereClauseCorrelated(Generator):
  643. COUNT = min(
  644. Generator.COUNT, 10
  645. ) # https://github.com/MaterializeInc/database-issues/issues/6189
  646. @classmethod
  647. def body(cls) -> None:
  648. print("> CREATE TABLE t1 (f1 INTEGER);")
  649. print("> INSERT INTO t1 VALUES (1);")
  650. where_clause = " AND ".join(
  651. f"f1 IN (SELECT * FROM t1 WHERE f1 = a1.f1 AND f1 <= {i})"
  652. for i in cls.all()
  653. )
  654. cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 WHERE {where_clause}")
  655. print("1")
  656. class SubqueriesInWhereClauseUncorrelated(Generator):
  657. COUNT = min(
  658. Generator.COUNT, 10
  659. ) # https://github.com/MaterializeInc/database-issues/issues/6189
  660. @classmethod
  661. def body(cls) -> None:
  662. print("> CREATE TABLE t1 (f1 INTEGER);")
  663. print("> INSERT INTO t1 VALUES (1);")
  664. where_clause = " AND ".join(
  665. f"f1 IN (SELECT * FROM t1 WHERE f1 <= {i})" for i in cls.all()
  666. )
  667. cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 WHERE {where_clause}")
  668. print("1")
  669. class SubqueriesWhereClauseOr(Generator):
  670. COUNT = min(
  671. Generator.COUNT, 10
  672. ) # https://github.com/MaterializeInc/database-issues/issues/2630
  673. MAX_COUNT = 160 # Too long-running with count=320
  674. @classmethod
  675. def body(cls) -> None:
  676. print("> CREATE TABLE t1 (f1 INTEGER);")
  677. print("> INSERT INTO t1 VALUES (1);")
  678. where_clause = " OR ".join(
  679. f"(SELECT * FROM t1 WHERE f1 = {i}) = 1" for i in cls.all()
  680. )
  681. cls.store_explain_and_run(f"SELECT 1 WHERE {where_clause}")
  682. print("1")
  683. class SubqueriesNested(Generator):
  684. COUNT = min(
  685. Generator.COUNT, 40
  686. ) # Otherwise we exceed the 128 limit to nested expressions
  687. @classmethod
  688. def body(cls) -> None:
  689. print("> CREATE TABLE t1 (f1 INTEGER);")
  690. print("> INSERT INTO t1 VALUES (1);")
  691. cls.store_explain_and_run(
  692. "SELECT 1 WHERE 1 = "
  693. + " ".join(" (SELECT * FROM t1 WHERE f1 = " for i in cls.all())
  694. + " 1"
  695. + "".join(" )" for i in cls.all())
  696. )
  697. print("1")
  698. class ViewsNested(Generator):
  699. COUNT = min(
  700. Generator.COUNT, 10
  701. ) # https://github.com/MaterializeInc/database-issues/issues/2626
  702. @classmethod
  703. def body(cls) -> None:
  704. print("$ postgres-execute connection=mz_system")
  705. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  706. print("> CREATE TABLE t (f1 INTEGER);")
  707. print("> INSERT INTO t VALUES (0);")
  708. print("> CREATE VIEW v0 (f1) AS SELECT f1 FROM t;")
  709. for i in cls.all():
  710. print(f"> CREATE VIEW v{i} AS SELECT f1 + 1 AS f1 FROM v{i-1};")
  711. cls.store_explain_and_run(f"SELECT * FROM v{cls.COUNT};")
  712. print(f"{cls.COUNT}")
  713. class ViewsMaterializedNested(Generator):
  714. COUNT = min(
  715. Generator.COUNT, 25
  716. ) # https://github.com/MaterializeInc/database-issues/issues/3958
  717. MAX_COUNT = 400 # Too long-running with 800 views
  718. @classmethod
  719. def body(cls) -> None:
  720. print("$ set-sql-timeout duration=300s")
  721. print("$ postgres-execute connection=mz_system")
  722. print(f"ALTER SYSTEM SET max_materialized_views = {cls.COUNT * 10};")
  723. print("$ postgres-execute connection=mz_system")
  724. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  725. print("> CREATE TABLE t (f1 INTEGER);")
  726. print("> INSERT INTO t VALUES (0);")
  727. print("> CREATE MATERIALIZED VIEW v0 (f1) AS SELECT f1 FROM t;")
  728. for i in cls.all():
  729. print(
  730. f"> CREATE MATERIALIZED VIEW v{i} AS SELECT f1 + 1 AS f1 FROM v{i-1};"
  731. )
  732. cls.store_explain_and_run(f"SELECT * FROM v{cls.COUNT};")
  733. print(f"{cls.COUNT}")
  734. class CTEs(Generator):
  735. COUNT = min(
  736. Generator.COUNT, 10
  737. ) # https://github.com/MaterializeInc/database-issues/issues/2628
  738. MAX_COUNT = 240 # Too long-running with count=480
  739. @classmethod
  740. def body(cls) -> None:
  741. print("> CREATE TABLE t1 (f1 INTEGER);")
  742. print("> INSERT INTO t1 VALUES (1);")
  743. cte_list = ", ".join(
  744. f"a{i} AS (SELECT * FROM t1 WHERE f1 = 1)" for i in cls.all()
  745. )
  746. table_list = ", ".join(f"a{i}" for i in cls.all())
  747. cls.store_explain_and_run(f"WITH {cte_list} SELECT * FROM {table_list}")
  748. print(" ".join("1" for i in cls.all()))
  749. class NestedCTEsIndependent(Generator):
  750. COUNT = min(
  751. Generator.COUNT, 9
  752. ) # https://github.com/MaterializeInc/database-issues/issues/2628 and https://github.com/MaterializeInc/database-issues/issues/7830
  753. @classmethod
  754. def body(cls) -> None:
  755. print("> CREATE TABLE t1 (f1 INTEGER);")
  756. print("> INSERT INTO t1 VALUES " + ", ".join(f"({i})" for i in cls.all()))
  757. cte_list = ", ".join(
  758. f"a{i} AS (SELECT f1 + 1 AS f1 FROM a{i-1} WHERE f1 <= {i})"
  759. for i in cls.no_first()
  760. )
  761. table_list = ", ".join(f"a{i}" for i in cls.all())
  762. cls.store_explain_and_run(
  763. f"WITH a{1} AS (SELECT * FROM t1 WHERE f1 <= 1), {cte_list} SELECT * FROM {table_list}"
  764. )
  765. print(" ".join(f"{a}" for a in cls.all()))
  766. class NestedCTEsChained(Generator):
  767. COUNT = min(
  768. Generator.COUNT, 10
  769. ) # https://github.com/MaterializeInc/database-issues/issues/2629
  770. @classmethod
  771. def body(cls) -> None:
  772. print("> CREATE TABLE t1 (f1 INTEGER);")
  773. print("> INSERT INTO t1 VALUES (1)")
  774. cte_list = ", ".join(
  775. f"a{i} AS (SELECT a{i-1}.f1 + 0 AS f1 FROM a{i-1}, t1 WHERE a{i-1}.f1 = t1.f1)"
  776. for i in cls.no_first()
  777. )
  778. cls.store_explain_and_run(
  779. f"WITH a{1} AS (SELECT * FROM t1), {cte_list} SELECT * FROM a{cls.COUNT}"
  780. )
  781. print("1")
  782. class DerivedTables(Generator):
  783. COUNT = min(
  784. Generator.COUNT, 10
  785. ) # https://github.com/MaterializeInc/database-issues/issues/2630
  786. MAX_COUNT = 160 # Too long-running with count=320
  787. @classmethod
  788. def body(cls) -> None:
  789. print("> CREATE TABLE t1 (f1 INTEGER);")
  790. print("> INSERT INTO t1 VALUES (1)")
  791. table_list = ", ".join(
  792. f"(SELECT t1.f1 + {i} AS f1 FROM t1 WHERE f1 <= {i}) AS a{i}"
  793. for i in cls.all()
  794. )
  795. cls.store_explain_and_run(f"SELECT * FROM {table_list};")
  796. print(" ".join(f"{i+1}" for i in cls.all()))
  797. class Lateral(Generator):
  798. COUNT = min(
  799. Generator.COUNT, 10
  800. ) # https://github.com/MaterializeInc/database-issues/issues/2631
  801. MAX_COUNT = 160 # Too long-running with count=320
  802. @classmethod
  803. def body(cls) -> None:
  804. print("> CREATE TABLE t1 (f1 INTEGER);")
  805. print("> INSERT INTO t1 VALUES (1)")
  806. table_list = ", LATERAL ".join(
  807. f"(SELECT t1.f1 + {i-1} AS f1 FROM t1 WHERE f1 <= a{i-1}.f1) AS a{i}"
  808. for i in cls.no_first()
  809. )
  810. cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 , LATERAL {table_list};")
  811. print(" ".join(f"{i}" for i in cls.all()))
  812. class SelectExpression(Generator):
  813. # Stack exhaustion with COUNT=1000 due to unprotected path:
  814. # https://github.com/MaterializeInc/database-issues/issues/3107
  815. COUNT = min(Generator.COUNT, 500)
  816. @classmethod
  817. def body(cls) -> None:
  818. column_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  819. print(f"> CREATE TABLE t1 ({column_list});")
  820. value_list = ", ".join("1" for i in cls.all())
  821. print(f"> INSERT INTO t1 VALUES ({value_list});")
  822. const_expression = " + ".join(f"{i}" for i in cls.all())
  823. cls.store_explain_and_run(f"SELECT {const_expression} FROM t1;")
  824. print(f"{sum(cls.all())}")
  825. expression = " + ".join(f"f{i}" for i in cls.all())
  826. cls.store_explain_and_run(f"SELECT {expression} FROM t1;")
  827. print(f"{cls.COUNT}")
  828. class WhereExpression(Generator):
  829. # Stack exhaustion with COUNT=1000 due to unprotected path:
  830. # https://github.com/MaterializeInc/database-issues/issues/3107
  831. COUNT = min(Generator.COUNT, 500)
  832. @classmethod
  833. def body(cls) -> None:
  834. print("> SET statement_timeout='120s'")
  835. column_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  836. print(f"> CREATE TABLE t1 ({column_list});")
  837. value_list = ", ".join("1" for i in cls.all())
  838. print(f"> INSERT INTO t1 VALUES ({value_list});")
  839. expression = " + ".join(f"{i}" for i in cls.all())
  840. cls.store_explain_and_run(
  841. f"SELECT f1 FROM t1 WHERE {expression} = {sum(cls.all())};"
  842. )
  843. print("1")
  844. class WhereConditionAnd(Generator):
  845. # Stack overflow, see https://github.com/MaterializeInc/database-issues/issues/5731
  846. COUNT = min(Generator.COUNT, 500)
  847. @classmethod
  848. def body(cls) -> None:
  849. column_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  850. print(f"> CREATE TABLE t1 ({column_list});")
  851. value_list = ", ".join("1" for i in cls.all())
  852. print(f"> INSERT INTO t1 VALUES ({value_list});")
  853. where_condition = " AND ".join(f"f{i} = 1" for i in cls.all())
  854. cls.store_explain_and_run(f"SELECT f1 FROM t1 WHERE {where_condition};")
  855. print("1")
  856. class WhereConditionAndSameColumn(Generator):
  857. # Stack overflow, see https://github.com/MaterializeInc/database-issues/issues/5731
  858. COUNT = min(Generator.COUNT, 500)
  859. @classmethod
  860. def body(cls) -> None:
  861. print("> CREATE TABLE t1 (f1 INTEGER);")
  862. print("> INSERT INTO t1 VALUES (1);")
  863. where_condition = " AND ".join(f"f1 <= {i}" for i in cls.all())
  864. cls.store_explain_and_run(f"SELECT f1 FROM t1 WHERE {where_condition};")
  865. print("1")
  866. class WhereConditionOr(Generator):
  867. # Stack overflow, see https://github.com/MaterializeInc/database-issues/issues/5731
  868. COUNT = min(Generator.COUNT, 500)
  869. @classmethod
  870. def body(cls) -> None:
  871. create_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  872. print(f"> CREATE TABLE t1 ({create_list});")
  873. value_list = ", ".join("1" for i in cls.all())
  874. print(f"> INSERT INTO t1 VALUES ({value_list});")
  875. where_condition = " OR ".join(f"f{i} = 1" for i in cls.all())
  876. cls.store_explain_and_run(f"SELECT f1 FROM t1 WHERE {where_condition};")
  877. print("1")
  878. class WhereConditionOrSameColumn(Generator):
  879. # Stack overflow, see https://github.com/MaterializeInc/database-issues/issues/5731
  880. COUNT = min(Generator.COUNT, 500)
  881. @classmethod
  882. def body(cls) -> None:
  883. print("> CREATE TABLE t1 (f1 INTEGER);")
  884. print("> INSERT INTO t1 VALUES (1);")
  885. where_condition = " OR ".join(f"f1 = {i}" for i in cls.all())
  886. cls.store_explain_and_run(f"SELECT f1 FROM t1 WHERE {where_condition};")
  887. print("1")
  888. class InList(Generator):
  889. @classmethod
  890. def body(cls) -> None:
  891. print("> CREATE TABLE t1 (f1 INTEGER);")
  892. print(f"> INSERT INTO t1 VALUES ({cls.COUNT})")
  893. in_list = ", ".join(f"{i}" for i in cls.all())
  894. cls.store_explain_and_run(f"SELECT * FROM t1 WHERE f1 IN ({in_list})")
  895. print(f"{cls.COUNT}")
  896. class JoinUsing(Generator):
  897. COUNT = min(Generator.COUNT, 10) # Slow
  898. @classmethod
  899. def body(cls) -> None:
  900. create_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  901. print(f"> CREATE TABLE t1 ({create_list});")
  902. value_list = ", ".join("1" for i in cls.all())
  903. print(f"> INSERT INTO t1 VALUES ({value_list});")
  904. column_list = ", ".join(f"f{i}" for i in cls.all())
  905. cls.store_explain_and_run(
  906. f"SELECT * FROM t1 AS a1 LEFT JOIN t1 AS a2 USING ({column_list})"
  907. )
  908. print(" ".join("1" for i in cls.all()))
  909. class JoinOn(Generator):
  910. COUNT = min(Generator.COUNT, 10) # Slow
  911. @classmethod
  912. def body(cls) -> None:
  913. create_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  914. print(f"> CREATE TABLE t1 ({create_list});")
  915. value_list = ", ".join("1" for i in cls.all())
  916. print(f"> INSERT INTO t1 VALUES ({value_list});")
  917. on_clause = " AND ".join(f"a1.f{i} = a2.f1" for i in cls.all())
  918. cls.store_explain_and_run(
  919. f"SELECT COUNT(*) FROM t1 AS a1 LEFT JOIN t1 AS a2 ON({on_clause})"
  920. )
  921. print("1")
  922. class Aggregates(Generator):
  923. @classmethod
  924. def body(cls) -> None:
  925. create_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  926. print(f"> CREATE TABLE t1 ({create_list});")
  927. value_list = ", ".join("1" for i in cls.all())
  928. print(f"> INSERT INTO t1 VALUES ({value_list});")
  929. aggregate_list = ", ".join(f"AVG(f{i})" for i in cls.all())
  930. cls.store_explain_and_run(f"SELECT {aggregate_list} FROM t1")
  931. print(" ".join("1" for i in cls.all()))
  932. class AggregateExpression(Generator):
  933. # Stack exhaustion with COUNT=1000 due to unprotected path:
  934. # https://github.com/MaterializeInc/database-issues/issues/3107
  935. COUNT = min(Generator.COUNT, 500)
  936. @classmethod
  937. def body(cls) -> None:
  938. create_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  939. print(f"> CREATE TABLE t1 ({create_list});")
  940. value_list = ", ".join("1" for i in cls.all())
  941. print(f"> INSERT INTO t1 VALUES ({value_list});")
  942. aggregate_expr = " + ".join(f"f{i}" for i in cls.all())
  943. cls.store_explain_and_run(f"SELECT AVG({aggregate_expr}) FROM t1")
  944. print(cls.COUNT)
  945. class GroupBy(Generator):
  946. @classmethod
  947. def body(cls) -> None:
  948. create_list = ", ".join(f"f{i} INTEGER" for i in cls.all())
  949. print(f"> CREATE TABLE t1 ({create_list});")
  950. value_list = ", ".join("1" for i in cls.all())
  951. print(f"> INSERT INTO t1 VALUES ({value_list});")
  952. column_list_select = ", ".join(f"f{i} + 1 AS f{i}" for i in cls.all())
  953. column_list_group_by = ", ".join(f"f{i} + 1" for i in cls.all())
  954. print(
  955. f"> CREATE MATERIALIZED VIEW v AS SELECT COUNT(*), {column_list_select} FROM t1 GROUP BY {column_list_group_by};"
  956. )
  957. print("> CREATE DEFAULT INDEX ON v")
  958. cls.store_explain_and_run("SELECT * FROM v")
  959. print("1 " + " ".join("2" for i in cls.all()))
  960. class Unions(Generator):
  961. COUNT = min(
  962. Generator.COUNT, 10
  963. ) # https://github.com/MaterializeInc/database-issues/issues/2628
  964. @classmethod
  965. def body(cls) -> None:
  966. print("> CREATE TABLE t1 (f1 INTEGER);")
  967. print("> INSERT INTO t1 VALUES (0)")
  968. union_list = " UNION DISTINCT ".join(
  969. f"(SELECT f1 + {i} FROM t1 AS a{i})" for i in cls.all()
  970. )
  971. cls.store_explain_and_run(f"SELECT COUNT(*) FROM ({union_list})")
  972. print(f"{cls.COUNT}")
  973. class UnionsNested(Generator):
  974. COUNT = min(
  975. Generator.COUNT, 40
  976. ) # Otherwise we exceed the 128 limit to nested expressions
  977. @classmethod
  978. def body(cls) -> None:
  979. print("> CREATE TABLE t1 (f1 INTEGER);")
  980. print("> INSERT INTO t1 VALUES (1)")
  981. cls.store_explain_and_run(
  982. "SELECT f1 + 0 FROM t1 UNION DISTINCT "
  983. + "\n".join(
  984. f" (SELECT f1 + {i} - {i} FROM t1 UNION DISTINCT " for i in cls.all()
  985. )
  986. + "\n"
  987. + " SELECT * FROM t1 "
  988. + "".join(" )" for i in cls.all())
  989. )
  990. print("1")
  991. class CaseWhen(Generator):
  992. # Originally this was working with 1000, but after moving lowering and
  993. # decorrelation from the `plan_~` to the `sequence_~` method we had to
  994. # reduce it a bit in order to avoid overflowing the stack. See database-issues#7216
  995. # and database-issues#7407 for the latest occurrences of this.
  996. COUNT = 600
  997. @classmethod
  998. def body(cls) -> None:
  999. print(
  1000. "> CREATE TABLE t (" + ", ".join(f"f{i} INTEGER" for i in cls.all()) + ");"
  1001. )
  1002. print("> INSERT INTO t DEFAULT VALUES")
  1003. print(
  1004. "> CREATE MATERIALIZED VIEW v AS SELECT CASE "
  1005. + " ".join(f"WHEN f{i} IS NOT NULL THEN f{i}" for i in cls.all())
  1006. + " ELSE 123 END FROM t"
  1007. )
  1008. print("> CREATE DEFAULT INDEX ON v")
  1009. cls.store_explain_and_run("SELECT * FROM v")
  1010. print("123")
  1011. class Coalesce(Generator):
  1012. @classmethod
  1013. def body(cls) -> None:
  1014. print(
  1015. "> CREATE TABLE t (" + ", ".join(f"f{i} INTEGER" for i in cls.all()) + ");"
  1016. )
  1017. print("> INSERT INTO t DEFAULT VALUES")
  1018. print(
  1019. "> CREATE MATERIALIZED VIEW v AS SELECT COALESCE("
  1020. + ",".join(f"f{i}" for i in cls.all())
  1021. + ", 123) FROM t"
  1022. )
  1023. print("> CREATE DEFAULT INDEX ON v")
  1024. cls.store_explain_and_run("SELECT * FROM v")
  1025. print("123")
  1026. class Concat(Generator):
  1027. MAX_COUNT = 250_000 # Too long-running with 500_000
  1028. @classmethod
  1029. def body(cls) -> None:
  1030. print("> CREATE TABLE t (f STRING)")
  1031. print("> INSERT INTO t VALUES (REPEAT('A', 1024))")
  1032. print(
  1033. "> CREATE MATERIALIZED VIEW v AS SELECT CONCAT("
  1034. + ",".join("f" for i in cls.all())
  1035. + ") AS c FROM t"
  1036. )
  1037. print("> CREATE DEFAULT INDEX ON v")
  1038. cls.store_explain_and_run("SELECT LENGTH(c) FROM v")
  1039. print(f"{cls.COUNT*1024}")
  1040. class ArrayAgg(Generator):
  1041. COUNT = 50
  1042. @classmethod
  1043. def body(cls) -> None:
  1044. print("> SET statement_timeout='300s'")
  1045. print(
  1046. f"""> CREATE TABLE t ({
  1047. ", ".join(
  1048. ", ".join([
  1049. f"a{i} STRING",
  1050. f"b{i} STRING",
  1051. f"c{i} STRING",
  1052. f"d{i} STRING[]",
  1053. ])
  1054. for i in cls.all()
  1055. )
  1056. });"""
  1057. )
  1058. print("> INSERT INTO t DEFAULT VALUES;")
  1059. print(
  1060. f"""> CREATE MATERIALIZED VIEW v2 AS SELECT {
  1061. ", ".join(
  1062. f"ARRAY_AGG(a{i} ORDER BY b1) FILTER (WHERE 's{i}' = ANY(d{i})) AS r{i}"
  1063. for i in cls.all()
  1064. )
  1065. } FROM t GROUP BY a1;"""
  1066. )
  1067. print("> CREATE DEFAULT INDEX ON v2;")
  1068. cls.store_explain_and_run("SELECT COUNT(*) FROM v2")
  1069. print("1")
  1070. class FilterSubqueries(Generator):
  1071. """
  1072. Regression test for database-issues#6189.
  1073. Without the database-issues#6189 fix in materialize#20702 this will cause `environmend` to OOM
  1074. because of excessive memory allocations in the `RedundantJoin` transform.
  1075. """
  1076. COUNT = min(Generator.COUNT, 100)
  1077. MAX_COUNT = 111 # Too long-running with count=200
  1078. @classmethod
  1079. def body(cls) -> None:
  1080. print("> CREATE TABLE t1 (f1 INTEGER);")
  1081. print("> INSERT INTO t1 VALUES (1);")
  1082. # Increase SQL timeout to 10 minutes (~5 should be enough).
  1083. #
  1084. # Update: Now 20 minutes, not 10. This query appears to scale
  1085. # super-linear with COUNT.
  1086. print("$ set-sql-timeout duration=1200s")
  1087. cls.store_explain_and_run(
  1088. f"SELECT * FROM t1 AS a1 WHERE {' AND '.join(f'f1 IN (SELECT * FROM t1 WHERE f1 = a1.f1 AND f1 <= {i})' for i in cls.all())}"
  1089. )
  1090. print("1")
  1091. #
  1092. # Column width
  1093. #
  1094. class Column(Generator):
  1095. COUNT = 100_000_000
  1096. @classmethod
  1097. def body(cls) -> None:
  1098. print("> CREATE TABLE t1 (f1 STRING);")
  1099. print(f"> INSERT INTO t1 VALUES (REPEAT('x', {cls.COUNT}));")
  1100. print("> SELECT COUNT(DISTINCT f1) FROM t1;")
  1101. print("1")
  1102. cls.store_explain_and_run("SELECT LENGTH(f1) FROM t1")
  1103. print(f"{cls.COUNT}")
  1104. #
  1105. # Table size
  1106. #
  1107. class Rows(Generator):
  1108. COUNT = 10_000_000
  1109. @classmethod
  1110. def body(cls) -> None:
  1111. print("> SET statement_timeout='60s'")
  1112. print("> CREATE TABLE t1 (f1 INTEGER);")
  1113. print(f"> INSERT INTO t1 SELECT * FROM generate_series(1, {cls.COUNT})")
  1114. cls.store_explain_and_run("SELECT COUNT(*) FROM t1")
  1115. print(f"{cls.COUNT}")
  1116. class RowsAggregate(Generator):
  1117. COUNT = 1_000_000
  1118. @classmethod
  1119. def body(cls) -> None:
  1120. cls.store_explain_and_run(
  1121. f"SELECT COUNT(*), MIN(generate_series), MAX(generate_series), COUNT(DISTINCT generate_series) FROM generate_series(1, {cls.COUNT})"
  1122. )
  1123. print(f"{cls.COUNT} 1 {cls.COUNT} {cls.COUNT}")
  1124. class RowsOrderByLimit(Generator):
  1125. COUNT = 10_000_000
  1126. MAX_COUNT = 80_000_000 # Too long-running with 160_000_000
  1127. @classmethod
  1128. def body(cls) -> None:
  1129. cls.store_explain_and_run(
  1130. f"SELECT * FROM generate_series(1, {cls.COUNT}) ORDER BY generate_series DESC LIMIT 1"
  1131. )
  1132. print(f"{cls.COUNT}")
  1133. class RowsJoinOneToOne(Generator):
  1134. COUNT = 10_000_000
  1135. @classmethod
  1136. def body(cls) -> None:
  1137. print(
  1138. f"> CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {cls.COUNT});"
  1139. )
  1140. cls.store_explain_and_run(
  1141. "SELECT COUNT(*) FROM v1 AS a1, v1 AS a2 WHERE a1.generate_series = a2.generate_series"
  1142. )
  1143. print(f"{cls.COUNT}")
  1144. class RowsJoinOneToMany(Generator):
  1145. COUNT = 10_000_000
  1146. MAX_COUNT = 80_000_000 # Too long-running with 160_000_000
  1147. @classmethod
  1148. def body(cls) -> None:
  1149. print(
  1150. f"> CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {cls.COUNT});"
  1151. )
  1152. cls.store_explain_and_run("SELECT COUNT(*) FROM v1 AS a1, (SELECT 1) AS a2")
  1153. print(f"{cls.COUNT}")
  1154. class RowsJoinCross(Generator):
  1155. COUNT = 1_000_000
  1156. MAX_COUNT = 64_000_000 # Too long-running with 128_000_000
  1157. @classmethod
  1158. def body(cls) -> None:
  1159. print(
  1160. f"> CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {cls.COUNT});"
  1161. )
  1162. cls.store_explain_and_run("SELECT COUNT(*) FROM v1 AS a1, v1 AS a2")
  1163. print(f"{cls.COUNT**2}")
  1164. class RowsJoinLargeRetraction(Generator):
  1165. COUNT = 1_000_000
  1166. @classmethod
  1167. def body(cls) -> None:
  1168. print("> SET statement_timeout='60s'")
  1169. print("> CREATE TABLE t1 (f1 INTEGER);")
  1170. print(f"> INSERT INTO t1 SELECT * FROM generate_series(1, {cls.COUNT});")
  1171. print(
  1172. "> CREATE MATERIALIZED VIEW v1 AS SELECT a1.f1 AS col1 , a2.f1 AS col2 FROM t1 AS a1, t1 AS a2 WHERE a1.f1 = a2.f1;"
  1173. )
  1174. print("> SELECT COUNT(*) > 0 FROM v1;")
  1175. print("true")
  1176. print("> DELETE FROM t1")
  1177. cls.store_explain_and_run("SELECT COUNT(*) FROM v1")
  1178. print("0")
  1179. class RowsJoinDifferential(Generator):
  1180. COUNT = 1_000_000
  1181. @classmethod
  1182. def body(cls) -> None:
  1183. print(
  1184. f"> CREATE MATERIALIZED VIEW v1 AS SELECT generate_series AS f1, generate_series AS f2 FROM (SELECT * FROM generate_series(1, {cls.COUNT}));"
  1185. )
  1186. cls.store_explain_and_run(
  1187. "SELECT COUNT(*) FROM v1 AS a1, v1 AS a2 WHERE a1.f1 = a2.f1"
  1188. )
  1189. print(f"{cls.COUNT}")
  1190. class RowsJoinOuter(Generator):
  1191. COUNT = 1_000_000
  1192. @classmethod
  1193. def body(cls) -> None:
  1194. print(
  1195. f"> CREATE MATERIALIZED VIEW v1 AS SELECT generate_series AS f1, generate_series AS f2 FROM (SELECT * FROM generate_series(1, {cls.COUNT}));"
  1196. )
  1197. cls.store_explain_and_run(
  1198. "SELECT COUNT(*) FROM v1 AS a1 LEFT JOIN v1 AS a2 USING (f1)"
  1199. )
  1200. print(f"{cls.COUNT}")
  1201. class PostgresSources(Generator):
  1202. COUNT = 300 # high memory consumption, slower with source tables
  1203. MAX_COUNT = 600 # Too long-running with count=1200
  1204. @classmethod
  1205. def body(cls) -> None:
  1206. print("> SET statement_timeout='300s'")
  1207. print("$ postgres-execute connection=mz_system")
  1208. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  1209. print("$ postgres-execute connection=mz_system")
  1210. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  1211. print("$ postgres-execute connection=mz_system")
  1212. print(f"ALTER SYSTEM SET max_tables = {cls.COUNT * 10};")
  1213. print("$ postgres-execute connection=postgres://postgres:postgres@postgres")
  1214. print("ALTER USER postgres WITH replication;")
  1215. print("DROP SCHEMA IF EXISTS public CASCADE;")
  1216. print("DROP PUBLICATION IF EXISTS mz_source;")
  1217. print("CREATE SCHEMA public;")
  1218. for i in cls.all():
  1219. print(f"CREATE TABLE t{i} (c int);")
  1220. print(f"ALTER TABLE t{i} REPLICA IDENTITY FULL;")
  1221. print(f"INSERT INTO t{i} VALUES ({i});")
  1222. print("CREATE PUBLICATION mz_source FOR ALL TABLES;")
  1223. print("> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'")
  1224. print(
  1225. """> CREATE CONNECTION pg TO POSTGRES (
  1226. HOST postgres,
  1227. DATABASE postgres,
  1228. USER postgres,
  1229. PASSWORD SECRET pgpass
  1230. )"""
  1231. )
  1232. for i in cls.all():
  1233. print(
  1234. f"""> CREATE SOURCE p{i}
  1235. IN CLUSTER single_replica_cluster
  1236. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  1237. """
  1238. )
  1239. print(
  1240. f"""> CREATE TABLE t{i}
  1241. FROM SOURCE p{i} (REFERENCE t{i})
  1242. """
  1243. )
  1244. for i in cls.all():
  1245. cls.store_explain_and_run(f"SELECT * FROM t{i}")
  1246. print(f"{i}")
  1247. class PostgresTables(Generator):
  1248. @classmethod
  1249. def body(cls) -> None:
  1250. print("> SET statement_timeout='300s'")
  1251. print("$ postgres-execute connection=mz_system")
  1252. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  1253. print("$ postgres-execute connection=mz_system")
  1254. print(f"ALTER SYSTEM SET max_tables = {cls.COUNT * 10};")
  1255. print("$ postgres-execute connection=mz_system")
  1256. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  1257. print("$ postgres-execute connection=postgres://postgres:postgres@postgres")
  1258. print("ALTER USER postgres WITH replication;")
  1259. print("DROP SCHEMA IF EXISTS public CASCADE;")
  1260. print("DROP PUBLICATION IF EXISTS mz_source;")
  1261. print("CREATE SCHEMA public;")
  1262. for i in cls.all():
  1263. print(f"CREATE TABLE t{i} (c int);")
  1264. print(f"ALTER TABLE t{i} REPLICA IDENTITY FULL;")
  1265. print(f"INSERT INTO t{i} VALUES ({i});")
  1266. print("CREATE PUBLICATION mz_source FOR ALL TABLES;")
  1267. print("> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'")
  1268. print(
  1269. """> CREATE CONNECTION pg TO POSTGRES (
  1270. HOST postgres,
  1271. DATABASE postgres,
  1272. USER postgres,
  1273. PASSWORD SECRET pgpass
  1274. )"""
  1275. )
  1276. print(
  1277. """> CREATE SOURCE p
  1278. IN CLUSTER single_worker_cluster
  1279. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
  1280. FOR ALL TABLES
  1281. """
  1282. )
  1283. print("> SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';")
  1284. for i in cls.all():
  1285. cls.store_explain_and_run(f"SELECT * FROM t{i}")
  1286. print(f"{i}")
  1287. class PostgresTablesOldSyntax(Generator):
  1288. @classmethod
  1289. def body(cls) -> None:
  1290. print("> SET statement_timeout='300s'")
  1291. print("$ postgres-execute connection=mz_system")
  1292. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  1293. print("$ postgres-execute connection=mz_system")
  1294. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  1295. print("$ postgres-execute connection=mz_system")
  1296. print(f"ALTER SYSTEM SET max_tables = {cls.COUNT * 10};")
  1297. print("$ postgres-execute connection=postgres://postgres:postgres@postgres")
  1298. print("ALTER USER postgres WITH replication;")
  1299. print("DROP SCHEMA IF EXISTS public CASCADE;")
  1300. print("DROP PUBLICATION IF EXISTS mz_source;")
  1301. print("CREATE SCHEMA public;")
  1302. for i in cls.all():
  1303. print(f"CREATE TABLE t{i} (c int);")
  1304. print(f"ALTER TABLE t{i} REPLICA IDENTITY FULL;")
  1305. print(f"INSERT INTO t{i} VALUES ({i});")
  1306. print("CREATE PUBLICATION mz_source FOR ALL TABLES;")
  1307. print("> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'")
  1308. print(
  1309. """> CREATE CONNECTION pg TO POSTGRES (
  1310. HOST postgres,
  1311. DATABASE postgres,
  1312. USER postgres,
  1313. PASSWORD SECRET pgpass
  1314. )"""
  1315. )
  1316. print(
  1317. """> CREATE SOURCE p
  1318. IN CLUSTER single_replica_cluster
  1319. FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR SCHEMAS (public)
  1320. """
  1321. )
  1322. print("> SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';")
  1323. for i in cls.all():
  1324. print("$ set-sql-timeout duration=300s")
  1325. cls.store_explain_and_run(f"SELECT * FROM t{i}")
  1326. print(f"{i}")
  1327. class MySqlSources(Generator):
  1328. COUNT = 300 # high memory consumption, slower with source tables
  1329. MAX_COUNT = 400 # Too long-running with count=473
  1330. @classmethod
  1331. def body(cls) -> None:
  1332. print("$ set-sql-timeout duration=300s")
  1333. print("$ postgres-execute connection=mz_system")
  1334. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  1335. print("$ postgres-execute connection=mz_system")
  1336. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  1337. print("$ postgres-execute connection=mz_system")
  1338. print(f"ALTER SYSTEM SET max_tables = {cls.COUNT * 10};")
  1339. print(
  1340. f"$ mysql-connect name=mysql url=mysql://root@mysql password={MySql.DEFAULT_ROOT_PASSWORD}"
  1341. )
  1342. print("$ mysql-execute name=mysql")
  1343. print(f"SET GLOBAL max_connections={cls.COUNT * 2 + 2}")
  1344. print("DROP DATABASE IF EXISTS public;")
  1345. print("CREATE DATABASE public;")
  1346. print("USE public;")
  1347. for i in cls.all():
  1348. print(f"CREATE TABLE t{i} (c int);")
  1349. print(f"INSERT INTO t{i} VALUES ({i});")
  1350. print(
  1351. f"> CREATE SECRET IF NOT EXISTS mysqlpass AS '{MySql.DEFAULT_ROOT_PASSWORD}'"
  1352. )
  1353. print(
  1354. """> CREATE CONNECTION mysql TO MYSQL (
  1355. HOST mysql,
  1356. USER root,
  1357. PASSWORD SECRET mysqlpass
  1358. )"""
  1359. )
  1360. for i in cls.all():
  1361. print(
  1362. f"""> CREATE SOURCE m{i}
  1363. IN CLUSTER single_replica_cluster
  1364. FROM MYSQL CONNECTION mysql
  1365. """
  1366. )
  1367. print(
  1368. f"""> CREATE TABLE t{i}
  1369. FROM SOURCE m{i} (REFERENCE public.t{i})
  1370. """
  1371. )
  1372. for i in cls.all():
  1373. cls.store_explain_and_run(f"SELECT * FROM t{i}")
  1374. print(f"{i}")
  1375. class WebhookSources(Generator):
  1376. COUNT = 100 # TODO: Remove when database-issues#8508 is fixed
  1377. MAX_COUNT = 400 # timeout expired with count=800
  1378. @classmethod
  1379. def body(cls) -> None:
  1380. print("$ postgres-execute connection=mz_system")
  1381. print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
  1382. print("$ postgres-execute connection=mz_system")
  1383. print(f"ALTER SYSTEM SET max_objects_per_schema = {cls.COUNT * 10};")
  1384. for i in cls.all():
  1385. print(
  1386. f"> CREATE SOURCE w{i} IN CLUSTER single_replica_cluster FROM WEBHOOK BODY FORMAT TEXT;"
  1387. )
  1388. for i in cls.all():
  1389. print(f"$ webhook-append database=materialize schema=public name=w{i}")
  1390. print(f"text{i}")
  1391. for i in cls.all():
  1392. cls.store_explain_and_run(f"SELECT * FROM w{i}")
  1393. print(f"text{i}")
  1394. TENANT_ID = str(uuid.uuid4())
  1395. ADMIN_USER = "u1@example.com"
  1396. ADMIN_ROLE = "MaterializePlatformAdmin"
  1397. OTHER_ROLE = "MaterializePlatform"
  1398. USERS = {
  1399. ADMIN_USER: {
  1400. "email": ADMIN_USER,
  1401. "password": str(uuid.uuid4()),
  1402. "id": str(uuid.uuid4()),
  1403. "tenant_id": TENANT_ID,
  1404. "initial_api_tokens": [
  1405. {
  1406. "client_id": str(uuid.uuid4()),
  1407. "secret": str(uuid.uuid4()),
  1408. }
  1409. ],
  1410. "roles": [OTHER_ROLE, ADMIN_ROLE],
  1411. }
  1412. }
  1413. FRONTEGG_URL = "http://frontegg-mock:6880"
  1414. def app_password(email: str) -> str:
  1415. api_token = USERS[email]["initial_api_tokens"][0]
  1416. password = f"mzp_{api_token['client_id']}{api_token['secret']}".replace("-", "")
  1417. return password
  1418. MAX_CLUSTERS = 8
  1419. MAX_REPLICAS = 4
  1420. MAX_NODES = 4
  1421. SERVICES = [
  1422. Zookeeper(),
  1423. Kafka(),
  1424. Postgres(
  1425. max_wal_senders=Generator.COUNT,
  1426. max_replication_slots=Generator.COUNT,
  1427. volumes=["sourcedata_512Mb:/var/lib/postgresql/data"],
  1428. ),
  1429. MySql(),
  1430. SchemaRegistry(),
  1431. # We create all sources, sinks and dataflows by default with SIZE '1'
  1432. # The workflow_instance_size workflow is testing multi-process clusters
  1433. Testdrive(
  1434. default_timeout="60s",
  1435. materialize_url=f"postgres://{quote(ADMIN_USER)}:{app_password(ADMIN_USER)}@balancerd:6875?sslmode=require",
  1436. materialize_use_https=True,
  1437. no_reset=True,
  1438. ),
  1439. TestCerts(),
  1440. FronteggMock(
  1441. issuer=FRONTEGG_URL,
  1442. encoding_key_file="/secrets/frontegg-mock.key",
  1443. decoding_key_file="/secrets/frontegg-mock.crt",
  1444. users=json.dumps(list(USERS.values())),
  1445. depends_on=["test-certs"],
  1446. volumes=[
  1447. "secrets:/secrets",
  1448. ],
  1449. ),
  1450. Balancerd(
  1451. command=[
  1452. "service",
  1453. "--pgwire-listen-addr=0.0.0.0:6875",
  1454. "--https-listen-addr=0.0.0.0:6876",
  1455. "--internal-http-listen-addr=0.0.0.0:6878",
  1456. "--frontegg-resolver-template=materialized:6875",
  1457. "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
  1458. f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
  1459. f"--frontegg-admin-role={ADMIN_ROLE}",
  1460. "--https-resolver-template=materialized:6876",
  1461. "--tls-key=/secrets/balancerd.key",
  1462. "--tls-cert=/secrets/balancerd.crt",
  1463. "--internal-tls",
  1464. # Nonsensical but we don't need cancellations here
  1465. "--cancellation-resolver-dir=/secrets",
  1466. ],
  1467. depends_on=["test-certs"],
  1468. volumes=[
  1469. "secrets:/secrets",
  1470. ],
  1471. ),
  1472. Cockroach(in_memory=True),
  1473. Materialized(
  1474. memory="8G",
  1475. cpu="2",
  1476. default_size=1,
  1477. options=[
  1478. # Enable TLS on the public port to verify that balancerd is connecting to the balancerd port.
  1479. "--tls-mode=require",
  1480. "--tls-key=/secrets/materialized.key",
  1481. "--tls-cert=/secrets/materialized.crt",
  1482. f"--frontegg-tenant={TENANT_ID}",
  1483. "--frontegg-jwk-file=/secrets/frontegg-mock.crt",
  1484. f"--frontegg-api-token-url={FRONTEGG_URL}/identity/resources/auth/v1/api-token",
  1485. f"--frontegg-admin-role={ADMIN_ROLE}",
  1486. ],
  1487. depends_on=["test-certs"],
  1488. volumes_extra=[
  1489. "secrets:/secrets",
  1490. ],
  1491. sanity_restart=False,
  1492. external_metadata_store=True,
  1493. metadata_store="cockroach",
  1494. listeners_config_path=f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth_https.json",
  1495. ),
  1496. Mz(app_password=""),
  1497. ]
  1498. for cluster_id in range(1, MAX_CLUSTERS + 1):
  1499. for replica_id in range(1, MAX_REPLICAS + 1):
  1500. for node_id in range(1, MAX_NODES + 1):
  1501. SERVICES.append(
  1502. Clusterd(name=f"clusterd_{cluster_id}_{replica_id}_{node_id}", cpu="2")
  1503. )
  1504. service_names = [
  1505. "zookeeper",
  1506. "kafka",
  1507. "schema-registry",
  1508. "postgres",
  1509. "mysql",
  1510. "materialized",
  1511. "balancerd",
  1512. "frontegg-mock",
  1513. "cockroach",
  1514. "clusterd_1_1_1",
  1515. "clusterd_1_1_2",
  1516. "clusterd_1_2_1",
  1517. "clusterd_1_2_2",
  1518. "clusterd_2_1_1",
  1519. "clusterd_2_1_2",
  1520. "clusterd_3_1_1",
  1521. ]
  1522. def setup(c: Composition, workers: int) -> None:
  1523. c.up(*service_names)
  1524. c.sql(
  1525. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1526. port=6877,
  1527. user="mz_system",
  1528. )
  1529. # Ensure the admin user exists
  1530. c.sql(
  1531. "SELECT 1;",
  1532. port=6875,
  1533. user=ADMIN_USER,
  1534. password=app_password(ADMIN_USER),
  1535. )
  1536. c.sql(
  1537. f"""
  1538. DROP CLUSTER quickstart cascade;
  1539. CREATE CLUSTER quickstart REPLICAS (
  1540. replica1 (
  1541. STORAGECTL ADDRESSES ['clusterd_1_1_1:2100', 'clusterd_1_1_2:2100'],
  1542. STORAGE ADDRESSES ['clusterd_1_1_1:2103', 'clusterd_1_1_2:2103'],
  1543. COMPUTECTL ADDRESSES ['clusterd_1_1_1:2101', 'clusterd_1_1_2:2101'],
  1544. COMPUTE ADDRESSES ['clusterd_1_1_1:2102', 'clusterd_1_1_2:2102'],
  1545. WORKERS {workers}
  1546. ),
  1547. replica2 (
  1548. STORAGECTL ADDRESSES ['clusterd_1_2_1:2100', 'clusterd_1_2_2:2100'],
  1549. STORAGE ADDRESSES ['clusterd_1_2_1:2103', 'clusterd_1_2_2:2103'],
  1550. COMPUTECTL ADDRESSES ['clusterd_1_2_1:2101', 'clusterd_1_2_2:2101'],
  1551. COMPUTE ADDRESSES ['clusterd_1_2_1:2102', 'clusterd_1_2_2:2102'],
  1552. WORKERS {workers}
  1553. )
  1554. );
  1555. DROP CLUSTER IF EXISTS single_replica_cluster CASCADE;
  1556. CREATE CLUSTER single_replica_cluster REPLICAS (
  1557. replica1 (
  1558. STORAGECTL ADDRESSES ['clusterd_2_1_1:2100', 'clusterd_2_1_2:2100'],
  1559. STORAGE ADDRESSES ['clusterd_2_1_1:2103', 'clusterd_2_1_2:2103'],
  1560. COMPUTECTL ADDRESSES ['clusterd_2_1_1:2101', 'clusterd_2_1_2:2101'],
  1561. COMPUTE ADDRESSES ['clusterd_2_1_1:2102', 'clusterd_2_1_2:2102'],
  1562. WORKERS {workers}
  1563. )
  1564. );
  1565. GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO materialize;
  1566. DROP CLUSTER IF EXISTS single_worker_cluster CASCADE;
  1567. CREATE CLUSTER single_worker_cluster REPLICAS (
  1568. replica1 (
  1569. STORAGECTL ADDRESSES ['clusterd_3_1_1:2100'],
  1570. STORAGE ADDRESSES ['clusterd_3_1_1:2103'],
  1571. COMPUTECTL ADDRESSES ['clusterd_3_1_1:2101'],
  1572. COMPUTE ADDRESSES ['clusterd_3_1_1:2102'],
  1573. WORKERS 1
  1574. )
  1575. );
  1576. GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO materialize;
  1577. GRANT ALL PRIVILEGES ON CLUSTER single_replica_cluster TO "{ADMIN_USER}";
  1578. GRANT ALL PRIVILEGES ON CLUSTER quickstart TO "{ADMIN_USER}";
  1579. """,
  1580. port=6877,
  1581. user="mz_system",
  1582. )
  1583. def upload_results_to_test_analytics(
  1584. c: Composition,
  1585. stats: dict[tuple[type[Generator], int], Statistics],
  1586. was_successful: bool,
  1587. ) -> None:
  1588. if not buildkite.is_in_buildkite():
  1589. return
  1590. test_analytics = TestAnalyticsDb(create_test_analytics_config(c))
  1591. test_analytics.builds.add_build_job(was_successful=was_successful)
  1592. result_entries = []
  1593. for (scenario, count), stat in stats.items():
  1594. scenario_name = scenario.__name__
  1595. scenario_version = scenario.VERSION
  1596. result_entries.append(
  1597. product_limits_result_storage.ProductLimitsResultEntry(
  1598. scenario_name=scenario_name,
  1599. scenario_version=str(scenario_version),
  1600. count=count,
  1601. wallclock=stat.wallclock,
  1602. explain_wallclock=stat.explain_wallclock,
  1603. )
  1604. )
  1605. test_analytics.product_limits_results.add_result(
  1606. framework_version=PRODUCT_LIMITS_FRAMEWORK_VERSION,
  1607. results=result_entries,
  1608. )
  1609. try:
  1610. test_analytics.submit_updates()
  1611. print("Uploaded results.")
  1612. except Exception as e:
  1613. # An error during an upload must never cause the build to fail
  1614. test_analytics.on_upload_failed(e)
  1615. def workflow_default(c: Composition) -> None:
  1616. def process(name: str) -> None:
  1617. if name == "default":
  1618. return
  1619. with c.test_case(name):
  1620. c.workflow(name)
  1621. c.test_parts(list(c.workflows.keys()), process)
  1622. def workflow_main(c: Composition, parser: WorkflowArgumentParser) -> None:
  1623. """Run all the limits tests against a multi-node, multi-replica cluster"""
  1624. parser.add_argument(
  1625. "--scenario", metavar="SCENARIO", type=str, help="Scenario to run."
  1626. )
  1627. parser.add_argument(
  1628. "--workers",
  1629. type=int,
  1630. metavar="N",
  1631. default=2,
  1632. help="set the default number of workers",
  1633. )
  1634. parser.add_argument(
  1635. "--find-limit",
  1636. action="store_true",
  1637. help="Increase limit until the test fails, record higehst limit that works",
  1638. )
  1639. args = parser.parse_args()
  1640. scenarios = buildkite.shard_list(
  1641. (
  1642. [globals()[args.scenario]]
  1643. if args.scenario
  1644. else list(all_subclasses(Generator))
  1645. ),
  1646. lambda s: s.__name__,
  1647. )
  1648. print(
  1649. f"Scenarios in shard with index {buildkite.get_parallelism_index()}: {scenarios}"
  1650. )
  1651. if not scenarios:
  1652. return
  1653. with c.override(
  1654. Clusterd(
  1655. name="clusterd_1_1_1",
  1656. workers=args.workers,
  1657. process_names=["clusterd_1_1_1", "clusterd_1_1_2"],
  1658. ),
  1659. Clusterd(
  1660. name="clusterd_1_1_2",
  1661. workers=args.workers,
  1662. process_names=["clusterd_1_1_1", "clusterd_1_1_2"],
  1663. ),
  1664. Clusterd(
  1665. name="clusterd_1_2_1",
  1666. workers=args.workers,
  1667. process_names=["clusterd_1_2_1", "clusterd_1_2_2"],
  1668. ),
  1669. Clusterd(
  1670. name="clusterd_1_2_2",
  1671. workers=args.workers,
  1672. process_names=["clusterd_1_2_1", "clusterd_1_2_2"],
  1673. ),
  1674. Clusterd(
  1675. name="clusterd_2_1_1",
  1676. workers=args.workers,
  1677. process_names=["clusterd_2_1_1", "clusterd_2_1_2"],
  1678. ),
  1679. Clusterd(
  1680. name="clusterd_2_1_2",
  1681. workers=args.workers,
  1682. process_names=["clusterd_2_1_1", "clusterd_2_1_2"],
  1683. ),
  1684. Clusterd(
  1685. name="clusterd_3_1_1",
  1686. workers=1,
  1687. ),
  1688. ):
  1689. run_scenarios(c, scenarios, args.find_limit, args.workers)
  1690. def run_scenarios(
  1691. c: Composition, scenarios: list[type[Generator]], find_limit: bool, workers: int
  1692. ):
  1693. c.up({"name": "testdrive", "persistent": True})
  1694. setup(c, workers)
  1695. failures: list[TestFailureDetails] = []
  1696. stats: dict[tuple[type[Generator], int], Statistics] = {}
  1697. for scenario in scenarios:
  1698. if find_limit:
  1699. good_count = None
  1700. bad_count = None
  1701. while True:
  1702. print(
  1703. f"--- Running scenario {scenario.__name__} with count {scenario.COUNT}"
  1704. )
  1705. f = StringIO()
  1706. with contextlib.redirect_stdout(f):
  1707. scenario.generate()
  1708. sys.stdout.flush()
  1709. try:
  1710. start_time = time.time()
  1711. c.testdrive(f.getvalue(), quiet=True).stdout
  1712. wallclock = time.time() - start_time
  1713. except Exception as e:
  1714. print(
  1715. f"Failed scenario {scenario.__name__} with count {scenario.COUNT}: {e}"
  1716. )
  1717. i = 0
  1718. while True:
  1719. try:
  1720. c.kill(*service_names)
  1721. c.rm(*service_names)
  1722. c.rm_volumes("mzdata")
  1723. break
  1724. except:
  1725. if i > 10:
  1726. raise
  1727. i += 1
  1728. print(
  1729. re.sub(
  1730. r"mzp_[a-z1-9]*",
  1731. "[REDACTED]",
  1732. traceback.format_exc(),
  1733. )
  1734. )
  1735. print("Retrying in a minute...")
  1736. time.sleep(60)
  1737. setup(c, workers)
  1738. bad_count = scenario.COUNT
  1739. previous_count = scenario.COUNT
  1740. scenario.COUNT = (
  1741. scenario.COUNT // 2
  1742. if good_count is None
  1743. else (good_count + bad_count) // 2
  1744. )
  1745. if scenario.COUNT >= bad_count:
  1746. if not good_count:
  1747. failures.append(
  1748. TestFailureDetails(
  1749. message=str(e),
  1750. details=traceback.format_exc(),
  1751. test_class_name_override=f"{scenario.__name__} with count {previous_count}",
  1752. )
  1753. )
  1754. break
  1755. continue
  1756. else:
  1757. if scenario.EXPLAIN:
  1758. with c.sql_cursor(
  1759. sslmode="require",
  1760. user=ADMIN_USER,
  1761. password=app_password(ADMIN_USER),
  1762. ) as cur:
  1763. start_time = time.time()
  1764. cur.execute(scenario.EXPLAIN.encode())
  1765. explain_wallclock = time.time() - start_time
  1766. explain_wallclock_str = (
  1767. f", explain took {explain_wallclock:.2f} s"
  1768. )
  1769. else:
  1770. explain_wallclock = None
  1771. explain_wallclock_str = ""
  1772. print(
  1773. f"Scenario {scenario.__name__} with count {scenario.COUNT} took {wallclock:.2f} s{explain_wallclock_str}"
  1774. )
  1775. stats[(scenario, scenario.COUNT)] = Statistics(
  1776. wallclock, explain_wallclock
  1777. )
  1778. good_count = scenario.COUNT
  1779. if bad_count is None:
  1780. scenario.COUNT *= 2
  1781. else:
  1782. scenario.COUNT = (good_count + bad_count) // 2
  1783. if scenario.MAX_COUNT is not None:
  1784. scenario.COUNT = min(scenario.COUNT, scenario.MAX_COUNT)
  1785. if scenario.COUNT <= good_count:
  1786. break
  1787. print(f"Final good count: {good_count}")
  1788. else:
  1789. print(f"--- Running scenario {scenario.__name__}")
  1790. f = StringIO()
  1791. with contextlib.redirect_stdout(f):
  1792. scenario.generate()
  1793. sys.stdout.flush()
  1794. try:
  1795. c.testdrive(f.getvalue())
  1796. except Exception as e:
  1797. failures.append(
  1798. TestFailureDetails(
  1799. message=str(e),
  1800. details=traceback.format_exc(),
  1801. test_class_name_override=scenario.__name__,
  1802. )
  1803. )
  1804. if find_limit:
  1805. upload_results_to_test_analytics(c, stats, not failures)
  1806. if failures:
  1807. raise FailedTestExecutionError(errors=failures)
  1808. def workflow_instance_size(c: Composition, parser: WorkflowArgumentParser) -> None:
  1809. """Create multiple clusters with multiple nodes and replicas each"""
  1810. parser.add_argument(
  1811. "--workers",
  1812. type=int,
  1813. metavar="N",
  1814. default=2,
  1815. help="set the default number of workers",
  1816. )
  1817. parser.add_argument(
  1818. "--clusters",
  1819. type=int,
  1820. metavar="N",
  1821. default=8,
  1822. help="set the number of clusters to create",
  1823. )
  1824. parser.add_argument(
  1825. "--replicas",
  1826. type=int,
  1827. metavar="N",
  1828. default=4,
  1829. help="set the number of replicas per cluster",
  1830. )
  1831. parser.add_argument(
  1832. "--nodes",
  1833. type=int,
  1834. metavar="N",
  1835. default=4,
  1836. help="set the number of nodes per cluster replica",
  1837. )
  1838. args = parser.parse_args()
  1839. assert args.clusters <= MAX_CLUSTERS, "SERVICES have to be static"
  1840. assert args.replicas <= MAX_REPLICAS, "SERVICES have to be static"
  1841. assert args.nodes <= MAX_NODES, "SERVICES have to be static"
  1842. c.up(
  1843. "zookeeper",
  1844. "kafka",
  1845. "schema-registry",
  1846. "materialized",
  1847. "balancerd",
  1848. "frontegg-mock",
  1849. {"name": "testdrive", "persistent": True},
  1850. )
  1851. # Construct the requied Clusterd instances and peer them into clusters
  1852. node_names = []
  1853. node_overrides = []
  1854. for cluster_id in range(1, args.clusters + 1):
  1855. for replica_id in range(1, args.replicas + 1):
  1856. names = [
  1857. f"clusterd_{cluster_id}_{replica_id}_{i}"
  1858. for i in range(1, args.nodes + 1)
  1859. ]
  1860. for node_name in names:
  1861. node_names.append(node_name)
  1862. node_overrides.append(
  1863. Clusterd(
  1864. name=node_name,
  1865. workers=args.workers,
  1866. process_names=names,
  1867. )
  1868. )
  1869. with c.override(
  1870. Testdrive(
  1871. seed=1,
  1872. materialize_url=f"postgres://{quote(ADMIN_USER)}:{app_password(ADMIN_USER)}@balancerd:6875?sslmode=require",
  1873. materialize_use_https=True,
  1874. no_reset=True,
  1875. ),
  1876. *node_overrides,
  1877. ):
  1878. c.up(*node_names)
  1879. # Increase resource limits
  1880. c.testdrive(
  1881. dedent(
  1882. f"""
  1883. $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
  1884. ALTER SYSTEM SET max_clusters = {args.clusters * 10}
  1885. ALTER SYSTEM SET max_replicas_per_cluster = {args.replicas * 10}
  1886. CREATE CLUSTER single_replica_cluster SIZE = '4';
  1887. GRANT ALL ON CLUSTER single_replica_cluster TO materialize;
  1888. GRANT ALL ON CLUSTER single_replica_cluster TO "{ADMIN_USER}";
  1889. GRANT ALL PRIVILEGES ON SCHEMA public TO "{ADMIN_USER}";
  1890. """
  1891. )
  1892. )
  1893. # Create some input data
  1894. c.testdrive(
  1895. dedent(
  1896. """
  1897. > CREATE TABLE ten (f1 INTEGER);
  1898. > INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
  1899. $ set schema={
  1900. "type" : "record",
  1901. "name" : "test",
  1902. "fields" : [
  1903. {"name":"f1", "type":"string"}
  1904. ]
  1905. }
  1906. $ kafka-create-topic topic=instance-size
  1907. $ kafka-ingest format=avro topic=instance-size schema=${schema} repeat=10000
  1908. {"f1": "fish"}
  1909. """
  1910. )
  1911. )
  1912. # Construct the required CREATE CLUSTER statements
  1913. for cluster_id in range(1, args.clusters + 1):
  1914. replica_definitions = []
  1915. for replica_id in range(1, args.replicas + 1):
  1916. nodes = []
  1917. for node_id in range(1, args.nodes + 1):
  1918. node_name = f"clusterd_{cluster_id}_{replica_id}_{node_id}"
  1919. nodes.append(node_name)
  1920. replica_name = f"replica_u{cluster_id}_{replica_id}"
  1921. replica_definitions.append(
  1922. f"{replica_name} (STORAGECTL ADDRESSES ["
  1923. + ", ".join(f"'{n}:2100'" for n in nodes)
  1924. + "], STORAGE ADDRESSES ["
  1925. + ", ".join(f"'{n}:2103'" for n in nodes)
  1926. + "], COMPUTECTL ADDRESSES ["
  1927. + ", ".join(f"'{n}:2101'" for n in nodes)
  1928. + "], COMPUTE ADDRESSES ["
  1929. + ", ".join(f"'{n}:2102'" for n in nodes)
  1930. + f"], WORKERS {args.workers})"
  1931. )
  1932. c.sql(
  1933. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1934. port=6877,
  1935. user="mz_system",
  1936. )
  1937. c.sql(
  1938. f"CREATE CLUSTER cluster_u{cluster_id} REPLICAS ("
  1939. + ",".join(replica_definitions)
  1940. + ")",
  1941. port=6877,
  1942. user="mz_system",
  1943. )
  1944. c.sql(
  1945. f"GRANT ALL PRIVILEGES ON CLUSTER cluster_u{cluster_id} TO materialize",
  1946. port=6877,
  1947. user="mz_system",
  1948. )
  1949. c.sql(
  1950. f'GRANT ALL PRIVILEGES ON CLUSTER cluster_u{cluster_id} TO "{ADMIN_USER}"',
  1951. port=6877,
  1952. user="mz_system",
  1953. )
  1954. # Construct some dataflows in each cluster
  1955. for cluster_id in range(1, args.clusters + 1):
  1956. cluster_name = f"cluster_u{cluster_id}"
  1957. c.testdrive(
  1958. dedent(
  1959. f"""
  1960. > SET cluster={cluster_name}
  1961. > CREATE DEFAULT INDEX ON ten;
  1962. > CREATE MATERIALIZED VIEW v_{cluster_name} AS
  1963. SELECT COUNT(*) AS c1 FROM ten AS a1, ten AS a2, ten AS a3, ten AS a4;
  1964. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  1965. TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  1966. > CREATE CONNECTION IF NOT EXISTS csr_conn
  1967. FOR CONFLUENT SCHEMA REGISTRY
  1968. URL '${{testdrive.schema-registry-url}}';
  1969. > CREATE SOURCE s_{cluster_name}
  1970. IN CLUSTER single_replica_cluster
  1971. FROM KAFKA CONNECTION kafka_conn (TOPIC
  1972. 'testdrive-instance-size-${{testdrive.seed}}')
  1973. > CREATE TABLE s_{cluster_name}_tbl FROM SOURCE s_{cluster_name} (REFERENCE "testdrive-instance-size-${{testdrive.seed}}")
  1974. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1975. ENVELOPE NONE
  1976. """
  1977. )
  1978. )
  1979. # Validate that each individual cluster is operating properly
  1980. for cluster_id in range(1, args.clusters + 1):
  1981. cluster_name = f"cluster_u{cluster_id}"
  1982. c.testdrive(
  1983. dedent(
  1984. f"""
  1985. > SET cluster={cluster_name}
  1986. > SELECT c1 FROM v_{cluster_name};
  1987. 10000
  1988. > SELECT COUNT(*) FROM s_{cluster_name}_tbl
  1989. 10000
  1990. """
  1991. )
  1992. )