action.py 88 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import datetime
  10. import json
  11. import random
  12. import threading
  13. import time
  14. import urllib.parse
  15. from collections.abc import Callable
  16. from typing import TYPE_CHECKING
  17. import psycopg
  18. import requests
  19. import websocket
  20. from pg8000.native import identifier
  21. from psycopg import Connection
  22. from psycopg.errors import OperationalError
  23. import materialize.parallel_workload.database
  24. from materialize.data_ingest.data_type import NUMBER_TYPES, Text, TextTextMap
  25. from materialize.data_ingest.query_error import QueryError
  26. from materialize.data_ingest.row import Operation
  27. from materialize.mzcompose import get_default_system_parameters
  28. from materialize.mzcompose.composition import Composition
  29. from materialize.mzcompose.services.materialized import (
  30. LEADER_STATUS_HEALTHCHECK,
  31. DeploymentStatus,
  32. Materialized,
  33. )
  34. from materialize.mzcompose.services.minio import minio_blob_uri
  35. from materialize.parallel_workload.database import (
  36. DATA_TYPES,
  37. DB,
  38. MAX_CLUSTERS,
  39. MAX_COLUMNS,
  40. MAX_DBS,
  41. MAX_INDEXES,
  42. MAX_KAFKA_SINKS,
  43. MAX_KAFKA_SOURCES,
  44. MAX_MYSQL_SOURCES,
  45. MAX_POSTGRES_SOURCES,
  46. MAX_ROLES,
  47. MAX_ROWS,
  48. MAX_SCHEMAS,
  49. MAX_TABLES,
  50. MAX_VIEWS,
  51. MAX_WEBHOOK_SOURCES,
  52. Cluster,
  53. ClusterReplica,
  54. Column,
  55. Database,
  56. DBObject,
  57. Index,
  58. KafkaSink,
  59. KafkaSource,
  60. MySqlSource,
  61. PostgresSource,
  62. Role,
  63. Schema,
  64. Table,
  65. View,
  66. WebhookSource,
  67. )
  68. from materialize.parallel_workload.executor import Executor, Http
  69. from materialize.parallel_workload.settings import Complexity, Scenario
  70. from materialize.sqlsmith import known_errors
  71. if TYPE_CHECKING:
  72. from materialize.parallel_workload.worker import Worker
  73. def ws_connect(ws: websocket.WebSocket, host, port, user: str) -> tuple[int, int]:
  74. thread_name = threading.current_thread().getName()
  75. ws.connect(f"ws://{host}:{port}/api/experimental/sql", origin=thread_name)
  76. ws.send(
  77. json.dumps(
  78. {
  79. "user": user,
  80. "password": "",
  81. "options": {
  82. "application_name": thread_name,
  83. "max_query_result_size": "1000000",
  84. "cluster": "quickstart",
  85. "database": "materialize",
  86. "search_path": "public",
  87. },
  88. }
  89. )
  90. )
  91. ws_conn_id = -1
  92. ws_secret_key = -1
  93. ws_ready = False
  94. while True:
  95. result = json.loads(ws.recv())
  96. result_type = result["type"]
  97. if result_type == "ParameterStatus":
  98. continue
  99. elif result_type == "BackendKeyData":
  100. ws_conn_id = result["payload"]["conn_id"]
  101. ws_secret_key = result["payload"]["secret_key"]
  102. elif result_type == "ReadyForQuery":
  103. ws_ready = True
  104. elif result_type == "Notice":
  105. assert "connected to Materialize" in result["payload"]["message"], result
  106. break
  107. else:
  108. raise RuntimeError(f"Unexpected result type: {result_type} in: {result}")
  109. assert ws_ready
  110. return (ws_conn_id, ws_secret_key)
  111. # TODO: CASCADE in DROPs, keep track of what will be deleted
  112. class Action:
  113. rng: random.Random
  114. composition: Composition | None
  115. def __init__(self, rng: random.Random, composition: Composition | None):
  116. self.rng = rng
  117. self.composition = composition
  118. def run(self, exe: Executor) -> bool:
  119. raise NotImplementedError
  120. def errors_to_ignore(self, exe: Executor) -> list[str]:
  121. result = [
  122. "permission denied for",
  123. "must be owner of",
  124. "HTTP read timeout",
  125. "result exceeds max size of",
  126. ]
  127. if exe.db.complexity in (Complexity.DDL, Complexity.DDLOnly):
  128. result.extend(
  129. [
  130. "query could not complete",
  131. "cached plan must not change result type",
  132. "violates not-null constraint",
  133. "unknown catalog item", # Expected, see database-issues#6124
  134. "was concurrently dropped", # role was dropped
  135. "unknown cluster", # cluster was dropped
  136. "unknown schema", # schema was dropped
  137. "the transaction's active cluster has been dropped", # cluster was dropped
  138. "was removed", # dependency was removed, started with moving optimization off main thread, see database-issues#7285
  139. "real-time source dropped before ingesting the upstream system's visible frontier", # Expected, see https://buildkite.com/materialize/nightly/builds/9399#0191be17-1f4c-4321-9b51-edc4b08b71c5
  140. "object state changed while transaction was in progress",
  141. ]
  142. )
  143. if exe.db.scenario == Scenario.Cancel:
  144. result.extend(
  145. [
  146. "canceling statement due to user request",
  147. ]
  148. )
  149. if exe.db.scenario == Scenario.ZeroDowntimeDeploy:
  150. result.extend(
  151. [
  152. "cannot write in read-only mode",
  153. "500: internal storage failure! ReadOnly",
  154. ]
  155. )
  156. if exe.db.scenario in (
  157. Scenario.Kill,
  158. Scenario.BackupRestore,
  159. Scenario.ZeroDowntimeDeploy,
  160. ):
  161. result.extend(
  162. [
  163. # psycopg
  164. "server closed the connection unexpectedly",
  165. "Can't create a connection to host",
  166. "Connection refused",
  167. "Cursor closed",
  168. # websockets
  169. "Connection to remote host was lost.",
  170. "socket is already closed.",
  171. "Broken pipe",
  172. "WS connect",
  173. # http
  174. "Remote end closed connection without response",
  175. "Connection aborted",
  176. "Connection refused",
  177. ]
  178. )
  179. if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
  180. # Expected, see database-issues#6156
  181. result.extend(["unknown catalog item", "unknown schema"])
  182. if exe.db.scenario == Scenario.Rename:
  183. result.extend(["unknown schema", "ambiguous reference to schema name"])
  184. if materialize.parallel_workload.database.NAUGHTY_IDENTIFIERS:
  185. result.extend(["identifier length exceeds 255 bytes"])
  186. return result
  187. class FetchAction(Action):
  188. def errors_to_ignore(self, exe: Executor) -> list[str]:
  189. result = super().errors_to_ignore(exe)
  190. result.extend(
  191. [
  192. "is not of expected type", # TODO(def-) Remove when database-issues#7857 is fixed
  193. ]
  194. )
  195. if exe.db.complexity == Complexity.DDL:
  196. result.extend(
  197. [
  198. "does not exist",
  199. "subscribe has been terminated because underlying relation",
  200. ]
  201. )
  202. return result
  203. def run(self, exe: Executor) -> bool:
  204. obj = self.rng.choice(exe.db.db_objects())
  205. # Unsupported via this API
  206. # See https://github.com/MaterializeInc/database-issues/issues/6159
  207. (
  208. exe.rollback(http=Http.NO)
  209. if self.rng.choice([True, False])
  210. else exe.commit(http=Http.NO)
  211. )
  212. query = f"SUBSCRIBE {obj}"
  213. if self.rng.choice([True, False]):
  214. envelope = "UPSERT" if self.rng.choice([True, False]) else "DEBEZIUM"
  215. columns = self.rng.sample(obj.columns, len(obj.columns))
  216. key = ", ".join(column.name(True) for column in columns)
  217. query += f" ENVELOPE {envelope} (KEY ({key}))"
  218. exe.execute(f"DECLARE c CURSOR FOR {query}", http=Http.NO)
  219. while True:
  220. rows = self.rng.choice(["ALL", self.rng.randrange(1000)])
  221. timeout = self.rng.randrange(10)
  222. query = f"FETCH {rows} c WITH (timeout='{timeout}s')"
  223. exe.execute(query, http=Http.NO, fetch=True)
  224. if self.rng.choice([True, False]):
  225. break
  226. (
  227. exe.rollback(http=Http.NO)
  228. if self.rng.choice([True, False])
  229. else exe.commit(http=Http.NO)
  230. )
  231. return True
  232. class SelectOneAction(Action):
  233. def run(self, exe: Executor) -> bool:
  234. exe.execute("SELECT 1", explainable=True, http=Http.RANDOM, fetch=True)
  235. return True
  236. class SelectAction(Action):
  237. def errors_to_ignore(self, exe: Executor) -> list[str]:
  238. result = super().errors_to_ignore(exe)
  239. result.extend(
  240. [
  241. "in the same timedomain",
  242. 'is not allowed from the "mz_catalog_server" cluster',
  243. "timed out before ingesting the source's visible frontier when real-time-recency query issued",
  244. ]
  245. )
  246. if exe.db.complexity == Complexity.DDL:
  247. result.extend(
  248. [
  249. "does not exist",
  250. ]
  251. )
  252. return result
  253. def run(self, exe: Executor) -> bool:
  254. obj = self.rng.choice(exe.db.db_objects())
  255. column = self.rng.choice(obj.columns)
  256. obj2 = self.rng.choice(exe.db.db_objects_without_views())
  257. obj_name = str(obj)
  258. obj2_name = str(obj2)
  259. columns = [
  260. c
  261. for c in obj2.columns
  262. if c.data_type == column.data_type and c.data_type != TextTextMap
  263. ]
  264. join = obj_name != obj2_name and obj not in exe.db.views and columns
  265. if join:
  266. all_columns = list(obj.columns) + list(obj2.columns)
  267. else:
  268. all_columns = obj.columns
  269. if self.rng.choice([True, False]):
  270. expressions = ", ".join(
  271. str(column)
  272. for column in self.rng.sample(
  273. all_columns, k=self.rng.randint(1, len(all_columns))
  274. )
  275. )
  276. if self.rng.choice([True, False]):
  277. column1 = self.rng.choice(all_columns)
  278. column2 = self.rng.choice(all_columns)
  279. column3 = self.rng.choice(all_columns)
  280. fns = ["COUNT"]
  281. if column1.data_type in NUMBER_TYPES:
  282. fns.extend(["SUM", "AVG", "MAX", "MIN"])
  283. window_fn = self.rng.choice(fns)
  284. expressions += f", {window_fn}({column1}) OVER (PARTITION BY {column2} ORDER BY {column3})"
  285. else:
  286. expressions = "*"
  287. query = f"SELECT {expressions} FROM {obj_name} "
  288. if join:
  289. column2 = self.rng.choice(columns)
  290. query += f"JOIN {obj2_name} ON {column} = {column2}"
  291. query += " LIMIT 1"
  292. rtr = self.rng.choice([True, False])
  293. if rtr:
  294. exe.execute("SET REAL_TIME_RECENCY TO TRUE", explainable=False)
  295. exe.execute(query, explainable=True, http=Http.RANDOM, fetch=True)
  296. if rtr:
  297. exe.execute("SET REAL_TIME_RECENCY TO FALSE", explainable=False)
  298. return True
  299. class SQLsmithAction(Action):
  300. composition: Composition
  301. queries: list[str]
  302. def __init__(self, rng: random.Random, composition: Composition | None):
  303. super().__init__(rng, composition)
  304. self.queries = []
  305. assert self.composition
  306. def errors_to_ignore(self, exe: Executor) -> list[str]:
  307. result = super().errors_to_ignore(exe)
  308. result.extend(known_errors)
  309. result.extend(
  310. [
  311. "in the same timedomain",
  312. 'is not allowed from the "mz_catalog_server" cluster',
  313. ]
  314. )
  315. if exe.db.complexity == Complexity.DDL:
  316. result.extend(
  317. [
  318. "does not exist",
  319. ]
  320. )
  321. return result
  322. def refill_sqlsmith(self, exe: Executor) -> None:
  323. self.composition.silent = True
  324. seed = self.rng.randrange(2**31)
  325. try:
  326. result = self.composition.run(
  327. "sqlsmith",
  328. "--max-joins=0",
  329. "--target=host=materialized port=6875 dbname=materialize user=materialize",
  330. "--read-state",
  331. "--dry-run",
  332. "--max-queries=100",
  333. f"--seed={seed}",
  334. stdin=exe.db.sqlsmith_state,
  335. capture=True,
  336. capture_stderr=True,
  337. rm=True,
  338. )
  339. if result.returncode != 0:
  340. raise ValueError(
  341. f"SQLsmith failed: {result.returncode} (seed {seed})\nStderr: {result.stderr}\nState: {exe.db.sqlsmith_state}"
  342. )
  343. try:
  344. data = json.loads(result.stdout)
  345. self.queries.extend(data["queries"])
  346. except:
  347. print(f"Loading json failed: {result.stdout}")
  348. # TODO(def-) SQLsmith sporadically fails to output
  349. # the entire json, I believe this to be a bug in the C++
  350. # json library used or the interaction with Python reading from
  351. # it. Ignore for now
  352. return
  353. except:
  354. if exe.db.scenario not in (
  355. Scenario.Kill,
  356. Scenario.BackupRestore,
  357. Scenario.ZeroDowntimeDeploy,
  358. ):
  359. raise
  360. finally:
  361. self.composition.silent = False
  362. def run(self, exe: Executor) -> bool:
  363. while not self.queries:
  364. self.refill_sqlsmith(exe)
  365. query = self.queries.pop()
  366. exe.execute(query, explainable=True, http=Http.RANDOM, fetch=True)
  367. return True
  368. class CopyToS3Action(Action):
  369. def errors_to_ignore(self, exe: Executor) -> list[str]:
  370. result = super().errors_to_ignore(exe)
  371. result.extend(
  372. [
  373. "in the same timedomain",
  374. 'is not allowed from the "mz_catalog_server" cluster',
  375. "copy has been terminated because underlying relation",
  376. "Relation contains unimplemented arrow types",
  377. "Cannot encode the following columns/types",
  378. "timeout: error trying to connect",
  379. "cannot represent decimal value", # parquet limitation
  380. ]
  381. )
  382. if exe.db.complexity == Complexity.DDL:
  383. result.extend(
  384. [
  385. "does not exist",
  386. ]
  387. )
  388. return result
  389. def run(self, exe: Executor) -> bool:
  390. obj = self.rng.choice(exe.db.db_objects())
  391. obj_name = str(obj)
  392. with exe.db.lock:
  393. location = exe.db.s3_path
  394. exe.db.s3_path += 1
  395. format = "csv" if self.rng.choice([True, False]) else "parquet"
  396. query = f"COPY (SELECT * FROM {obj_name}) TO 's3://copytos3/{location}' WITH (AWS CONNECTION = aws_conn, FORMAT = '{format}')"
  397. exe.execute(query, explainable=False, http=Http.NO, fetch=False)
  398. return True
  399. class InsertAction(Action):
  400. def run(self, exe: Executor) -> bool:
  401. table = None
  402. if exe.insert_table is not None:
  403. for t in exe.db.tables:
  404. if t.table_id == exe.insert_table:
  405. table = t
  406. if table.num_rows >= MAX_ROWS:
  407. (
  408. exe.commit()
  409. if self.rng.choice([True, False])
  410. else exe.rollback()
  411. )
  412. table = None
  413. break
  414. else:
  415. exe.commit() if self.rng.choice([True, False]) else exe.rollback()
  416. if not table:
  417. tables = [table for table in exe.db.tables if table.num_rows < MAX_ROWS]
  418. if not tables:
  419. return False
  420. table = self.rng.choice(tables)
  421. column_names = ", ".join(column.name(True) for column in table.columns)
  422. column_values = []
  423. max_rows = min(100, MAX_ROWS - table.num_rows)
  424. for i in range(self.rng.randrange(1, max_rows + 1)):
  425. column_values.append(
  426. ", ".join(column.value(self.rng, True) for column in table.columns)
  427. )
  428. all_column_values = ", ".join(f"({v})" for v in column_values)
  429. query = f"INSERT INTO {table} ({column_names}) VALUES {all_column_values}"
  430. exe.execute(query, http=Http.RANDOM)
  431. table.num_rows += len(column_values)
  432. exe.insert_table = table.table_id
  433. return True
  434. class InsertReturningAction(Action):
  435. def run(self, exe: Executor) -> bool:
  436. table = None
  437. if exe.insert_table is not None:
  438. for t in exe.db.tables:
  439. if t.table_id == exe.insert_table:
  440. table = t
  441. if table.num_rows >= MAX_ROWS:
  442. (
  443. exe.commit()
  444. if self.rng.choice([True, False])
  445. else exe.rollback()
  446. )
  447. table = None
  448. break
  449. else:
  450. exe.commit() if self.rng.choice([True, False]) else exe.rollback()
  451. if not table:
  452. tables = [table for table in exe.db.tables if table.num_rows < MAX_ROWS]
  453. if not tables:
  454. return False
  455. table = self.rng.choice(tables)
  456. column_names = ", ".join(column.name(True) for column in table.columns)
  457. column_values = []
  458. max_rows = min(100, MAX_ROWS - table.num_rows)
  459. for i in range(self.rng.randrange(1, max_rows + 1)):
  460. column_values.append(
  461. ", ".join(column.value(self.rng, True) for column in table.columns)
  462. )
  463. all_column_values = ", ".join(f"({v})" for v in column_values)
  464. query = f"INSERT INTO {table} ({column_names}) VALUES {all_column_values}"
  465. returning_exprs = []
  466. if self.rng.choice([True, False]):
  467. returning_exprs.append("0")
  468. elif self.rng.choice([True, False]):
  469. returning_exprs.append("*")
  470. else:
  471. returning_exprs.append(column_names)
  472. if returning_exprs:
  473. query += f" RETURNING {', '.join(returning_exprs)}"
  474. exe.execute(query, http=Http.RANDOM)
  475. table.num_rows += len(column_values)
  476. exe.insert_table = table.table_id
  477. return True
  478. class SourceInsertAction(Action):
  479. def run(self, exe: Executor) -> bool:
  480. with exe.db.lock:
  481. sources = [
  482. source
  483. for source in exe.db.kafka_sources + exe.db.postgres_sources
  484. if source.num_rows < MAX_ROWS
  485. ]
  486. if not sources:
  487. return False
  488. source = self.rng.choice(sources)
  489. with source.lock:
  490. if source not in [*exe.db.kafka_sources, *exe.db.postgres_sources]:
  491. return False
  492. transaction = next(source.generator)
  493. for row_list in transaction.row_lists:
  494. for row in row_list.rows:
  495. if row.operation == Operation.INSERT:
  496. source.num_rows += 1
  497. elif row.operation == Operation.DELETE:
  498. source.num_rows -= 1
  499. source.executor.run(transaction, logging_exe=exe)
  500. return True
  501. class UpdateAction(Action):
  502. def errors_to_ignore(self, exe: Executor) -> list[str]:
  503. return [
  504. "canceling statement due to statement timeout",
  505. ] + super().errors_to_ignore(exe)
  506. def run(self, exe: Executor) -> bool:
  507. table = None
  508. if exe.insert_table is not None:
  509. for t in exe.db.tables:
  510. if t.table_id == exe.insert_table:
  511. table = t
  512. break
  513. if not table:
  514. table = self.rng.choice(exe.db.tables)
  515. column1 = table.columns[0]
  516. column2 = self.rng.choice(table.columns)
  517. query = f"UPDATE {table} SET {column2.name(True)} = {column2.value(self.rng, True)} WHERE "
  518. if column1.data_type == TextTextMap:
  519. query += f"map_length({column1.name(True)}) = map_length({column1.value(self.rng, True)})"
  520. else:
  521. query += f"{column1.name(True)} = {column1.value(self.rng, True)}"
  522. exe.execute(query, http=Http.RANDOM)
  523. exe.insert_table = table.table_id
  524. return True
  525. class DeleteAction(Action):
  526. def errors_to_ignore(self, exe: Executor) -> list[str]:
  527. return [
  528. "canceling statement due to statement timeout",
  529. ] + super().errors_to_ignore(exe)
  530. def run(self, exe: Executor) -> bool:
  531. table = self.rng.choice(exe.db.tables)
  532. query = f"DELETE FROM {table}"
  533. if self.rng.random() < 0.95:
  534. query += " WHERE true"
  535. # TODO: Generic expression generator
  536. for column in table.columns:
  537. if column.data_type == TextTextMap:
  538. query += f" AND map_length({column.name(True)}) = map_length({column.value(self.rng, True)})"
  539. else:
  540. query += (
  541. f" AND {column.name(True)} = {column.value(self.rng, True)}"
  542. )
  543. exe.execute(query, http=Http.RANDOM)
  544. exe.commit()
  545. result = exe.cur.rowcount
  546. table.num_rows -= result
  547. return True
  548. class CommentAction(Action):
  549. def run(self, exe: Executor) -> bool:
  550. table = self.rng.choice(exe.db.tables)
  551. if self.rng.choice([True, False]):
  552. column = self.rng.choice(table.columns)
  553. query = f"COMMENT ON COLUMN {column} IS '{Text.random_value(self.rng)}'"
  554. else:
  555. query = f"COMMENT ON TABLE {table} IS '{Text.random_value(self.rng)}'"
  556. exe.execute(query, http=Http.RANDOM)
  557. return True
  558. class CreateIndexAction(Action):
  559. def errors_to_ignore(self, exe: Executor) -> list[str]:
  560. return [
  561. "already exists", # TODO: Investigate
  562. ] + super().errors_to_ignore(exe)
  563. def run(self, exe: Executor) -> bool:
  564. if len(exe.db.indexes) >= MAX_INDEXES:
  565. return False
  566. obj = self.rng.choice(exe.db.db_objects())
  567. columns = self.rng.sample(obj.columns, len(obj.columns))
  568. columns_str = "_".join(column.name() for column in columns)
  569. # columns_str may exceed 255 characters, so it is converted to a positive number with hash
  570. index = Index(f"idx_{obj.name()}_{abs(hash(columns_str))}")
  571. index_elems = []
  572. for column in columns:
  573. order = self.rng.choice(["ASC", "DESC"])
  574. index_elems.append(f"{column.name(True)} {order}")
  575. index_str = ", ".join(index_elems)
  576. query = f"CREATE INDEX {index} ON {obj} ({index_str})"
  577. exe.execute(query, http=Http.RANDOM)
  578. with exe.db.lock:
  579. exe.db.indexes.add(index)
  580. return True
  581. class DropIndexAction(Action):
  582. def run(self, exe: Executor) -> bool:
  583. with exe.db.lock:
  584. if not exe.db.indexes:
  585. return False
  586. index = self.rng.choice(list(exe.db.indexes))
  587. with index.lock:
  588. if index not in exe.db.indexes:
  589. return False
  590. query = f"DROP INDEX {index}"
  591. exe.execute(query, http=Http.RANDOM)
  592. exe.db.indexes.remove(index)
  593. return True
  594. class CreateTableAction(Action):
  595. def run(self, exe: Executor) -> bool:
  596. if len(exe.db.tables) >= MAX_TABLES:
  597. return False
  598. table_id = exe.db.table_id
  599. exe.db.table_id += 1
  600. schema = self.rng.choice(exe.db.schemas)
  601. with schema.lock:
  602. if schema not in exe.db.schemas:
  603. return False
  604. table = Table(self.rng, table_id, schema)
  605. table.create(exe)
  606. exe.db.tables.append(table)
  607. return True
  608. class DropTableAction(Action):
  609. def errors_to_ignore(self, exe: Executor) -> list[str]:
  610. return [
  611. "still depended upon by",
  612. ] + super().errors_to_ignore(exe)
  613. def run(self, exe: Executor) -> bool:
  614. with exe.db.lock:
  615. if len(exe.db.tables) <= 2:
  616. return False
  617. table = self.rng.choice(exe.db.tables)
  618. with table.lock:
  619. # Was dropped while we were acquiring lock
  620. if table not in exe.db.tables:
  621. return False
  622. query = f"DROP TABLE {table}"
  623. exe.execute(query, http=Http.RANDOM)
  624. exe.db.tables.remove(table)
  625. return True
  626. class RenameTableAction(Action):
  627. def run(self, exe: Executor) -> bool:
  628. if exe.db.scenario != Scenario.Rename:
  629. return False
  630. with exe.db.lock:
  631. if not exe.db.tables:
  632. return False
  633. table = self.rng.choice(exe.db.tables)
  634. with table.lock:
  635. old_name = str(table)
  636. table.rename += 1
  637. try:
  638. exe.execute(
  639. f"ALTER TABLE {old_name} RENAME TO {identifier(table.name())}",
  640. # http=Http.RANDOM, # Fails, see https://buildkite.com/materialize/nightly/builds/7362#018ecc56-787f-4cc2-ac54-1c8437af164b
  641. )
  642. except:
  643. table.rename -= 1
  644. raise
  645. return True
  646. class AlterTableAddColumnAction(Action):
  647. def run(self, exe: Executor) -> bool:
  648. with exe.db.lock:
  649. if not exe.db.tables:
  650. return False
  651. if exe.db.flags.get("enable_alter_table_add_column", "FALSE") != "TRUE":
  652. return False
  653. table = self.rng.choice(exe.db.tables)
  654. with table.lock:
  655. # Allow adding more a few more columns than the max for additional coverage.
  656. if len(table.columns) >= MAX_COLUMNS + 3:
  657. return False
  658. # TODO(alter_table): Support adding non-nullable columns with a default value.
  659. new_column = Column(
  660. self.rng, len(table.columns), self.rng.choice(DATA_TYPES), table
  661. )
  662. new_column.raw_name = f"{new_column.raw_name}-altered"
  663. new_column.nullable = True
  664. new_column.default = None
  665. try:
  666. exe.execute(
  667. f"ALTER TABLE {str(table)} ADD COLUMN {new_column.create()}"
  668. )
  669. except:
  670. raise
  671. table.columns.append(new_column)
  672. return True
  673. class RenameViewAction(Action):
  674. def run(self, exe: Executor) -> bool:
  675. if exe.db.scenario != Scenario.Rename:
  676. return False
  677. with exe.db.lock:
  678. if not exe.db.views:
  679. return False
  680. view = self.rng.choice(exe.db.views)
  681. with view.lock:
  682. if view not in exe.db.views:
  683. return False
  684. old_name = str(view)
  685. view.rename += 1
  686. try:
  687. exe.execute(
  688. f"ALTER {'MATERIALIZED VIEW' if view.materialized else 'VIEW'} {old_name} RENAME TO {identifier(view.name())}",
  689. # http=Http.RANDOM, # Fails, see https://buildkite.com/materialize/nightly/builds/7362#018ecc56-787f-4cc2-ac54-1c8437af164b
  690. )
  691. except:
  692. view.rename -= 1
  693. raise
  694. return True
  695. class RenameSinkAction(Action):
  696. def run(self, exe: Executor) -> bool:
  697. if exe.db.scenario != Scenario.Rename:
  698. return False
  699. with exe.db.lock:
  700. if not exe.db.kafka_sinks:
  701. return False
  702. sink = self.rng.choice(exe.db.kafka_sinks)
  703. with sink.lock:
  704. if sink not in exe.db.kafka_sinks:
  705. return False
  706. old_name = str(sink)
  707. sink.rename += 1
  708. try:
  709. exe.execute(
  710. f"ALTER SINK {old_name} RENAME TO {identifier(sink.name())}",
  711. # http=Http.RANDOM, # Fails
  712. )
  713. except:
  714. sink.rename -= 1
  715. raise
  716. return True
  717. class AlterKafkaSinkFromAction(Action):
  718. def run(self, exe: Executor) -> bool:
  719. if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
  720. # Does not work reliably with kills, see database-issues#8421
  721. return False
  722. with exe.db.lock:
  723. if not exe.db.kafka_sinks:
  724. return False
  725. sink = self.rng.choice(exe.db.kafka_sinks)
  726. with sink.lock, sink.base_object.lock:
  727. if sink not in exe.db.kafka_sinks:
  728. return False
  729. old_object = sink.base_object
  730. if sink.key != "":
  731. # key requires same column names, low chance of even having that
  732. return False
  733. elif sink.format in ["FORMAT BYTES", "FORMAT TEXT"]:
  734. # single column formats
  735. objs = [
  736. o
  737. for o in exe.db.db_objects_without_views()
  738. if len(o.columns) == 1
  739. and o.columns[0].data_type == old_object.columns[0].data_type
  740. ]
  741. elif sink.format in ["FORMAT JSON"]:
  742. # We should be able to format all data types as JSON, and they have no
  743. # particular backwards-compatiblility requirements.
  744. objs = [o for o in exe.db.db_objects_without_views()]
  745. else:
  746. # Avro schema migration checking can be quite strict, and we need to be not only
  747. # compatible with the latest object's schema but all previous schemas.
  748. # Only allow a conservative case for now: where all types and names match.
  749. objs = []
  750. old_cols = {c.name(True): c.data_type for c in old_object.columns}
  751. for o in exe.db.db_objects_without_views():
  752. if isinstance(old_object, WebhookSource):
  753. continue
  754. if isinstance(o, WebhookSource):
  755. continue
  756. new_cols = {c.name(True): c.data_type for c in o.columns}
  757. if old_cols == new_cols:
  758. objs.append(o)
  759. if not objs:
  760. return False
  761. sink.base_object = self.rng.choice(objs)
  762. try:
  763. exe.execute(
  764. f"ALTER SINK {sink} SET FROM {sink.base_object}",
  765. # http=Http.RANDOM, # Fails
  766. )
  767. except:
  768. sink.base_object = old_object
  769. raise
  770. return True
  771. class CreateDatabaseAction(Action):
  772. def run(self, exe: Executor) -> bool:
  773. with exe.db.lock:
  774. if len(exe.db.dbs) >= MAX_DBS:
  775. return False
  776. db_id = exe.db.db_id
  777. exe.db.db_id += 1
  778. db = DB(exe.db.seed, db_id)
  779. db.create(exe)
  780. exe.db.dbs.append(db)
  781. return True
  782. class DropDatabaseAction(Action):
  783. def errors_to_ignore(self, exe: Executor) -> list[str]:
  784. return [
  785. "cannot be dropped with RESTRICT while it contains schemas",
  786. ] + super().errors_to_ignore(exe)
  787. def run(self, exe: Executor) -> bool:
  788. with exe.db.lock:
  789. if len(exe.db.dbs) <= 1:
  790. return False
  791. db = self.rng.choice(exe.db.dbs)
  792. with db.lock:
  793. # Was dropped while we were acquiring lock
  794. if db not in exe.db.dbs:
  795. return False
  796. query = f"DROP DATABASE {db} RESTRICT"
  797. exe.execute(query, http=Http.RANDOM)
  798. exe.db.dbs.remove(db)
  799. return True
  800. class CreateSchemaAction(Action):
  801. def run(self, exe: Executor) -> bool:
  802. with exe.db.lock:
  803. if len(exe.db.schemas) >= MAX_SCHEMAS:
  804. return False
  805. schema_id = exe.db.schema_id
  806. exe.db.schema_id += 1
  807. schema = Schema(self.rng.choice(exe.db.dbs), schema_id)
  808. schema.create(exe)
  809. exe.db.schemas.append(schema)
  810. return True
  811. class DropSchemaAction(Action):
  812. def errors_to_ignore(self, exe: Executor) -> list[str]:
  813. return [
  814. "cannot be dropped without CASCADE while it contains objects",
  815. ] + super().errors_to_ignore(exe)
  816. def run(self, exe: Executor) -> bool:
  817. with exe.db.lock:
  818. if len(exe.db.schemas) <= 1:
  819. return False
  820. schema = self.rng.choice(exe.db.schemas)
  821. with schema.lock:
  822. # Was dropped while we were acquiring lock
  823. if schema not in exe.db.schemas:
  824. return False
  825. query = f"DROP SCHEMA {schema}"
  826. exe.execute(query, http=Http.RANDOM)
  827. exe.db.schemas.remove(schema)
  828. return True
  829. class RenameSchemaAction(Action):
  830. def errors_to_ignore(self, exe: Executor) -> list[str]:
  831. return [
  832. "ambiguous reference to schema named" # see https://github.com/MaterializeInc/materialize/pull/22551#pullrequestreview-1691876923
  833. ] + super().errors_to_ignore(exe)
  834. def run(self, exe: Executor) -> bool:
  835. if exe.db.scenario != Scenario.Rename:
  836. return False
  837. with exe.db.lock:
  838. schema = self.rng.choice(exe.db.schemas)
  839. with schema.lock:
  840. if schema not in exe.db.schemas:
  841. return False
  842. old_name = str(schema)
  843. schema.rename += 1
  844. try:
  845. exe.execute(
  846. f"ALTER SCHEMA {old_name} RENAME TO {identifier(schema.name())}",
  847. # http=Http.RANDOM, # Fails
  848. )
  849. except:
  850. schema.rename -= 1
  851. raise
  852. return True
  853. class SwapSchemaAction(Action):
  854. def errors_to_ignore(self, exe: Executor) -> list[str]:
  855. return [
  856. "object state changed while transaction was in progress",
  857. ] + super().errors_to_ignore(exe)
  858. def run(self, exe: Executor) -> bool:
  859. if exe.db.scenario != Scenario.Rename:
  860. return False
  861. with exe.db.lock:
  862. db = self.rng.choice(exe.db.dbs)
  863. schemas = [
  864. schema for schema in exe.db.schemas if schema.db.db_id == db.db_id
  865. ]
  866. if len(schemas) < 2:
  867. return False
  868. schema_ids = sorted(self.rng.sample(range(0, len(schemas)), 2))
  869. schema1 = schemas[schema_ids[0]]
  870. schema2 = schemas[schema_ids[1]]
  871. with schema1.lock, schema2.lock:
  872. if schema1 not in exe.db.schemas:
  873. return False
  874. if schema2 not in exe.db.schemas:
  875. return False
  876. if self.rng.choice([True, False]):
  877. exe.execute(
  878. f"ALTER SCHEMA {schema1} SWAP WITH {identifier(schema2.name())}"
  879. )
  880. else:
  881. exe.cur.connection.autocommit = False
  882. try:
  883. exe.execute(f"ALTER SCHEMA {schema1} RENAME TO tmp_schema")
  884. exe.execute(
  885. f"ALTER SCHEMA {schema2} RENAME TO {identifier(schema1.name())}"
  886. )
  887. exe.execute(
  888. f"ALTER SCHEMA tmp_schema RENAME TO {identifier(schema1.name())}"
  889. )
  890. exe.commit()
  891. finally:
  892. try:
  893. exe.cur.connection.autocommit = True
  894. except:
  895. exe.reconnect_next = True
  896. schema1.schema_id, schema2.schema_id = schema2.schema_id, schema1.schema_id
  897. schema1.rename, schema2.rename = schema2.rename, schema1.rename
  898. return True
  899. class TransactionIsolationAction(Action):
  900. def run(self, exe: Executor) -> bool:
  901. level = self.rng.choice(["SERIALIZABLE", "STRICT SERIALIZABLE"])
  902. exe.set_isolation(level)
  903. return True
  904. class CommitRollbackAction(Action):
  905. def run(self, exe: Executor) -> bool:
  906. if not exe.action_run_since_last_commit_rollback:
  907. return False
  908. if self.rng.random() < 0.7:
  909. exe.commit()
  910. else:
  911. exe.rollback()
  912. exe.action_run_since_last_commit_rollback = False
  913. return True
  914. class FlipFlagsAction(Action):
  915. def __init__(
  916. self,
  917. rng: random.Random,
  918. composition: Composition | None,
  919. ):
  920. super().__init__(rng, composition)
  921. BOOLEAN_FLAG_VALUES = ["TRUE", "FALSE"]
  922. self.flags_with_values: dict[str, list[str]] = dict()
  923. self.flags_with_values["persist_blob_target_size"] = (
  924. # 1 MiB, 16 MiB, 128 MiB
  925. ["1048576", "16777216", "134217728"]
  926. )
  927. for flag in ["catalog", "source", "snapshot", "txn"]:
  928. self.flags_with_values[f"persist_use_critical_since_{flag}"] = (
  929. BOOLEAN_FLAG_VALUES
  930. )
  931. self.flags_with_values["persist_claim_unclaimed_compactions"] = (
  932. BOOLEAN_FLAG_VALUES
  933. )
  934. self.flags_with_values["persist_optimize_ignored_data_fetch"] = (
  935. BOOLEAN_FLAG_VALUES
  936. )
  937. self.flags_with_values["enable_variadic_left_join_lowering"] = (
  938. BOOLEAN_FLAG_VALUES
  939. )
  940. self.flags_with_values["enable_eager_delta_joins"] = BOOLEAN_FLAG_VALUES
  941. self.flags_with_values["persist_batch_structured_key_lower_len"] = [
  942. "0",
  943. "1",
  944. "512",
  945. "1000",
  946. "50000",
  947. ]
  948. self.flags_with_values["persist_batch_max_run_len"] = [
  949. "2",
  950. "3",
  951. "4",
  952. "16",
  953. "1000",
  954. ]
  955. self.flags_with_values["persist_compaction_memory_bound_bytes"] = [
  956. # 64 MiB, 1 * 128 MiB, 4 * 128 MiB, 8 * 128 MiB
  957. "67108864",
  958. "134217728",
  959. "536870912",
  960. "1073741824",
  961. ]
  962. self.flags_with_values["persist_part_decode_format"] = [
  963. "row_with_validate",
  964. "arrow",
  965. ]
  966. self.flags_with_values["persist_encoding_enable_dictionary"] = (
  967. BOOLEAN_FLAG_VALUES
  968. )
  969. self.flags_with_values["persist_validate_part_bounds_on_read"] = (
  970. BOOLEAN_FLAG_VALUES
  971. )
  972. self.flags_with_values["persist_validate_part_bounds_on_write"] = (
  973. BOOLEAN_FLAG_VALUES
  974. )
  975. self.flags_with_values["compute_apply_column_demands"] = BOOLEAN_FLAG_VALUES
  976. self.flags_with_values["enable_alter_table_add_column"] = BOOLEAN_FLAG_VALUES
  977. self.flags_with_values["enable_compute_active_dataflow_cancelation"] = (
  978. BOOLEAN_FLAG_VALUES
  979. )
  980. self.flags_with_values["enable_compute_peek_response_stash"] = (
  981. BOOLEAN_FLAG_VALUES
  982. )
  983. self.flags_with_values["compute_peek_response_stash_threshold_bytes"] = [
  984. "0", # "force enabled"
  985. "1048576", # 1 MiB, an in-between value
  986. "314572800", # 300 MiB, the production value
  987. ]
  988. # If you are adding a new config flag in Materialize, consider using it
  989. # here instead of just marking it as uninteresting to silence the
  990. # linter. parallel-workload randomly flips the flags in
  991. # `flags_with_values` while running. If a new flag has interesting
  992. # behavior, you should add it. Feature flags which turn on/off
  993. # externally visible features should not be flipped.
  994. self.uninteresting_flags: list[str] = [
  995. "enable_mz_join_core",
  996. "enable_compute_correction_v2",
  997. "enable_compute_mv_append_smearing",
  998. "linear_join_yielding",
  999. "enable_lgalloc",
  1000. "enable_lgalloc_eager_reclamation",
  1001. "lgalloc_background_interval",
  1002. "lgalloc_file_growth_dampener",
  1003. "lgalloc_local_buffer_bytes",
  1004. "lgalloc_slow_clear_bytes",
  1005. "lgalloc_limiter_interval",
  1006. "lgalloc_limiter_usage_factor",
  1007. "lgalloc_limiter_usage_bias",
  1008. "lgalloc_limiter_burst_factor",
  1009. "memory_limiter_interval",
  1010. "memory_limiter_usage_factor",
  1011. "memory_limiter_usage_bias",
  1012. "memory_limiter_burst_factor",
  1013. "enable_columnation_lgalloc",
  1014. "enable_columnar_lgalloc",
  1015. "compute_server_maintenance_interval",
  1016. "compute_dataflow_max_inflight_bytes",
  1017. "compute_dataflow_max_inflight_bytes_cc",
  1018. "compute_flat_map_fuel",
  1019. "consolidating_vec_growth_dampener",
  1020. "compute_hydration_concurrency",
  1021. "copy_to_s3_parquet_row_group_file_ratio",
  1022. "copy_to_s3_arrow_builder_buffer_ratio",
  1023. "copy_to_s3_multipart_part_size_bytes",
  1024. "enable_compute_replica_expiration",
  1025. "compute_replica_expiration_offset",
  1026. "enable_compute_render_fueled_as_specific_collection",
  1027. "enable_compute_temporal_bucketing",
  1028. "compute_temporal_bucketing_summary",
  1029. "enable_compute_logical_backpressure",
  1030. "compute_logical_backpressure_max_retained_capabilities",
  1031. "compute_logical_backpressure_inflight_slack",
  1032. "persist_fetch_semaphore_cost_adjustment",
  1033. "persist_fetch_semaphore_permit_adjustment",
  1034. "persist_pubsub_client_enabled",
  1035. "persist_pubsub_push_diff_enabled",
  1036. "persist_pubsub_same_process_delegate_enabled",
  1037. "persist_pubsub_connect_attempt_timeout",
  1038. "persist_pubsub_request_timeout",
  1039. "persist_pubsub_connect_max_backoff",
  1040. "persist_pubsub_client_sender_channel_size",
  1041. "persist_pubsub_client_receiver_channel_size",
  1042. "persist_pubsub_server_connection_channel_size",
  1043. "persist_pubsub_state_cache_shard_ref_channel_size",
  1044. "persist_pubsub_reconnect_backoff",
  1045. "persist_batch_delete_enabled",
  1046. "persist_encoding_compression_format",
  1047. "persist_batch_max_runs",
  1048. "persist_inline_writes_single_max_bytes",
  1049. "persist_inline_writes_total_max_bytes",
  1050. "persist_write_combine_inline_writes",
  1051. "storage_source_decode_fuel",
  1052. "persist_reader_lease_duration",
  1053. "persist_consensus_connection_pool_max_size",
  1054. "persist_consensus_connection_pool_max_wait",
  1055. "persist_consensus_connection_pool_ttl",
  1056. "persist_consensus_connection_pool_ttl_stagger",
  1057. "crdb_connect_timeout",
  1058. "crdb_tcp_user_timeout",
  1059. "use_global_txn_cache_source",
  1060. "persist_batch_builder_max_outstanding_parts",
  1061. "persist_compaction_heuristic_min_inputs",
  1062. "persist_compaction_heuristic_min_parts",
  1063. "persist_compaction_heuristic_min_updates",
  1064. "persist_gc_blob_delete_concurrency_limit",
  1065. "persist_state_versions_recent_live_diffs_limit",
  1066. "persist_usage_state_fetch_concurrency_limit",
  1067. "persist_blob_operation_timeout",
  1068. "persist_blob_operation_attempt_timeout",
  1069. "persist_blob_connect_timeout",
  1070. "persist_blob_read_timeout",
  1071. "persist_stats_audit_percent",
  1072. "persist_stats_collection_enabled",
  1073. "persist_stats_filter_enabled",
  1074. "persist_stats_budget_bytes",
  1075. "persist_stats_untrimmable_columns_equals",
  1076. "persist_stats_untrimmable_columns_prefix",
  1077. "persist_stats_untrimmable_columns_suffix",
  1078. "persist_catalog_force_compaction_fuel",
  1079. "persist_catalog_force_compaction_wait",
  1080. "persist_expression_cache_force_compaction_fuel",
  1081. "persist_expression_cache_force_compaction_wait",
  1082. "persist_blob_cache_mem_limit_bytes",
  1083. "persist_blob_cache_scale_with_threads",
  1084. "persist_blob_cache_scale_factor_bytes",
  1085. "persist_claim_compaction_percent",
  1086. "persist_claim_compaction_min_version",
  1087. "persist_next_listen_batch_retryer_fixed_sleep",
  1088. "persist_next_listen_batch_retryer_initial_backoff",
  1089. "persist_next_listen_batch_retryer_multiplier",
  1090. "persist_next_listen_batch_retryer_clamp",
  1091. "persist_rollup_threshold",
  1092. "persist_rollup_fallback_threshold_ms",
  1093. "persist_rollup_use_active_rollup",
  1094. "persist_gc_fallback_threshold_ms",
  1095. "persist_gc_use_active_gc",
  1096. "persist_gc_min_versions",
  1097. "persist_gc_max_versions",
  1098. "persist_compaction_minimum_timeout",
  1099. "persist_compaction_use_most_recent_schema",
  1100. "persist_compaction_check_process_flag",
  1101. "balancerd_sigterm_connection_wait",
  1102. "balancerd_sigterm_listen_wait",
  1103. "balancerd_inject_proxy_protocol_header_http",
  1104. "balancerd_log_filter",
  1105. "balancerd_opentelemetry_filter",
  1106. "balancerd_log_filter_defaults",
  1107. "balancerd_opentelemetry_filter_defaults",
  1108. "balancerd_sentry_filters",
  1109. "persist_enable_s3_lgalloc_cc_sizes",
  1110. "persist_enable_s3_lgalloc_noncc_sizes",
  1111. "persist_enable_arrow_lgalloc_cc_sizes",
  1112. "persist_enable_arrow_lgalloc_noncc_sizes",
  1113. "controller_past_generation_replica_cleanup_retry_interval",
  1114. "enable_0dt_deployment_sources",
  1115. "wallclock_lag_recording_interval",
  1116. "enable_wallclock_lag_histogram_collection",
  1117. "wallclock_lag_histogram_period_interval",
  1118. "enable_timely_zero_copy",
  1119. "enable_timely_zero_copy_lgalloc",
  1120. "timely_zero_copy_limit",
  1121. "arrangement_exert_proportionality",
  1122. "txn_wal_apply_ensure_schema_match",
  1123. "persist_txns_data_shard_retryer_initial_backoff",
  1124. "persist_txns_data_shard_retryer_multiplier",
  1125. "persist_txns_data_shard_retryer_clamp",
  1126. "storage_cluster_shutdown_grace_period",
  1127. "storage_dataflow_delay_sources_past_rehydration",
  1128. "storage_dataflow_suspendable_sources",
  1129. "storage_downgrade_since_during_finalization",
  1130. "replica_metrics_history_retention_interval",
  1131. "wallclock_lag_history_retention_interval",
  1132. "wallclock_global_lag_histogram_retention_interval",
  1133. "kafka_client_id_enrichment_rules",
  1134. "kafka_poll_max_wait",
  1135. "kafka_default_metadata_fetch_interval",
  1136. "kafka_default_aws_privatelink_endpoint_identification_algorithm",
  1137. "kafka_buffered_event_resize_threshold_elements",
  1138. "mysql_replication_heartbeat_interval",
  1139. "mysql_offset_known_interval",
  1140. "postgres_fetch_slot_resume_lsn_interval",
  1141. "pg_offset_known_interval",
  1142. "pg_schema_validation_interval",
  1143. "storage_enforce_external_addresses",
  1144. "storage_upsert_prevent_snapshot_buffering",
  1145. "storage_rocksdb_use_merge_operator",
  1146. "storage_upsert_max_snapshot_batch_buffering",
  1147. "storage_rocksdb_cleanup_tries",
  1148. "storage_suspend_and_restart_delay",
  1149. "storage_sink_snapshot_frontier",
  1150. "storage_reclock_to_latest",
  1151. "storage_use_continual_feedback_upsert",
  1152. "storage_server_maintenance_interval",
  1153. "storage_sink_progress_search",
  1154. "storage_sink_ensure_topic_config",
  1155. "ore_overflowing_behavior",
  1156. "sql_server_max_lsn_wait",
  1157. "sql_server_snapshot_progress_report_interval",
  1158. "sql_server_cdc_poll_interval",
  1159. "sql_server_cdc_cleanup_change_table",
  1160. "sql_server_cdc_cleanup_change_table_max_deletes",
  1161. "sql_server_offset_known_interval",
  1162. "allow_user_sessions",
  1163. "enable_0dt_deployment",
  1164. "with_0dt_deployment_max_wait",
  1165. "with_0dt_deployment_ddl_check_interval",
  1166. "enable_0dt_deployment_panic_after_timeout",
  1167. "enable_0dt_caught_up_check",
  1168. "with_0dt_caught_up_check_allowed_lag",
  1169. "with_0dt_caught_up_check_cutoff",
  1170. "enable_statement_lifecycle_logging",
  1171. "enable_introspection_subscribes",
  1172. "plan_insights_notice_fast_path_clusters_optimize_duration",
  1173. "enable_continual_task_builtins",
  1174. "enable_expression_cache",
  1175. "enable_multi_replica_sources",
  1176. "enable_password_auth",
  1177. "constraint_based_timestamp_selection",
  1178. "persist_fast_path_order",
  1179. "mz_metrics_lgalloc_map_refresh_interval",
  1180. "mz_metrics_lgalloc_refresh_interval",
  1181. "mz_metrics_rusage_refresh_interval",
  1182. "compute_peek_stash_num_batches",
  1183. "compute_peek_stash_batch_size",
  1184. "compute_peek_response_stash_batch_max_runs",
  1185. "compute_peek_response_stash_read_batch_size_bytes",
  1186. "compute_peek_response_stash_read_memory_budget_bytes",
  1187. "persist_enable_incremental_compaction",
  1188. "storage_statistics_retention_duration",
  1189. ]
  1190. def run(self, exe: Executor) -> bool:
  1191. flag_name = self.rng.choice(list(self.flags_with_values.keys()))
  1192. # TODO: Remove when database-issues#8352 is fixed
  1193. if exe.db.scenario == Scenario.ZeroDowntimeDeploy and flag_name.startswith(
  1194. "persist_use_critical_since_"
  1195. ):
  1196. return False
  1197. flag_value = self.rng.choice(self.flags_with_values[flag_name])
  1198. conn = None
  1199. try:
  1200. conn = self.create_system_connection(exe)
  1201. self.flip_flag(conn, flag_name, flag_value)
  1202. exe.db.flags[flag_name] = flag_value
  1203. return True
  1204. except OperationalError:
  1205. if conn is not None:
  1206. conn.close()
  1207. # ignore it
  1208. return False
  1209. except Exception as e:
  1210. raise QueryError(str(e), "FlipFlags")
  1211. def create_system_connection(
  1212. self, exe: Executor, num_attempts: int = 10
  1213. ) -> Connection:
  1214. try:
  1215. conn = psycopg.connect(
  1216. host=exe.db.host,
  1217. port=exe.db.ports[
  1218. "mz_system" if exe.mz_service == "materialized" else "mz_system2"
  1219. ],
  1220. user="mz_system",
  1221. dbname="materialize",
  1222. )
  1223. conn.autocommit = True
  1224. return conn
  1225. except:
  1226. if num_attempts == 0:
  1227. raise
  1228. else:
  1229. time.sleep(1)
  1230. return self.create_system_connection(exe, num_attempts - 1)
  1231. def flip_flag(self, conn: Connection, flag_name: str, flag_value: str) -> None:
  1232. with conn.cursor() as cur:
  1233. cur.execute(
  1234. f"ALTER SYSTEM SET {flag_name} = {flag_value};".encode(),
  1235. )
  1236. class CreateViewAction(Action):
  1237. def run(self, exe: Executor) -> bool:
  1238. with exe.db.lock:
  1239. if len(exe.db.views) >= MAX_VIEWS:
  1240. return False
  1241. view_id = exe.db.view_id
  1242. exe.db.view_id += 1
  1243. # Don't use views for now since LIMIT 1 and statement_timeout are
  1244. # not effective yet at preventing long-running queries and OoMs.
  1245. base_object = self.rng.choice(exe.db.db_objects_without_views())
  1246. base_object2: DBObject | None = self.rng.choice(
  1247. exe.db.db_objects_without_views()
  1248. )
  1249. if self.rng.choice([True, False]) or base_object2 == base_object:
  1250. base_object2 = None
  1251. schema = self.rng.choice(exe.db.schemas)
  1252. with schema.lock:
  1253. if schema not in exe.db.schemas:
  1254. return False
  1255. view = View(
  1256. self.rng,
  1257. view_id,
  1258. base_object,
  1259. base_object2,
  1260. schema,
  1261. )
  1262. view.create(exe)
  1263. exe.db.views.append(view)
  1264. return True
  1265. class DropViewAction(Action):
  1266. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1267. return [
  1268. "still depended upon by",
  1269. ] + super().errors_to_ignore(exe)
  1270. def run(self, exe: Executor) -> bool:
  1271. with exe.db.lock:
  1272. if not exe.db.views:
  1273. return False
  1274. view = self.rng.choice(exe.db.views)
  1275. with view.lock:
  1276. # Was dropped while we were acquiring lock
  1277. if view not in exe.db.views:
  1278. return False
  1279. if view.materialized:
  1280. query = f"DROP MATERIALIZED VIEW {view}"
  1281. else:
  1282. query = f"DROP VIEW {view}"
  1283. exe.execute(query, http=Http.RANDOM)
  1284. exe.db.views.remove(view)
  1285. return True
  1286. class CreateRoleAction(Action):
  1287. def run(self, exe: Executor) -> bool:
  1288. with exe.db.lock:
  1289. if len(exe.db.roles) >= MAX_ROLES:
  1290. return False
  1291. role_id = exe.db.role_id
  1292. exe.db.role_id += 1
  1293. role = Role(role_id)
  1294. role.create(exe)
  1295. exe.db.roles.append(role)
  1296. return True
  1297. class DropRoleAction(Action):
  1298. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1299. return [
  1300. "cannot be dropped because some objects depend on it",
  1301. "current role cannot be dropped",
  1302. ] + super().errors_to_ignore(exe)
  1303. def run(self, exe: Executor) -> bool:
  1304. with exe.db.lock:
  1305. if not exe.db.roles:
  1306. return False
  1307. role = self.rng.choice(exe.db.roles)
  1308. with role.lock:
  1309. # Was dropped while we were acquiring lock
  1310. if role not in exe.db.roles:
  1311. return False
  1312. query = f"DROP ROLE {role}"
  1313. try:
  1314. exe.execute(query, http=Http.RANDOM)
  1315. except QueryError as e:
  1316. # expected, see database-issues#6156
  1317. if (
  1318. exe.db.scenario not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy)
  1319. or "unknown role" not in e.msg
  1320. ):
  1321. raise e
  1322. exe.db.roles.remove(role)
  1323. return True
  1324. class CreateClusterAction(Action):
  1325. def run(self, exe: Executor) -> bool:
  1326. with exe.db.lock:
  1327. if len(exe.db.clusters) >= MAX_CLUSTERS:
  1328. return False
  1329. cluster_id = exe.db.cluster_id
  1330. exe.db.cluster_id += 1
  1331. cluster = Cluster(
  1332. cluster_id,
  1333. managed=self.rng.choice([True, False]),
  1334. size=self.rng.choice(["1", "2"]),
  1335. replication_factor=self.rng.choice([1, 2]),
  1336. introspection_interval=self.rng.choice(["0", "1s", "10s"]),
  1337. )
  1338. cluster.create(exe)
  1339. exe.db.clusters.append(cluster)
  1340. return True
  1341. class DropClusterAction(Action):
  1342. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1343. return [
  1344. # cannot drop cluster "..." because other objects depend on it
  1345. "because other objects depend on it",
  1346. ] + super().errors_to_ignore(exe)
  1347. def run(self, exe: Executor) -> bool:
  1348. with exe.db.lock:
  1349. if len(exe.db.clusters) <= 1:
  1350. return False
  1351. # Keep cluster 0 with 1 replica for sources/sinks
  1352. self.rng.randrange(1, len(exe.db.clusters))
  1353. cluster = self.rng.choice(exe.db.clusters)
  1354. with cluster.lock:
  1355. # Was dropped while we were acquiring lock
  1356. if cluster not in exe.db.clusters:
  1357. return False
  1358. query = f"DROP CLUSTER {cluster}"
  1359. try:
  1360. exe.execute(query, http=Http.RANDOM)
  1361. except QueryError as e:
  1362. # expected, see database-issues#6156
  1363. if (
  1364. exe.db.scenario not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy)
  1365. or "unknown cluster" not in e.msg
  1366. ):
  1367. raise e
  1368. exe.db.clusters.remove(cluster)
  1369. return True
  1370. class SwapClusterAction(Action):
  1371. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1372. return [
  1373. "object state changed while transaction was in progress",
  1374. ] + super().errors_to_ignore(exe)
  1375. def run(self, exe: Executor) -> bool:
  1376. if exe.db.scenario != Scenario.Rename:
  1377. return False
  1378. with exe.db.lock:
  1379. if len(exe.db.clusters) < 2:
  1380. return False
  1381. cluster_ids = sorted(self.rng.sample(range(0, len(exe.db.clusters)), 2))
  1382. cluster1 = exe.db.clusters[cluster_ids[0]]
  1383. cluster2 = exe.db.clusters[cluster_ids[1]]
  1384. with cluster1.lock, cluster2.lock:
  1385. if cluster1 not in exe.db.clusters:
  1386. return False
  1387. if cluster2 not in exe.db.clusters:
  1388. return False
  1389. if self.rng.choice([True, False]):
  1390. exe.execute(
  1391. f"ALTER CLUSTER {cluster1} SWAP WITH {identifier(cluster2.name())}",
  1392. # http=Http.RANDOM, # Fails, see https://buildkite.com/materialize/nightly/builds/7362#018ecc56-787f-4cc2-ac54-1c8437af164b
  1393. )
  1394. else:
  1395. exe.cur.connection.autocommit = False
  1396. try:
  1397. exe.execute(f"ALTER SCHEMA {cluster1} RENAME TO tmp_cluster")
  1398. exe.execute(
  1399. f"ALTER SCHEMA {cluster2} RENAME TO {identifier(cluster1.name())}"
  1400. )
  1401. exe.execute(
  1402. f"ALTER SCHEMA tmp_cluster RENAME TO {identifier(cluster1.name())}"
  1403. )
  1404. exe.commit()
  1405. finally:
  1406. try:
  1407. exe.cur.connection.autocommit = True
  1408. except:
  1409. exe.reconnect_next = True
  1410. cluster1.cluster_id, cluster2.cluster_id = (
  1411. cluster2.cluster_id,
  1412. cluster1.cluster_id,
  1413. )
  1414. cluster1.rename, cluster2.rename = cluster2.rename, cluster1.rename
  1415. return True
  1416. class SetClusterAction(Action):
  1417. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1418. return [
  1419. "SET cluster cannot be called in an active transaction",
  1420. ] + super().errors_to_ignore(exe)
  1421. def run(self, exe: Executor) -> bool:
  1422. with exe.db.lock:
  1423. if not exe.db.clusters:
  1424. return False
  1425. cluster = self.rng.choice(exe.db.clusters)
  1426. query = f"SET CLUSTER = {cluster}"
  1427. exe.execute(query, http=Http.RANDOM)
  1428. return True
  1429. class CreateClusterReplicaAction(Action):
  1430. def run(self, exe: Executor) -> bool:
  1431. with exe.db.lock:
  1432. # Keep cluster 0 with 1 replica for sources/sinks
  1433. unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed]
  1434. if not unmanaged_clusters:
  1435. return False
  1436. cluster = self.rng.choice(unmanaged_clusters)
  1437. cluster.replica_id += 1
  1438. with cluster.lock:
  1439. if cluster not in exe.db.clusters or not cluster.managed:
  1440. return False
  1441. replica = ClusterReplica(
  1442. cluster.replica_id,
  1443. size=self.rng.choice(["1", "2"]),
  1444. cluster=cluster,
  1445. )
  1446. replica.create(exe)
  1447. cluster.replicas.append(replica)
  1448. return True
  1449. class DropClusterReplicaAction(Action):
  1450. def run(self, exe: Executor) -> bool:
  1451. with exe.db.lock:
  1452. # Keep cluster 0 with 1 replica for sources/sinks
  1453. unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed]
  1454. if not unmanaged_clusters:
  1455. return False
  1456. cluster = self.rng.choice(unmanaged_clusters)
  1457. # Avoid "has no replicas available to service request" error
  1458. if len(cluster.replicas) <= 1:
  1459. return False
  1460. replica = self.rng.choice(cluster.replicas)
  1461. with cluster.lock, replica.lock:
  1462. # Was dropped while we were acquiring lock
  1463. if replica not in cluster.replicas:
  1464. return False
  1465. if cluster not in exe.db.clusters:
  1466. return False
  1467. # Avoid "has no replicas available to service request" error
  1468. if len(cluster.replicas) <= 1:
  1469. return False
  1470. query = f"DROP CLUSTER REPLICA {cluster}.{replica}"
  1471. try:
  1472. exe.execute(query, http=Http.RANDOM)
  1473. except QueryError as e:
  1474. # expected, see database-issues#6156
  1475. if (
  1476. exe.db.scenario not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy)
  1477. or "has no CLUSTER REPLICA named" not in e.msg
  1478. ):
  1479. raise e
  1480. cluster.replicas.remove(replica)
  1481. return True
  1482. class GrantPrivilegesAction(Action):
  1483. def run(self, exe: Executor) -> bool:
  1484. with exe.db.lock:
  1485. if not exe.db.roles:
  1486. return False
  1487. role = self.rng.choice(exe.db.roles)
  1488. privilege = self.rng.choice(["SELECT", "INSERT", "UPDATE", "ALL"])
  1489. tables_views: list[DBObject] = [*exe.db.tables, *exe.db.views]
  1490. table = self.rng.choice(tables_views)
  1491. with table.lock, role.lock:
  1492. if table not in [*exe.db.tables, *exe.db.views]:
  1493. return False
  1494. if role not in exe.db.roles:
  1495. return False
  1496. query = f"GRANT {privilege} ON {table} TO {role}"
  1497. try:
  1498. exe.execute(query, http=Http.RANDOM)
  1499. except QueryError as e:
  1500. # expected, see database-issues#6156
  1501. if (
  1502. exe.db.scenario not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy)
  1503. or "unknown role" not in e.msg
  1504. ):
  1505. raise e
  1506. exe.db.roles.remove(role)
  1507. return True
  1508. class RevokePrivilegesAction(Action):
  1509. def run(self, exe: Executor) -> bool:
  1510. with exe.db.lock:
  1511. if not exe.db.roles:
  1512. return False
  1513. role = self.rng.choice(exe.db.roles)
  1514. privilege = self.rng.choice(["SELECT", "INSERT", "UPDATE", "ALL"])
  1515. tables_views: list[DBObject] = [*exe.db.tables, *exe.db.views]
  1516. table = self.rng.choice(tables_views)
  1517. with table.lock, role.lock:
  1518. if table not in [*exe.db.tables, *exe.db.views]:
  1519. return False
  1520. if role not in exe.db.roles:
  1521. return False
  1522. query = f"REVOKE {privilege} ON {table} FROM {role}"
  1523. try:
  1524. exe.execute(query, http=Http.RANDOM)
  1525. except QueryError as e:
  1526. # expected, see database-issues#6156
  1527. if (
  1528. exe.db.scenario not in (Scenario.Kill, Scenario.ZeroDowntimeDeploy)
  1529. or "unknown role" not in e.msg
  1530. ):
  1531. raise e
  1532. exe.db.roles.remove(role)
  1533. return True
  1534. # TODO: Should factor this out so can easily use it without action
  1535. class ReconnectAction(Action):
  1536. def __init__(
  1537. self,
  1538. rng: random.Random,
  1539. composition: Composition | None,
  1540. random_role: bool = True,
  1541. ):
  1542. super().__init__(rng, composition)
  1543. self.random_role = random_role
  1544. def run(self, exe: Executor) -> bool:
  1545. exe.mz_service = "materialized"
  1546. exe.log("reconnecting")
  1547. host = exe.db.host
  1548. port = exe.db.ports[exe.mz_service]
  1549. with exe.db.lock:
  1550. if self.random_role and exe.db.roles:
  1551. user = self.rng.choice(
  1552. ["materialize", str(self.rng.choice(exe.db.roles))]
  1553. )
  1554. else:
  1555. user = "materialize"
  1556. conn = exe.cur.connection
  1557. if exe.ws and exe.use_ws:
  1558. try:
  1559. exe.ws.close()
  1560. except:
  1561. pass
  1562. try:
  1563. exe.cur.close()
  1564. except:
  1565. pass
  1566. try:
  1567. conn.close()
  1568. except:
  1569. pass
  1570. NUM_ATTEMPTS = 20
  1571. if exe.ws:
  1572. threading.current_thread().getName()
  1573. for i in range(
  1574. NUM_ATTEMPTS
  1575. if exe.db.scenario != Scenario.ZeroDowntimeDeploy
  1576. else 1000000
  1577. ):
  1578. exe.ws = websocket.WebSocket()
  1579. try:
  1580. ws_conn_id, ws_secret_key = ws_connect(
  1581. exe.ws,
  1582. host,
  1583. exe.db.ports[
  1584. "http" if exe.mz_service == "materialized" else "http2"
  1585. ],
  1586. user,
  1587. )
  1588. except Exception as e:
  1589. if exe.db.scenario == Scenario.ZeroDowntimeDeploy:
  1590. exe.mz_service = (
  1591. "materialized2"
  1592. if exe.mz_service == "materialized"
  1593. else "materialized"
  1594. )
  1595. continue
  1596. if i < NUM_ATTEMPTS - 1:
  1597. time.sleep(1)
  1598. continue
  1599. raise QueryError(str(e), "WS connect")
  1600. if exe.use_ws:
  1601. exe.pg_pid = ws_conn_id
  1602. break
  1603. for i in range(
  1604. NUM_ATTEMPTS if exe.db.scenario != Scenario.ZeroDowntimeDeploy else 1000000
  1605. ):
  1606. try:
  1607. conn = psycopg.connect(
  1608. host=host, port=port, user=user, dbname="materialize"
  1609. )
  1610. conn.autocommit = exe.autocommit
  1611. cur = conn.cursor()
  1612. exe.cur = cur
  1613. exe.set_isolation("SERIALIZABLE")
  1614. cur.execute("SELECT pg_backend_pid()")
  1615. if not exe.use_ws:
  1616. exe.pg_pid = cur.fetchall()[0][0]
  1617. except Exception as e:
  1618. if exe.db.scenario == Scenario.ZeroDowntimeDeploy:
  1619. exe.mz_service = (
  1620. "materialized2"
  1621. if exe.mz_service == "materialized"
  1622. else "materialized"
  1623. )
  1624. continue
  1625. if i < NUM_ATTEMPTS - 1 and (
  1626. "server closed the connection unexpectedly" in str(e)
  1627. or "Can't create a connection to host" in str(e)
  1628. or "Connection refused" in str(e)
  1629. ):
  1630. time.sleep(1)
  1631. continue
  1632. raise QueryError(str(e), "connect")
  1633. else:
  1634. break
  1635. return True
  1636. class CancelAction(Action):
  1637. workers: list["Worker"]
  1638. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1639. return [
  1640. "must be a member of",
  1641. ] + super().errors_to_ignore(exe)
  1642. def __init__(
  1643. self,
  1644. rng: random.Random,
  1645. composition: Composition | None,
  1646. workers: list["Worker"],
  1647. ):
  1648. super().__init__(rng, composition)
  1649. self.workers = workers
  1650. def run(self, exe: Executor) -> bool:
  1651. pid = self.rng.choice(
  1652. [worker.exe.pg_pid for worker in self.workers if worker.exe and worker.exe.pg_pid != -1] # type: ignore
  1653. )
  1654. worker = None
  1655. for i in range(len(self.workers)):
  1656. worker_exe = self.workers[i].exe
  1657. if worker_exe and worker_exe.pg_pid == pid:
  1658. worker = f"worker_{i}"
  1659. break
  1660. assert worker
  1661. exe.execute(
  1662. f"SELECT pg_cancel_backend({pid})",
  1663. extra_info=f"Canceling {worker}",
  1664. http=Http.RANDOM,
  1665. )
  1666. time.sleep(self.rng.uniform(0.1, 10))
  1667. return True
  1668. class KillAction(Action):
  1669. def __init__(
  1670. self,
  1671. rng: random.Random,
  1672. composition: Composition | None,
  1673. azurite: bool,
  1674. sanity_restart: bool,
  1675. system_param_fn: Callable[[dict[str, str]], dict[str, str]] = lambda x: x,
  1676. ):
  1677. super().__init__(rng, composition)
  1678. self.system_param_fn = system_param_fn
  1679. self.system_parameters = {}
  1680. self.azurite = azurite
  1681. self.sanity_restart = sanity_restart
  1682. def run(self, exe: Executor) -> bool:
  1683. assert self.composition
  1684. self.composition.kill("materialized")
  1685. self.system_parameters = self.system_param_fn(self.system_parameters)
  1686. with self.composition.override(
  1687. Materialized(
  1688. restart="on-failure",
  1689. # TODO: Retry with toxiproxy on azurite
  1690. external_blob_store=True,
  1691. blob_store_is_azure=self.azurite,
  1692. external_metadata_store="toxiproxy",
  1693. ports=["6975:6875", "6976:6876", "6977:6877"],
  1694. sanity_restart=self.sanity_restart,
  1695. additional_system_parameter_defaults=self.system_parameters,
  1696. metadata_store="cockroach",
  1697. default_replication_factor=2,
  1698. )
  1699. ):
  1700. self.composition.up("materialized", detach=True)
  1701. time.sleep(self.rng.uniform(60, 120))
  1702. return True
  1703. class ZeroDowntimeDeployAction(Action):
  1704. def __init__(
  1705. self,
  1706. rng: random.Random,
  1707. composition: Composition | None,
  1708. azurite: bool,
  1709. sanity_restart: bool,
  1710. ):
  1711. super().__init__(rng, composition)
  1712. self.azurite = azurite
  1713. self.sanity_restart = sanity_restart
  1714. self.deploy_generation = 0
  1715. def run(self, exe: Executor) -> bool:
  1716. assert self.composition
  1717. self.deploy_generation += 1
  1718. if self.deploy_generation % 2 == 0:
  1719. mz_service = "materialized"
  1720. ports = ["6975:6875", "6976:6876", "6977:6877"]
  1721. else:
  1722. mz_service = "materialized2"
  1723. ports = ["7075:6875", "7076:6876", "7077:6877"]
  1724. print(f"Deploying generation {self.deploy_generation} on {mz_service}")
  1725. with self.composition.override(
  1726. Materialized(
  1727. name=mz_service,
  1728. # TODO: Retry with toxiproxy on azurite
  1729. external_blob_store=True,
  1730. blob_store_is_azure=self.azurite,
  1731. external_metadata_store="toxiproxy",
  1732. ports=ports,
  1733. sanity_restart=self.sanity_restart,
  1734. deploy_generation=self.deploy_generation,
  1735. system_parameter_defaults=get_default_system_parameters(
  1736. zero_downtime=True
  1737. ),
  1738. restart="on-failure",
  1739. healthcheck=LEADER_STATUS_HEALTHCHECK,
  1740. metadata_store="cockroach",
  1741. default_replication_factor=2,
  1742. ),
  1743. ):
  1744. self.composition.up(mz_service, detach=True)
  1745. self.composition.await_mz_deployment_status(
  1746. DeploymentStatus.READY_TO_PROMOTE, mz_service
  1747. )
  1748. self.composition.promote_mz(mz_service)
  1749. self.composition.await_mz_deployment_status(
  1750. DeploymentStatus.IS_LEADER, mz_service
  1751. )
  1752. time.sleep(self.rng.uniform(60, 120))
  1753. return True
  1754. # TODO: Don't restore immediately, keep copy of Database objects
  1755. class BackupRestoreAction(Action):
  1756. composition: Composition
  1757. db: Database
  1758. num: int
  1759. def __init__(
  1760. self, rng: random.Random, composition: Composition | None, db: Database
  1761. ):
  1762. super().__init__(rng, composition)
  1763. self.db = db
  1764. self.num = 0
  1765. assert self.composition
  1766. def run(self, exe: Executor) -> bool:
  1767. self.num += 1
  1768. time.sleep(self.rng.uniform(10, 240))
  1769. with self.db.lock:
  1770. # Backup
  1771. self.composition.exec("mc", "mc", "mb", f"persist/crdb-backup{self.num}")
  1772. self.composition.exec(
  1773. "cockroach",
  1774. "cockroach",
  1775. "sql",
  1776. "--insecure",
  1777. "-e",
  1778. f"""
  1779. CREATE EXTERNAL CONNECTION backup_bucket{self.num} AS 's3://persist/crdb-backup{self.num}?AWS_ENDPOINT=http://minio:9000/&AWS_REGION=minio&AWS_ACCESS_KEY_ID=minioadmin&AWS_SECRET_ACCESS_KEY=minioadmin';
  1780. BACKUP INTO 'external://backup_bucket{self.num}';
  1781. """,
  1782. )
  1783. self.composition.kill("materialized")
  1784. # Restore
  1785. self.composition.exec(
  1786. "cockroach",
  1787. "cockroach",
  1788. "sql",
  1789. "--insecure",
  1790. "-e",
  1791. f"""
  1792. DROP DATABASE defaultdb;
  1793. RESTORE DATABASE defaultdb FROM LATEST IN 'external://backup_bucket{self.num}';
  1794. SELECT shard, min(sequence_number), max(sequence_number)
  1795. FROM consensus.consensus GROUP BY 1 ORDER BY 2 DESC, 3 DESC, 1 ASC LIMIT 32;
  1796. """,
  1797. )
  1798. self.composition.run(
  1799. "persistcli",
  1800. "admin",
  1801. "--commit",
  1802. "restore-blob",
  1803. f"--blob-uri={minio_blob_uri()}",
  1804. "--consensus-uri=postgres://root@cockroach:26257?options=--search_path=consensus",
  1805. )
  1806. self.composition.up("materialized")
  1807. return True
  1808. class CreateWebhookSourceAction(Action):
  1809. def run(self, exe: Executor) -> bool:
  1810. with exe.db.lock:
  1811. if len(exe.db.webhook_sources) >= MAX_WEBHOOK_SOURCES:
  1812. return False
  1813. webhook_source_id = exe.db.webhook_source_id
  1814. exe.db.webhook_source_id += 1
  1815. cluster = self.rng.choice(exe.db.clusters)
  1816. schema = self.rng.choice(exe.db.schemas)
  1817. with schema.lock, cluster.lock:
  1818. if schema not in exe.db.schemas:
  1819. return False
  1820. if cluster not in exe.db.clusters:
  1821. return False
  1822. source = WebhookSource(webhook_source_id, cluster, schema, self.rng)
  1823. source.create(exe)
  1824. exe.db.webhook_sources.append(source)
  1825. return True
  1826. class DropWebhookSourceAction(Action):
  1827. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1828. return [
  1829. "still depended upon by",
  1830. ] + super().errors_to_ignore(exe)
  1831. def run(self, exe: Executor) -> bool:
  1832. with exe.db.lock:
  1833. if not exe.db.webhook_sources:
  1834. return False
  1835. source = self.rng.choice(exe.db.webhook_sources)
  1836. with source.lock:
  1837. # Was dropped while we were acquiring lock
  1838. if source not in exe.db.webhook_sources:
  1839. return False
  1840. query = f"DROP SOURCE {source}"
  1841. exe.execute(query, http=Http.RANDOM)
  1842. exe.db.webhook_sources.remove(source)
  1843. return True
  1844. class CreateKafkaSourceAction(Action):
  1845. def run(self, exe: Executor) -> bool:
  1846. with exe.db.lock:
  1847. if len(exe.db.kafka_sources) >= MAX_KAFKA_SOURCES:
  1848. return False
  1849. source_id = exe.db.kafka_source_id
  1850. exe.db.kafka_source_id += 1
  1851. cluster = self.rng.choice(exe.db.clusters)
  1852. schema = self.rng.choice(exe.db.schemas)
  1853. with schema.lock, cluster.lock:
  1854. if schema not in exe.db.schemas:
  1855. return False
  1856. if cluster not in exe.db.clusters:
  1857. return False
  1858. try:
  1859. source = KafkaSource(
  1860. source_id,
  1861. cluster,
  1862. schema,
  1863. exe.db.ports,
  1864. self.rng,
  1865. )
  1866. source.create(exe)
  1867. exe.db.kafka_sources.append(source)
  1868. except:
  1869. if exe.db.scenario not in (
  1870. Scenario.Kill,
  1871. Scenario.ZeroDowntimeDeploy,
  1872. ):
  1873. raise
  1874. return True
  1875. class DropKafkaSourceAction(Action):
  1876. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1877. return [
  1878. "still depended upon by",
  1879. ] + super().errors_to_ignore(exe)
  1880. def run(self, exe: Executor) -> bool:
  1881. with exe.db.lock:
  1882. if not exe.db.kafka_sources:
  1883. return False
  1884. source = self.rng.choice(exe.db.kafka_sources)
  1885. with source.lock:
  1886. # Was dropped while we were acquiring lock
  1887. if source not in exe.db.kafka_sources:
  1888. return False
  1889. query = f"DROP SOURCE {source}"
  1890. exe.execute(query, http=Http.RANDOM)
  1891. exe.db.kafka_sources.remove(source)
  1892. source.executor.mz_conn.close()
  1893. return True
  1894. class CreateMySqlSourceAction(Action):
  1895. def run(self, exe: Executor) -> bool:
  1896. # See database-issues#6881, not expected to work
  1897. if exe.db.scenario == Scenario.BackupRestore:
  1898. return False
  1899. with exe.db.lock:
  1900. if len(exe.db.mysql_sources) >= MAX_MYSQL_SOURCES:
  1901. return False
  1902. source_id = exe.db.mysql_source_id
  1903. exe.db.mysql_source_id += 1
  1904. schema = self.rng.choice(exe.db.schemas)
  1905. cluster = self.rng.choice(exe.db.clusters)
  1906. with schema.lock, cluster.lock:
  1907. if schema not in exe.db.schemas:
  1908. return False
  1909. if cluster not in exe.db.clusters:
  1910. return False
  1911. try:
  1912. source = MySqlSource(
  1913. source_id,
  1914. cluster,
  1915. schema,
  1916. exe.db.ports,
  1917. self.rng,
  1918. )
  1919. source.create(exe)
  1920. exe.db.mysql_sources.append(source)
  1921. except:
  1922. if exe.db.scenario not in (
  1923. Scenario.Kill,
  1924. Scenario.ZeroDowntimeDeploy,
  1925. ):
  1926. raise
  1927. return True
  1928. class DropMySqlSourceAction(Action):
  1929. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1930. return [
  1931. "still depended upon by",
  1932. ] + super().errors_to_ignore(exe)
  1933. def run(self, exe: Executor) -> bool:
  1934. with exe.db.lock:
  1935. if not exe.db.mysql_sources:
  1936. return False
  1937. source = self.rng.choice(exe.db.mysql_sources)
  1938. with source.lock:
  1939. # Was dropped while we were acquiring lock
  1940. if source not in exe.db.mysql_sources:
  1941. return False
  1942. query = f"DROP SOURCE {source.executor.source}"
  1943. exe.execute(query, http=Http.RANDOM)
  1944. exe.db.mysql_sources.remove(source)
  1945. source.executor.mz_conn.close()
  1946. return True
  1947. class CreatePostgresSourceAction(Action):
  1948. def run(self, exe: Executor) -> bool:
  1949. # See database-issues#6881, not expected to work
  1950. if exe.db.scenario == Scenario.BackupRestore:
  1951. return False
  1952. with exe.db.lock:
  1953. if len(exe.db.postgres_sources) >= MAX_POSTGRES_SOURCES:
  1954. return False
  1955. source_id = exe.db.postgres_source_id
  1956. exe.db.postgres_source_id += 1
  1957. schema = self.rng.choice(exe.db.schemas)
  1958. cluster = self.rng.choice(exe.db.clusters)
  1959. with schema.lock, cluster.lock:
  1960. if schema not in exe.db.schemas:
  1961. return False
  1962. if cluster not in exe.db.clusters:
  1963. return False
  1964. try:
  1965. source = PostgresSource(
  1966. source_id,
  1967. cluster,
  1968. schema,
  1969. exe.db.ports,
  1970. self.rng,
  1971. )
  1972. source.create(exe)
  1973. exe.db.postgres_sources.append(source)
  1974. except:
  1975. if exe.db.scenario not in (
  1976. Scenario.Kill,
  1977. Scenario.ZeroDowntimeDeploy,
  1978. ):
  1979. raise
  1980. return True
  1981. class DropPostgresSourceAction(Action):
  1982. def errors_to_ignore(self, exe: Executor) -> list[str]:
  1983. return [
  1984. "still depended upon by",
  1985. ] + super().errors_to_ignore(exe)
  1986. def run(self, exe: Executor) -> bool:
  1987. with exe.db.lock:
  1988. if not exe.db.postgres_sources:
  1989. return False
  1990. source = self.rng.choice(exe.db.postgres_sources)
  1991. with source.lock:
  1992. # Was dropped while we were acquiring lock
  1993. if source not in exe.db.postgres_sources:
  1994. return False
  1995. query = f"DROP SOURCE {source.executor.source}"
  1996. exe.execute(query, http=Http.RANDOM)
  1997. exe.db.postgres_sources.remove(source)
  1998. source.executor.mz_conn.close()
  1999. return True
  2000. class CreateKafkaSinkAction(Action):
  2001. def errors_to_ignore(self, exe: Executor) -> list[str]:
  2002. return [
  2003. "BYTES format with non-encodable type",
  2004. ] + super().errors_to_ignore(exe)
  2005. def run(self, exe: Executor) -> bool:
  2006. with exe.db.lock:
  2007. if len(exe.db.kafka_sinks) >= MAX_KAFKA_SINKS:
  2008. return False
  2009. sink_id = exe.db.kafka_sink_id
  2010. exe.db.kafka_sink_id += 1
  2011. cluster = self.rng.choice(exe.db.clusters)
  2012. schema = self.rng.choice(exe.db.schemas)
  2013. with schema.lock, cluster.lock:
  2014. if schema not in exe.db.schemas:
  2015. return False
  2016. if cluster not in exe.db.clusters:
  2017. return False
  2018. sink = KafkaSink(
  2019. sink_id,
  2020. cluster,
  2021. schema,
  2022. self.rng.choice(exe.db.db_objects_without_views()),
  2023. self.rng,
  2024. )
  2025. sink.create(exe)
  2026. exe.db.kafka_sinks.append(sink)
  2027. return True
  2028. class DropKafkaSinkAction(Action):
  2029. def errors_to_ignore(self, exe: Executor) -> list[str]:
  2030. return [
  2031. "still depended upon by",
  2032. ] + super().errors_to_ignore(exe)
  2033. def run(self, exe: Executor) -> bool:
  2034. with exe.db.lock:
  2035. if not exe.db.kafka_sinks:
  2036. return False
  2037. sink = self.rng.choice(exe.db.kafka_sinks)
  2038. with sink.lock:
  2039. # Was dropped while we were acquiring lock
  2040. if sink not in exe.db.kafka_sinks:
  2041. return False
  2042. query = f"DROP SINK {sink}"
  2043. exe.execute(query, http=Http.RANDOM)
  2044. exe.db.kafka_sinks.remove(sink)
  2045. return True
  2046. class HttpPostAction(Action):
  2047. def errors_to_ignore(self, exe: Executor) -> list[str]:
  2048. result = super().errors_to_ignore(exe)
  2049. if exe.db.scenario == Scenario.Rename:
  2050. result.extend(["404: no object was found at the path"])
  2051. return result
  2052. def run(self, exe: Executor) -> bool:
  2053. with exe.db.lock:
  2054. if not exe.db.webhook_sources:
  2055. return False
  2056. sources = [
  2057. source
  2058. for source in exe.db.webhook_sources
  2059. if source.num_rows < MAX_ROWS
  2060. ]
  2061. if not sources:
  2062. return False
  2063. source = self.rng.choice(sources)
  2064. with source.lock:
  2065. # Was dropped while we were acquiring lock
  2066. if source not in exe.db.webhook_sources:
  2067. return False
  2068. url = f"http://{exe.db.host}:{exe.db.ports['http' if exe.mz_service == 'materialized' else 'http2']}/api/webhook/{urllib.parse.quote(source.schema.db.name(), safe='')}/{urllib.parse.quote(source.schema.name(), safe='')}/{urllib.parse.quote(source.name(), safe='')}"
  2069. payload = source.body_format.to_data_type().random_value(self.rng)
  2070. header_fields = source.explicit_include_headers
  2071. if source.include_headers:
  2072. header_fields.extend(["x-event-type", "signature", "x-mz-api-key"])
  2073. headers = {
  2074. header: (
  2075. f"{datetime.datetime.now()}"
  2076. if header == "timestamp"
  2077. else f'"{Text.random_value(self.rng)}"'.encode()
  2078. )
  2079. for header in self.rng.sample(header_fields, len(header_fields))
  2080. }
  2081. headers_strs = [f"{key}: {value}" for key, value in headers.items()]
  2082. log = f"POST {url} Headers: {', '.join(headers_strs)} Body: {payload.encode('utf-8')}"
  2083. exe.log(log)
  2084. try:
  2085. source.num_rows += 1
  2086. result = requests.post(url, data=payload.encode(), headers=headers)
  2087. if result.status_code != 200:
  2088. raise QueryError(f"{result.status_code}: {result.text}", log)
  2089. except requests.exceptions.ConnectionError:
  2090. # Expected when Mz is killed
  2091. if exe.db.scenario not in (
  2092. Scenario.Kill,
  2093. Scenario.BackupRestore,
  2094. Scenario.ZeroDowntimeDeploy,
  2095. ):
  2096. raise
  2097. except QueryError as e:
  2098. # expected, see database-issues#6156
  2099. if exe.db.scenario not in (
  2100. Scenario.Kill,
  2101. Scenario.ZeroDowntimeDeploy,
  2102. ) or ("404: no object was found at the path" not in e.msg):
  2103. raise e
  2104. return True
  2105. class StatisticsAction(Action):
  2106. def run(self, exe: Executor) -> bool:
  2107. for typ, objs in [
  2108. ("tables", exe.db.tables),
  2109. ("views", exe.db.views),
  2110. ("kafka_sources", exe.db.kafka_sources),
  2111. ("postgres_sources", exe.db.postgres_sources),
  2112. ("webhook_sources", exe.db.webhook_sources),
  2113. ]:
  2114. counts = []
  2115. for t in objs:
  2116. exe.execute(f"SELECT count(*) FROM {t}")
  2117. counts.append(str(exe.cur.fetchall()[0][0]))
  2118. print(f"{typ}: {' '.join(counts)}")
  2119. time.sleep(10)
  2120. return True
  2121. class ActionList:
  2122. action_classes: list[type[Action]]
  2123. weights: list[float]
  2124. autocommit: bool
  2125. def __init__(
  2126. self, action_classes_weights: list[tuple[type[Action], int]], autocommit: bool
  2127. ):
  2128. self.action_classes = [action[0] for action in action_classes_weights]
  2129. self.weights = [action[1] for action in action_classes_weights]
  2130. self.autocommit = autocommit
  2131. read_action_list = ActionList(
  2132. [
  2133. (SelectAction, 100),
  2134. (SelectOneAction, 1),
  2135. # (SQLsmithAction, 30), # Questionable use
  2136. (CopyToS3Action, 100),
  2137. # (SetClusterAction, 1), # SET cluster cannot be called in an active transaction
  2138. (CommitRollbackAction, 30),
  2139. (ReconnectAction, 1),
  2140. (FlipFlagsAction, 2),
  2141. ],
  2142. autocommit=False,
  2143. )
  2144. fetch_action_list = ActionList(
  2145. [
  2146. (FetchAction, 30),
  2147. # (SetClusterAction, 1), # SET cluster cannot be called in an active transaction
  2148. (ReconnectAction, 1),
  2149. (FlipFlagsAction, 2),
  2150. ],
  2151. autocommit=False,
  2152. )
  2153. write_action_list = ActionList(
  2154. [
  2155. (InsertAction, 50),
  2156. (SelectOneAction, 1), # can be mixed with writes
  2157. # (SetClusterAction, 1), # SET cluster cannot be called in an active transaction
  2158. (HttpPostAction, 5),
  2159. (CommitRollbackAction, 10),
  2160. (ReconnectAction, 1),
  2161. (SourceInsertAction, 5),
  2162. (FlipFlagsAction, 2),
  2163. ],
  2164. autocommit=False,
  2165. )
  2166. dml_nontrans_action_list = ActionList(
  2167. [
  2168. (DeleteAction, 10),
  2169. (UpdateAction, 10),
  2170. (InsertReturningAction, 10),
  2171. (CommentAction, 5),
  2172. (SetClusterAction, 1),
  2173. (ReconnectAction, 1),
  2174. (FlipFlagsAction, 2),
  2175. # (TransactionIsolationAction, 1),
  2176. ],
  2177. autocommit=True, # deletes can't be inside of transactions
  2178. )
  2179. ddl_action_list = ActionList(
  2180. [
  2181. (CreateIndexAction, 2),
  2182. (DropIndexAction, 2),
  2183. (CreateTableAction, 2),
  2184. (DropTableAction, 2),
  2185. (CreateViewAction, 8),
  2186. (DropViewAction, 8),
  2187. (CreateRoleAction, 2),
  2188. (DropRoleAction, 2),
  2189. (CreateClusterAction, 2),
  2190. (DropClusterAction, 2),
  2191. (SwapClusterAction, 10),
  2192. (CreateClusterReplicaAction, 4),
  2193. (DropClusterReplicaAction, 4),
  2194. (SetClusterAction, 1),
  2195. (CreateWebhookSourceAction, 2),
  2196. (DropWebhookSourceAction, 2),
  2197. (CreateKafkaSinkAction, 4),
  2198. (DropKafkaSinkAction, 4),
  2199. (CreateKafkaSourceAction, 4),
  2200. (DropKafkaSourceAction, 4),
  2201. # TODO: Reenable when database-issues#8237 is fixed
  2202. # (CreateMySqlSourceAction, 4),
  2203. # (DropMySqlSourceAction, 4),
  2204. (CreatePostgresSourceAction, 4),
  2205. (DropPostgresSourceAction, 4),
  2206. (GrantPrivilegesAction, 4),
  2207. (RevokePrivilegesAction, 1),
  2208. (ReconnectAction, 1),
  2209. (CreateDatabaseAction, 1),
  2210. (DropDatabaseAction, 1),
  2211. (CreateSchemaAction, 1),
  2212. (DropSchemaAction, 1),
  2213. (RenameSchemaAction, 10),
  2214. (RenameTableAction, 10),
  2215. (RenameViewAction, 10),
  2216. (RenameSinkAction, 10),
  2217. (SwapSchemaAction, 10),
  2218. (FlipFlagsAction, 2),
  2219. # TODO: Reenable when database-issues#8813 is fixed.
  2220. # (AlterTableAddColumnAction, 10),
  2221. (AlterKafkaSinkFromAction, 8),
  2222. # (TransactionIsolationAction, 1),
  2223. ],
  2224. autocommit=True,
  2225. )
  2226. action_lists = [
  2227. read_action_list,
  2228. fetch_action_list,
  2229. write_action_list,
  2230. dml_nontrans_action_list,
  2231. ddl_action_list,
  2232. ]