mzcompose.py 194 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596559755985599560056015602560356045605560656075608560956105611561256135614561556165617561856195620562156225623562456255626562756285629563056315632563356345635563656375638563956405641564256435644564556465647564856495650565156525653565456555656565756585659566056615662566356645665566656675668566956705671567256735674567556765677567856795680568156825683568456855686568756885689569056915692569356945695569656975698569957005701570257035704570557065707570857095710571157125713571457155716571757185719572057215722572357245725572657275728572957305731573257335734573557365737573857395740574157425743574457455746574757485749575057515752575357545755575657575758575957605761576257635764576557665767576857695770577157725773577457755776577757785779578057815782578357845785578657875788578957905791579257935794579557965797579857995800580158025803580458055806580758085809581058115812581358145815581658175818581958205821
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional tests which require separate clusterd containers (instead of the
  11. usual clusterd included in the materialized container).
  12. """
  13. import json
  14. import random
  15. import re
  16. import time
  17. from collections.abc import Callable
  18. from copy import copy
  19. from datetime import datetime, timedelta
  20. from statistics import quantiles
  21. from textwrap import dedent
  22. from threading import Thread
  23. import psycopg
  24. import requests
  25. from psycopg import Cursor
  26. from psycopg.errors import (
  27. DatabaseError,
  28. InternalError_,
  29. OperationalError,
  30. QueryCanceled,
  31. )
  32. from materialize import buildkite, ui
  33. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  34. from materialize.mzcompose.services.clusterd import Clusterd
  35. from materialize.mzcompose.services.kafka import Kafka
  36. from materialize.mzcompose.services.localstack import Localstack
  37. from materialize.mzcompose.services.materialized import Materialized
  38. from materialize.mzcompose.services.minio import Minio
  39. from materialize.mzcompose.services.mz import Mz
  40. from materialize.mzcompose.services.postgres import (
  41. CockroachOrPostgresMetadata,
  42. Postgres,
  43. )
  44. from materialize.mzcompose.services.redpanda import Redpanda
  45. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  46. from materialize.mzcompose.services.testdrive import Testdrive
  47. from materialize.mzcompose.services.toxiproxy import Toxiproxy
  48. from materialize.mzcompose.services.zookeeper import Zookeeper
  49. from materialize.util import PropagatingThread
  50. SERVICES = [
  51. Zookeeper(),
  52. Kafka(),
  53. SchemaRegistry(),
  54. Localstack(),
  55. Clusterd(name="clusterd1", workers=2),
  56. Clusterd(name="clusterd2", workers=2),
  57. Clusterd(name="clusterd3", workers=2),
  58. Clusterd(name="clusterd4", workers=2),
  59. Mz(app_password=""),
  60. Minio(),
  61. Materialized(
  62. # We use mz_panic() in some test scenarios, so environmentd must stay up.
  63. propagate_crashes=False,
  64. external_metadata_store=True,
  65. additional_system_parameter_defaults={
  66. "unsafe_enable_unsafe_functions": "true",
  67. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  68. },
  69. ),
  70. CockroachOrPostgresMetadata(),
  71. Postgres(),
  72. Redpanda(),
  73. Toxiproxy(),
  74. Testdrive(
  75. volume_workdir="../testdrive:/workdir/testdrive",
  76. volumes_extra=[".:/workdir/smoke"],
  77. materialize_params={"cluster": "cluster1"},
  78. ),
  79. ]
  80. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  81. def process(name: str) -> None:
  82. # incident-70 and refresh-mv-restart are slow, run in separate CI step
  83. # concurrent-connections is too flaky
  84. # TODO: Reenable test-memory-limiter when database-issues/9502 is fixed
  85. if name in (
  86. "default",
  87. "test-incident-70",
  88. "test-concurrent-connections",
  89. "test-refresh-mv-restart",
  90. "test-memory-limiter",
  91. ):
  92. return
  93. with c.test_case(name):
  94. c.workflow(name)
  95. files = buildkite.shard_list(list(c.workflows.keys()), lambda workflow: workflow)
  96. c.test_parts(files, process)
  97. def workflow_test_smoke(c: Composition, parser: WorkflowArgumentParser) -> None:
  98. """Run testdrive in a variety of cluster configurations."""
  99. parser.add_argument(
  100. "glob",
  101. nargs="*",
  102. default=["smoke/*.td"],
  103. help="run against the specified files",
  104. )
  105. args = parser.parse_args()
  106. c.down(destroy_volumes=True)
  107. with c.override(
  108. Clusterd(
  109. name="clusterd1",
  110. workers=2,
  111. process_names=["clusterd1", "clusterd2"],
  112. ),
  113. Clusterd(
  114. name="clusterd2",
  115. workers=2,
  116. process_names=["clusterd1", "clusterd2"],
  117. ),
  118. Clusterd(
  119. name="clusterd3",
  120. workers=2,
  121. process_names=["clusterd3", "clusterd4"],
  122. ),
  123. Clusterd(
  124. name="clusterd4",
  125. workers=2,
  126. process_names=["clusterd3", "clusterd4"],
  127. ),
  128. ):
  129. c.up("zookeeper", "kafka", "schema-registry", "localstack")
  130. c.up("materialized")
  131. # Create a cluster and verify that tests pass.
  132. c.up("clusterd1")
  133. c.up("clusterd2")
  134. # Make sure cluster1 is owned by the system so it doesn't get dropped
  135. # between testdrive runs.
  136. c.sql(
  137. """
  138. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;
  139. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  140. CREATE CLUSTER cluster1 REPLICAS (
  141. replica1 (
  142. STORAGECTL ADDRESSES ['clusterd1:2100', 'clusterd2:2100'],
  143. STORAGE ADDRESSES ['clusterd1:2103', 'clusterd2:2103'],
  144. COMPUTECTL ADDRESSES ['clusterd1:2101', 'clusterd2:2101'],
  145. COMPUTE ADDRESSES ['clusterd1:2102', 'clusterd2:2102'],
  146. WORKERS 2
  147. )
  148. );
  149. GRANT ALL ON CLUSTER cluster1 TO materialize;
  150. """,
  151. port=6877,
  152. user="mz_system",
  153. )
  154. c.run_testdrive_files(*args.glob)
  155. # Add a replica to that cluster and verify that tests still pass.
  156. c.up("clusterd3")
  157. c.up("clusterd4")
  158. c.sql(
  159. """
  160. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;
  161. CREATE CLUSTER REPLICA cluster1.replica2
  162. STORAGECTL ADDRESSES ['clusterd3:2100', 'clusterd4:2100'],
  163. STORAGE ADDRESSES ['clusterd3:2103', 'clusterd4:2103'],
  164. COMPUTECTL ADDRESSES ['clusterd3:2101', 'clusterd4:2101'],
  165. COMPUTE ADDRESSES ['clusterd3:2102', 'clusterd4:2102'],
  166. WORKERS 2;
  167. """,
  168. port=6877,
  169. user="mz_system",
  170. )
  171. c.run_testdrive_files(*args.glob)
  172. # Kill one of the nodes in the first replica of the compute cluster and
  173. # verify that tests still pass.
  174. c.kill("clusterd1")
  175. c.run_testdrive_files(*args.glob)
  176. # Leave only replica 2 up and verify that tests still pass.
  177. c.sql("DROP CLUSTER REPLICA cluster1.replica1", port=6877, user="mz_system")
  178. c.run_testdrive_files(*args.glob)
  179. c.sql("DROP CLUSTER cluster1 CASCADE", port=6877, user="mz_system")
  180. def workflow_test_github_3553(c: Composition) -> None:
  181. """Test that clients do not wait indefinitely for a crashed resource."""
  182. c.down(destroy_volumes=True)
  183. c.up("materialized")
  184. c.sql(
  185. """
  186. CREATE TABLE IF NOT EXISTS log_table (f1 TEXT);
  187. CREATE TABLE IF NOT EXISTS panic_table (f1 TEXT);
  188. INSERT INTO panic_table VALUES ('forced panic');
  189. """
  190. )
  191. start_time = time.time()
  192. try:
  193. c.sql(
  194. """
  195. SET statement_timeout = '1 s';
  196. -- Crash loop the cluster.
  197. INSERT INTO log_table SELECT mz_unsafe.mz_panic(f1) FROM panic_table;
  198. """
  199. )
  200. except QueryCanceled as e:
  201. # Ensure we received the correct error message
  202. assert "statement timeout" in str(e)
  203. # Ensure the statement_timeout setting is ~honored
  204. elapsed = time.time() - start_time
  205. assert elapsed < 2, f"statement_timeout not respected ({elapsed=})"
  206. else:
  207. raise RuntimeError("unexpected success in test_github_3553")
  208. # Ensure we can select from tables after cancellation.
  209. c.sql("SELECT * FROM log_table;")
  210. def workflow_test_github_4443(c: Composition) -> None:
  211. """
  212. Test that compute command history does not leak peek commands.
  213. Regression test for https://github.com/MaterializeInc/database-issues/issues/4443.
  214. """
  215. c.down(destroy_volumes=True)
  216. with c.override(Clusterd(name="clusterd1", workers=1)):
  217. c.up("materialized", "clusterd1")
  218. # helper function to get command history metrics
  219. def find_command_history_metrics(c: Composition) -> tuple[int, int, int, int]:
  220. controller_metrics = c.exec(
  221. "materialized", "curl", "localhost:6878/metrics", capture=True
  222. ).stdout
  223. replica_metrics = c.exec(
  224. "clusterd1", "curl", "localhost:6878/metrics", capture=True
  225. ).stdout
  226. metrics = controller_metrics + replica_metrics
  227. controller_command_count, controller_command_count_found = 0, False
  228. controller_dataflow_count, controller_dataflow_count_found = 0, False
  229. replica_command_count, replica_command_count_found = 0, False
  230. replica_dataflow_count, replica_dataflow_count_found = 0, False
  231. for metric in metrics.splitlines():
  232. if (
  233. metric.startswith("mz_compute_controller_history_command_count")
  234. and 'instance_id="u2"' in metric
  235. ):
  236. controller_command_count += int(metric.split()[1])
  237. controller_command_count_found = True
  238. elif (
  239. metric.startswith("mz_compute_controller_history_dataflow_count")
  240. and 'instance_id="u2"' in metric
  241. ):
  242. controller_dataflow_count += int(metric.split()[1])
  243. controller_dataflow_count_found = True
  244. elif metric.startswith("mz_compute_replica_history_command_count"):
  245. replica_command_count += int(metric.split()[1])
  246. replica_command_count_found = True
  247. elif metric.startswith("mz_compute_replica_history_dataflow_count"):
  248. replica_dataflow_count += int(metric.split()[1])
  249. replica_dataflow_count_found = True
  250. assert (
  251. controller_command_count_found
  252. ), "command count not found in controller metrics"
  253. assert (
  254. controller_dataflow_count_found
  255. ), "dataflow count not found in controller metrics"
  256. assert (
  257. replica_command_count_found
  258. ), "command count not found in replica metrics"
  259. assert (
  260. replica_dataflow_count_found
  261. ), "dataflow count not found in replica metrics"
  262. return (
  263. controller_command_count,
  264. controller_dataflow_count,
  265. replica_command_count,
  266. replica_dataflow_count,
  267. )
  268. c.sql(
  269. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  270. port=6877,
  271. user="mz_system",
  272. )
  273. # Set up a cluster with an indexed table and an unindexed one.
  274. c.sql(
  275. """
  276. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  277. STORAGECTL ADDRESSES ['clusterd1:2100'],
  278. STORAGE ADDRESSES ['clusterd1:2103'],
  279. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  280. COMPUTE ADDRESSES ['clusterd1:2102'],
  281. WORKERS 1
  282. ));
  283. SET cluster = cluster1;
  284. -- table for fast-path peeks
  285. CREATE TABLE t (a int);
  286. CREATE DEFAULT INDEX ON t;
  287. INSERT INTO t VALUES (42);
  288. -- table for slow-path peeks
  289. CREATE TABLE t2 (a int);
  290. INSERT INTO t2 VALUES (84);
  291. -- Wait for the cluster to be ready.
  292. SELECT * FROM t;
  293. SELECT * FROM t2;
  294. """
  295. )
  296. # Wait a bit to let the metrics refresh.
  297. time.sleep(2)
  298. # Obtain initial history size and dataflow count.
  299. # Dataflow count can plausibly be more than 1, if compaction is delayed.
  300. (
  301. controller_command_count,
  302. controller_dataflow_count,
  303. replica_command_count,
  304. replica_dataflow_count,
  305. ) = find_command_history_metrics(c)
  306. assert controller_command_count > 0, "controller history cannot be empty"
  307. assert (
  308. controller_dataflow_count > 0
  309. ), "at least one dataflow expected in controller history"
  310. assert (
  311. controller_dataflow_count < 5
  312. ), "more dataflows than expected in controller history"
  313. assert replica_command_count > 0, "replica history cannot be empty"
  314. assert (
  315. replica_dataflow_count > 0
  316. ), "at least one dataflow expected in replica history"
  317. assert (
  318. replica_dataflow_count < 5
  319. ), "more dataflows than expected in replica history"
  320. # execute 400 fast- and slow-path peeks
  321. for _ in range(20):
  322. c.sql(
  323. """
  324. SELECT * FROM t;
  325. SELECT * FROM t2;
  326. SELECT * FROM t;
  327. SELECT * FROM t2;
  328. SELECT * FROM t;
  329. SELECT * FROM t2;
  330. SELECT * FROM t;
  331. SELECT * FROM t2;
  332. SELECT * FROM t;
  333. SELECT * FROM t2;
  334. SELECT * FROM t;
  335. SELECT * FROM t2;
  336. SELECT * FROM t;
  337. SELECT * FROM t2;
  338. SELECT * FROM t;
  339. SELECT * FROM t2;
  340. SELECT * FROM t;
  341. SELECT * FROM t2;
  342. SELECT * FROM t;
  343. SELECT * FROM t2;
  344. """
  345. )
  346. # Wait a bit to let the metrics refresh.
  347. time.sleep(2)
  348. # Check that history size and dataflow count are well-behaved.
  349. # Dataflow count can plausibly be more than 1, if compaction is delayed.
  350. (
  351. controller_command_count,
  352. controller_dataflow_count,
  353. replica_command_count,
  354. replica_dataflow_count,
  355. ) = find_command_history_metrics(c)
  356. assert (
  357. controller_command_count < 100
  358. ), "controller history grew more than expected after peeks"
  359. assert (
  360. controller_dataflow_count > 0
  361. ), "at least one dataflow expected in controller history"
  362. assert (
  363. controller_dataflow_count < 5
  364. ), "more dataflows than expected in controller history"
  365. assert (
  366. replica_command_count < 100
  367. ), "replica history grew more than expected after peeks"
  368. assert (
  369. replica_dataflow_count > 0
  370. ), "at least one dataflow expected in replica history"
  371. assert (
  372. replica_dataflow_count < 5
  373. ), "more dataflows than expected in replica history"
  374. def workflow_test_github_4444(c: Composition) -> None:
  375. """
  376. Test that compute reconciliation does not produce empty frontiers.
  377. Regression test for https://github.com/MaterializeInc/database-issues/issues/4444.
  378. """
  379. c.down(destroy_volumes=True)
  380. c.up("materialized")
  381. c.up("clusterd1")
  382. c.sql(
  383. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  384. port=6877,
  385. user="mz_system",
  386. )
  387. # Set up a dataflow on clusterd.
  388. c.sql(
  389. """
  390. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  391. STORAGECTL ADDRESSES ['clusterd1:2100'],
  392. STORAGE ADDRESSES ['clusterd1:2103'],
  393. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  394. COMPUTE ADDRESSES ['clusterd1:2102'],
  395. WORKERS 2
  396. ));
  397. SET cluster = cluster1;
  398. CREATE TABLE t (a int);
  399. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  400. -- wait for the dataflow to be ready
  401. SELECT * FROM mv;
  402. """
  403. )
  404. # Restart environmentd to trigger a reconciliation on clusterd.
  405. c.kill("materialized")
  406. c.up("materialized")
  407. print("Sleeping to wait for frontier updates")
  408. time.sleep(10)
  409. def extract_frontiers(output: str) -> tuple[int, int]:
  410. j = json.loads(output)
  411. (upper,) = j["determination"]["upper"]["elements"]
  412. (since,) = j["determination"]["since"]["elements"]
  413. return (upper, since)
  414. # Verify that there are no empty frontiers.
  415. output = c.sql_query("EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM mv")
  416. mv_since, mv_upper = extract_frontiers(output[0][0])
  417. output = c.sql_query("EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM t")
  418. t_since, t_upper = extract_frontiers(output[0][0])
  419. assert mv_since, "mv has empty since frontier"
  420. assert mv_upper, "mv has empty upper frontier"
  421. assert t_since, "t has empty since frontier"
  422. assert t_upper, "t has empty upper frontier"
  423. def workflow_test_github_4545(c: Composition) -> None:
  424. """
  425. Test that querying introspection sources on a replica does not
  426. crash other replicas in the same cluster that have introspection disabled.
  427. Regression test for https://github.com/MaterializeInc/database-issues/issues/4545.
  428. """
  429. c.down(destroy_volumes=True)
  430. c.up("materialized")
  431. c.up("clusterd1")
  432. c.up("clusterd2")
  433. c.sql(
  434. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  435. port=6877,
  436. user="mz_system",
  437. )
  438. c.sql(
  439. """
  440. CREATE CLUSTER cluster1 REPLICAS (
  441. logging_on (
  442. STORAGECTL ADDRESSES ['clusterd1:2100'],
  443. STORAGE ADDRESSES ['clusterd1:2103'],
  444. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  445. COMPUTE ADDRESSES ['clusterd1:2102'],
  446. WORKERS 2
  447. ),
  448. logging_off (
  449. STORAGECTL ADDRESSES ['clusterd2:2100'],
  450. STORAGE ADDRESSES ['clusterd2:2103'],
  451. COMPUTECTL ADDRESSES ['clusterd2:2101'],
  452. COMPUTE ADDRESSES ['clusterd2:2102'],
  453. WORKERS 2,
  454. INTROSPECTION INTERVAL 0
  455. )
  456. );
  457. SET cluster = cluster1;
  458. -- query the introspection sources on the replica with logging enabled
  459. SET cluster_replica = logging_on;
  460. SELECT * FROM mz_introspection.mz_active_peeks, mz_introspection.mz_compute_exports;
  461. -- verify that the other replica has not crashed and still responds
  462. SET cluster_replica = logging_off;
  463. SELECT * FROM mz_tables, mz_sources;
  464. """
  465. )
  466. def workflow_test_github_4587(c: Composition) -> None:
  467. """
  468. Test that triggering reconciliation does not wedge the
  469. mz_compute_frontiers_per_worker introspection source.
  470. Regression test for https://github.com/MaterializeInc/database-issues/issues/4587.
  471. """
  472. c.down(destroy_volumes=True)
  473. with c.override(
  474. Testdrive(no_reset=True),
  475. ):
  476. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  477. c.sql(
  478. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  479. port=6877,
  480. user="mz_system",
  481. )
  482. c.sql(
  483. """
  484. CREATE CLUSTER cluster1 REPLICAS (
  485. logging_on (
  486. STORAGECTL ADDRESSES ['clusterd1:2100'],
  487. STORAGE ADDRESSES ['clusterd1:2103'],
  488. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  489. COMPUTE ADDRESSES ['clusterd1:2102'],
  490. WORKERS 2
  491. )
  492. );
  493. """
  494. )
  495. # verify that we can query the introspection source
  496. c.testdrive(
  497. input=dedent(
  498. """
  499. > SET cluster = cluster1;
  500. > SELECT 1 FROM mz_introspection.mz_compute_frontiers_per_worker LIMIT 1;
  501. 1
  502. """
  503. )
  504. )
  505. # Restart environmentd to trigger a reconciliation on clusterd.
  506. c.kill("materialized")
  507. c.up("materialized")
  508. # verify again that we can query the introspection source
  509. c.testdrive(
  510. input=dedent(
  511. """
  512. > SET cluster = cluster1;
  513. > SELECT 1 FROM mz_introspection.mz_compute_frontiers_per_worker LIMIT 1;
  514. 1
  515. """
  516. )
  517. )
  518. c.sql(
  519. """
  520. SET cluster = cluster1;
  521. -- now let's give it another go with user-defined objects
  522. CREATE TABLE t (a int);
  523. CREATE DEFAULT INDEX ON t;
  524. INSERT INTO t VALUES (42);
  525. """
  526. )
  527. cursor = c.sql_cursor()
  528. cursor.execute("SET cluster = cluster1;")
  529. cursor.execute("BEGIN;")
  530. cursor.execute("DECLARE c CURSOR FOR SUBSCRIBE t;")
  531. cursor.execute("FETCH ALL c;")
  532. # Restart environmentd to trigger yet another reconciliation on clusterd.
  533. c.kill("materialized")
  534. c.up("materialized")
  535. # Verify yet again that we can query the introspection source and now the table.
  536. # The subscribe should have been dropped during reconciliation, so we expect to not find a
  537. # frontier entry for it.
  538. c.testdrive(
  539. input=dedent(
  540. """
  541. > SET cluster = cluster1;
  542. > SELECT 1 FROM mz_introspection.mz_compute_frontiers_per_worker LIMIT 1;
  543. 1
  544. > SELECT * FROM t;
  545. 42
  546. """
  547. )
  548. )
  549. def workflow_test_github_4433(c: Composition) -> None:
  550. """
  551. Test that a reduce collation over a source with an invalid accumulation does not
  552. panic, but rather logs errors, when soft assertions are turned off.
  553. Regression test for https://github.com/MaterializeInc/database-issues/issues/4433.
  554. """
  555. c.down(destroy_volumes=True)
  556. with c.override(
  557. Clusterd(
  558. name="clusterd1",
  559. environment_extra=[
  560. "MZ_SOFT_ASSERTIONS=0",
  561. ],
  562. workers=2,
  563. ),
  564. Testdrive(no_reset=True),
  565. ):
  566. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  567. c.up("clusterd1")
  568. c.sql(
  569. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  570. port=6877,
  571. user="mz_system",
  572. )
  573. c.sql(
  574. "ALTER SYSTEM SET enable_repeat_row = true;",
  575. port=6877,
  576. user="mz_system",
  577. )
  578. # set up a test cluster and run a testdrive regression script
  579. c.sql(
  580. """
  581. CREATE CLUSTER cluster1 REPLICAS (
  582. r1 (
  583. STORAGECTL ADDRESSES ['clusterd1:2100'],
  584. STORAGE ADDRESSES ['clusterd1:2103'],
  585. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  586. COMPUTE ADDRESSES ['clusterd1:2102'],
  587. WORKERS 2
  588. )
  589. );
  590. -- Set data for test up.
  591. SET cluster = cluster1;
  592. CREATE TABLE base (data bigint, diff bigint);
  593. CREATE MATERIALIZED VIEW data AS SELECT data FROM base, repeat_row(diff);
  594. INSERT INTO base VALUES (1, 1);
  595. INSERT INTO base VALUES (1, -1), (1, -1);
  596. -- Create a materialized view to ensure non-monotonic rendering.
  597. -- Note that we employ below a query hint to hit the case of not yet
  598. -- generating a SQL-level error, given the partial fix to bucketed
  599. -- aggregates introduced in PR materialize#17918.
  600. CREATE MATERIALIZED VIEW sum_and_max AS
  601. SELECT SUM(data), MAX(data) FROM data OPTIONS (AGGREGATE INPUT GROUP SIZE = 1);
  602. """
  603. )
  604. c.testdrive(
  605. dedent(
  606. """
  607. > SET cluster = cluster1;
  608. # Run a query that would generate a panic before the fix.
  609. ! SELECT * FROM sum_and_max;
  610. contains:Non-positive accumulation in ReduceMinsMaxes
  611. """
  612. )
  613. )
  614. # ensure that an error was put into the logs
  615. c1 = c.invoke("logs", "clusterd1", capture=True)
  616. assert "Non-positive accumulation in ReduceMinsMaxes" in c1.stdout
  617. def workflow_test_github_4966(c: Composition) -> None:
  618. """
  619. Test that an accumulable reduction over a source with an invalid accumulation not only
  620. emits errors to the logs when soft assertions are turned off, but also produces a clean
  621. query-level error.
  622. Regression test for https://github.com/MaterializeInc/database-issues/issues/4966.
  623. """
  624. c.down(destroy_volumes=True)
  625. with c.override(
  626. Testdrive(no_reset=True),
  627. ):
  628. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  629. c.sql(
  630. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  631. port=6877,
  632. user="mz_system",
  633. )
  634. # set up a test cluster and run a testdrive regression script
  635. c.sql(
  636. """
  637. CREATE CLUSTER cluster1 REPLICAS (
  638. r1 (
  639. STORAGECTL ADDRESSES ['clusterd1:2100'],
  640. STORAGE ADDRESSES ['clusterd1:2103'],
  641. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  642. COMPUTE ADDRESSES ['clusterd1:2102'],
  643. WORKERS 2
  644. )
  645. );
  646. """
  647. )
  648. c.testdrive(
  649. dedent(
  650. """
  651. $[version>=5500] postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  652. ALTER SYSTEM SET enable_repeat_row = true;
  653. # Set data for test up
  654. > SET cluster = cluster1;
  655. > CREATE TABLE base (data float, diff bigint);
  656. > CREATE MATERIALIZED VIEW data AS SELECT data FROM base, repeat_row(diff);
  657. > INSERT INTO base VALUES (1.00, 1);
  658. > INSERT INTO base VALUES (1.01, -1);
  659. # The query below would not fail previously, but now should produce
  660. # a SQL-level error that is observable by users.
  661. ! SELECT SUM(data) FROM data;
  662. contains:Invalid data in source, saw net-zero records for key
  663. # It should be possible to fix the data in the source and make the error
  664. # go away.
  665. > INSERT INTO base VALUES (1.01, 1);
  666. > SELECT SUM(data) FROM data;
  667. 1
  668. """
  669. )
  670. )
  671. # ensure that an error was put into the logs
  672. c1 = c.invoke("logs", "clusterd1", capture=True)
  673. assert (
  674. "Net-zero records with non-zero accumulation in ReduceAccumulable"
  675. in c1.stdout
  676. )
  677. def workflow_test_github_5087(c: Composition) -> None:
  678. """
  679. Test that sum aggregations over uint2 and uint4 types do not produce a panic
  680. when soft assertions are turned off, but rather a SQL-level error when faced
  681. with invalid accumulations due to too many retractions in a source. Additionally,
  682. we verify that in these cases, an adequate error message is written to the logs.
  683. Regression test for https://github.com/MaterializeInc/database-issues/issues/5087.
  684. """
  685. c.down(destroy_volumes=True)
  686. with c.override(
  687. Testdrive(no_reset=True),
  688. ):
  689. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  690. c.sql(
  691. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  692. port=6877,
  693. user="mz_system",
  694. )
  695. c.sql(
  696. "ALTER SYSTEM SET enable_repeat_row = true;",
  697. port=6877,
  698. user="mz_system",
  699. )
  700. # set up a test cluster and run a testdrive regression script
  701. c.sql(
  702. """
  703. CREATE CLUSTER cluster1 REPLICAS (
  704. r1 (
  705. STORAGECTL ADDRESSES ['clusterd1:2100'],
  706. STORAGE ADDRESSES ['clusterd1:2103'],
  707. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  708. COMPUTE ADDRESSES ['clusterd1:2102'],
  709. WORKERS 2
  710. )
  711. );
  712. -- Set data for test up
  713. SET cluster = cluster1;
  714. CREATE TABLE base (data2 uint2, data4 uint4, data8 uint8, diff bigint);
  715. CREATE MATERIALIZED VIEW data AS
  716. SELECT data2, data4, data8
  717. FROM base, repeat_row(diff);
  718. CREATE MATERIALIZED VIEW sum_types AS
  719. SELECT SUM(data2) AS sum2, SUM(data4) AS sum4, SUM(data8) AS sum8
  720. FROM data;
  721. INSERT INTO base VALUES (1, 1, 1, 1);
  722. INSERT INTO base VALUES (1, 1, 1, -1), (1, 1, 1, -1);
  723. CREATE MATERIALIZED VIEW constant_sums AS
  724. SELECT SUM(data2) AS sum2, SUM(data4) AS sum4, SUM(data8) AS sum8
  725. FROM (
  726. SELECT * FROM (
  727. VALUES (1::uint2, 1::uint4, 1::uint8, 1),
  728. (1::uint2, 1::uint4, 1::uint8, -1),
  729. (1::uint2, 1::uint4, 1::uint8, -1)
  730. ) AS base (data2, data4, data8, diff),
  731. repeat_row(diff)
  732. );
  733. """
  734. )
  735. c.testdrive(
  736. dedent(
  737. """
  738. > SET cluster = cluster1;
  739. # Run a queries that would generate panics before the fix.
  740. ! SELECT SUM(data2) FROM data;
  741. contains:Invalid data in source, saw negative accumulation with unsigned type for key
  742. ! SELECT SUM(data4) FROM data;
  743. contains:Invalid data in source, saw negative accumulation with unsigned type for key
  744. ! SELECT * FROM constant_sums;
  745. contains:constant folding encountered reduce on collection with non-positive multiplicities
  746. # The following statement verifies that the behavior introduced in PR materialize#6122
  747. # is now rectified, i.e., instead of wrapping to a negative number, we produce
  748. # an error upon seeing invalid multiplicities.
  749. ! SELECT SUM(data8) FROM data;
  750. contains:Invalid data in source, saw negative accumulation with unsigned type for key
  751. # Test repairs
  752. > INSERT INTO base VALUES (1, 1, 1, 1), (1, 1, 1, 1);
  753. > SELECT SUM(data2) FROM data;
  754. 1
  755. > SELECT SUM(data4) FROM data;
  756. 1
  757. > SELECT SUM(data8) FROM data;
  758. 1
  759. # Ensure that the output types for uint sums are unaffected.
  760. > SELECT c.name, c.type
  761. FROM mz_materialized_views mv
  762. JOIN mz_columns c USING (id)
  763. WHERE mv.name = 'sum_types'
  764. ORDER BY c.type, c.name;
  765. sum8 numeric
  766. sum2 uint8
  767. sum4 uint8
  768. > SELECT c.name, c.type
  769. FROM mz_materialized_views mv
  770. JOIN mz_columns c USING (id)
  771. WHERE mv.name = 'constant_sums'
  772. ORDER BY c.type, c.name;
  773. sum8 numeric
  774. sum2 uint8
  775. sum4 uint8
  776. # Test wraparound behaviors
  777. > INSERT INTO base VALUES (1, 1, 1, -1);
  778. >[version<14000] INSERT INTO base VALUES (2, 2, 2, 9223372036854775807);
  779. >[version<14000] SELECT sum(data2) FROM data;
  780. 18446744073709551614
  781. >[version<14000] SELECT sum(data4) FROM data;
  782. 18446744073709551614
  783. >[version<14000] SELECT sum(data8) FROM data;
  784. 18446744073709551614
  785. > INSERT INTO base VALUES (1, 1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1);
  786. # This causes a panic starting with v0.140.0, but not before.
  787. >[version<14000] SELECT SUM(data2) FROM data;
  788. 1
  789. # This causes a panic starting with v0.140.0, but not before.
  790. >[version<14000] SELECT SUM(data4) FROM data;
  791. 1
  792. # This causes a panic starting with v0.140.0, but not before.
  793. >[version<14000] SELECT SUM(data8) FROM data;
  794. 18446744073709551617
  795. """
  796. )
  797. )
  798. # ensure that an error was put into the logs
  799. c1 = c.invoke("logs", "clusterd1", capture=True)
  800. assert "Invalid negative unsigned aggregation in ReduceAccumulable" in c1.stdout
  801. def workflow_test_github_5086(c: Composition) -> None:
  802. """
  803. Test that a bucketed hierarchical reduction over a source with an invalid accumulation produces
  804. a clean error when an arrangement hierarchy is built, in addition to logging an error, when soft
  805. assertions are turned off.
  806. This is a partial regression test for https://github.com/MaterializeInc/database-issues/issues/5086.
  807. The checks here are extended by opting into a smaller group size with a query hint (e.g.,
  808. OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)) in workflow test-github-4433. This scenario was
  809. initially not covered, but eventually got supported as well.
  810. """
  811. c.down(destroy_volumes=True)
  812. with c.override(
  813. Clusterd(
  814. name="clusterd1",
  815. environment_extra=[
  816. "MZ_SOFT_ASSERTIONS=0",
  817. ],
  818. workers=2,
  819. ),
  820. Testdrive(no_reset=True),
  821. ):
  822. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  823. c.sql(
  824. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  825. port=6877,
  826. user="mz_system",
  827. )
  828. c.sql(
  829. "ALTER SYSTEM SET enable_repeat_row = true;",
  830. port=6877,
  831. user="mz_system",
  832. )
  833. # set up a test cluster and run a testdrive regression script
  834. c.sql(
  835. """
  836. CREATE CLUSTER cluster1 REPLICAS (
  837. r1 (
  838. STORAGECTL ADDRESSES ['clusterd1:2100'],
  839. STORAGE ADDRESSES ['clusterd1:2103'],
  840. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  841. COMPUTE ADDRESSES ['clusterd1:2102'],
  842. WORKERS 2
  843. )
  844. );
  845. -- Set data for test up.
  846. SET cluster = cluster1;
  847. CREATE TABLE base (data bigint, diff bigint);
  848. CREATE MATERIALIZED VIEW data AS SELECT data FROM base, repeat_row(diff);
  849. INSERT INTO base VALUES (1, 1);
  850. INSERT INTO base VALUES (1, -1), (1, -1);
  851. -- Create materialized views to ensure non-monotonic rendering.
  852. CREATE MATERIALIZED VIEW max_data AS
  853. SELECT MAX(data) FROM data;
  854. CREATE MATERIALIZED VIEW max_group_by_data AS
  855. SELECT data, MAX(data) FROM data GROUP BY data;
  856. """
  857. )
  858. c.testdrive(
  859. dedent(
  860. """
  861. > SET cluster = cluster1;
  862. # The query below would return a null previously, but now fails cleanly.
  863. ! SELECT * FROM max_data;
  864. contains:Invalid data in source, saw non-positive accumulation for key
  865. ! SELECT * FROM max_group_by_data;
  866. contains:Invalid data in source, saw non-positive accumulation for key
  867. # Repairing the error must be possible.
  868. > INSERT INTO base VALUES (1, 2), (2, 1);
  869. > SELECT * FROM max_data;
  870. 2
  871. > SELECT * FROM max_group_by_data;
  872. 1 1
  873. 2 2
  874. """
  875. )
  876. )
  877. # ensure that an error was put into the logs
  878. c1 = c.invoke("logs", "clusterd1", capture=True)
  879. assert "Non-positive accumulation in MinsMaxesHierarchical" in c1.stdout
  880. assert "Negative accumulation in ReduceMinsMaxes" not in c1.stdout
  881. def workflow_test_github_5831(c: Composition) -> None:
  882. """
  883. Test that a monotonic one-shot SELECT will perform consolidation without error on valid data.
  884. We introduce data that results in a multiset and compute min/max. In a monotonic one-shot
  885. evaluation strategy, we must consolidate and subsequently assert monotonicity.
  886. This is a regression test for https://github.com/MaterializeInc/database-issues/issues/5831, where
  887. we observed a performance regression caused by a correctness issue. Here, we validate that the
  888. underlying correctness issue has been fixed.
  889. """
  890. c.down(destroy_volumes=True)
  891. with c.override(
  892. Clusterd(
  893. name="clusterd1",
  894. environment_extra=[
  895. "MZ_PERSIST_COMPACTION_DISABLED=true",
  896. ],
  897. workers=4,
  898. ),
  899. Testdrive(no_reset=True),
  900. ):
  901. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  902. c.sql(
  903. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  904. port=6877,
  905. user="mz_system",
  906. )
  907. c.sql(
  908. "ALTER SYSTEM SET enable_repeat_row = true;",
  909. port=6877,
  910. user="mz_system",
  911. )
  912. # set up a test cluster and run a testdrive regression script
  913. c.sql(
  914. """
  915. CREATE CLUSTER cluster1 REPLICAS (
  916. r1 (
  917. STORAGECTL ADDRESSES ['clusterd1:2100'],
  918. STORAGE ADDRESSES ['clusterd1:2103'],
  919. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  920. COMPUTE ADDRESSES ['clusterd1:2102'],
  921. WORKERS 4
  922. )
  923. );
  924. -- Set data for test up.
  925. SET cluster = cluster1;
  926. CREATE TABLE base (data bigint, diff bigint);
  927. CREATE MATERIALIZED VIEW data AS SELECT data FROM base, repeat_row(diff);
  928. INSERT INTO base VALUES (1, 6);
  929. INSERT INTO base VALUES (1, -3), (1, -2);
  930. INSERT INTO base VALUES (2, 3), (2, 2);
  931. INSERT INTO base VALUES (2, -1), (2, -1);
  932. INSERT INTO base VALUES (3, 3), (3, 2);
  933. INSERT INTO base VALUES (3, -3), (3, -2);
  934. INSERT INTO base VALUES (4, 1), (4, 2);
  935. INSERT INTO base VALUES (4, -1), (4, -2);
  936. INSERT INTO base VALUES (5, 5), (5, 6);
  937. INSERT INTO base VALUES (5, -5), (5, -6);
  938. """
  939. )
  940. c.testdrive(
  941. dedent(
  942. """
  943. > SET cluster = cluster1;
  944. # Computing min/max with a monotonic one-shot SELECT requires
  945. # consolidation. We test here that consolidation works correctly,
  946. # since we assert monotonicity right after consolidating.
  947. # Note that we employ a cursor to avoid testdrive retries.
  948. # Hash functions used for exchanges in consolidation may be
  949. # nondeterministic and produce the correct output by chance.
  950. > BEGIN
  951. > DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
  952. > FETCH ALL cur;
  953. 1 2
  954. > COMMIT;
  955. # To reduce the chance of a (un)lucky strike of the hash function,
  956. # let's do the same a few times.
  957. > BEGIN
  958. > DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
  959. > FETCH ALL cur;
  960. 1 2
  961. > COMMIT;
  962. > BEGIN
  963. > DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
  964. > FETCH ALL cur;
  965. 1 2
  966. > COMMIT;
  967. > BEGIN
  968. > DECLARE cur CURSOR FOR SELECT min(data), max(data) FROM data;
  969. > FETCH ALL cur;
  970. 1 2
  971. > COMMIT;
  972. """
  973. )
  974. )
  975. def workflow_test_single_time_monotonicity_enforcers(c: Composition) -> None:
  976. """
  977. Test that a monotonic one-shot SELECT where a single-time monotonicity enforcer is present
  978. can process a subsequent computation where consolidation can be turned off without error.
  979. We introduce data that results in a multiset, process these data with an enforcer, and then
  980. compute min/max subsequently. In a monotonic one-shot evaluation strategy, we can toggle the
  981. must_consolidate flag off for min/max due to the enforcer, but still use internally an
  982. ensure_monotonic operator to subsequently assert monotonicity. Note that Constant is already
  983. checked as an enforcer in test/transform/relax_must_consolidate.slt, so we focus on TopK,
  984. Reduce, Get, and Threshold here. This test conservatively employs cursors to avoid testdrive's
  985. behavior of performing repetitions to see if the output matches.
  986. """
  987. c.down(destroy_volumes=True)
  988. with c.override(
  989. Clusterd(
  990. name="clusterd1",
  991. environment_extra=[
  992. "MZ_PERSIST_COMPACTION_DISABLED=true",
  993. ],
  994. workers=4,
  995. ),
  996. Testdrive(no_reset=True),
  997. ):
  998. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  999. c.sql(
  1000. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1001. port=6877,
  1002. user="mz_system",
  1003. )
  1004. c.sql(
  1005. "ALTER SYSTEM SET enable_repeat_row = true;",
  1006. port=6877,
  1007. user="mz_system",
  1008. )
  1009. # set up a test cluster and run a testdrive regression script
  1010. c.sql(
  1011. """
  1012. CREATE CLUSTER cluster1 REPLICAS (
  1013. r1 (
  1014. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1015. STORAGE ADDRESSES ['clusterd1:2103'],
  1016. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1017. COMPUTE ADDRESSES ['clusterd1:2102'],
  1018. WORKERS 4
  1019. )
  1020. );
  1021. -- Set data for test up.
  1022. SET cluster = cluster1;
  1023. CREATE TABLE base (data bigint, diff bigint);
  1024. CREATE MATERIALIZED VIEW data AS SELECT data FROM base, repeat_row(diff);
  1025. INSERT INTO base VALUES (1, 6);
  1026. INSERT INTO base VALUES (1, -3), (1, -2);
  1027. INSERT INTO base VALUES (2, 3), (2, 2);
  1028. INSERT INTO base VALUES (2, -1), (2, -1);
  1029. INSERT INTO base VALUES (3, 3), (3, 2);
  1030. INSERT INTO base VALUES (3, -3), (3, -2);
  1031. INSERT INTO base VALUES (4, 1), (4, 2);
  1032. INSERT INTO base VALUES (4, -1), (4, -2);
  1033. INSERT INTO base VALUES (5, 5), (5, 6);
  1034. INSERT INTO base VALUES (5, -5), (5, -6);
  1035. """
  1036. )
  1037. c.testdrive(
  1038. dedent(
  1039. """
  1040. > SET cluster = cluster1;
  1041. # Check TopK as an enforcer
  1042. > BEGIN
  1043. > DECLARE cur CURSOR FOR
  1044. SELECT MIN(data), MAX(data)
  1045. FROM (SELECT data FROM data ORDER BY data LIMIT 5);
  1046. > FETCH ALL cur;
  1047. 1 2
  1048. > COMMIT;
  1049. # Check Get and Reduce as enforcers
  1050. > CREATE VIEW reduced_data AS
  1051. SELECT data % 2 AS evenodd, SUM(data) AS data
  1052. FROM data GROUP BY data % 2;
  1053. > BEGIN
  1054. > DECLARE cur CURSOR FOR
  1055. SELECT MIN(data), MAX(data)
  1056. FROM (
  1057. SELECT * FROM reduced_data WHERE evenodd + 1 = 1
  1058. UNION ALL
  1059. SELECT * FROM reduced_data WHERE data + 1 = 2);
  1060. > FETCH ALL cur;
  1061. 1 6
  1062. > COMMIT;
  1063. # Check Threshold as enforcer
  1064. > BEGIN
  1065. > DECLARE cur CURSOR FOR
  1066. SELECT MIN(data), MAX(data)
  1067. FROM (
  1068. SELECT * FROM data WHERE data % 2 = 0
  1069. EXCEPT ALL
  1070. SELECT * FROM data WHERE data + 1 = 2);
  1071. > FETCH ALL cur;
  1072. 2 2
  1073. > COMMIT;
  1074. """
  1075. )
  1076. )
  1077. def workflow_test_github_7645(c: Composition) -> None:
  1078. """Regression test for database-issues#7645"""
  1079. c.down(destroy_volumes=True)
  1080. with c.override(
  1081. Testdrive(no_reset=True, consistent_seed=True),
  1082. ):
  1083. c.up(
  1084. "materialized",
  1085. )
  1086. c.run_testdrive_files("github-7645/01-create-source.td")
  1087. latency = c.sql_query(
  1088. """
  1089. SELECT
  1090. (u.rehydration_latency)::text
  1091. FROM mz_sources s
  1092. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  1093. WHERE s.name IN ('count')
  1094. """
  1095. )[0][0]
  1096. c.kill("materialized")
  1097. c.up("materialized")
  1098. c.run_testdrive_files(
  1099. f"--var=rehydration-latency={latency}",
  1100. "github-7645/02-after-environmentd-restart.td",
  1101. )
  1102. def workflow_test_upsert(c: Composition) -> None:
  1103. """Test creating upsert sources and continuing to ingest them after a restart."""
  1104. with c.override(
  1105. Testdrive(default_timeout="30s", no_reset=True, consistent_seed=True),
  1106. ):
  1107. c.down(destroy_volumes=True)
  1108. c.up("materialized", "zookeeper", "kafka", "schema-registry")
  1109. c.run_testdrive_files("upsert/01-create-sources.td")
  1110. # Sleep to make sure the errors have made it to persist.
  1111. # This isn't necessary for correctness,
  1112. # as we should be able to crash at any point and re-start.
  1113. # But if we don't sleep here, then we might be ingesting the errored
  1114. # records in the new process, and so we won't actually be testing
  1115. # the ability to retract error values that make it to persist.
  1116. print("Sleeping for ten seconds")
  1117. time.sleep(10)
  1118. c.exec("materialized", "bash", "-c", "kill -9 `pidof clusterd`")
  1119. c.run_testdrive_files("upsert/02-after-clusterd-restart.td")
  1120. def workflow_test_remote_storage(c: Composition) -> None:
  1121. """Test creating sources in a remote clusterd process."""
  1122. c.down(destroy_volumes=True)
  1123. with c.override(
  1124. Testdrive(no_reset=True, consistent_seed=True),
  1125. Clusterd(
  1126. name="clusterd1",
  1127. workers=4,
  1128. process_names=["clusterd1", "clusterd2"],
  1129. ),
  1130. Clusterd(
  1131. name="clusterd2",
  1132. workers=4,
  1133. process_names=["clusterd1", "clusterd2"],
  1134. ),
  1135. ):
  1136. c.up(
  1137. "materialized",
  1138. "clusterd1",
  1139. "clusterd2",
  1140. "zookeeper",
  1141. "kafka",
  1142. "schema-registry",
  1143. )
  1144. c.run_testdrive_files("storage/01-create-sources.td")
  1145. c.kill("materialized")
  1146. c.up("materialized")
  1147. c.kill("clusterd1")
  1148. c.up("clusterd1")
  1149. c.up("clusterd2")
  1150. c.run_testdrive_files("storage/02-after-environmentd-restart.td")
  1151. # just kill one of the clusterd's and make sure we can recover.
  1152. # `clusterd2` will die on its own.
  1153. c.kill("clusterd1")
  1154. c.run_testdrive_files("storage/03-while-clusterd-down.td")
  1155. # Bring back both clusterd's
  1156. c.up("clusterd1")
  1157. c.up("clusterd2")
  1158. c.run_testdrive_files("storage/04-after-clusterd-restart.td")
  1159. def workflow_test_drop_quickstart_cluster(c: Composition) -> None:
  1160. """Test that the quickstart cluster can be dropped"""
  1161. c.down(destroy_volumes=True)
  1162. c.up("materialized")
  1163. c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
  1164. c.sql(
  1165. "CREATE CLUSTER quickstart REPLICAS (quickstart (SIZE '1'))",
  1166. user="mz_system",
  1167. port=6877,
  1168. )
  1169. def workflow_test_resource_limits(c: Composition) -> None:
  1170. """Test resource limits in Materialize."""
  1171. c.down(destroy_volumes=True)
  1172. with c.override(
  1173. Testdrive(),
  1174. Materialized(),
  1175. ):
  1176. c.up("materialized", "postgres")
  1177. c.run_testdrive_files("resources/resource-limits.td")
  1178. def workflow_pg_snapshot_resumption(c: Composition) -> None:
  1179. """Test PostgreSQL snapshot resumption."""
  1180. c.down(destroy_volumes=True)
  1181. with c.override(
  1182. # Start postgres for the pg source
  1183. Testdrive(no_reset=True),
  1184. Clusterd(
  1185. name="clusterd1",
  1186. environment_extra=["FAILPOINTS=pg_snapshot_failure=return"],
  1187. workers=4,
  1188. ),
  1189. ):
  1190. c.up("materialized", "postgres", "clusterd1")
  1191. c.run_testdrive_files("pg-snapshot-resumption/01-configure-postgres.td")
  1192. c.run_testdrive_files("pg-snapshot-resumption/02-create-sources.td")
  1193. c.run_testdrive_files("pg-snapshot-resumption/03-ensure-source-down.td")
  1194. # Temporarily disabled because it is timing out.
  1195. # TODO: Reenable when https://github.com/MaterializeInc/database-issues/issues/4145 is fixed
  1196. # # clusterd should crash
  1197. # c.run_testdrive_files("pg-snapshot-resumption/04-while-clusterd-down.td")
  1198. with c.override(
  1199. # turn off the failpoint
  1200. Clusterd(name="clusterd1", workers=4)
  1201. ):
  1202. c.up("clusterd1")
  1203. c.run_testdrive_files("pg-snapshot-resumption/05-verify-data.td")
  1204. def workflow_sink_failure(c: Composition) -> None:
  1205. """Test specific sink failure scenarios"""
  1206. c.down(destroy_volumes=True)
  1207. with c.override(
  1208. # Start postgres for the pg source
  1209. Testdrive(no_reset=True),
  1210. Clusterd(
  1211. name="clusterd1",
  1212. environment_extra=["FAILPOINTS=kafka_sink_creation_error=return"],
  1213. workers=4,
  1214. ),
  1215. ):
  1216. c.up("materialized", "zookeeper", "kafka", "schema-registry", "clusterd1")
  1217. c.run_testdrive_files("sink-failure/01-configure-sinks.td")
  1218. c.run_testdrive_files("sink-failure/02-ensure-sink-down.td")
  1219. with c.override(
  1220. # turn off the failpoint
  1221. Clusterd(name="clusterd1", workers=4)
  1222. ):
  1223. c.up("clusterd1")
  1224. c.run_testdrive_files("sink-failure/03-verify-data.td")
  1225. def workflow_test_bootstrap_vars(c: Composition) -> None:
  1226. """Test default system vars values passed with a CLI option."""
  1227. c.down(destroy_volumes=True)
  1228. with c.override(
  1229. Testdrive(no_reset=True),
  1230. Materialized(
  1231. options=[
  1232. "--system-parameter-default=allowed_cluster_replica_sizes='1', '2', 'oops'"
  1233. ],
  1234. ),
  1235. ):
  1236. c.up("materialized")
  1237. c.run_testdrive_files("resources/bootstrapped-system-vars.td")
  1238. with c.override(
  1239. Testdrive(no_reset=True),
  1240. Materialized(
  1241. additional_system_parameter_defaults={
  1242. "allowed_cluster_replica_sizes": "'1', '2', 'oops'"
  1243. },
  1244. ),
  1245. ):
  1246. c.up("materialized")
  1247. c.run_testdrive_files("resources/bootstrapped-system-vars.td")
  1248. def workflow_test_system_table_indexes(c: Composition) -> None:
  1249. """Test system table indexes."""
  1250. c.down(destroy_volumes=True)
  1251. with c.override(
  1252. Testdrive(),
  1253. Materialized(),
  1254. ):
  1255. c.up("materialized", {"name": "testdrive", "persistent": True})
  1256. c.testdrive(
  1257. input=dedent(
  1258. """
  1259. $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
  1260. SET CLUSTER TO DEFAULT;
  1261. CREATE VIEW v_mz_views AS SELECT \
  1262. id, \
  1263. oid, \
  1264. schema_id, \
  1265. name, \
  1266. definition, \
  1267. owner_id, \
  1268. privileges, \
  1269. create_sql, \
  1270. redacted_create_sql \
  1271. FROM mz_views;
  1272. CREATE DEFAULT INDEX ON v_mz_views;
  1273. > SELECT id FROM mz_indexes WHERE id like 'u%';
  1274. u2
  1275. """
  1276. )
  1277. )
  1278. c.kill("materialized")
  1279. with c.override(
  1280. Testdrive(no_reset=True),
  1281. Materialized(),
  1282. ):
  1283. c.up("materialized", {"name": "testdrive", "persistent": True})
  1284. c.testdrive(
  1285. input=dedent(
  1286. """
  1287. > SELECT id FROM mz_indexes WHERE id like 'u%';
  1288. u2
  1289. """
  1290. )
  1291. )
  1292. def workflow_test_replica_targeted_subscribe_abort(c: Composition) -> None:
  1293. """
  1294. Test that a replica-targeted SUBSCRIBE is aborted when the target
  1295. replica disconnects.
  1296. """
  1297. c.down(destroy_volumes=True)
  1298. c.up("materialized")
  1299. c.up("clusterd1")
  1300. c.up("clusterd2")
  1301. c.sql(
  1302. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1303. port=6877,
  1304. user="mz_system",
  1305. )
  1306. c.sql(
  1307. """
  1308. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  1309. CREATE CLUSTER cluster1 REPLICAS (
  1310. replica1 (
  1311. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1312. STORAGE ADDRESSES ['clusterd1:2103'],
  1313. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1314. COMPUTE ADDRESSES ['clusterd1:2102'],
  1315. WORKERS 2
  1316. ),
  1317. replica2 (
  1318. STORAGECTL ADDRESSES ['clusterd2:2100'],
  1319. STORAGE ADDRESSES ['clusterd2:2103'],
  1320. COMPUTECTL ADDRESSES ['clusterd2:2101'],
  1321. COMPUTE ADDRESSES ['clusterd2:2102'],
  1322. WORKERS 2
  1323. )
  1324. );
  1325. CREATE TABLE t (a int);
  1326. """
  1327. )
  1328. def drop_replica_with_delay() -> None:
  1329. time.sleep(2)
  1330. c.sql("DROP CLUSTER REPLICA cluster1.replica1;")
  1331. dropper = Thread(target=drop_replica_with_delay)
  1332. dropper.start()
  1333. try:
  1334. c.sql(
  1335. """
  1336. SET cluster = cluster1;
  1337. SET cluster_replica = replica1;
  1338. BEGIN;
  1339. DECLARE c CURSOR FOR SUBSCRIBE t;
  1340. FETCH c WITH (timeout = '5s');
  1341. """
  1342. )
  1343. except InternalError_ as e:
  1344. assert (
  1345. e.diag.message_primary
  1346. and "target replica failed or was dropped" in e.diag.message_primary
  1347. ), e
  1348. else:
  1349. raise RuntimeError("SUBSCRIBE didn't return the expected error")
  1350. dropper.join()
  1351. def kill_replica_with_delay() -> None:
  1352. time.sleep(2)
  1353. c.kill("clusterd2")
  1354. killer = Thread(target=kill_replica_with_delay)
  1355. killer.start()
  1356. try:
  1357. c.sql(
  1358. """
  1359. SET cluster = cluster1;
  1360. SET cluster_replica = replica2;
  1361. BEGIN;
  1362. DECLARE c CURSOR FOR SUBSCRIBE t;
  1363. FETCH c WITH (timeout = '5s');
  1364. """,
  1365. reuse_connection=False,
  1366. )
  1367. except InternalError_ as e:
  1368. assert (
  1369. e.diag.message_primary
  1370. and "target replica failed or was dropped" in e.diag.message_primary
  1371. ), e
  1372. else:
  1373. raise RuntimeError("SUBSCRIBE didn't return the expected error")
  1374. killer.join()
  1375. def workflow_test_replica_targeted_select_abort(c: Composition) -> None:
  1376. """
  1377. Test that a replica-targeted SELECT is aborted when the target
  1378. replica disconnects.
  1379. """
  1380. c.down(destroy_volumes=True)
  1381. c.up("materialized")
  1382. c.up("clusterd1")
  1383. c.up("clusterd2")
  1384. c.sql(
  1385. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1386. port=6877,
  1387. user="mz_system",
  1388. )
  1389. c.sql(
  1390. """
  1391. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  1392. CREATE CLUSTER cluster1 REPLICAS (
  1393. replica1 (
  1394. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1395. STORAGE ADDRESSES ['clusterd1:2103'],
  1396. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1397. COMPUTE ADDRESSES ['clusterd1:2102'],
  1398. WORKERS 2
  1399. ),
  1400. replica2 (
  1401. STORAGECTL ADDRESSES ['clusterd2:2100'],
  1402. STORAGE ADDRESSES ['clusterd2:2103'],
  1403. COMPUTECTL ADDRESSES ['clusterd2:2101'],
  1404. COMPUTE ADDRESSES ['clusterd2:2102'],
  1405. WORKERS 2
  1406. )
  1407. );
  1408. CREATE TABLE t (a int);
  1409. """
  1410. )
  1411. def drop_replica_with_delay() -> None:
  1412. time.sleep(2)
  1413. c.sql("DROP CLUSTER REPLICA cluster1.replica1;")
  1414. dropper = Thread(target=drop_replica_with_delay)
  1415. dropper.start()
  1416. try:
  1417. c.sql(
  1418. """
  1419. SET cluster = cluster1;
  1420. SET cluster_replica = replica1;
  1421. SELECT * FROM t AS OF 18446744073709551615;
  1422. """
  1423. )
  1424. except InternalError_ as e:
  1425. assert (
  1426. e.diag.message_primary
  1427. and "target replica failed or was dropped" in e.diag.message_primary
  1428. ), e
  1429. else:
  1430. raise RuntimeError("SELECT didn't return the expected error")
  1431. dropper.join()
  1432. def kill_replica_with_delay() -> None:
  1433. time.sleep(2)
  1434. c.kill("clusterd2")
  1435. killer = Thread(target=kill_replica_with_delay)
  1436. killer.start()
  1437. try:
  1438. c.sql(
  1439. """
  1440. SET cluster = cluster1;
  1441. SET cluster_replica = replica2;
  1442. SELECT * FROM t AS OF 18446744073709551615;
  1443. """
  1444. )
  1445. except InternalError_ as e:
  1446. assert (
  1447. e.diag.message_primary
  1448. and "target replica failed or was dropped" in e.diag.message_primary
  1449. ), e
  1450. else:
  1451. raise RuntimeError("SELECT didn't return the expected error")
  1452. killer.join()
  1453. def workflow_test_compute_reconciliation_reuse(c: Composition) -> None:
  1454. """
  1455. Test that compute reconciliation reuses existing dataflows.
  1456. """
  1457. c.down(destroy_volumes=True)
  1458. with c.override(
  1459. Clusterd(name="clusterd1", workers=1),
  1460. Clusterd(name="clusterd2", workers=1),
  1461. ):
  1462. c.up("materialized", "clusterd1", "clusterd2")
  1463. c.sql(
  1464. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1465. port=6877,
  1466. user="mz_system",
  1467. )
  1468. # Helper function to get reconciliation metrics for clusterd.
  1469. def fetch_reconciliation_metrics(process: str) -> tuple[int, int]:
  1470. metrics = c.exec(
  1471. process, "curl", "localhost:6878/metrics", capture=True
  1472. ).stdout
  1473. reused = 0
  1474. replaced = 0
  1475. for metric in metrics.splitlines():
  1476. if metric.startswith(
  1477. "mz_compute_reconciliation_reused_dataflows_count_total"
  1478. ):
  1479. reused += int(metric.split()[1])
  1480. elif metric.startswith(
  1481. "mz_compute_reconciliation_replaced_dataflows_count_total"
  1482. ):
  1483. replaced += int(metric.split()[1])
  1484. return reused, replaced
  1485. # Run a slow-path SELECT to allocate a transient ID. This ensures that
  1486. # after the restart dataflows get different internal transient IDs
  1487. # assigned, which is something we want reconciliation to be able to handle.
  1488. c.sql("SELECT * FROM mz_views JOIN mz_indexes USING (id)")
  1489. # Set up a cluster and a number of dataflows that can be reconciled.
  1490. c.sql(
  1491. """
  1492. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  1493. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1494. STORAGE ADDRESSES ['clusterd1:2103'],
  1495. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1496. COMPUTE ADDRESSES ['clusterd1:2102'],
  1497. WORKERS 1
  1498. ));
  1499. SET cluster = cluster1;
  1500. -- index on table
  1501. CREATE TABLE t1 (a int);
  1502. CREATE DEFAULT INDEX ON t1;
  1503. -- index on index
  1504. CREATE VIEW v1 AS SELECT a + 1 AS a FROM t1;
  1505. CREATE DEFAULT INDEX ON v1;
  1506. -- index on index on index
  1507. CREATE VIEW v2 AS SELECT a + 1 AS a FROM v1;
  1508. CREATE DEFAULT INDEX ON v2;
  1509. -- materialized view on table
  1510. CREATE TABLE t2 (a int);
  1511. CREATE MATERIALIZED VIEW mv1 AS SELECT a + 1 AS a FROM t2;
  1512. -- materialized view on index
  1513. CREATE MATERIALIZED VIEW mv2 AS SELECT a + 1 AS a FROM t1;
  1514. -- materialized view on index on index
  1515. CREATE MATERIALIZED VIEW mv3 AS SELECT a + 1 AS a FROM v1;
  1516. -- materialized view on index on index on index
  1517. CREATE MATERIALIZED VIEW mv4 AS SELECT a + 1 AS a FROM v2;
  1518. -- REFRESH materialized view on table
  1519. CREATE MATERIALIZED VIEW rmv1 WITH (REFRESH EVERY '1m') AS SELECT a + 1 AS a FROM t2;
  1520. -- REFRESH materialized view on index
  1521. CREATE MATERIALIZED VIEW rmv2 WITH (REFRESH EVERY '1m') AS SELECT a + 1 AS a FROM t1;
  1522. -- REFRESH materialized view on index on index
  1523. CREATE MATERIALIZED VIEW rmv3 WITH (REFRESH EVERY '1m') AS SELECT a + 1 AS a FROM v1;
  1524. -- REFRESH materialized view on index on index on index
  1525. CREATE MATERIALIZED VIEW rmv4 WITH (REFRESH EVERY '1m') AS SELECT a + 1 AS a FROM v2;
  1526. -- REFRESH materialized view on materialized view
  1527. CREATE MATERIALIZED VIEW rmv5 WITH (REFRESH EVERY '1m') AS SELECT a + 1 AS a FROM mv1;
  1528. -- REFRESH materialized view on REFRESH materialized view
  1529. CREATE MATERIALIZED VIEW rmv6 WITH (REFRESH EVERY '1m') AS SELECT a + 1 AS a FROM rmv1;
  1530. -- materialized view on REFRESH materialized view
  1531. CREATE MATERIALIZED VIEW mv5 AS SELECT a + 1 AS a FROM rmv1;
  1532. -- index on REFRESH materialized view
  1533. CREATE DEFAULT INDEX ON rmv1;
  1534. """
  1535. )
  1536. # Replace the `mz_catalog_server` replica with an unorchestrated one so we
  1537. # can test reconciliation of system indexes too.
  1538. c.sql(
  1539. """
  1540. ALTER CLUSTER mz_catalog_server SET (MANAGED = false);
  1541. DROP CLUSTER REPLICA mz_catalog_server.r1;
  1542. CREATE CLUSTER REPLICA mz_catalog_server.r1 (
  1543. STORAGECTL ADDRESSES ['clusterd2:2100'],
  1544. STORAGE ADDRESSES ['clusterd2:2103'],
  1545. COMPUTECTL ADDRESSES ['clusterd2:2101'],
  1546. COMPUTE ADDRESSES ['clusterd2:2102'],
  1547. WORKERS 1
  1548. );
  1549. """,
  1550. port=6877,
  1551. user="mz_system",
  1552. )
  1553. # Give the dataflows some time to make progress and get compacted.
  1554. # This is done to trigger the bug described in database-issues#5113.
  1555. time.sleep(10)
  1556. # Restart environmentd to trigger a reconciliation.
  1557. c.kill("materialized")
  1558. c.up("materialized")
  1559. # Perform queries to ensure reconciliation has finished.
  1560. c.sql(
  1561. """
  1562. SET cluster = cluster1;
  1563. SELECT * FROM v1; -- cluster1
  1564. SHOW INDEXES; -- mz_catalog_server
  1565. """
  1566. )
  1567. reused, replaced = fetch_reconciliation_metrics("clusterd1")
  1568. assert reused == 15 and replaced == 0, f"{reused=}, {replaced=}"
  1569. reused, replaced = fetch_reconciliation_metrics("clusterd2")
  1570. assert reused > 10 and replaced == 0, f"{reused=}, {replaced=}"
  1571. def workflow_test_compute_reconciliation_replace(c: Composition) -> None:
  1572. """
  1573. Test that compute reconciliation replaces changed dataflows, as well as
  1574. dataflows transitively depending on them.
  1575. Regression test for database-issues#8444.
  1576. """
  1577. c.down(destroy_volumes=True)
  1578. with c.override(
  1579. Clusterd(name="clusterd1", workers=1),
  1580. ):
  1581. c.up("materialized", "clusterd1")
  1582. c.sql(
  1583. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1584. port=6877,
  1585. user="mz_system",
  1586. )
  1587. # Helper function to get reconciliation metrics for clusterd.
  1588. def fetch_reconciliation_metrics(process: str) -> tuple[int, int]:
  1589. metrics = c.exec(
  1590. process, "curl", "localhost:6878/metrics", capture=True
  1591. ).stdout
  1592. reused = 0
  1593. replaced = 0
  1594. for metric in metrics.splitlines():
  1595. if metric.startswith(
  1596. "mz_compute_reconciliation_reused_dataflows_count_total"
  1597. ):
  1598. reused += int(metric.split()[1])
  1599. elif metric.startswith(
  1600. "mz_compute_reconciliation_replaced_dataflows_count_total"
  1601. ):
  1602. replaced += int(metric.split()[1])
  1603. return reused, replaced
  1604. # Set up a cluster and a number of dataflows that can be reconciled.
  1605. c.sql(
  1606. """
  1607. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  1608. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1609. STORAGE ADDRESSES ['clusterd1:2103'],
  1610. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1611. COMPUTE ADDRESSES ['clusterd1:2102'],
  1612. WORKERS 1
  1613. ));
  1614. SET cluster = cluster1;
  1615. CREATE TABLE t (a int);
  1616. CREATE INDEX idx ON t (a);
  1617. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  1618. CREATE VIEW v1 AS SELECT a + 1 AS b FROM t;
  1619. CREATE INDEX idx1 ON v1 (b);
  1620. CREATE VIEW v2 AS SELECT b + 1 AS c FROM v1;
  1621. CREATE INDEX idx2 ON v2 (c);
  1622. CREATE VIEW v3 AS SELECT c + 1 AS d FROM v2;
  1623. CREATE INDEX idx3 ON v3 (d);
  1624. SELECT * FROM v3;
  1625. """
  1626. )
  1627. # Drop the index on the base table. This will change the plan of `mv1` the
  1628. # next time it is replanned, which should cause reconciliation to replace
  1629. # it, as well as the other dataflows that depend on `mv1`.
  1630. c.sql("DROP INDEX idx")
  1631. # Restart environmentd to trigger a replanning and reconciliation.
  1632. c.kill("materialized")
  1633. c.up("materialized")
  1634. # Perform queries to ensure reconciliation has finished.
  1635. c.sql(
  1636. """
  1637. SET cluster = cluster1;
  1638. SELECT * FROM v3;
  1639. """
  1640. )
  1641. reused, replaced = fetch_reconciliation_metrics("clusterd1")
  1642. assert reused == 0 and replaced == 4, f"{reused=}, {replaced=}"
  1643. def workflow_test_compute_reconciliation_no_errors(c: Composition) -> None:
  1644. """
  1645. Test that no errors are logged during or after compute
  1646. reconciliation.
  1647. This is generally useful to find unknown issues, and specifically
  1648. to verify that replicas don't send unexpected compute responses
  1649. in the process of reconciliation.
  1650. """
  1651. c.down(destroy_volumes=True)
  1652. c.up("materialized")
  1653. c.up("clusterd1")
  1654. c.sql(
  1655. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1656. port=6877,
  1657. user="mz_system",
  1658. )
  1659. # Set up a cluster and a number of dataflows that can be reconciled.
  1660. c.sql(
  1661. """
  1662. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  1663. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1664. STORAGE ADDRESSES ['clusterd1:2103'],
  1665. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1666. COMPUTE ADDRESSES ['clusterd1:2102'],
  1667. WORKERS 2
  1668. ));
  1669. SET cluster = cluster1;
  1670. -- index on table
  1671. CREATE TABLE t1 (a int);
  1672. CREATE DEFAULT INDEX on t1;
  1673. -- index on view
  1674. CREATE VIEW v AS SELECT a + 1 FROM t1;
  1675. CREATE DEFAULT INDEX on v;
  1676. -- materialized view on table
  1677. CREATE TABLE t2 (a int);
  1678. CREATE MATERIALIZED VIEW mv1 AS SELECT a + 1 FROM t2;
  1679. -- materialized view on index
  1680. CREATE MATERIALIZED VIEW mv2 AS SELECT a + 1 FROM t1;
  1681. """
  1682. )
  1683. # Set up a subscribe dataflow that will be dropped during reconciliation.
  1684. cursor = c.sql_cursor()
  1685. cursor.execute("SET cluster = cluster1")
  1686. cursor.execute("INSERT INTO t1 VALUES (1)")
  1687. cursor.execute("BEGIN")
  1688. cursor.execute("DECLARE c CURSOR FOR SUBSCRIBE t1")
  1689. cursor.execute("FETCH 1 c")
  1690. # Perform a query to ensure dataflows have been installed.
  1691. c.sql(
  1692. """
  1693. SET cluster = cluster1;
  1694. SELECT * FROM t1, v, mv1, mv2;
  1695. """
  1696. )
  1697. # We don't have much control over compute reconciliation from here. We
  1698. # drop a dataflow and immediately kill environmentd, in hopes of maybe
  1699. # provoking an interesting race that way.
  1700. c.sql("DROP MATERIALIZED VIEW mv2")
  1701. # Restart environmentd to trigger a reconciliation.
  1702. c.kill("materialized")
  1703. c.up("materialized")
  1704. # Perform a query to ensure reconciliation has finished.
  1705. c.sql(
  1706. """
  1707. SET cluster = cluster1;
  1708. SELECT * FROM v;
  1709. """
  1710. )
  1711. # Verify the absence of logged errors.
  1712. for service in ("materialized", "clusterd1"):
  1713. p = c.invoke("logs", service, capture=True)
  1714. for line in p.stdout.splitlines():
  1715. assert " ERROR " not in line, f"found ERROR in service {service}: {line}"
  1716. def workflow_test_drop_during_reconciliation(c: Composition) -> None:
  1717. """
  1718. Test that dropping storage and compute objects during reconciliation works.
  1719. Regression test for database-issues#8399.
  1720. """
  1721. c.down(destroy_volumes=True)
  1722. with c.override(
  1723. Materialized(
  1724. additional_system_parameter_defaults={
  1725. "unsafe_enable_unsafe_functions": "true",
  1726. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  1727. },
  1728. ),
  1729. Clusterd(
  1730. name="clusterd1",
  1731. environment_extra=[
  1732. # Disable GRPC host checking. We are connecting through a
  1733. # proxy, so the host in the request URI doesn't match
  1734. # clusterd's fqdn.
  1735. "CLUSTERD_GRPC_HOST=",
  1736. ],
  1737. ),
  1738. Testdrive(
  1739. no_reset=True,
  1740. default_timeout="30s",
  1741. ),
  1742. ):
  1743. c.up(
  1744. "materialized",
  1745. "clusterd1",
  1746. "toxiproxy",
  1747. {"name": "testdrive", "persistent": True},
  1748. )
  1749. # Set up toxi-proxies for clusterd GRPC endpoints.
  1750. toxi_url = "http://toxiproxy:8474/proxies"
  1751. for port in (2100, 2101):
  1752. c.testdrive(
  1753. dedent(
  1754. f"""
  1755. $ http-request method=POST url={toxi_url} content-type=application/json
  1756. {{
  1757. "name": "clusterd_{port}",
  1758. "listen": "0.0.0.0:{port}",
  1759. "upstream": "clusterd1:{port}"
  1760. }}
  1761. """
  1762. )
  1763. )
  1764. # Set up a cluster with storage and compute objects that can be dropped
  1765. # during reconciliation.
  1766. c.sql(
  1767. """
  1768. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  1769. STORAGECTL ADDRESSES ['toxiproxy:2100'],
  1770. STORAGE ADDRESSES ['clusterd1:2103'],
  1771. COMPUTECTL ADDRESSES ['toxiproxy:2101'],
  1772. COMPUTE ADDRESSES ['clusterd1:2102'],
  1773. WORKERS 1
  1774. ));
  1775. SET cluster = cluster1;
  1776. CREATE SOURCE s FROM LOAD GENERATOR COUNTER;
  1777. CREATE DEFAULT INDEX on s;
  1778. CREATE MATERIALIZED VIEW mv AS SELECT * FROM s;
  1779. """
  1780. )
  1781. # Wait for objects to be installed on the cluster.
  1782. c.sql("SELECT * FROM mv")
  1783. # Sever the connection between envd and clusterd.
  1784. for port in (2100, 2101):
  1785. c.testdrive(
  1786. dedent(
  1787. f"""
  1788. $ http-request method=POST url={toxi_url}/clusterd_{port} content-type=application/json
  1789. {{"enabled": false}}
  1790. """
  1791. )
  1792. )
  1793. # Drop all objects installed on the cluster.
  1794. c.sql("DROP SOURCE s CASCADE")
  1795. # Restore the connection between envd and clusterd, causing a
  1796. # reconciliation.
  1797. for port in (2100, 2101):
  1798. c.testdrive(
  1799. dedent(
  1800. f"""
  1801. $ http-request method=POST url={toxi_url}/clusterd_{port} content-type=application/json
  1802. {{"enabled": true}}
  1803. """
  1804. )
  1805. )
  1806. # Confirm the cluster is still healthy and the compute objects have
  1807. # been dropped. We can't verify the dropping of storage objects due to
  1808. # the lack of introspection for storage dataflows.
  1809. c.testdrive(
  1810. dedent(
  1811. """
  1812. > SET cluster = cluster1;
  1813. > SELECT * FROM mz_introspection.mz_compute_exports WHERE export_id LIKE 'u%';
  1814. """
  1815. )
  1816. )
  1817. def workflow_test_mz_subscriptions(c: Composition) -> None:
  1818. """
  1819. Test that in-progress subscriptions are reflected in
  1820. mz_subscriptions.
  1821. """
  1822. c.down(destroy_volumes=True)
  1823. c.up("materialized", "clusterd1")
  1824. c.sql(
  1825. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1826. port=6877,
  1827. user="mz_system",
  1828. )
  1829. c.sql(
  1830. """
  1831. CREATE CLUSTER cluster1 REPLICAS (r (
  1832. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1833. STORAGE ADDRESSES ['clusterd1:2103'],
  1834. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1835. COMPUTE ADDRESSES ['clusterd1:2102'],
  1836. WORKERS 2
  1837. ));
  1838. CREATE TABLE t1 (a int);
  1839. CREATE TABLE t2 (a int);
  1840. CREATE TABLE t3 (a int);
  1841. INSERT INTO t1 VALUES (1);
  1842. INSERT INTO t2 VALUES (1);
  1843. INSERT INTO t3 VALUES (1);
  1844. """
  1845. )
  1846. def start_subscribe(table: str, cluster: str) -> Cursor:
  1847. """Start a subscribe on the given table and cluster."""
  1848. cursor = c.sql_cursor()
  1849. cursor.execute(f"SET cluster = {cluster}".encode())
  1850. cursor.execute("BEGIN")
  1851. cursor.execute(f"DECLARE c CURSOR FOR SUBSCRIBE {table}".encode())
  1852. cursor.execute("FETCH 1 c")
  1853. return cursor
  1854. def stop_subscribe(cursor: Cursor) -> None:
  1855. """Stop a susbscribe started with `start_subscribe`."""
  1856. cursor.execute("ROLLBACK")
  1857. def check_mz_subscriptions(expected: list) -> None:
  1858. """
  1859. Check that the expected subscribes exist in mz_subscriptions.
  1860. We identify subscribes by user, cluster, and target table only.
  1861. We explicitly don't check the `GlobalId`, as how that is
  1862. allocated is an implementation detail and might change in the
  1863. future.
  1864. """
  1865. output = c.sql_query(
  1866. """
  1867. SELECT r.name, c.name, t.name
  1868. FROM mz_internal.mz_subscriptions s
  1869. JOIN mz_internal.mz_sessions e ON (e.id = s.session_id)
  1870. JOIN mz_roles r ON (r.id = e.role_id)
  1871. JOIN mz_clusters c ON (c.id = s.cluster_id)
  1872. JOIN mz_tables t ON (t.id = s.referenced_object_ids[1])
  1873. ORDER BY s.created_at
  1874. """
  1875. )
  1876. assert output == expected, f"expected: {expected}, got: {output}"
  1877. subscribe1 = start_subscribe("t1", "quickstart")
  1878. check_mz_subscriptions([("materialize", "quickstart", "t1")])
  1879. subscribe2 = start_subscribe("t2", "cluster1")
  1880. check_mz_subscriptions(
  1881. [
  1882. ("materialize", "quickstart", "t1"),
  1883. ("materialize", "cluster1", "t2"),
  1884. ]
  1885. )
  1886. stop_subscribe(subscribe1)
  1887. check_mz_subscriptions([("materialize", "cluster1", "t2")])
  1888. subscribe3 = start_subscribe("t3", "quickstart")
  1889. check_mz_subscriptions(
  1890. [
  1891. ("materialize", "cluster1", "t2"),
  1892. ("materialize", "quickstart", "t3"),
  1893. ]
  1894. )
  1895. stop_subscribe(subscribe3)
  1896. check_mz_subscriptions([("materialize", "cluster1", "t2")])
  1897. stop_subscribe(subscribe2)
  1898. check_mz_subscriptions([])
  1899. def workflow_test_mv_source_sink(c: Composition) -> None:
  1900. """
  1901. Test that compute materialized view's "since" timestamp is at least as large as source table's "since" timestamp.
  1902. Regression test for https://github.com/MaterializeInc/database-issues/issues/5676
  1903. """
  1904. c.down(destroy_volumes=True)
  1905. c.up("materialized")
  1906. c.up("clusterd1")
  1907. c.sql(
  1908. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  1909. port=6877,
  1910. user="mz_system",
  1911. )
  1912. # Set up a dataflow on clusterd.
  1913. c.sql(
  1914. """
  1915. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  1916. STORAGECTL ADDRESSES ['clusterd1:2100'],
  1917. STORAGE ADDRESSES ['clusterd1:2103'],
  1918. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  1919. COMPUTE ADDRESSES ['clusterd1:2102'],
  1920. WORKERS 2
  1921. ));
  1922. SET cluster = cluster1;
  1923. """
  1924. )
  1925. def extract_since_ts(output: str) -> int:
  1926. j = json.loads(output)
  1927. (since,) = j["determination"]["since"]["elements"]
  1928. return int(since)
  1929. cursor = c.sql_cursor()
  1930. cursor.execute("CREATE TABLE t (a int)")
  1931. # Verify that there are no empty frontiers.
  1932. cursor.execute("EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM t")
  1933. t_since = extract_since_ts(cursor.fetchall()[0][0])
  1934. cursor.execute("CREATE MATERIALIZED VIEW mv AS SELECT * FROM t")
  1935. cursor.execute("EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM mv")
  1936. mv_since = extract_since_ts(cursor.fetchall()[0][0])
  1937. assert (
  1938. mv_since >= t_since
  1939. ), f'"since" timestamp of mv ({mv_since}) is less than "since" timestamp of its source table ({t_since})'
  1940. def workflow_test_query_without_default_cluster(c: Composition) -> None:
  1941. """Test queries without a default cluster in Materialize."""
  1942. c.down(destroy_volumes=True)
  1943. with c.override(
  1944. Testdrive(),
  1945. Materialized(),
  1946. ):
  1947. c.up("materialized", "postgres")
  1948. c.run_testdrive_files(
  1949. "query-without-default-cluster/query-without-default-cluster.td",
  1950. )
  1951. def workflow_test_clusterd_death_detection(c: Composition) -> None:
  1952. """
  1953. Test that environmentd notices when a clusterd becomes disconnected.
  1954. Regression test for https://github.com/MaterializeInc/database-issues/issues/6095
  1955. """
  1956. c.down(destroy_volumes=True)
  1957. with c.override(
  1958. Clusterd(
  1959. name="clusterd1",
  1960. environment_extra=[
  1961. # Disable GRPC host checking. We are connecting through a
  1962. # proxy, so the host in the request URI doesn't match
  1963. # clusterd's fqdn.
  1964. "CLUSTERD_GRPC_HOST=",
  1965. ],
  1966. ),
  1967. ):
  1968. c.up(
  1969. "materialized",
  1970. "clusterd1",
  1971. "toxiproxy",
  1972. {"name": "testdrive", "persistent": True},
  1973. )
  1974. c.testdrive(
  1975. input=dedent(
  1976. """
  1977. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  1978. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  1979. $ http-request method=POST url=http://toxiproxy:8474/proxies content-type=application/json
  1980. {
  1981. "name": "clusterd1",
  1982. "listen": "0.0.0.0:2100",
  1983. "upstream": "clusterd1:2100",
  1984. "enabled": true
  1985. }
  1986. $ http-request method=POST url=http://toxiproxy:8474/proxies content-type=application/json
  1987. {
  1988. "name": "clusterd2",
  1989. "listen": "0.0.0.0:2101",
  1990. "upstream": "clusterd1:2101",
  1991. "enabled": true
  1992. }
  1993. > CREATE CLUSTER cluster1 REPLICAS (replica1 (
  1994. STORAGECTL ADDRESSES ['toxiproxy:2100'],
  1995. STORAGE ADDRESSES ['clusterd1:2103'],
  1996. COMPUTECTL ADDRESSES ['toxiproxy:2101'],
  1997. COMPUTE ADDRESSES ['clusterd1:2102'],
  1998. WORKERS 1));
  1999. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  2000. $ http-request method=POST url=http://toxiproxy:8474/proxies/clusterd1/toxics content-type=application/json
  2001. {
  2002. "name": "clusterd1",
  2003. "type": "timeout",
  2004. "attributes": {"timeout": 0}
  2005. }
  2006. $ http-request method=POST url=http://toxiproxy:8474/proxies/clusterd2/toxics content-type=application/json
  2007. {
  2008. "name": "clusterd2",
  2009. "type": "timeout",
  2010. "attributes": {"timeout": 0}
  2011. }
  2012. """
  2013. )
  2014. )
  2015. # Should detect broken connection after a few seconds, works with c.kill("clusterd1")
  2016. time.sleep(10)
  2017. envd = c.invoke("logs", "materialized", capture=True)
  2018. assert (
  2019. "error reading a body from connection: stream closed because of a broken pipe"
  2020. in envd.stdout
  2021. )
  2022. class Metrics:
  2023. metrics: dict[str, str]
  2024. def __init__(self, raw: str) -> None:
  2025. self.metrics = {}
  2026. for line in raw.splitlines():
  2027. key, value = line.split(maxsplit=1)
  2028. self.metrics[key] = value
  2029. def for_instance(self, id: str) -> "Metrics":
  2030. new = copy(self)
  2031. new.metrics = {
  2032. k: v for k, v in self.metrics.items() if f'instance_id="{id}"' in k
  2033. }
  2034. return new
  2035. def with_name(self, metric_name: str) -> dict[str, float]:
  2036. items = {}
  2037. for key, value in self.metrics.items():
  2038. if key.startswith(metric_name):
  2039. items[key] = float(value)
  2040. return items
  2041. def get_value(self, metric_name: str) -> float:
  2042. metrics = self.with_name(metric_name)
  2043. values = list(metrics.values())
  2044. assert len(values) == 1
  2045. return values[0]
  2046. def get_summed_value(self, metric_name: str) -> float:
  2047. metrics = self.with_name(metric_name)
  2048. return sum(metrics.values())
  2049. def get_command_count(self, metric: str, command_type: str) -> float:
  2050. metrics = self.with_name(metric)
  2051. values = [
  2052. v for k, v in metrics.items() if f'command_type="{command_type}"' in k
  2053. ]
  2054. assert len(values) == 1
  2055. return values[0]
  2056. def get_response_count(self, metric: str, response_type: str) -> float:
  2057. metrics = self.with_name(metric)
  2058. values = [
  2059. v for k, v in metrics.items() if f'response_type="{response_type}"' in k
  2060. ]
  2061. assert len(values) == 1
  2062. return values[0]
  2063. def get_replica_history_command_count(self, command_type: str) -> float:
  2064. return self.get_command_count(
  2065. "mz_compute_replica_history_command_count", command_type
  2066. )
  2067. def get_compute_controller_history_command_count(self, command_type: str) -> float:
  2068. return self.get_command_count(
  2069. "mz_compute_controller_history_command_count", command_type
  2070. )
  2071. def get_storage_controller_history_command_count(self, command_type: str) -> float:
  2072. return self.get_command_count(
  2073. "mz_storage_controller_history_command_count", command_type
  2074. )
  2075. def get_commands_total(self, command_type: str) -> float:
  2076. return self.get_command_count("mz_compute_commands_total", command_type)
  2077. def get_command_bytes_total(self, command_type: str) -> float:
  2078. return self.get_command_count(
  2079. "mz_compute_command_message_bytes_total", command_type
  2080. )
  2081. def get_responses_total(self, response_type: str) -> float:
  2082. return self.get_response_count("mz_compute_responses_total", response_type)
  2083. def get_response_bytes_total(self, response_type: str) -> float:
  2084. return self.get_response_count(
  2085. "mz_compute_response_message_bytes_total", response_type
  2086. )
  2087. def get_peeks_total(self, result: str) -> float:
  2088. metrics = self.with_name("mz_compute_peeks_total")
  2089. values = [v for k, v in metrics.items() if f'result="{result}"' in k]
  2090. assert len(values) == 1
  2091. return values[0]
  2092. def get_wallclock_lag_count(self, collection_id: str) -> float | None:
  2093. metrics = self.with_name("mz_dataflow_wallclock_lag_seconds_count")
  2094. values = [
  2095. v for k, v in metrics.items() if f'collection_id="{collection_id}"' in k
  2096. ]
  2097. assert len(values) <= 1
  2098. return next(iter(values), None)
  2099. def get_e2e_optimization_time(self, object_type: str) -> float:
  2100. metrics = self.with_name("mz_optimizer_e2e_optimization_time_seconds_sum")
  2101. values = [v for k, v in metrics.items() if f'object_type="{object_type}"' in k]
  2102. assert len(values) == 1
  2103. return values[0]
  2104. def get_pgwire_message_processing_seconds(self, message_type: str) -> float:
  2105. metrics = self.with_name("mz_pgwire_message_processing_seconds_sum")
  2106. values = [
  2107. v for k, v in metrics.items() if f'message_type="{message_type}"' in k
  2108. ]
  2109. assert len(values) == 1
  2110. return values[0]
  2111. def get_result_rows_first_to_last_byte_seconds(self, statement_type: str) -> float:
  2112. metrics = self.with_name("mz_result_rows_first_to_last_byte_seconds_sum")
  2113. values = [
  2114. v for k, v in metrics.items() if f'statement_type="{statement_type}"' in k
  2115. ]
  2116. assert len(values) == 1
  2117. return values[0]
  2118. def get_last_command_received(self, server_name: str) -> float:
  2119. metrics = self.with_name("mz_grpc_server_last_command_received")
  2120. values = [v for k, v in metrics.items() if server_name in k]
  2121. assert len(values) == 1
  2122. return values[0]
  2123. def get_compute_collection_count(self, type_: str, hydrated: str) -> float:
  2124. metrics = self.with_name("mz_compute_collection_count")
  2125. values = [
  2126. v
  2127. for k, v in metrics.items()
  2128. if f'type="{type_}"' in k and f'hydrated="{hydrated}"' in k
  2129. ]
  2130. assert len(values) == 1
  2131. return values[0]
  2132. def workflow_test_replica_metrics(c: Composition) -> None:
  2133. """Test metrics exposed by replicas."""
  2134. c.down(destroy_volumes=True)
  2135. with c.override(Clusterd(name="clusterd1", workers=1)):
  2136. c.up("materialized", "clusterd1")
  2137. def fetch_metrics() -> Metrics:
  2138. resp = c.exec(
  2139. "clusterd1", "curl", "localhost:6878/metrics", capture=True
  2140. ).stdout
  2141. return Metrics(resp)
  2142. c.sql(
  2143. "ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true;",
  2144. port=6877,
  2145. user="mz_system",
  2146. )
  2147. metrics = fetch_metrics()
  2148. # The cluster should not report the time that the last command was received
  2149. # as 0 until environmentd connects.
  2150. assert metrics.get_last_command_received("compute") == 0
  2151. assert metrics.get_last_command_received("storage") == 0
  2152. before_connection_time = time.time()
  2153. # Set up a cluster with a couple dataflows.
  2154. c.sql(
  2155. """
  2156. CREATE CLUSTER cluster1 REPLICAS (replica1 (
  2157. STORAGECTL ADDRESSES ['clusterd1:2100'],
  2158. STORAGE ADDRESSES ['clusterd1:2103'],
  2159. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  2160. COMPUTE ADDRESSES ['clusterd1:2102'],
  2161. WORKERS 1
  2162. ));
  2163. SET cluster = cluster1;
  2164. CREATE TABLE t (a int);
  2165. INSERT INTO t SELECT generate_series(1, 10);
  2166. CREATE INDEX idx ON t (a);
  2167. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  2168. SELECT * FROM t;
  2169. SELECT * FROM mv;
  2170. """
  2171. )
  2172. # Can take a few seconds, don't request the metrics too quickly
  2173. time.sleep(2)
  2174. # Check that expected metrics exist and have sensible values.
  2175. metrics = fetch_metrics()
  2176. count = metrics.get_replica_history_command_count("hello")
  2177. assert count == 0, f"unexpected hello count: {count}"
  2178. count = metrics.get_replica_history_command_count("create_instance")
  2179. assert count == 1, f"unexpected create_instance count: {count}"
  2180. count = metrics.get_replica_history_command_count("allow_compaction")
  2181. assert count > 0, f"unexpected allow_compaction count: {count}"
  2182. count = metrics.get_replica_history_command_count("create_dataflow")
  2183. assert count > 0, f"unexpected create_dataflow count: {count}"
  2184. count = metrics.get_replica_history_command_count("peek")
  2185. assert count <= 2, f"unexpected peek count: {count}"
  2186. count = metrics.get_replica_history_command_count("cancel_peek")
  2187. assert count <= 2, f"unexpected cancel_peek count: {count}"
  2188. count = metrics.get_replica_history_command_count("initialization_complete")
  2189. assert count == 0, f"unexpected initialization_complete count: {count}"
  2190. count = metrics.get_replica_history_command_count("update_configuration")
  2191. assert count == 1, f"unexpected update_configuration count: {count}"
  2192. count = metrics.get_value("mz_compute_replica_history_dataflow_count")
  2193. assert count >= 2, f"unexpected dataflow count: {count}"
  2194. maintenance = metrics.get_value("mz_arrangement_maintenance_seconds_total")
  2195. assert (
  2196. maintenance > 0
  2197. ), f"unexpected arrangement maintenance time: {maintenance}"
  2198. mv_correction_insertions = metrics.get_value(
  2199. "mz_persist_sink_correction_insertions_total"
  2200. )
  2201. assert (
  2202. mv_correction_insertions > 0
  2203. ), f"unexpected persist sink correction insertions: {mv_correction_insertions}"
  2204. mv_correction_cap_increases = metrics.get_value(
  2205. "mz_persist_sink_correction_capacity_increases_total"
  2206. )
  2207. assert (
  2208. mv_correction_cap_increases > 0
  2209. ), f"unexpected persist sink correction capacity increases: {mv_correction_cap_increases}"
  2210. mv_correction_max_len_per_worker = metrics.get_value(
  2211. "mz_persist_sink_correction_max_per_sink_worker_len_updates"
  2212. )
  2213. assert (
  2214. mv_correction_max_len_per_worker > 0
  2215. ), f"unexpected persist max correction len per worker: {mv_correction_max_len_per_worker}"
  2216. mv_correction_max_cap_per_worker = metrics.get_value(
  2217. "mz_persist_sink_correction_max_per_sink_worker_capacity_updates"
  2218. )
  2219. assert (
  2220. mv_correction_max_cap_per_worker > 0
  2221. ), f"unexpected persist sink max correction capacity per worker: {mv_correction_max_cap_per_worker}"
  2222. assert metrics.get_last_command_received("compute") >= before_connection_time
  2223. count = metrics.get_compute_collection_count("log", "0")
  2224. assert count == 0, "unexpected number of unhydrated log collections"
  2225. count = metrics.get_compute_collection_count("log", "1")
  2226. assert count > 0, "unexpected number of hydrated log collections"
  2227. count = metrics.get_compute_collection_count("user", "0")
  2228. assert count == 0, "unexpected number of unhydrated user collections"
  2229. count = metrics.get_compute_collection_count("user", "1")
  2230. assert count == 2, "unexpected number of hydrated user collections"
  2231. # Check that collection metrics update when collections are dropped.
  2232. c.sql(
  2233. """
  2234. DROP INDEX idx;
  2235. DROP MATERIALIZED VIEW mv;
  2236. """
  2237. )
  2238. time.sleep(2)
  2239. metrics = fetch_metrics()
  2240. count = metrics.get_compute_collection_count("user", "0")
  2241. assert count == 0, "unexpected number of unhydrated user collections"
  2242. count = metrics.get_compute_collection_count("user", "1")
  2243. assert count == 0, "unexpected number of hydrated user collections"
  2244. def workflow_test_compute_controller_metrics(c: Composition) -> None:
  2245. """Test metrics exposed by the compute controller."""
  2246. c.down(destroy_volumes=True)
  2247. c.up("materialized", {"name": "testdrive", "persistent": True})
  2248. def fetch_metrics() -> Metrics:
  2249. resp = c.exec(
  2250. "materialized", "curl", "localhost:6878/metrics", capture=True
  2251. ).stdout
  2252. return Metrics(resp).for_instance("u2")
  2253. # Set up a cluster with a couple dataflows.
  2254. c.sql(
  2255. """
  2256. CREATE CLUSTER test MANAGED, SIZE '1';
  2257. SET cluster = test;
  2258. CREATE TABLE t (a int);
  2259. INSERT INTO t SELECT generate_series(1, 10);
  2260. CREATE INDEX idx ON t (a);
  2261. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  2262. SELECT * FROM t;
  2263. SELECT * FROM mv;
  2264. """
  2265. )
  2266. index_id = c.sql_query("SELECT id FROM mz_indexes WHERE name = 'idx'")[0][0]
  2267. mv_id = c.sql_query("SELECT id FROM mz_materialized_views WHERE name = 'mv'")[0][0]
  2268. # Wait a bit to let the controller refresh its metrics.
  2269. time.sleep(2)
  2270. # Check that expected metrics exist and have sensible values.
  2271. metrics = fetch_metrics()
  2272. # mz_compute_commands_total
  2273. count = metrics.get_commands_total("hello")
  2274. assert count == 1, f"got {count}"
  2275. count = metrics.get_commands_total("create_instance")
  2276. assert count == 1, f"got {count}"
  2277. count = metrics.get_commands_total("allow_compaction")
  2278. assert count > 0, f"got {count}"
  2279. count = metrics.get_commands_total("create_dataflow")
  2280. assert count >= 3, f"got {count}"
  2281. count = metrics.get_commands_total("peek")
  2282. assert count == 2, f"got {count}"
  2283. count = metrics.get_commands_total("cancel_peek")
  2284. assert count == 2, f"got {count}"
  2285. count = metrics.get_commands_total("initialization_complete")
  2286. assert count == 1, f"got {count}"
  2287. count = metrics.get_commands_total("update_configuration")
  2288. assert count == 1, f"got {count}"
  2289. # mz_compute_command_message_bytes_total
  2290. count = metrics.get_command_bytes_total("hello")
  2291. assert count > 0, f"got {count}"
  2292. count = metrics.get_command_bytes_total("create_instance")
  2293. assert count > 0, f"got {count}"
  2294. count = metrics.get_command_bytes_total("allow_compaction")
  2295. assert count > 0, f"got {count}"
  2296. count = metrics.get_command_bytes_total("create_dataflow")
  2297. assert count > 0, f"got {count}"
  2298. count = metrics.get_command_bytes_total("peek")
  2299. assert count > 0, f"got {count}"
  2300. count = metrics.get_command_bytes_total("cancel_peek")
  2301. assert count > 0, f"got {count}"
  2302. count = metrics.get_command_bytes_total("initialization_complete")
  2303. assert count > 0, f"got {count}"
  2304. count = metrics.get_command_bytes_total("update_configuration")
  2305. assert count > 0, f"got {count}"
  2306. # mz_compute_responses_total
  2307. count = metrics.get_responses_total("frontiers")
  2308. assert count > 0, f"got {count}"
  2309. count = metrics.get_responses_total("peek_response")
  2310. assert count == 2, f"got {count}"
  2311. count = metrics.get_responses_total("subscribe_response")
  2312. assert count > 0, f"got {count}"
  2313. # mz_compute_response_message_bytes_total
  2314. count = metrics.get_response_bytes_total("frontiers")
  2315. assert count > 0, f"got {count}"
  2316. count = metrics.get_response_bytes_total("peek_response")
  2317. assert count > 0, f"got {count}"
  2318. count = metrics.get_response_bytes_total("subscribe_response")
  2319. assert count > 0, f"got {count}"
  2320. count = metrics.get_value("mz_compute_controller_replica_count")
  2321. assert count == 1, f"got {count}"
  2322. count = metrics.get_value("mz_compute_controller_collection_count")
  2323. assert count > 0, f"got {count}"
  2324. count = metrics.get_value("mz_compute_controller_collection_unscheduled_count")
  2325. assert count == 0, f"got {count}"
  2326. count = metrics.get_value("mz_compute_controller_peek_count")
  2327. assert count == 0, f"got {count}"
  2328. count = metrics.get_value("mz_compute_controller_subscribe_count")
  2329. assert count > 0, f"got {count}"
  2330. count = metrics.get_value("mz_compute_controller_command_queue_size")
  2331. assert count < 10, f"got {count}"
  2332. send_count = metrics.get_value("mz_compute_controller_response_send_count")
  2333. assert send_count > 10, f"got {send_count}"
  2334. recv_count = metrics.get_value("mz_compute_controller_response_recv_count")
  2335. assert recv_count > 10, f"got {recv_count}"
  2336. assert send_count - recv_count < 10, f"got {send_count}, {recv_count}"
  2337. count = metrics.get_value("mz_compute_controller_hydration_queue_size")
  2338. assert count == 0, f"got {count}"
  2339. # mz_compute_controller_history_command_count
  2340. count = metrics.get_compute_controller_history_command_count("hello")
  2341. assert count == 1, f"got {count}"
  2342. count = metrics.get_compute_controller_history_command_count("create_instance")
  2343. assert count == 1, f"got {count}"
  2344. count = metrics.get_compute_controller_history_command_count("allow_compaction")
  2345. assert count > 0, f"got {count}"
  2346. count = metrics.get_compute_controller_history_command_count("create_dataflow")
  2347. assert count > 0, f"got {count}"
  2348. count = metrics.get_compute_controller_history_command_count("peek")
  2349. assert count <= 2, f"got {count}"
  2350. count = metrics.get_compute_controller_history_command_count("cancel_peek")
  2351. assert count <= 2, f"got {count}"
  2352. count = metrics.get_compute_controller_history_command_count(
  2353. "initialization_complete"
  2354. )
  2355. assert count == 1, f"got {count}"
  2356. count = metrics.get_compute_controller_history_command_count("update_configuration")
  2357. assert count == 1, f"got {count}"
  2358. count = metrics.get_compute_controller_history_command_count("allow_writes")
  2359. assert count == 1, f"got {count}"
  2360. count = metrics.get_value("mz_compute_controller_history_dataflow_count")
  2361. assert count >= 2, f"got {count}"
  2362. # mz_compute_peeks_total
  2363. count = metrics.get_peeks_total("rows") + metrics.get_peeks_total("rows_stashed")
  2364. assert count == 2, f"got {count}"
  2365. count = metrics.get_peeks_total("error")
  2366. assert count == 0, f"got {count}"
  2367. count = metrics.get_peeks_total("canceled")
  2368. assert count == 0, f"got {count}"
  2369. count = metrics.get_value("mz_compute_controller_connected_replica_count")
  2370. assert count == 1, f"got {count}"
  2371. count = metrics.get_value("mz_compute_controller_replica_connects_total")
  2372. assert count == 1, f"got {count}"
  2373. duration = metrics.get_value(
  2374. "mz_compute_controller_replica_connect_wait_time_seconds_total"
  2375. )
  2376. assert duration > 0, f"got {duration}"
  2377. # mz_dataflow_wallclock_lag_seconds_count
  2378. count = metrics.get_wallclock_lag_count(index_id)
  2379. assert count, f"got {count}"
  2380. count = metrics.get_wallclock_lag_count(mv_id)
  2381. assert count, f"got {count}"
  2382. # Drop the dataflows.
  2383. c.sql(
  2384. """
  2385. DROP INDEX idx;
  2386. DROP MATERIALIZED VIEW mv;
  2387. """
  2388. )
  2389. # Wait for the controller to asynchronously drop the dataflows and update
  2390. # metrics. We can inspect the controller's view of things in
  2391. # `mz_frontiers`, which is updated at the same time as these metrics are.
  2392. c.testdrive(
  2393. input=dedent(
  2394. """
  2395. > SELECT *
  2396. FROM mz_internal.mz_frontiers
  2397. WHERE object_id LIKE 'u%'
  2398. """
  2399. )
  2400. )
  2401. # Check that the per-collection metrics have been cleaned up.
  2402. metrics = fetch_metrics()
  2403. assert metrics.get_wallclock_lag_count(index_id) is None
  2404. assert metrics.get_wallclock_lag_count(mv_id) is None
  2405. def workflow_test_storage_controller_metrics(c: Composition) -> None:
  2406. """Test metrics exposed by the storage controller."""
  2407. c.down(destroy_volumes=True)
  2408. c.up(
  2409. "materialized",
  2410. "kafka",
  2411. "schema-registry",
  2412. {"name": "testdrive", "persistent": True},
  2413. )
  2414. def fetch_metrics() -> Metrics:
  2415. resp = c.exec(
  2416. "materialized", "curl", "localhost:6878/metrics", capture=True
  2417. ).stdout
  2418. return Metrics(resp)
  2419. # Set up a cluster with a couple storage objects.
  2420. c.testdrive(
  2421. dedent(
  2422. """
  2423. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  2424. ALTER SYSTEM SET enable_alter_table_add_column = true
  2425. > CREATE CLUSTER test SIZE '1'
  2426. > SET cluster = test
  2427. > CREATE TABLE t (a int)
  2428. > INSERT INTO t VALUES (1)
  2429. > CREATE TABLE t_alter (a int)
  2430. > INSERT INTO t_alter VALUES (1)
  2431. > ALTER TABLE t_alter ADD COLUMN b int
  2432. > CREATE MATERIALIZED VIEW mv AS SELECT * FROM t
  2433. > CREATE SOURCE src FROM LOAD GENERATOR COUNTER
  2434. > CREATE CONNECTION kafka_conn
  2435. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  2436. > CREATE CONNECTION csr_conn
  2437. TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}')
  2438. > CREATE SINK snk FROM t
  2439. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk-${testdrive.seed}')
  2440. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  2441. ENVELOPE DEBEZIUM
  2442. > SELECT count(*) > 0 FROM t
  2443. true
  2444. > SELECT count(*) > 0 FROM t_alter
  2445. true
  2446. > SELECT count(*) > 0 FROM mv
  2447. true
  2448. > SELECT count(*) > 0 FROM src
  2449. true
  2450. """
  2451. )
  2452. )
  2453. table1_id = c.sql_query("SELECT id FROM mz_tables WHERE name = 't'")[0][0]
  2454. table2_id = c.sql_query("SELECT id FROM mz_tables WHERE name = 't_alter'")[0][0]
  2455. mv_id = c.sql_query("SELECT id FROM mz_materialized_views WHERE name = 'mv'")[0][0]
  2456. source_id = c.sql_query("SELECT id FROM mz_sources WHERE name = 'src'")[0][0]
  2457. sink_id = c.sql_query("SELECT id FROM mz_sinks WHERE name = 'snk'")[0][0]
  2458. # Wait a bit to let the controller refresh its metrics.
  2459. time.sleep(2)
  2460. # Check that expected metrics exist and have sensible values.
  2461. metrics = fetch_metrics()
  2462. metrics_u2 = metrics.for_instance("u2")
  2463. metrics_ux = metrics.for_instance("")
  2464. count = metrics_u2.get_summed_value("mz_storage_messages_sent_bytes")
  2465. assert count > 0, f"got {count}"
  2466. count = metrics_u2.get_summed_value("mz_storage_messages_received_bytes")
  2467. assert count > 0, f"got {count}"
  2468. # mz_storage_controller_history_command_count
  2469. count = metrics_u2.get_storage_controller_history_command_count("hello")
  2470. assert count == 1, f"got {count}"
  2471. count = metrics_u2.get_storage_controller_history_command_count("allow_compaction")
  2472. assert count > 0, f"got {count}"
  2473. count = metrics_u2.get_storage_controller_history_command_count("run_ingestions")
  2474. assert count == 1, f"got {count}"
  2475. count = metrics_u2.get_storage_controller_history_command_count("run_sinks")
  2476. assert count == 1, f"got {count}"
  2477. count = metrics_u2.get_storage_controller_history_command_count(
  2478. "initialization_complete"
  2479. )
  2480. assert count == 1, f"got {count}"
  2481. count = metrics_u2.get_storage_controller_history_command_count(
  2482. "update_configuration"
  2483. )
  2484. assert count == 1, f"got {count}"
  2485. count = metrics_u2.get_storage_controller_history_command_count("allow_writes")
  2486. assert count == 1, f"got {count}"
  2487. count = metrics_u2.get_value("mz_compute_controller_connected_replica_count")
  2488. assert count == 1, f"got {count}"
  2489. count = metrics_u2.get_value("mz_compute_controller_replica_connects_total")
  2490. assert count == 1, f"got {count}"
  2491. duration = metrics_u2.get_value(
  2492. "mz_compute_controller_replica_connect_wait_time_seconds_total"
  2493. )
  2494. assert duration > 0, f"got {duration}"
  2495. # mz_dataflow_wallclock_lag_seconds_count
  2496. count = metrics_ux.get_wallclock_lag_count(table1_id)
  2497. assert count, f"got {count}"
  2498. count = metrics_ux.get_wallclock_lag_count(table2_id)
  2499. assert count, f"got {count}"
  2500. count = metrics_ux.get_wallclock_lag_count(mv_id)
  2501. assert count, f"got {count}"
  2502. count = metrics_u2.get_wallclock_lag_count(source_id)
  2503. assert count, f"got {count}"
  2504. count = metrics_u2.get_wallclock_lag_count(sink_id)
  2505. assert count, f"got {count}"
  2506. # Drop the storage objects.
  2507. c.sql(
  2508. """
  2509. DROP sink snk;
  2510. DROP SOURCE src;
  2511. DROP MATERIALIZED VIEW mv;
  2512. DROP TABLE t;
  2513. DROP TABLE t_alter;
  2514. """
  2515. )
  2516. # Wait a bit to let the controller refresh its metrics.
  2517. time.sleep(2)
  2518. # Check that the per-collection metrics have been cleaned up.
  2519. metrics = fetch_metrics()
  2520. metrics_u2 = metrics.for_instance("u2")
  2521. metrics_ux = metrics.for_instance("")
  2522. assert metrics_ux.get_wallclock_lag_count(table1_id) is None
  2523. assert metrics_ux.get_wallclock_lag_count(table2_id) is None
  2524. assert metrics_ux.get_wallclock_lag_count(mv_id) is None
  2525. assert metrics_u2.get_wallclock_lag_count(source_id) is None
  2526. assert metrics_u2.get_wallclock_lag_count(sink_id) is None
  2527. def workflow_test_optimizer_metrics(c: Composition) -> None:
  2528. """Test metrics exposed by the optimizer."""
  2529. c.down(destroy_volumes=True)
  2530. c.up("materialized")
  2531. def fetch_metrics() -> Metrics:
  2532. resp = c.exec(
  2533. "materialized", "curl", "localhost:6878/metrics", capture=True
  2534. ).stdout
  2535. return Metrics(resp)
  2536. # Run optimizations for different object types.
  2537. c.sql(
  2538. """
  2539. CREATE TABLE t (a int);
  2540. -- view
  2541. CREATE VIEW v AS SELECT a + 1 FROM t;
  2542. -- index
  2543. CREATE INDEX i ON t (a);
  2544. -- materialized view
  2545. CREATE MATERIALIZED VIEW m AS SELECT a + 1 FROM t;
  2546. -- fast-path peek
  2547. SELECT * FROM t;
  2548. -- slow-path peek;
  2549. SELECT count(*) FROM t JOIN v ON (true);
  2550. -- subscribe
  2551. SUBSCRIBE (SELECT 1);
  2552. """
  2553. )
  2554. # Check that expected metrics exist and have sensible values.
  2555. metrics = fetch_metrics()
  2556. # mz_optimizer_e2e_optimization_time_seconds
  2557. time = metrics.get_e2e_optimization_time("view")
  2558. assert 0 < time < 10, f"got {time}"
  2559. time = metrics.get_e2e_optimization_time("index")
  2560. assert 0 < time < 10, f"got {time}"
  2561. time = metrics.get_e2e_optimization_time("materialized_view")
  2562. assert 0 < time < 10, f"got {time}"
  2563. time = metrics.get_e2e_optimization_time("peek:fast_path")
  2564. assert 0 < time < 10, f"got {time}"
  2565. time = metrics.get_e2e_optimization_time("peek:slow_path")
  2566. assert 0 < time < 10, f"got {time}"
  2567. time = metrics.get_e2e_optimization_time("subscribe")
  2568. assert 0 < time < 10, f"got {time}"
  2569. def workflow_test_pgwire_metrics(c: Composition) -> None:
  2570. """Test metrics collected in the Adapter frontend, i.e., `pgwire.rs`"""
  2571. def fetch_metrics() -> Metrics:
  2572. resp = c.exec(
  2573. "materialized", "curl", "localhost:6878/metrics", capture=True
  2574. ).stdout
  2575. return Metrics(resp)
  2576. c.down(destroy_volumes=True)
  2577. with c.override(
  2578. Testdrive(no_reset=True),
  2579. ):
  2580. c.up("materialized", {"name": "testdrive", "persistent": True})
  2581. c.sql(
  2582. """
  2583. CREATE TABLE t (a int);
  2584. INSERT INTO t VALUES (7);
  2585. SELECT * FROM t;
  2586. CREATE INDEX i ON t (a);
  2587. SELECT * FROM t;
  2588. """
  2589. )
  2590. metrics = fetch_metrics()
  2591. # `c.sql` above uses the Simple Query protocol.
  2592. time = metrics.get_pgwire_message_processing_seconds("query")
  2593. assert 0 < time < 10, f"got {time}"
  2594. # Testdrive uses the Extended Query protocol.
  2595. c.testdrive(
  2596. input=dedent(
  2597. """
  2598. > SELECT * FROM t;
  2599. 7
  2600. """
  2601. )
  2602. )
  2603. metrics = fetch_metrics()
  2604. time = metrics.get_pgwire_message_processing_seconds("parse")
  2605. assert 0 < time < 10, f"got {time}"
  2606. time = metrics.get_pgwire_message_processing_seconds("bind")
  2607. assert 0 < time < 10, f"got {time}"
  2608. time = metrics.get_pgwire_message_processing_seconds("execute")
  2609. assert 0 < time < 10, f"got {time}"
  2610. time = metrics.get_value("mz_parse_seconds_sum")
  2611. assert 0 < time < 10, f"got {time}"
  2612. rrftlbs_select_1 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2613. assert 0 < rrftlbs_select_1 < 10, f"got {time}"
  2614. # We run a SELECT (as a Simple Query), and then expect the metric to have increased.
  2615. c.sql(
  2616. """
  2617. SELECT * FROM t;
  2618. """
  2619. )
  2620. metrics = fetch_metrics()
  2621. rrftlbs_select_2 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2622. assert rrftlbs_select_2 > rrftlbs_select_1, f"got {rrftlbs_select_2}"
  2623. # We run a SELECT (as an Extended Query via Testdrive), and then expect the metric to have increased.
  2624. c.testdrive(
  2625. input=dedent(
  2626. """
  2627. > SELECT * FROM t;
  2628. 7
  2629. """
  2630. )
  2631. )
  2632. metrics = fetch_metrics()
  2633. rrftlbs_select_3 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2634. assert rrftlbs_select_3 > rrftlbs_select_2, f"got {rrftlbs_select_3}"
  2635. c.sql(
  2636. """
  2637. INSERT INTO t VALUES (8);
  2638. """
  2639. )
  2640. # Declare a cursor and fetch 1 row. The SELECT will have 2 result rows in total, so the metric should _not_
  2641. # change after fetching just 1 row.
  2642. c.sql(
  2643. """
  2644. BEGIN;
  2645. DECLARE c1 CURSOR FOR (SELECT * FROM t);
  2646. FETCH 1 c1;
  2647. """
  2648. )
  2649. metrics = fetch_metrics()
  2650. rrftlbs_select_4 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2651. assert (
  2652. rrftlbs_select_4 == rrftlbs_select_3
  2653. ), f"got {rrftlbs_select_4} vs. {rrftlbs_select_3}"
  2654. # Still no change after one more FETCH, because we need to read _past_ the last row.
  2655. # (This is a separate session.)
  2656. c.sql(
  2657. """
  2658. BEGIN;
  2659. DECLARE c2 CURSOR FOR (SELECT * FROM t);
  2660. FETCH 1 c2;
  2661. FETCH 1 c2;
  2662. """
  2663. )
  2664. metrics = fetch_metrics()
  2665. rrftlbs_select_5 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2666. assert (
  2667. rrftlbs_select_5 == rrftlbs_select_4
  2668. ), f"got {rrftlbs_select_5} vs. {rrftlbs_select_4}"
  2669. # Now it should change, because we consume all the rows, and then try to consume one more, so the cursor ends.
  2670. c.sql(
  2671. """
  2672. BEGIN;
  2673. DECLARE c3 CURSOR FOR (SELECT * FROM t);
  2674. FETCH 1 c3;
  2675. FETCH 1 c3;
  2676. FETCH 1 c3;
  2677. """
  2678. )
  2679. metrics = fetch_metrics()
  2680. rrftlbs_select_6 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2681. assert (
  2682. rrftlbs_select_6 > rrftlbs_select_5
  2683. ), f"got {rrftlbs_select_6} vs. {rrftlbs_select_5}"
  2684. # FETCH ALL
  2685. c.sql(
  2686. """
  2687. BEGIN;
  2688. DECLARE c4 CURSOR FOR (SELECT * FROM t);
  2689. FETCH ALL c4;
  2690. """
  2691. )
  2692. metrics = fetch_metrics()
  2693. rrftlbs_select_7 = metrics.get_result_rows_first_to_last_byte_seconds("select")
  2694. assert (
  2695. rrftlbs_select_7 > rrftlbs_select_6
  2696. ), f"got {rrftlbs_select_7} vs. {rrftlbs_select_6}"
  2697. # SUBSCRIBE should show up if it's on a constant collection.
  2698. # We need two FETCHes, because the first one won't observe that there are no more rows, due to
  2699. # `ExecuteTimeout::WaitOnce`.
  2700. c.sql(
  2701. """
  2702. CREATE VIEW v1 AS SELECT 3;
  2703. BEGIN;
  2704. DECLARE c5 CURSOR FOR SUBSCRIBE (SELECT * FROM v1);
  2705. FETCH ALL c5;
  2706. FETCH ALL c5;
  2707. """,
  2708. reuse_connection=False,
  2709. )
  2710. metrics = fetch_metrics()
  2711. rrftlbs_subscribe_1 = metrics.get_result_rows_first_to_last_byte_seconds(
  2712. "subscribe"
  2713. )
  2714. assert 0 < rrftlbs_subscribe_1 < 10, f"got {rrftlbs_subscribe_1}"
  2715. # ... and should increase
  2716. c.sql(
  2717. """
  2718. BEGIN;
  2719. DECLARE c6 CURSOR FOR SUBSCRIBE (SELECT * FROM v1);
  2720. FETCH ALL c6;
  2721. FETCH ALL c6;
  2722. """,
  2723. reuse_connection=False,
  2724. )
  2725. metrics = fetch_metrics()
  2726. rrftlbs_subscribe_2 = metrics.get_result_rows_first_to_last_byte_seconds(
  2727. "subscribe"
  2728. )
  2729. assert (
  2730. rrftlbs_subscribe_2 > rrftlbs_subscribe_1
  2731. ), f"got {rrftlbs_subscribe_2} vs. {rrftlbs_subscribe_1}"
  2732. # Shouldn't increase for a non-const SUBSCRIBE, because there is no last row, ever.
  2733. c.sql(
  2734. """
  2735. BEGIN;
  2736. DECLARE c7 CURSOR FOR SUBSCRIBE (SELECT * FROM t);
  2737. FETCH ALL c7;
  2738. FETCH ALL c7 WITH (TIMEOUT = INTERVAL '1500 milliseconds');
  2739. """,
  2740. reuse_connection=False,
  2741. )
  2742. metrics = fetch_metrics()
  2743. rrftlbs_subscribe_3 = metrics.get_result_rows_first_to_last_byte_seconds(
  2744. "subscribe"
  2745. )
  2746. assert (
  2747. rrftlbs_subscribe_3 == rrftlbs_subscribe_2
  2748. ), f"got {rrftlbs_subscribe_3} vs. {rrftlbs_subscribe_3}"
  2749. def workflow_test_metrics_retention_across_restart(c: Composition) -> None:
  2750. """
  2751. Test that sinces of retained-metrics objects are held back across
  2752. restarts of environmentd.
  2753. """
  2754. # There are three kinds of retained-metrics objects currently:
  2755. # * tables (like `mz_cluster_replicas`)
  2756. # * indexes (like `mz_cluster_replicas_ind`)
  2757. # Generally, metrics tables are indexed in `mz_catalog_server` and
  2758. # not indexed in the `default` cluster, so we can use that to
  2759. # collect the `since` frontiers we want.
  2760. def collect_sinces() -> tuple[int, int]:
  2761. with c.sql_cursor() as cur:
  2762. cur.execute("SET cluster = default;")
  2763. cur.execute("EXPLAIN TIMESTAMP FOR SELECT * FROM mz_cluster_replicas;")
  2764. explain = cur.fetchall()[0][0]
  2765. table_since = parse_since_from_explain(explain)
  2766. with c.sql_cursor() as cur:
  2767. cur.execute("SET cluster = mz_catalog_server;")
  2768. cur.execute("EXPLAIN TIMESTAMP FOR SELECT * FROM mz_cluster_replicas;")
  2769. explain = cur.fetchall()[0][0]
  2770. index_since = parse_since_from_explain(explain)
  2771. return table_since, index_since
  2772. def parse_since_from_explain(explain: str) -> int:
  2773. since_line = re.compile(r"\s*read frontier:\[(?P<since>\d+) \(.+\)\]")
  2774. for line in explain.splitlines():
  2775. if match := since_line.match(line):
  2776. return int(match.group("since"))
  2777. raise AssertionError(f"since not found in explain: {explain}")
  2778. def validate_since(since: int, name: str) -> None:
  2779. now = datetime.now()
  2780. dt = datetime.fromtimestamp(since / 1000.0)
  2781. diff = now - dt
  2782. # This env was just created, so the since should be recent.
  2783. assert (
  2784. diff.days < 30
  2785. ), f"{name} greater than expected (since={since}, diff={diff})"
  2786. c.down(destroy_volumes=True)
  2787. c.up("materialized")
  2788. table_since1, index_since1 = collect_sinces()
  2789. validate_since(table_since1, "table_since1")
  2790. validate_since(index_since1, "index_since1")
  2791. # Restart Materialize.
  2792. c.kill("materialized")
  2793. c.up("materialized")
  2794. # The env has been up for less than 30d, so the since should not have
  2795. # changed.
  2796. table_since2, index_since2 = collect_sinces()
  2797. assert (
  2798. table_since1 == table_since2
  2799. ), f"table sinces did not match {table_since1} vs {table_since2})"
  2800. assert (
  2801. index_since1 == index_since2
  2802. ), f"index sinces did not match {index_since1} vs {index_since2})"
  2803. def find_proxy_port(logs: str, cluster_id: str, port_name: str) -> int | None:
  2804. """Extract a proxy port from the given logs."""
  2805. RE_PROXY_LINE = re.compile(
  2806. rf".*INFO mz_orchestrator_process: cluster-{cluster_id}-replica-.*:"
  2807. rf" {port_name} tcp proxy listening on 0.0.0.0:(?P<port>\d+)"
  2808. )
  2809. for line in logs.splitlines():
  2810. if match := RE_PROXY_LINE.match(line):
  2811. return int(match.group("port"))
  2812. return None
  2813. def workflow_test_workload_class_in_metrics(c: Composition) -> None:
  2814. """
  2815. Test that setting the cluster workload class correctly reflects in metrics
  2816. exposed by envd and clusterd.
  2817. """
  2818. RE_WORKLOAD_CLASS_LABEL = re.compile(r'workload_class="(?P<value>\w+)"')
  2819. c.down(destroy_volumes=True)
  2820. c.up("materialized")
  2821. # Create a cluster and wait for it to come up.
  2822. c.sql(
  2823. """
  2824. CREATE CLUSTER test SIZE '1';
  2825. SET cluster = test;
  2826. SELECT * FROM mz_introspection.mz_dataflow_operators;
  2827. """
  2828. )
  2829. # Find the internal-http port of the test cluster.
  2830. cluster_id = c.sql_query("SELECT id FROM mz_clusters WHERE name = 'test'")[0][0]
  2831. logs = c.invoke("logs", "materialized", capture=True).stdout
  2832. clusterd_port = find_proxy_port(logs, cluster_id, "internal-http")
  2833. def check_workload_class(expected: str | None):
  2834. """
  2835. Assert that metrics on both envd and clusterd are labeled with the
  2836. given expected workload class.
  2837. """
  2838. # Sleep a bit to give workload class changes time to propagate.
  2839. time.sleep(1)
  2840. envd_metrics = c.exec(
  2841. "materialized", "curl", "localhost:6878/metrics", capture=True
  2842. ).stdout
  2843. clusterd_metrics = c.exec(
  2844. "materialized", "curl", f"localhost:{clusterd_port}/metrics", capture=True
  2845. ).stdout
  2846. envd_classes = {
  2847. m.group("value") for m in RE_WORKLOAD_CLASS_LABEL.finditer(envd_metrics)
  2848. }
  2849. clusterd_classes = {
  2850. m.group("value") for m in RE_WORKLOAD_CLASS_LABEL.finditer(clusterd_metrics)
  2851. }
  2852. if expected is None:
  2853. assert (
  2854. not envd_classes
  2855. ), f"envd: expected no workload classes, found {envd_classes}"
  2856. assert (
  2857. not clusterd_classes
  2858. ), f"clusterd: expected no workload classes, found {clusterd_classes}"
  2859. else:
  2860. assert envd_classes == {
  2861. expected
  2862. }, f"envd: expected workload class '{expected}', found {envd_classes}"
  2863. assert clusterd_classes == {
  2864. expected
  2865. }, f"clusterd: expected workload class '{expected}', found {clusterd_classes}"
  2866. check_workload_class(None)
  2867. c.sql(
  2868. "ALTER CLUSTER test SET (WORKLOAD CLASS 'production')",
  2869. port=6877,
  2870. user="mz_system",
  2871. )
  2872. check_workload_class("production")
  2873. c.sql(
  2874. "ALTER CLUSTER test SET (WORKLOAD CLASS 'staging')",
  2875. port=6877,
  2876. user="mz_system",
  2877. )
  2878. check_workload_class("staging")
  2879. c.sql(
  2880. "ALTER CLUSTER test RESET (WORKLOAD CLASS)",
  2881. port=6877,
  2882. user="mz_system",
  2883. )
  2884. check_workload_class(None)
  2885. def workflow_test_concurrent_connections(c: Composition) -> None:
  2886. """
  2887. Run many concurrent connections, measure their p50 and p99 latency, make
  2888. sure database-issues#6537 does not regress.
  2889. """
  2890. num_conns = 2000
  2891. p50_limit = 10.0
  2892. p99_limit = 20.0
  2893. runtimes: list[float] = [float("inf")] * num_conns
  2894. def worker(c: Composition, i: int) -> None:
  2895. start_time = time.time()
  2896. c.sql("SELECT 1", print_statement=False)
  2897. end_time = time.time()
  2898. runtimes[i] = end_time - start_time
  2899. c.down(destroy_volumes=True)
  2900. c.up("materialized")
  2901. c.sql(
  2902. f"ALTER SYSTEM SET max_connections = {num_conns + 4};",
  2903. port=6877,
  2904. user="mz_system",
  2905. )
  2906. for i in range(3):
  2907. threads = []
  2908. for j in range(num_conns):
  2909. thread = Thread(name=f"worker_{j}", target=worker, args=(c, j))
  2910. threads.append(thread)
  2911. for thread in threads:
  2912. thread.start()
  2913. for thread in threads:
  2914. thread.join()
  2915. p = quantiles(runtimes, n=100)
  2916. print(
  2917. f"min: {min(runtimes):.2f}s, p50: {p[49]:.2f}s, p99: {p[98]:.2f}s, max: {max(runtimes):.2f}s"
  2918. )
  2919. p50 = p[49]
  2920. p99 = p[98]
  2921. if p50 < p50_limit and p99 < p99_limit:
  2922. return
  2923. if i < 2:
  2924. print("retry...")
  2925. continue
  2926. assert (
  2927. p50 < p50_limit
  2928. ), f"p50 is {p50:.2f}s, should be less than {p50_limit:.2f}s"
  2929. assert (
  2930. p99 < p99_limit
  2931. ), f"p99 is {p99:.2f}s, should be less than {p99_limit:.2f}s"
  2932. def workflow_test_profile_fetch(c: Composition) -> None:
  2933. """
  2934. Test fetching memory and CPU profiles via the internal HTTP
  2935. endpoint.
  2936. """
  2937. c.down(destroy_volumes=True)
  2938. c.up("materialized")
  2939. c.up("clusterd1")
  2940. envd_port = c.port("materialized", 6878)
  2941. envd_url = f"http://localhost:{envd_port}/prof/"
  2942. clusterd_port = c.port("clusterd1", 6878)
  2943. clusterd_url = f"http://localhost:{clusterd_port}/"
  2944. def test_post(data: dict[str, str], check: Callable[[int, str], None]) -> None:
  2945. resp = requests.post(envd_url, data=data)
  2946. check(resp.status_code, resp.text)
  2947. resp = requests.post(clusterd_url, data=data)
  2948. check(resp.status_code, resp.text)
  2949. def test_get(path: str, check: Callable[[int, str], None]) -> None:
  2950. resp = requests.get(envd_url + path)
  2951. check(resp.status_code, resp.text)
  2952. resp = requests.get(clusterd_url + path)
  2953. check(resp.status_code, resp.text)
  2954. def make_check(code: int, contents: str) -> Callable[[int, str], None]:
  2955. def check(code_: int, text: str) -> None:
  2956. assert code_ == code, f"expected {code}, got {code_}"
  2957. assert contents in text, f"'{contents}' not found in text: {text}"
  2958. return check
  2959. def make_ok_check(contents: str) -> Callable[[int, str], None]:
  2960. return make_check(200, contents)
  2961. def check_profiling_disabled(code: int, text: str) -> None:
  2962. check = make_check(403, "heap profiling not activated")
  2963. check(code, text)
  2964. # Test fetching heap profiles. Heap profiling should be activated by default.
  2965. test_post({"action": "dump_jeheap"}, make_ok_check("heap_v2/"))
  2966. test_post(
  2967. {"action": "dump_sym_mzfg"}, make_ok_check("mz_fg_version: 1\nAllocated:")
  2968. )
  2969. test_post(
  2970. {"action": "mem_fg"},
  2971. make_ok_check("mz_fg_version: 1\\ndisplay_bytes: 1\\nAllocated:"),
  2972. )
  2973. test_get("heap", make_ok_check(""))
  2974. # Test fetching CPU profiles. This disables memory profiling!
  2975. test_post(
  2976. {"action": "time_fg", "time_secs": "1", "hz": "1"},
  2977. make_ok_check(
  2978. "mz_fg_version: 1\\nSampling time (s): 1\\nSampling frequency (Hz): 1\\n"
  2979. ),
  2980. )
  2981. # Deactivate memory profiling.
  2982. test_post(
  2983. {"action": "deactivate"},
  2984. make_ok_check("Jemalloc profiling enabled but inactive"),
  2985. )
  2986. # Test that fetching heap profiles is forbidden.
  2987. test_post({"action": "dump_jeheap"}, check_profiling_disabled)
  2988. test_post({"action": "dump_sym_mzfg"}, check_profiling_disabled)
  2989. test_post({"action": "mem_fg"}, check_profiling_disabled)
  2990. test_get("heap", check_profiling_disabled)
  2991. # Activate memory profiling again.
  2992. test_post({"action": "activate"}, make_ok_check("Jemalloc profiling active"))
  2993. # Test fetching heap profiles again.
  2994. test_post({"action": "dump_jeheap"}, make_ok_check("heap_v2/"))
  2995. test_post(
  2996. {"action": "dump_sym_mzfg"}, make_ok_check("mz_fg_version: 1\nAllocated:")
  2997. )
  2998. test_post(
  2999. {"action": "mem_fg"},
  3000. make_ok_check("mz_fg_version: 1\\ndisplay_bytes: 1\\nAllocated:"),
  3001. )
  3002. test_get("heap", make_ok_check(""))
  3003. # Test fetching CPU profiles again.
  3004. test_post(
  3005. {"action": "time_fg", "time_secs": "1", "hz": "1"},
  3006. make_ok_check(
  3007. "mz_fg_version: 1\\nSampling time (s): 1\\nSampling frequency (Hz): 1\\n"
  3008. ),
  3009. )
  3010. def workflow_test_incident_70(c: Composition) -> None:
  3011. """
  3012. Test incident-70.
  3013. """
  3014. num_conns = 1
  3015. mv_count = 50
  3016. persist_reader_lease_duration_in_sec = 10
  3017. data_scale_factor = 10
  3018. with c.override(
  3019. Materialized(
  3020. external_metadata_store=True,
  3021. external_blob_store=True,
  3022. sanity_restart=False,
  3023. ),
  3024. Minio(setup_materialize=True),
  3025. ):
  3026. c.down(destroy_volumes=True)
  3027. c.up("minio", "materialized")
  3028. c.sql(
  3029. f"ALTER SYSTEM SET max_connections = {num_conns + 5};",
  3030. port=6877,
  3031. user="mz_system",
  3032. )
  3033. c.sql(
  3034. f"ALTER SYSTEM SET persist_reader_lease_duration = '{persist_reader_lease_duration_in_sec}s';",
  3035. port=6877,
  3036. user="mz_system",
  3037. )
  3038. mz_view_create_statements = []
  3039. for i in range(mv_count):
  3040. mz_view_create_statements.append(
  3041. f"CREATE MATERIALIZED VIEW mv_lineitem_count_{i + 1} AS SELECT count(*) FROM lineitem;"
  3042. )
  3043. mz_view_create_statements_sql = "\n".join(mz_view_create_statements)
  3044. c.sql(
  3045. dedent(
  3046. f"""
  3047. CREATE SOURCE gen FROM LOAD GENERATOR TPCH (SCALE FACTOR {data_scale_factor});
  3048. CREATE TABLE customer FROM SOURCE gen (REFERENCE customer);
  3049. CREATE TABLE lineitem FROM SOURCE gen (REFERENCE lineitem);
  3050. CREATE TABLE nation FROM SOURCE gen (REFERENCE nation);
  3051. CREATE TABLE orders FROM SOURCE gen (REFERENCE orders);
  3052. CREATE TABLE part FROM SOURCE gen (REFERENCE part);
  3053. CREATE TABLE partsupp FROM SOURCE gen (REFERENCE partsupp);
  3054. CREATE TABLE region FROM SOURCE gen (REFERENCE region);
  3055. CREATE TABLE supplier FROM SOURCE gen (REFERENCE supplier);
  3056. {mz_view_create_statements_sql}
  3057. """
  3058. )
  3059. )
  3060. start_time = datetime.now()
  3061. end_time = start_time + timedelta(seconds=600)
  3062. def worker(c: Composition, worker_index: int) -> None:
  3063. print(f"Thread {worker_index} tries to acquire a cursor")
  3064. cursor = c.sql_cursor()
  3065. print(f"Thread {worker_index} got a cursor")
  3066. iteration = 1
  3067. while datetime.now() < end_time:
  3068. if iteration % 20 == 0:
  3069. print(f"Thread {worker_index}, iteration {iteration}")
  3070. cursor.execute("SELECT * FROM mv_lineitem_count_1;")
  3071. iteration += 1
  3072. print(f"Thread {worker_index} terminates before iteration {iteration}")
  3073. threads = []
  3074. for worker_index in range(num_conns):
  3075. thread = Thread(
  3076. name=f"worker_{worker_index}", target=worker, args=(c, worker_index)
  3077. )
  3078. threads.append(thread)
  3079. for thread in threads:
  3080. thread.start()
  3081. # this is because of database-issues#6639
  3082. time.sleep(0.2)
  3083. for thread in threads:
  3084. thread.join()
  3085. def workflow_test_github_cloud_7998(
  3086. c: Composition, parser: WorkflowArgumentParser
  3087. ) -> None:
  3088. """Regression test for MaterializeInc/cloud#7998."""
  3089. c.down(destroy_volumes=True)
  3090. with c.override(
  3091. Testdrive(no_reset=True),
  3092. Clusterd(name="clusterd1"),
  3093. Materialized(),
  3094. ):
  3095. c.up("materialized")
  3096. c.up("clusterd1")
  3097. c.run_testdrive_files("github-cloud-7998/setup.td")
  3098. # Make the compute cluster unavailable.
  3099. c.kill("clusterd1")
  3100. c.run_testdrive_files("github-cloud-7998/check.td")
  3101. # Trigger an environment bootstrap.
  3102. c.kill("materialized")
  3103. c.up("materialized")
  3104. c.run_testdrive_files("github-cloud-7998/check.td")
  3105. # Run a second bootstrap check, just to be sure.
  3106. c.kill("materialized")
  3107. c.up("materialized")
  3108. c.run_testdrive_files("github-cloud-7998/check.td")
  3109. def workflow_test_github_7000(c: Composition, parser: WorkflowArgumentParser) -> None:
  3110. """Regression test for database-issues#7000."""
  3111. c.down(destroy_volumes=True)
  3112. with c.override(
  3113. Testdrive(no_reset=True),
  3114. ):
  3115. c.up("materialized", {"name": "testdrive", "persistent": True})
  3116. # Create an MV reading from an index. Make sure it doesn't produce its
  3117. # snapshot by installing it in a cluster without replicas.
  3118. c.sql(
  3119. """
  3120. CREATE CLUSTER test SIZE '1', REPLICATION FACTOR 0;
  3121. SET cluster = test;
  3122. CREATE TABLE t (a int);
  3123. INSERT INTO t VALUES (1);
  3124. CREATE DEFAULT INDEX ON t;
  3125. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  3126. """
  3127. )
  3128. # Verify that the MV's upper is zero, which is what caused the bug.
  3129. # This ensures that the test doesn't break in the future because we
  3130. # start initializing frontiers differently.
  3131. c.testdrive(
  3132. input=dedent(
  3133. """
  3134. > SELECT write_frontier
  3135. FROM mz_internal.mz_frontiers
  3136. JOIN mz_materialized_views ON (object_id = id)
  3137. WHERE name = 'mv'
  3138. 0
  3139. """
  3140. )
  3141. )
  3142. # Trigger an environment bootstrap, and see if envd comes up without
  3143. # panicking.
  3144. c.kill("materialized")
  3145. c.up("materialized")
  3146. def workflow_statement_logging(c: Composition, parser: WorkflowArgumentParser) -> None:
  3147. """Statement logging test needs to run with 100% logging of tests (as opposed to the default 1% )"""
  3148. c.down(destroy_volumes=True)
  3149. with c.override(
  3150. Testdrive(no_reset=True),
  3151. Materialized(),
  3152. ):
  3153. c.up("materialized")
  3154. c.sql(
  3155. """
  3156. ALTER SYSTEM SET statement_logging_max_sample_rate = 1.0;
  3157. ALTER SYSTEM SET statement_logging_default_sample_rate = 1.0;
  3158. """,
  3159. port=6877,
  3160. user="mz_system",
  3161. )
  3162. c.run_testdrive_files("statement-logging/statement-logging.td")
  3163. def workflow_blue_green_deployment(
  3164. c: Composition, parser: WorkflowArgumentParser
  3165. ) -> None:
  3166. """Blue/Green Deployment testing, see https://www.notion.so/materialize/Testing-Plan-Blue-Green-Deployments-01528a1eec3b42c3a25d5faaff7a9bf9#f53b51b110b044859bf954afc771c63a"""
  3167. c.down(destroy_volumes=True)
  3168. running = True
  3169. def selects():
  3170. runtimes = []
  3171. try:
  3172. with c.sql_cursor() as cursor:
  3173. while running:
  3174. total_runtime = 0
  3175. queries = [
  3176. "SELECT * FROM prod.counter_mv",
  3177. "SET CLUSTER = prod; SELECT max(counter) FROM counter",
  3178. "SELECT count(*) FROM prod.tpch_mv",
  3179. ]
  3180. for i, query in enumerate(queries):
  3181. start_time = time.time()
  3182. try:
  3183. cursor.execute(query.encode())
  3184. except DatabaseError as e:
  3185. # Expected
  3186. if "cached plan must not change result type" in str(e):
  3187. continue
  3188. raise e
  3189. if query.startswith("SET "):
  3190. cursor.nextset()
  3191. results = cursor.fetchone()
  3192. assert results
  3193. assert int(results[0]) > 0
  3194. runtime = time.time() - start_time
  3195. assert (
  3196. runtime < 5
  3197. ), f"query: {query}, runtime spiked to {runtime}"
  3198. total_runtime += runtime
  3199. runtimes.append(total_runtime)
  3200. finally:
  3201. print(f"Query runtimes: {runtimes}")
  3202. def subscribe():
  3203. cursor = c.sql_cursor()
  3204. while running:
  3205. try:
  3206. cursor.execute("ROLLBACK")
  3207. cursor.execute("BEGIN")
  3208. cursor.execute(
  3209. "DECLARE subscribe CURSOR FOR SUBSCRIBE (SELECT * FROM prod.counter_mv)"
  3210. )
  3211. cursor.execute("FETCH ALL subscribe WITH (timeout='15s')")
  3212. assert int(cursor.fetchall()[-1][2]) > 0
  3213. cursor.execute("CLOSE subscribe")
  3214. except DatabaseError as e:
  3215. # Expected
  3216. msg = str(e)
  3217. if ("cached plan must not change result type" in msg) or (
  3218. "subscribe has been terminated because underlying relation" in msg
  3219. ):
  3220. continue
  3221. raise e
  3222. with c.override(
  3223. Testdrive(
  3224. no_reset=True, default_timeout="300s"
  3225. ), # pending dataflows can take a while
  3226. Clusterd(
  3227. name="clusterd1",
  3228. workers=1,
  3229. ),
  3230. Clusterd(
  3231. name="clusterd2",
  3232. workers=2,
  3233. process_names=["clusterd2", "clusterd3"],
  3234. ),
  3235. Clusterd(
  3236. name="clusterd3",
  3237. workers=2,
  3238. process_names=["clusterd2", "clusterd3"],
  3239. ),
  3240. Materialized(
  3241. additional_system_parameter_defaults={
  3242. "unsafe_enable_unsafe_functions": "true",
  3243. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  3244. },
  3245. ),
  3246. ):
  3247. c.up("materialized")
  3248. c.up("clusterd1")
  3249. c.up("clusterd2")
  3250. c.up("clusterd3")
  3251. c.run_testdrive_files("blue-green-deployment/setup.td")
  3252. threads = [PropagatingThread(target=fn) for fn in (selects, subscribe)]
  3253. for thread in threads:
  3254. thread.start()
  3255. time.sleep(10) # some time to make sure the queries run fine
  3256. try:
  3257. c.run_testdrive_files("blue-green-deployment/deploy.td")
  3258. finally:
  3259. running = False
  3260. for thread in threads:
  3261. thread.join()
  3262. def workflow_test_subscribe_hydration_status(
  3263. c: Composition, parser: WorkflowArgumentParser
  3264. ) -> None:
  3265. """Test that hydration status tracking works for subscribe dataflows."""
  3266. c.down(destroy_volumes=True)
  3267. c.up("materialized", {"name": "testdrive", "persistent": True})
  3268. # Start a subscribe.
  3269. cursor = c.sql_cursor()
  3270. cursor.execute("BEGIN")
  3271. cursor.execute("DECLARE c CURSOR FOR SUBSCRIBE mz_tables")
  3272. cursor.execute("FETCH 1 c")
  3273. # Verify that the subscribe dataflow eventually shows as hydrated.
  3274. c.testdrive(
  3275. input=dedent(
  3276. """
  3277. > SET cluster = mz_catalog_server
  3278. > SELECT DISTINCT h.time_ns IS NOT NULL
  3279. FROM mz_internal.mz_subscriptions s,
  3280. unnest(s.referenced_object_ids) as sroi(id)
  3281. JOIN mz_introspection.mz_compute_hydration_times_per_worker h ON h.export_id = s.id
  3282. JOIN mz_tables t ON (t.id = sroi.id)
  3283. WHERE t.name = 'mz_tables'
  3284. true
  3285. """
  3286. )
  3287. )
  3288. # Cancel the subscribe.
  3289. cursor.execute("ROLLBACK")
  3290. # Verify that the subscribe's hydration status is removed.
  3291. c.testdrive(
  3292. input=dedent(
  3293. """
  3294. > SET cluster = mz_catalog_server
  3295. > SELECT DISTINCT h.time_ns IS NOT NULL
  3296. FROM mz_internal.mz_subscriptions s,
  3297. unnest(s.referenced_object_ids) as sroi(id)
  3298. JOIN mz_introspection.mz_compute_hydration_times_per_worker h ON h.export_id = s.id
  3299. JOIN mz_tables t ON (t.id = sroi.id)
  3300. WHERE t.name = 'mz_tables'
  3301. """
  3302. )
  3303. )
  3304. def workflow_cluster_drop_concurrent(
  3305. c: Composition, parser: WorkflowArgumentParser
  3306. ) -> None:
  3307. """
  3308. Test that dropping a cluster will close already running queries against
  3309. that cluster, both SELECTs and SUBSCRIBEs.
  3310. """
  3311. c.down(destroy_volumes=True)
  3312. def select():
  3313. with c.sql_cursor() as cursor:
  3314. # This should hang instantly as the timestamp is far in the future,
  3315. # until the cluster is dropped
  3316. cursor.execute("SELECT * FROM counter AS OF 18446744073709551615")
  3317. def subscribe():
  3318. cursor = c.sql_cursor()
  3319. cursor.execute("BEGIN")
  3320. cursor.execute("DECLARE subscribe CURSOR FOR SUBSCRIBE (SELECT * FROM counter)")
  3321. # This should hang until the cluster is dropped
  3322. cursor.execute("FETCH ALL subscribe")
  3323. with c.override(
  3324. Testdrive(
  3325. no_reset=True,
  3326. ),
  3327. Clusterd(name="clusterd1"),
  3328. Materialized(),
  3329. ):
  3330. c.up("materialized")
  3331. c.up("clusterd1")
  3332. c.run_testdrive_files("cluster-drop-concurrent/setup.td")
  3333. threads = [
  3334. PropagatingThread(target=fn, name=name)
  3335. for fn, name in ((select, "select"), (subscribe, "subscribe"))
  3336. ]
  3337. for thread in threads:
  3338. thread.start()
  3339. time.sleep(2) # some time to make sure the queries are in progress
  3340. try:
  3341. c.run_testdrive_files("cluster-drop-concurrent/run.td")
  3342. finally:
  3343. for thread in threads:
  3344. try:
  3345. thread.join(timeout=10)
  3346. except InternalError_ as e:
  3347. assert 'query could not complete because relation "materialize.public.counter" was dropped' in str(
  3348. e
  3349. ) or 'subscribe has been terminated because underlying relation "materialize.public.counter" was dropped' in str(
  3350. e
  3351. )
  3352. for thread in threads:
  3353. assert not thread.is_alive(), f"Thread {thread.name} is still running"
  3354. def workflow_test_refresh_mv_warmup(
  3355. c: Composition, parser: WorkflowArgumentParser
  3356. ) -> None:
  3357. """
  3358. Test REFRESH materialized view warmup behavior after envd restarts:
  3359. 1. Regression test for https://github.com/MaterializeInc/database-issues/issues/7574
  3360. If an MV is past its last refresh, it shouldn't get rehydrated after a restart.
  3361. 2. Regression test for https://github.com/MaterializeInc/database-issues/issues/7543
  3362. Bootstrapping should select an `as_of` for an MV dataflow in a way that allows it to warm up before its next
  3363. refresh.
  3364. """
  3365. with c.override(
  3366. Materialized(
  3367. additional_system_parameter_defaults={
  3368. "enable_refresh_every_mvs": "true",
  3369. },
  3370. ),
  3371. Testdrive(no_reset=True),
  3372. ):
  3373. c.down(destroy_volumes=True)
  3374. c.up("materialized", {"name": "testdrive", "persistent": True})
  3375. c.testdrive(
  3376. input=dedent(
  3377. """
  3378. > CREATE CLUSTER cluster12 SIZE '1';
  3379. > SET cluster = cluster12;
  3380. ## 1. Create a materialized view that has only one refresh, and takes at least a few seconds to hydrate.
  3381. ## (Currently, it's ~2 seconds on a release build.)
  3382. > CREATE TABLE t1 (a int);
  3383. > INSERT INTO t1 VALUES (10000000);
  3384. > CREATE MATERIALIZED VIEW mv1 WITH (REFRESH AT CREATION) AS
  3385. SELECT count(*) FROM (SELECT generate_series(1,a) FROM t1);
  3386. # Let's wait for its initial refresh to complete.
  3387. > SELECT * FROM mv1;
  3388. 10000000
  3389. # This INSERT shouldn't be visible in mv1, because we are past its only refresh.
  3390. > INSERT INTO t1 VALUES (10000001);
  3391. ## 2. Create an materialized view that will have its first refresh immediately, but it's next refresh is
  3392. ## a long time away.
  3393. > CREATE TABLE t2 (a int);
  3394. > INSERT INTO t2 VALUES (100);
  3395. > CREATE MATERIALIZED VIEW mv2 WITH (REFRESH EVERY '1 day') AS
  3396. SELECT count(*) FROM (SELECT generate_series(1,a) FROM t2);
  3397. > SELECT * FROM mv2;
  3398. 100
  3399. > INSERT INTO t2 VALUES (1000);
  3400. """
  3401. )
  3402. )
  3403. # Restart environmentd
  3404. c.kill("materialized")
  3405. c.up("materialized")
  3406. c.testdrive(
  3407. input=dedent(
  3408. """
  3409. ## 1. We shouldn't have a dataflow for mv1.
  3410. > SELECT * FROM mz_introspection.mz_dataflows WHERE name = 'mv1';
  3411. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="500ms"
  3412. > SELECT * FROM mz_introspection.mz_dataflows WHERE name = 'mv1';
  3413. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="500ms"
  3414. > SELECT * FROM mz_introspection.mz_dataflows WHERE name = 'mv1';
  3415. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="500ms"
  3416. > SELECT * FROM mz_introspection.mz_dataflows WHERE name = 'mv1';
  3417. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="500ms"
  3418. > SELECT * FROM mz_introspection.mz_dataflows WHERE name = 'mv1';
  3419. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="500ms"
  3420. > SELECT * FROM mz_introspection.mz_dataflows WHERE name = 'mv1';
  3421. > SELECT * FROM mv1;
  3422. 10000000
  3423. ## 2. Check that mv2's dataflow hydrates, even though we are a long time away from the next refresh.
  3424. > SELECT hydrated
  3425. FROM mz_internal.mz_compute_hydration_statuses h JOIN mz_objects o ON (h.object_id = o.id)
  3426. WHERE name = 'mv2';
  3427. true
  3428. # Check that the next refresh hasn't happened yet.
  3429. > INSERT INTO t2 VALUES (10000);
  3430. > SELECT * FROM mv2;
  3431. 100
  3432. """
  3433. )
  3434. )
  3435. def check_read_frontier_not_stuck(c: Composition, object_name: str):
  3436. query = f"""
  3437. SELECT f.read_frontier
  3438. FROM mz_internal.mz_frontiers f
  3439. JOIN mz_objects o ON o.id = f.object_id
  3440. WHERE o.name = '{object_name}';
  3441. """
  3442. # Because `mz_frontiers` isn't a linearizable relation it's possible that
  3443. # we need to wait a bit for the object's frontier to show up.
  3444. result = c.sql_query(query)
  3445. if not result:
  3446. time.sleep(2)
  3447. result = c.sql_query(query)
  3448. before = int(result[0][0])
  3449. time.sleep(3)
  3450. after = int(c.sql_query(query)[0][0])
  3451. assert (
  3452. before < after
  3453. ), f"read frontier of {object_name} is stuck, {before} >= {after}"
  3454. def workflow_test_refresh_mv_restart(
  3455. c: Composition, parser: WorkflowArgumentParser
  3456. ) -> None:
  3457. """
  3458. Test REFRESH materialized views with envd restarts:
  3459. 1a. Restart just after a refresh, and then check after the next refresh that things still work.
  3460. 1b. Same as above, but turn off the replica before the restart, and then turn it back on after the restart.
  3461. 1c. Same as 1a, but with the MV reading from an index.
  3462. 1i. Same as 1a, but on an auto-scheduled cluster.
  3463. 1j. Same as 1a, but there is an index on the MV, and the refresh interval is large, so we are still before the next
  3464. refresh after the restart.
  3465. 2a. Sleep through a refresh time after killing envd, then bring up envd and check that we recover.
  3466. 2b. Same as 2a, but manipulate replicas as in 1b.
  3467. 2c. Same as 2a, but with the MV reading from an index.
  3468. 2i. Same as 2a, but on an auto-scheduled cluster.
  3469. 2j. Same as 1j, but with the long sleep of 2a.
  3470. 3d. No replica while creating the MV, restart envd, and then create a replica
  3471. 3e. Same as 3d, but with an MV reading from an index.
  3472. 3f. Same as 3d, but with an MV that has a last refresh.
  3473. 3g. Same as 3e, but with an MV that has a last refresh.
  3474. 3h. Same as 3d, but with an MV that has a short refresh interval, so we miss several refreshes by the time we get a
  3475. replica.
  3476. When querying MVs after the restarts, perform the same queries with all combinations of
  3477. - SERIALIZABLE / STRICT SERIALIZABLE,
  3478. - with / without indexes on MVs.
  3479. After each of 1., 2., and 3., check that the input table's read frontier keeps advancing.
  3480. Also do some sanity checks on introspection objects related to REFRESH MVs.
  3481. Other tests involving REFRESH MVs and envd restarts:
  3482. * workflow_test_github_8734
  3483. * workflow_test_refresh_mv_warmup
  3484. """
  3485. def check_introspection():
  3486. c.testdrive(
  3487. input=dedent(
  3488. """
  3489. # Wait for introspection objects to be populated.
  3490. > SELECT count(*) > 0 FROM mz_catalog.mz_materialized_views;
  3491. true
  3492. > SELECT count(*) > 0 FROM mz_internal.mz_materialized_view_refresh_strategies;
  3493. true
  3494. > SELECT count(*) > 0 FROM mz_internal.mz_materialized_view_refreshes;
  3495. true
  3496. # Check that no MV is missing from any of the introspection objects.
  3497. # Note that only REFRESH MVs show up in `mz_materialized_view_refreshes`, which is ok, because we are
  3498. # creating only REFRESH MVs in these tests.
  3499. > SELECT *
  3500. FROM
  3501. mz_catalog.mz_materialized_views mv
  3502. FULL OUTER JOIN mz_internal.mz_materialized_view_refreshes mvr ON (mv.id = mvr.materialized_view_id)
  3503. FULL OUTER JOIN mz_internal.mz_materialized_view_refresh_strategies mvrs ON (mv.id = mvrs.materialized_view_id)
  3504. WHERE
  3505. mv.id IS NULL OR
  3506. mvr.materialized_view_id IS NULL OR
  3507. mvrs.materialized_view_id IS NULL;
  3508. """
  3509. )
  3510. )
  3511. with c.override(
  3512. Materialized(
  3513. additional_system_parameter_defaults={
  3514. "enable_refresh_every_mvs": "true",
  3515. "enable_cluster_schedule_refresh": "true",
  3516. },
  3517. ),
  3518. Testdrive(no_reset=True),
  3519. ):
  3520. # We'll issue the same SQL commands in 1. and 2. (the only difference is we make the restart slow with a sleep),
  3521. # so save the SQL commands in `before_restart` and `after_restart`.
  3522. before_restart = dedent(
  3523. """
  3524. > CREATE TABLE t (x int);
  3525. > INSERT INTO t VALUES (100);
  3526. > CREATE CLUSTER cluster_acj SIZE '1';
  3527. > CREATE CLUSTER cluster_b SIZE '1';
  3528. > CREATE CLUSTER cluster_auto_scheduled (SIZE '1', SCHEDULE = ON REFRESH);
  3529. > CREATE MATERIALIZED VIEW mv_a
  3530. IN CLUSTER cluster_acj
  3531. WITH (REFRESH EVERY '20 sec' ALIGNED TO mz_now()::text::int8 + 2000) AS
  3532. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3533. > CREATE MATERIALIZED VIEW mv_b
  3534. IN CLUSTER cluster_b
  3535. WITH (REFRESH EVERY '20 sec' ALIGNED TO mz_now()::text::int8 + 2000) AS
  3536. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3537. > CREATE DEFAULT INDEX
  3538. IN CLUSTER cluster_acj
  3539. ON t;
  3540. > CREATE MATERIALIZED VIEW mv_c
  3541. IN CLUSTER cluster_acj
  3542. WITH (REFRESH EVERY '20 sec' ALIGNED TO mz_now()::text::int8 + 2000) AS
  3543. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3544. > CREATE MATERIALIZED VIEW mv_i
  3545. IN CLUSTER cluster_auto_scheduled
  3546. WITH (REFRESH EVERY '20 sec' ALIGNED TO mz_now()::text::int8 + 2000) AS
  3547. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3548. > CREATE MATERIALIZED VIEW mv_j
  3549. IN CLUSTER cluster_acj
  3550. WITH (REFRESH EVERY '1000000 sec' ALIGNED TO mz_now()::text::int8 + 2000) AS
  3551. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3552. > CREATE CLUSTER serving SIZE '1';
  3553. > CREATE CLUSTER serving_indexed SIZE '1';
  3554. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_a;
  3555. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_b;
  3556. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_c;
  3557. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_i;
  3558. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_j;
  3559. # Let's wait for the MVs' initial refresh to complete.
  3560. > SET cluster = 'serving';
  3561. > SELECT * FROM mv_a;
  3562. 100
  3563. > SELECT * FROM mv_b;
  3564. 100
  3565. > SELECT * FROM mv_c;
  3566. 100
  3567. > SELECT * FROM mv_i;
  3568. 100
  3569. > SELECT * FROM mv_j;
  3570. 100
  3571. > (SELECT * FROM mv_j)
  3572. UNION
  3573. (SELECT x + 1 FROM t);
  3574. 100
  3575. 101
  3576. > SET cluster = 'serving_indexed';
  3577. > SELECT * FROM mv_a;
  3578. 100
  3579. > SELECT * FROM mv_b;
  3580. 100
  3581. > SELECT * FROM mv_c;
  3582. 100
  3583. > SELECT * FROM mv_i;
  3584. 100
  3585. > SELECT * FROM mv_j;
  3586. 100
  3587. > (SELECT * FROM mv_j)
  3588. UNION
  3589. (SELECT x + 1 FROM t);
  3590. 100
  3591. 101
  3592. > INSERT INTO t VALUES (1000);
  3593. > ALTER CLUSTER cluster_b SET (REPLICATION FACTOR 0);
  3594. """
  3595. )
  3596. after_restart = dedent(
  3597. """
  3598. > ALTER CLUSTER cluster_b SET (REPLICATION FACTOR 2);
  3599. > SET TRANSACTION_ISOLATION TO 'STRICT SERIALIZABLE';
  3600. > SET cluster = 'serving';
  3601. > SELECT * FROM mv_a;
  3602. 1100
  3603. > SELECT * FROM mv_b;
  3604. 1100
  3605. > SELECT * FROM mv_c;
  3606. 1100
  3607. > SELECT * FROM mv_i;
  3608. 1100
  3609. > SELECT * FROM mv_j;
  3610. 100
  3611. > (SELECT * FROM mv_j)
  3612. UNION
  3613. (SELECT x + 1 FROM t);
  3614. 100
  3615. 101
  3616. 1001
  3617. > SET cluster = 'serving_indexed';
  3618. > SELECT * FROM mv_a;
  3619. 1100
  3620. > SELECT * FROM mv_b;
  3621. 1100
  3622. > SELECT * FROM mv_c;
  3623. 1100
  3624. > SELECT * FROM mv_i;
  3625. 1100
  3626. > SELECT * FROM mv_j;
  3627. 100
  3628. > (SELECT * FROM mv_j)
  3629. UNION
  3630. (SELECT x + 1 FROM t);
  3631. 100
  3632. 101
  3633. 1001
  3634. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  3635. > SET cluster = 'serving';
  3636. > SELECT * FROM mv_a;
  3637. 1100
  3638. > SELECT * FROM mv_b;
  3639. 1100
  3640. > SELECT * FROM mv_c;
  3641. 1100
  3642. > SELECT * FROM mv_i;
  3643. 1100
  3644. > SELECT * FROM mv_j;
  3645. 100
  3646. > (SELECT * FROM mv_j)
  3647. UNION
  3648. (SELECT x + 1 FROM t);
  3649. 100
  3650. 101
  3651. 1001
  3652. > SET cluster = 'serving_indexed';
  3653. > SELECT * FROM mv_a;
  3654. 1100
  3655. > SELECT * FROM mv_b;
  3656. 1100
  3657. > SELECT * FROM mv_c;
  3658. 1100
  3659. > SELECT * FROM mv_i;
  3660. 1100
  3661. > SELECT * FROM mv_j;
  3662. 100
  3663. > (SELECT * FROM mv_j)
  3664. UNION
  3665. (SELECT x + 1 FROM t);
  3666. 100
  3667. 101
  3668. 1001
  3669. """
  3670. )
  3671. c.down(destroy_volumes=True)
  3672. c.up("materialized", {"name": "testdrive", "persistent": True})
  3673. # 1. (quick restart)
  3674. c.testdrive(input=before_restart)
  3675. check_introspection()
  3676. c.kill("materialized")
  3677. c.up("materialized")
  3678. check_introspection()
  3679. c.testdrive(input=after_restart)
  3680. check_read_frontier_not_stuck(c, "t")
  3681. check_introspection()
  3682. # Reset the testing context.
  3683. c.down(destroy_volumes=True)
  3684. c.up("materialized", {"name": "testdrive", "persistent": True})
  3685. # 2. (slow restart)
  3686. c.testdrive(input=before_restart)
  3687. check_introspection()
  3688. c.kill("materialized")
  3689. time.sleep(20) # Sleep through the refresh interval of the above MVs
  3690. c.up("materialized")
  3691. check_introspection()
  3692. c.testdrive(input=after_restart)
  3693. check_read_frontier_not_stuck(c, "t")
  3694. check_introspection()
  3695. # Reset the testing context.
  3696. c.down(destroy_volumes=True)
  3697. c.up("materialized", {"name": "testdrive", "persistent": True})
  3698. # 3.
  3699. c.testdrive(
  3700. input=dedent(
  3701. """
  3702. > CREATE TABLE t (x int);
  3703. > INSERT INTO t VALUES (100);
  3704. > CREATE CLUSTER cluster_defgh (SIZE '1', REPLICATION FACTOR 0);
  3705. > CREATE MATERIALIZED VIEW mv_3h
  3706. IN CLUSTER cluster_defgh
  3707. WITH (REFRESH EVERY '1600 ms') AS
  3708. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3709. > CREATE MATERIALIZED VIEW mv_3d
  3710. IN CLUSTER cluster_defgh
  3711. WITH (REFRESH EVERY '100000 sec') AS
  3712. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3713. > CREATE MATERIALIZED VIEW mv_3f
  3714. IN CLUSTER cluster_defgh
  3715. WITH (REFRESH AT CREATION) AS
  3716. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3717. > CREATE DEFAULT INDEX
  3718. IN CLUSTER cluster_defgh
  3719. ON t;
  3720. > CREATE MATERIALIZED VIEW mv_3e
  3721. IN CLUSTER cluster_defgh
  3722. WITH (REFRESH EVERY '100000 sec') AS
  3723. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3724. > CREATE MATERIALIZED VIEW mv_3g
  3725. IN CLUSTER cluster_defgh
  3726. WITH (REFRESH AT CREATION) AS
  3727. SELECT count(*) FROM (SELECT generate_series(1,x) FROM t);
  3728. > CREATE CLUSTER serving_indexed SIZE '1';
  3729. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_3d;
  3730. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_3e;
  3731. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_3f;
  3732. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_3g;
  3733. > CREATE DEFAULT INDEX IN CLUSTER serving_indexed ON mv_3h;
  3734. > INSERT INTO t VALUES (1000);
  3735. """
  3736. )
  3737. )
  3738. check_introspection()
  3739. c.kill("materialized")
  3740. c.up("materialized")
  3741. check_introspection()
  3742. c.testdrive(
  3743. input=dedent(
  3744. """
  3745. > ALTER CLUSTER cluster_defgh SET (REPLICATION FACTOR 2);
  3746. > CREATE CLUSTER serving SIZE '1';
  3747. > SET TRANSACTION_ISOLATION TO 'STRICT SERIALIZABLE';
  3748. > SET cluster = 'serving';
  3749. > SELECT * FROM mv_3d
  3750. 100
  3751. > SELECT * FROM mv_3e
  3752. 100
  3753. > SELECT * FROM mv_3f
  3754. 100
  3755. > SELECT * FROM mv_3g
  3756. 100
  3757. > SELECT * FROM mv_3h;
  3758. 1100
  3759. > INSERT INTO t VALUES (10000);
  3760. > SELECT * FROM mv_3h;
  3761. 11100
  3762. > SET cluster = 'serving_indexed';
  3763. > SELECT * FROM mv_3d
  3764. 100
  3765. > SELECT * FROM mv_3e
  3766. 100
  3767. > SELECT * FROM mv_3f
  3768. 100
  3769. > SELECT * FROM mv_3g
  3770. 100
  3771. > SELECT * FROM mv_3h;
  3772. 11100
  3773. > INSERT INTO t VALUES (10000);
  3774. > SELECT * FROM mv_3h;
  3775. 21100
  3776. > SET TRANSACTION_ISOLATION TO 'SERIALIZABLE';
  3777. > SET cluster = 'serving';
  3778. > SELECT * FROM mv_3d
  3779. 100
  3780. > SELECT * FROM mv_3e
  3781. 100
  3782. > SELECT * FROM mv_3f
  3783. 100
  3784. > SELECT * FROM mv_3g
  3785. 100
  3786. > SELECT * FROM mv_3h;
  3787. 21100
  3788. > INSERT INTO t VALUES (10000);
  3789. > SELECT * FROM mv_3h;
  3790. 31100
  3791. > SET cluster = 'serving_indexed';
  3792. > SELECT * FROM mv_3d
  3793. 100
  3794. > SELECT * FROM mv_3e
  3795. 100
  3796. > SELECT * FROM mv_3f
  3797. 100
  3798. > SELECT * FROM mv_3g
  3799. 100
  3800. > SELECT * FROM mv_3h;
  3801. 31100
  3802. > INSERT INTO t VALUES (10000);
  3803. > SELECT * FROM mv_3h;
  3804. 41100
  3805. """
  3806. )
  3807. )
  3808. check_read_frontier_not_stuck(c, "t")
  3809. check_introspection()
  3810. # Drop some MVs and check that this is reflected in the introspection objects.
  3811. c.testdrive(
  3812. input=dedent(
  3813. """
  3814. > DROP MATERIALIZED VIEW mv_3h;
  3815. > DROP MATERIALIZED VIEW mv_3d;
  3816. > SELECT * FROM mz_catalog.mz_materialized_views
  3817. WHERE name = 'mv_3h' OR name = 'mv_3d';
  3818. """
  3819. )
  3820. )
  3821. check_introspection()
  3822. def workflow_test_github_8734(c: Composition) -> None:
  3823. """
  3824. Tests that REFRESH MVs on paused clusters don't unnecessarily hold back
  3825. compaction of their inputs after an envd restart.
  3826. Regression test for database-issues#8734.
  3827. """
  3828. c.down(destroy_volumes=True)
  3829. with c.override(
  3830. Materialized(
  3831. additional_system_parameter_defaults={
  3832. "enable_refresh_every_mvs": "true",
  3833. },
  3834. ),
  3835. Testdrive(no_reset=True),
  3836. ):
  3837. c.up("materialized", {"name": "testdrive", "persistent": True})
  3838. # Create a REFRESH MV and wait for it to refresh once, then take down
  3839. # its cluster.
  3840. c.sql(
  3841. """
  3842. CREATE TABLE t (a int);
  3843. CREATE CLUSTER test SIZE '1';
  3844. CREATE MATERIALIZED VIEW mv
  3845. IN CLUSTER test
  3846. WITH (REFRESH EVERY '60m')
  3847. AS SELECT * FROM t;
  3848. SELECT * FROM mv;
  3849. ALTER CLUSTER test SET (REPLICATION FACTOR 0);
  3850. """
  3851. )
  3852. check_read_frontier_not_stuck(c, "t")
  3853. # Restart envd, then verify that the table's frontier still advances.
  3854. c.kill("materialized")
  3855. c.up("materialized")
  3856. c.sql("SELECT * FROM mv")
  3857. check_read_frontier_not_stuck(c, "t")
  3858. def workflow_test_github_7798(c: Composition, parser: WorkflowArgumentParser) -> None:
  3859. """Regression test for database-issues#7798."""
  3860. c.down(destroy_volumes=True)
  3861. with c.override(
  3862. Materialized(
  3863. additional_system_parameter_defaults={
  3864. "unsafe_enable_unsafe_functions": "true",
  3865. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  3866. },
  3867. ),
  3868. Testdrive(
  3869. no_reset=True,
  3870. default_timeout="10s",
  3871. ),
  3872. ):
  3873. def check_frontiers_advance():
  3874. c.testdrive(
  3875. dedent(
  3876. r"""
  3877. $ set-regex match=0|\d{13,20} replacement=<TIMESTAMP>
  3878. -- Run the frontier query once to make `tokio-postgres` collect type information.
  3879. -- If we don't do this it will inject SELECTs into the transaction, making it fail.
  3880. > SELECT write_frontier FROM mz_internal.mz_frontiers WHERE false
  3881. > BEGIN
  3882. > DECLARE c CURSOR FOR SUBSCRIBE (
  3883. SELECT write_frontier
  3884. FROM mz_internal.mz_frontiers
  3885. JOIN mz_tables ON (id = object_id)
  3886. WHERE name = 'lineitem'
  3887. )
  3888. > FETCH 1 c
  3889. <TIMESTAMP> 1 <TIMESTAMP>
  3890. > FETCH 2 c
  3891. <TIMESTAMP> 1 <TIMESTAMP>
  3892. <TIMESTAMP> -1 <TIMESTAMP>
  3893. > FETCH 2 c
  3894. <TIMESTAMP> 1 <TIMESTAMP>
  3895. <TIMESTAMP> -1 <TIMESTAMP>
  3896. > ROLLBACK
  3897. """
  3898. )
  3899. )
  3900. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  3901. # Create an unmanaged cluster that isn't restarted together with materialized,
  3902. # and therein a source with subsources.
  3903. c.sql(
  3904. """
  3905. CREATE CLUSTER source REPLICAS (
  3906. replica1 (
  3907. STORAGECTL ADDRESSES ['clusterd1:2100'],
  3908. STORAGE ADDRESSES ['clusterd1:2103'],
  3909. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  3910. COMPUTE ADDRESSES ['clusterd1:2102'],
  3911. WORKERS 2
  3912. )
  3913. );
  3914. CREATE SOURCE lgtpch
  3915. IN CLUSTER source
  3916. FROM LOAD GENERATOR TPCH (SCALE FACTOR 0.001, TICK INTERVAL '1s');
  3917. CREATE TABLE customer FROM SOURCE lgtpch (REFERENCE customer);
  3918. CREATE TABLE lineitem FROM SOURCE lgtpch (REFERENCE lineitem);
  3919. CREATE TABLE nation FROM SOURCE lgtpch (REFERENCE nation);
  3920. CREATE TABLE orders FROM SOURCE lgtpch (REFERENCE orders);
  3921. CREATE TABLE part FROM SOURCE lgtpch (REFERENCE part);
  3922. CREATE TABLE partsupp FROM SOURCE lgtpch (REFERENCE partsupp);
  3923. CREATE TABLE region FROM SOURCE lgtpch (REFERENCE region);
  3924. CREATE TABLE supplier FROM SOURCE lgtpch (REFERENCE supplier);
  3925. """,
  3926. )
  3927. check_frontiers_advance()
  3928. # Restart envd to force a storage reconciliation.
  3929. c.kill("materialized")
  3930. c.up("materialized")
  3931. check_frontiers_advance()
  3932. def workflow_test_http_race_condition(
  3933. c: Composition, parser: WorkflowArgumentParser
  3934. ) -> None:
  3935. c.down(destroy_volumes=True)
  3936. c.up("materialized")
  3937. def worker() -> None:
  3938. end_time = time.time() + 60
  3939. while time.time() < end_time:
  3940. timeout = random.uniform(0.01, 1.0)
  3941. rows = random.uniform(1, 10000)
  3942. envd_port = c.port("materialized", 6876)
  3943. try:
  3944. result = requests.post(
  3945. f"http://localhost:{envd_port}/api/sql",
  3946. data=json.dumps(
  3947. {"query": f"select generate_series(1, {rows}::int8)"}
  3948. ),
  3949. headers={"content-type": "application/json"},
  3950. timeout=timeout,
  3951. )
  3952. except requests.exceptions.ReadTimeout:
  3953. continue
  3954. assert result.status_code == 200, result
  3955. threads = []
  3956. for j in range(100):
  3957. thread = Thread(name=f"worker_{j}", target=worker)
  3958. threads.append(thread)
  3959. for thread in threads:
  3960. thread.start()
  3961. for thread in threads:
  3962. thread.join()
  3963. cleanup_seconds = 120 if ui.env_is_truthy("CI_COVERAGE_ENABLED") else 30
  3964. stopping_time = datetime.now() + timedelta(seconds=cleanup_seconds)
  3965. while datetime.now() < stopping_time:
  3966. result = c.sql_query(
  3967. "SELECT * FROM mz_internal.mz_sessions WHERE connection_id <> pg_backend_pid()"
  3968. )
  3969. if not result:
  3970. break
  3971. print(
  3972. f"There are supposed to be no sessions remaining, but there are:\n{result}"
  3973. )
  3974. else:
  3975. raise RuntimeError(f"Sessions did not clean up after {cleanup_seconds}s")
  3976. def workflow_test_read_frontier_advancement(
  3977. c: Composition, parser: WorkflowArgumentParser
  3978. ) -> None:
  3979. """
  3980. Tests that in a set of dependent healthy compute collections all read
  3981. frontiers keep advancing continually. This is to protect against
  3982. regressions in downgrading read holds.
  3983. """
  3984. c.down(destroy_volumes=True)
  3985. c.up("materialized")
  3986. # Create various dataflows on a cluster with multiple replicas, to also
  3987. # test tracking of per-replica read holds.
  3988. c.sql(
  3989. """
  3990. CREATE CLUSTER test SIZE '2-2', REPLICATION FACTOR 4;
  3991. SET cluster = test;
  3992. CREATE TABLE t1 (x int);
  3993. CREATE TABLE t2 (y int);
  3994. CREATE VIEW v1 AS SELECT * FROM t1 JOIN t2 ON (x = y);
  3995. CREATE DEFAULT INDEX ON v1;
  3996. CREATE VIEW v2 AS SELECT x + 1 AS a, y + 2 AS b FROM v1;
  3997. CREATE DEFAULT INDEX ON v2;
  3998. CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM v2;
  3999. CREATE DEFAULT INDEX ON mv1;
  4000. CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM mv1, t1;
  4001. -- wait for dataflows to hydrate
  4002. SELECT 1 FROM v1, v2, mv1, mv2;
  4003. """,
  4004. )
  4005. # Run a subscribe in a different thread.
  4006. def subscribe():
  4007. cursor = c.sql_cursor()
  4008. cursor.execute("SET cluster = test")
  4009. cursor.execute("BEGIN")
  4010. cursor.execute("DECLARE sub CURSOR FOR SUBSCRIBE (SELECT * FROM mv2)")
  4011. try:
  4012. # This should hang until the cluster is dropped. The timeout is
  4013. # only a failsafe.
  4014. cursor.execute("FETCH ALL sub WITH (timeout = '30s')")
  4015. except DatabaseError as exc:
  4016. assert (
  4017. exc.diag.message_primary
  4018. == 'subscribe has been terminated because underlying relation "materialize.public.mv2" was dropped'
  4019. )
  4020. subscribe_thread = Thread(target=subscribe)
  4021. subscribe_thread.start()
  4022. # Wait for the subscribe to start and frontier introspection to be refreshed.
  4023. time.sleep(3)
  4024. # Check that read frontiers advance.
  4025. def collect_read_frontiers() -> dict[str, int]:
  4026. output = c.sql_query(
  4027. """
  4028. SELECT object_id, read_frontier
  4029. FROM mz_internal.mz_frontiers
  4030. WHERE object_id LIKE 'u%'
  4031. """
  4032. )
  4033. frontiers = {}
  4034. for row in output:
  4035. name, frontier = row
  4036. frontiers[name] = int(frontier)
  4037. return frontiers
  4038. frontiers1 = collect_read_frontiers()
  4039. time.sleep(3)
  4040. frontiers2 = collect_read_frontiers()
  4041. for id_ in frontiers1:
  4042. a, b = frontiers1[id_], frontiers2[id_]
  4043. assert a < b, f"read frontier of {id_} has not advanced: {a} -> {b}"
  4044. # Drop the cluster to cancel the subscribe.
  4045. c.sql("DROP CLUSTER test CASCADE")
  4046. subscribe_thread.join()
  4047. def workflow_test_adhoc_system_indexes(
  4048. c: Composition, parser: WorkflowArgumentParser
  4049. ) -> None:
  4050. """
  4051. Tests that the system user can create ad-hoc system indexes and that they
  4052. are handled normally by the system.
  4053. """
  4054. c.down(destroy_volumes=True)
  4055. c.up("materialized")
  4056. # The system user should be able to create a new index on a catalog object
  4057. # in the mz_catalog_server cluster.
  4058. c.sql(
  4059. """
  4060. SET cluster = mz_catalog_server;
  4061. CREATE INDEX mz_test_idx1 ON mz_tables (char_length(name));
  4062. """,
  4063. port=6877,
  4064. user="mz_system",
  4065. )
  4066. output = c.sql_query(
  4067. """
  4068. SELECT i.id, o.name, c.name
  4069. FROM mz_indexes i
  4070. JOIN mz_objects o ON (i.on_id = o.id)
  4071. JOIN mz_clusters c ON (i.cluster_id = c.id)
  4072. WHERE i.name = 'mz_test_idx1'
  4073. """
  4074. )
  4075. assert output[0] == ("u1", "mz_tables", "mz_catalog_server"), output
  4076. output = c.sql_query("EXPLAIN SELECT * FROM mz_tables WHERE char_length(name) = 9")
  4077. assert "mz_test_idx1" in output[0][0], output
  4078. output = c.sql_query("SELECT * FROM mz_tables WHERE char_length(name) = 9")
  4079. assert len(output) > 0
  4080. # The system user should be able to create a new index on an unstable
  4081. # catalog object in the mz_catalog_server cluster if
  4082. # `unsafe_enable_unstable_dependencies` is set.
  4083. c.sql(
  4084. """
  4085. ALTER SYSTEM SET unsafe_enable_unstable_dependencies = on;
  4086. SET cluster = mz_catalog_server;
  4087. CREATE INDEX mz_test_idx2 ON mz_internal.mz_hydration_statuses (hydrated);
  4088. ALTER SYSTEM SET unsafe_enable_unstable_dependencies = off;
  4089. """,
  4090. port=6877,
  4091. user="mz_system",
  4092. )
  4093. output = c.sql_query(
  4094. """
  4095. SELECT i.id, o.name, c.name
  4096. FROM mz_indexes i
  4097. JOIN mz_objects o ON (i.on_id = o.id)
  4098. JOIN mz_clusters c ON (i.cluster_id = c.id)
  4099. WHERE i.name = 'mz_test_idx2'
  4100. """
  4101. )
  4102. assert output[0] == ("u2", "mz_hydration_statuses", "mz_catalog_server"), output
  4103. output = c.sql_query(
  4104. "EXPLAIN SELECT * FROM mz_internal.mz_hydration_statuses WHERE hydrated"
  4105. )
  4106. assert "mz_test_idx2" in output[0][0]
  4107. output = c.sql_query(
  4108. "SELECT * FROM mz_internal.mz_hydration_statuses WHERE hydrated"
  4109. )
  4110. assert len(output) > 0
  4111. # Make sure everything the new indexes survive a restart.
  4112. c.kill("materialized")
  4113. c.up("materialized")
  4114. output = c.sql_query(
  4115. """
  4116. SELECT i.id, o.name, c.name
  4117. FROM mz_indexes i
  4118. JOIN mz_objects o ON (i.on_id = o.id)
  4119. JOIN mz_clusters c ON (i.cluster_id = c.id)
  4120. WHERE i.name LIKE 'mz_test_idx%'
  4121. ORDER BY id
  4122. """
  4123. )
  4124. assert output[0] == ("u1", "mz_tables", "mz_catalog_server"), output
  4125. assert output[1] == ("u2", "mz_hydration_statuses", "mz_catalog_server"), output
  4126. # Make sure the new indexes can be dropped again.
  4127. c.sql(
  4128. """
  4129. DROP INDEX mz_test_idx1;
  4130. DROP INDEX mz_internal.mz_test_idx2;
  4131. """,
  4132. port=6877,
  4133. user="mz_system",
  4134. )
  4135. output = c.sql_query(
  4136. """
  4137. SELECT i.id, o.name, c.name
  4138. FROM mz_indexes i
  4139. JOIN mz_objects o ON (i.on_id = o.id)
  4140. JOIN mz_clusters c ON (i.cluster_id = c.id)
  4141. WHERE i.name LIKE 'mz_test_idx%'
  4142. ORDER BY id
  4143. """
  4144. )
  4145. assert not output, output
  4146. def workflow_test_mz_introspection_cluster_compat(
  4147. c: Composition, parser: WorkflowArgumentParser
  4148. ) -> None:
  4149. """
  4150. Tests that usages of the `mz_introspection` cluster and the
  4151. `auto_route_introspection_queries` variable, which both have been renamed,
  4152. are automatically translated to the new names.
  4153. """
  4154. c.down(destroy_volumes=True)
  4155. c.up("materialized")
  4156. with c.override(
  4157. Testdrive(no_reset=True),
  4158. ):
  4159. c.up("materialized", {"name": "testdrive", "persistent": True})
  4160. # Setting variables through `SET <variable>`.
  4161. c.testdrive(
  4162. dedent(
  4163. """
  4164. > SHOW cluster
  4165. quickstart
  4166. > SET cluster = mz_introspection
  4167. > SHOW cluster
  4168. mz_catalog_server
  4169. > SHOW auto_route_catalog_queries
  4170. on
  4171. > SET auto_route_introspection_queries = off
  4172. > SHOW auto_route_catalog_queries
  4173. off
  4174. > RESET cluster
  4175. > SHOW cluster
  4176. quickstart
  4177. > RESET auto_route_introspection_queries
  4178. > SHOW auto_route_catalog_queries
  4179. on
  4180. """
  4181. )
  4182. )
  4183. # Setting variables through `ALTER ROLE`.
  4184. c.sql(
  4185. """
  4186. ALTER ROLE materialize SET cluster = mz_introspection;
  4187. ALTER ROLE materialize SET auto_route_introspection_queries = off;
  4188. """
  4189. )
  4190. c.testdrive(
  4191. dedent(
  4192. """
  4193. > SHOW cluster
  4194. mz_catalog_server
  4195. > SHOW auto_route_catalog_queries
  4196. off
  4197. """
  4198. )
  4199. )
  4200. c.sql(
  4201. """
  4202. ALTER ROLE materialize RESET cluster;
  4203. ALTER ROLE materialize RESET auto_route_introspection_queries;
  4204. """
  4205. )
  4206. c.testdrive(
  4207. dedent(
  4208. """
  4209. > SHOW cluster
  4210. quickstart
  4211. > SHOW auto_route_catalog_queries
  4212. on
  4213. """
  4214. )
  4215. )
  4216. # Setting variables through the connection string.
  4217. port = c.default_port("materialized")
  4218. url = (
  4219. f"postgres://materialize@localhost:{port}?options="
  4220. "--cluster%3Dmz_introspection%20"
  4221. "--auto_route_introspection_queries%3Doff"
  4222. )
  4223. with psycopg.connect(url) as conn:
  4224. with conn.cursor() as cur:
  4225. cur.execute("SHOW cluster")
  4226. row = cur.fetchone()
  4227. assert row == ("mz_catalog_server",), row
  4228. cur.execute("SHOW auto_route_catalog_queries")
  4229. row = cur.fetchone()
  4230. assert row == ("off",), row
  4231. def workflow_test_unified_introspection_during_replica_disconnect(c: Composition):
  4232. """
  4233. Test that unified introspection data collected for a replica remains
  4234. available after the replica disconnects, until it sends updated
  4235. introspection data.
  4236. """
  4237. c.down(destroy_volumes=True)
  4238. with c.override(
  4239. Materialized(
  4240. additional_system_parameter_defaults={
  4241. "unsafe_enable_unsafe_functions": "true",
  4242. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  4243. },
  4244. ),
  4245. Testdrive(
  4246. no_reset=True,
  4247. default_timeout="10s",
  4248. ),
  4249. ):
  4250. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  4251. # Set up an unorchestrated replica with a couple dataflows.
  4252. c.sql(
  4253. """
  4254. CREATE CLUSTER test REPLICAS (
  4255. test (
  4256. STORAGECTL ADDRESSES ['clusterd1:2100'],
  4257. STORAGE ADDRESSES ['clusterd1:2103'],
  4258. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  4259. COMPUTE ADDRESSES ['clusterd1:2102'],
  4260. WORKERS 2
  4261. )
  4262. );
  4263. SET cluster = test;
  4264. CREATE TABLE t (a int);
  4265. CREATE INDEX idx ON t (a);
  4266. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t;
  4267. """
  4268. )
  4269. output = c.sql_query("SELECT id FROM mz_cluster_replicas WHERE name = 'test'")
  4270. replica_id = output[0][0]
  4271. # Wait for the dataflows to be reported as hydrated.
  4272. c.testdrive(
  4273. dedent(
  4274. f"""
  4275. > SELECT o.name, h.time_ns IS NOT NULL
  4276. FROM mz_internal.mz_compute_hydration_times h
  4277. JOIN mz_objects o ON o.id = h.object_id
  4278. WHERE
  4279. h.replica_id = '{replica_id}' AND
  4280. h.object_id LIKE 'u%'
  4281. idx true
  4282. mv true
  4283. """
  4284. )
  4285. )
  4286. output = c.sql_query(
  4287. f"""
  4288. SELECT sum(time_ns)
  4289. FROM mz_internal.mz_compute_hydration_times
  4290. WHERE replica_id = '{replica_id}'
  4291. """
  4292. )
  4293. previous_times = output[0][0]
  4294. # Kill the replica, wait for a bit for envd to notice, then restart it.
  4295. c.kill("clusterd1")
  4296. time.sleep(5)
  4297. # Verify that the hydration times are still queryable.
  4298. c.testdrive(
  4299. dedent(
  4300. f"""
  4301. > SELECT o.name, h.time_ns IS NOT NULL
  4302. FROM mz_internal.mz_compute_hydration_times h
  4303. JOIN mz_objects o ON o.id = h.object_id
  4304. WHERE
  4305. h.replica_id = '{replica_id}' AND
  4306. h.object_id LIKE 'u%'
  4307. idx true
  4308. mv true
  4309. """
  4310. )
  4311. )
  4312. # Restart the replica, wait for it to report a new set of hydration times.
  4313. c.up("clusterd1")
  4314. c.testdrive(
  4315. dedent(
  4316. f"""
  4317. > SELECT sum(time_ns) != {previous_times}
  4318. FROM mz_internal.mz_compute_hydration_times
  4319. WHERE replica_id = '{replica_id}'
  4320. true
  4321. """
  4322. )
  4323. )
  4324. def workflow_test_zero_downtime_reconfigure(
  4325. c: Composition, parser: WorkflowArgumentParser
  4326. ) -> None:
  4327. """
  4328. Tests reconfiguring a managed cluster with zero downtime
  4329. """
  4330. c.down(destroy_volumes=True)
  4331. with c.override(
  4332. Testdrive(no_reset=True),
  4333. ):
  4334. c.up(
  4335. "materialized",
  4336. "clusterd1",
  4337. "zookeeper",
  4338. "kafka",
  4339. "schema-registry",
  4340. {"name": "testdrive", "persistent": True},
  4341. )
  4342. c.testdrive(
  4343. dedent(
  4344. """
  4345. $ kafka-create-topic topic=graceful-reconfig
  4346. $ kafka-ingest topic=graceful-reconfig format=bytes key-format=bytes key-terminator=: repeat=1000
  4347. key${kafka-ingest.iteration}:value${kafka-ingest.iteration}
  4348. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  4349. ALTER SYSTEM SET enable_zero_downtime_cluster_reconfiguration = true;
  4350. DROP CLUSTER IF EXISTS cluster1 CASCADE;
  4351. DROP TABLE IF EXISTS t CASCADE;
  4352. CREATE CLUSTER cluster1 ( SIZE = '1');
  4353. GRANT ALL ON CLUSTER cluster1 TO materialize;
  4354. > SET CLUSTER = cluster1;
  4355. > CREATE TABLE t (a int);
  4356. > CREATE DEFAULT INDEX ON t;
  4357. > INSERT INTO t VALUES (42);
  4358. > CREATE CONNECTION kafka_conn
  4359. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  4360. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  4361. URL '${testdrive.schema-registry-url}'
  4362. )
  4363. > CREATE SOURCE kafka_src
  4364. IN CLUSTER cluster1
  4365. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-graceful-reconfig-${testdrive.seed}')
  4366. > CREATE TABLE kafka_tbl
  4367. FROM SOURCE kafka_src (REFERENCE "testdrive-graceful-reconfig-${testdrive.seed}")
  4368. KEY FORMAT TEXT
  4369. VALUE FORMAT TEXT
  4370. ENVELOPE UPSERT
  4371. """
  4372. ),
  4373. )
  4374. replicas = c.sql_query(
  4375. """
  4376. SELECT mz_cluster_replicas.name
  4377. FROM mz_cluster_replicas, mz_clusters WHERE
  4378. mz_cluster_replicas.cluster_id = mz_clusters.id AND mz_clusters.name='cluster1';
  4379. """
  4380. )
  4381. assert replicas == [
  4382. ("r1",)
  4383. ], f"Cluster should only have one replica prior to alter, found {replicas}"
  4384. replicas = c.sql_query(
  4385. """
  4386. SELECT cr.name
  4387. FROM mz_internal.mz_pending_cluster_replicas ur
  4388. INNER join mz_cluster_replicas cr ON cr.id=ur.id
  4389. INNER join mz_clusters c ON c.id=cr.cluster_id
  4390. WHERE c.name = 'cluster1';
  4391. """
  4392. )
  4393. assert (
  4394. len(replicas) == 0
  4395. ), f"Cluster should only have no pending replica prior to alter, found {replicas}"
  4396. def zero_downtime_alter():
  4397. try:
  4398. c.sql(
  4399. """
  4400. ALTER CLUSTER cluster1 SET (SIZE = '2') WITH ( WAIT FOR '10s')
  4401. """,
  4402. port=6877,
  4403. user="mz_system",
  4404. )
  4405. except OperationalError:
  4406. # We expect the network to drop during this
  4407. pass
  4408. # Run a reconfigure
  4409. thread = Thread(target=zero_downtime_alter)
  4410. thread.start()
  4411. time.sleep(3)
  4412. # Validate that there is a pending replica
  4413. replicas = c.sql_query(
  4414. """
  4415. SELECT mz_cluster_replicas.name
  4416. FROM mz_cluster_replicas, mz_clusters WHERE
  4417. mz_cluster_replicas.cluster_id = mz_clusters.id AND mz_clusters.name='cluster1';
  4418. """
  4419. )
  4420. assert replicas == [("r1",), ("r1-pending",)], replicas
  4421. replicas = c.sql_query(
  4422. """
  4423. SELECT cr.name
  4424. FROM mz_internal.mz_pending_cluster_replicas ur
  4425. INNER join mz_cluster_replicas cr ON cr.id=ur.id
  4426. INNER join mz_clusters c ON c.id=cr.cluster_id
  4427. WHERE c.name = 'cluster1';
  4428. """
  4429. )
  4430. assert (
  4431. len(replicas) == 1
  4432. ), "pending replica should be in mz_pending_cluster_replicas"
  4433. # Restart environmentd
  4434. c.kill("materialized")
  4435. c.up("materialized")
  4436. # Ensure there is no pending replica
  4437. replicas = c.sql_query(
  4438. """
  4439. SELECT mz_cluster_replicas.name
  4440. FROM mz_cluster_replicas, mz_clusters
  4441. WHERE mz_cluster_replicas.cluster_id = mz_clusters.id
  4442. AND mz_clusters.name='cluster1';
  4443. """
  4444. )
  4445. assert replicas == [
  4446. ("r1",)
  4447. ], f"Expected one non pending replica, found {replicas}"
  4448. # Ensure the cluster config did not change
  4449. assert (
  4450. c.sql_query(
  4451. """
  4452. SELECT size FROM mz_clusters WHERE name='cluster1';
  4453. """
  4454. )
  4455. == [("1",)]
  4456. )
  4457. c.sql(
  4458. """
  4459. ALTER SYSTEM RESET enable_zero_downtime_cluster_reconfiguration;
  4460. """,
  4461. port=6877,
  4462. user="mz_system",
  4463. )
  4464. def workflow_crash_on_replica_expiration_mv(
  4465. c: Composition, parser: WorkflowArgumentParser
  4466. ) -> None:
  4467. """
  4468. Tests that clusterd crashes when a replica is set to expire
  4469. """
  4470. c.down(destroy_volumes=True)
  4471. with c.override(
  4472. Clusterd(name="clusterd1", restart="on-failure"),
  4473. ):
  4474. offset = 20
  4475. c.up("materialized")
  4476. c.up("clusterd1")
  4477. c.sql(
  4478. f"""
  4479. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = 'true';
  4480. ALTER SYSTEM SET compute_replica_expiration_offset = '{offset}s';
  4481. DROP CLUSTER IF EXISTS test CASCADE;
  4482. DROP TABLE IF EXISTS t CASCADE;
  4483. CREATE CLUSTER test REPLICAS (
  4484. test (
  4485. STORAGECTL ADDRESSES ['clusterd1:2100'],
  4486. STORAGE ADDRESSES ['clusterd1:2103'],
  4487. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  4488. COMPUTE ADDRESSES ['clusterd1:2102'],
  4489. WORKERS 1
  4490. )
  4491. );
  4492. SET CLUSTER TO test;
  4493. CREATE TABLE t (x int);
  4494. INSERT INTO t VALUES (42);
  4495. CREATE MATERIALIZED VIEW mv AS SELECT * FROM t WHERE x < 84;
  4496. """,
  4497. port=6877,
  4498. user="mz_system",
  4499. )
  4500. c.sleep(offset + 10)
  4501. results = c.sql_query(
  4502. """
  4503. SELECT * from mv;
  4504. """,
  4505. port=6877,
  4506. user="mz_system",
  4507. )
  4508. assert results == [(42,)], f"Results mismatch: expected [42], found {results}"
  4509. c1 = c.invoke("logs", "clusterd1", capture=True)
  4510. assert (
  4511. "replica expired" in c1.stdout
  4512. ), "unexpected success in crash-on-replica-expiration"
  4513. def workflow_crash_on_replica_expiration_index(
  4514. c: Composition, parser: WorkflowArgumentParser
  4515. ) -> None:
  4516. def fetch_metrics() -> Metrics:
  4517. resp = c.exec(
  4518. "clusterd1", "curl", "localhost:6878/metrics", capture=True
  4519. ).stdout
  4520. return Metrics(resp)
  4521. """
  4522. Tests that clusterd crashes when a replica is set to expire
  4523. """
  4524. c.down(destroy_volumes=True)
  4525. with c.override(
  4526. Clusterd(name="clusterd1", restart="on-failure"),
  4527. ):
  4528. offset = 20
  4529. c.up("materialized")
  4530. c.up("clusterd1")
  4531. c.sql(
  4532. f"""
  4533. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = 'true';
  4534. ALTER SYSTEM SET compute_replica_expiration_offset = '{offset}s';
  4535. DROP CLUSTER IF EXISTS test CASCADE;
  4536. DROP TABLE IF EXISTS t CASCADE;
  4537. CREATE CLUSTER test REPLICAS (
  4538. test (
  4539. STORAGECTL ADDRESSES ['clusterd1:2100'],
  4540. STORAGE ADDRESSES ['clusterd1:2103'],
  4541. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  4542. COMPUTE ADDRESSES ['clusterd1:2102'],
  4543. WORKERS 1
  4544. )
  4545. );
  4546. SET CLUSTER TO test;
  4547. CREATE TABLE t (x int);
  4548. INSERT INTO t VALUES (42);
  4549. CREATE VIEW mv AS SELECT * FROM t WHERE x < 84;
  4550. CREATE DEFAULT INDEX ON mv;
  4551. """,
  4552. port=6877,
  4553. user="mz_system",
  4554. )
  4555. c.sleep(offset + 10)
  4556. results = c.sql_query(
  4557. """
  4558. SELECT * from mv;
  4559. """,
  4560. port=6877,
  4561. user="mz_system",
  4562. )
  4563. assert results == [(42,)], f"Results mismatch: expected [42], found {results}"
  4564. c1 = c.invoke("logs", "clusterd1", capture=True)
  4565. assert (
  4566. "replica expired" in c1.stdout
  4567. ), "unexpected success in crash-on-replica-expiration"
  4568. # Wait a bit to let the controller refresh its metrics.
  4569. time.sleep(2)
  4570. # Check that expected metrics exist and have sensible values.
  4571. metrics = fetch_metrics()
  4572. expected_expiration_timestamp_sec = int(time.time())
  4573. expiration_timestamp_sec = (
  4574. metrics.get_value("mz_dataflow_replica_expiration_timestamp_seconds") / 1000
  4575. )
  4576. # Just ensure the expiration_timestamp is within a reasonable range of now().
  4577. assert (
  4578. (expected_expiration_timestamp_sec - offset)
  4579. < expiration_timestamp_sec
  4580. < (expected_expiration_timestamp_sec + offset)
  4581. ), f"expiration_timestamp: expected={expected_expiration_timestamp_sec}[{datetime.fromtimestamp(expected_expiration_timestamp_sec)}], got={expiration_timestamp_sec}[{[{datetime.fromtimestamp(expiration_timestamp_sec)}]}]"
  4582. expiration_remaining = metrics.get_value(
  4583. "mz_dataflow_replica_expiration_remaining_seconds"
  4584. )
  4585. # Ensure the expiration_remaining is within the configured offset.
  4586. offset = float(offset)
  4587. assert (
  4588. expiration_remaining < offset
  4589. ), f"expiration_remaining: expected < 10s, got={expiration_remaining}"
  4590. def workflow_replica_expiration_creates_retraction_diffs_after_panic(
  4591. c: Composition, parser: WorkflowArgumentParser
  4592. ) -> None:
  4593. """
  4594. Test that retraction diffs within the expiration time are generated after the replica expires and panics
  4595. """
  4596. c.down(destroy_volumes=True)
  4597. with c.override(
  4598. Testdrive(no_reset=True),
  4599. Clusterd(name="clusterd1", restart="on-failure"),
  4600. ):
  4601. c.up("materialized", "clusterd1", {"name": "testdrive", "persistent": True})
  4602. c.testdrive(
  4603. dedent(
  4604. """
  4605. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  4606. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = 'true';
  4607. ALTER SYSTEM SET compute_replica_expiration_offset = '50s';
  4608. > DROP CLUSTER IF EXISTS test CASCADE;
  4609. > DROP TABLE IF EXISTS events CASCADE;
  4610. > CREATE CLUSTER test REPLICAS (
  4611. test (
  4612. STORAGECTL ADDRESSES ['clusterd1:2100'],
  4613. STORAGE ADDRESSES ['clusterd1:2103'],
  4614. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  4615. COMPUTE ADDRESSES ['clusterd1:2102'],
  4616. WORKERS 1
  4617. )
  4618. );
  4619. > SET CLUSTER TO test;
  4620. > CREATE TABLE events (
  4621. content TEXT,
  4622. event_ts TIMESTAMP
  4623. );
  4624. > CREATE VIEW events_view AS
  4625. SELECT event_ts, content
  4626. FROM events
  4627. WHERE mz_now() <= event_ts + INTERVAL '80s';
  4628. > CREATE DEFAULT INDEX ON events_view;
  4629. > INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x;
  4630. # Retraction diffs are not generated
  4631. > SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes
  4632. WHERE name LIKE '%events_view_primary_idx';
  4633. 1000
  4634. # Sleep until the replica expires
  4635. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="60s"
  4636. # Retraction diffs are now within the expiration time and should be generated
  4637. > SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes
  4638. WHERE name LIKE '%events_view_primary_idx';
  4639. 2000
  4640. > DROP TABLE events CASCADE;
  4641. > DROP CLUSTER test CASCADE;
  4642. """
  4643. )
  4644. )
  4645. def workflow_test_constant_sink(c: Composition) -> None:
  4646. """
  4647. Test how we handle constant sinks.
  4648. This test reflects the current behavior, though not the desired behavior,
  4649. as described in database-issues#8842. Once we fix that issue, this can
  4650. become a regression test.
  4651. """
  4652. c.down(destroy_volumes=True)
  4653. with c.override(Testdrive(no_reset=True)):
  4654. c.up(
  4655. "materialized",
  4656. "zookeeper",
  4657. "kafka",
  4658. "schema-registry",
  4659. {"name": "testdrive", "persistent": True},
  4660. )
  4661. c.testdrive(
  4662. dedent(
  4663. """
  4664. > CREATE CLUSTER test SIZE '1';
  4665. > CREATE MATERIALIZED VIEW const IN CLUSTER test AS SELECT 1
  4666. > CREATE CONNECTION kafka_conn
  4667. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  4668. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  4669. URL '${testdrive.schema-registry-url}'
  4670. )
  4671. > CREATE SINK snk
  4672. IN CLUSTER test
  4673. FROM const
  4674. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-snk-${testdrive.seed}')
  4675. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  4676. ENVELOPE DEBEZIUM
  4677. > SELECT write_frontier
  4678. FROM mz_internal.mz_frontiers
  4679. JOIN mz_sinks ON id = object_id
  4680. WHERE name = 'snk' AND write_frontier IS NOT NULL
  4681. > SELECT status
  4682. FROM mz_internal.mz_sink_statuses
  4683. WHERE name = 'snk'
  4684. running
  4685. """
  4686. )
  4687. )
  4688. c.kill("materialized")
  4689. c.up("materialized")
  4690. c.testdrive(
  4691. dedent(
  4692. """
  4693. > SELECT write_frontier
  4694. FROM mz_internal.mz_frontiers
  4695. JOIN mz_sinks ON id = object_id
  4696. WHERE name = 'snk' AND write_frontier IS NOT NULL
  4697. > SELECT status
  4698. FROM mz_internal.mz_sink_statuses
  4699. WHERE name = 'snk'
  4700. running
  4701. """
  4702. )
  4703. )
  4704. def workflow_test_lgalloc_limiter(c: Composition) -> None:
  4705. """
  4706. Test that the lgalloc disk usage limiter functions as expected.
  4707. We run a workload whose disk usage is roughly known and then assert that it
  4708. does, or does not, manage to hydrate with various limiter configurations.
  4709. """
  4710. c.down(destroy_volumes=True)
  4711. with c.override(
  4712. Materialized(
  4713. additional_system_parameter_defaults={
  4714. "enable_lgalloc": "true",
  4715. "enable_columnar_lgalloc": "true",
  4716. "enable_columnation_lgalloc": "true",
  4717. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  4718. "enable_compute_correction_v2": "true",
  4719. "lgalloc_limiter_interval": "100ms",
  4720. },
  4721. ),
  4722. Clusterd(
  4723. name="clusterd1",
  4724. # Announce an (unenforced) memory limit of 1GiB. Disk limits are
  4725. # derived from this memory limit.
  4726. options=["--announce-memory-limit=1073741824"],
  4727. ),
  4728. Testdrive(no_reset=True),
  4729. ):
  4730. c.up("materialized", {"name": "testdrive", "persistent": True})
  4731. c.sql(
  4732. """
  4733. CREATE CLUSTER test REPLICAS (
  4734. r1 (
  4735. STORAGECTL ADDRESSES ['clusterd1:2100'],
  4736. STORAGE ADDRESSES ['clusterd1:2103'],
  4737. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  4738. COMPUTE ADDRESSES ['clusterd1:2102'],
  4739. WORKERS 1
  4740. )
  4741. )
  4742. """
  4743. )
  4744. def setup_workload():
  4745. """
  4746. For our workload we use a large MV, which we obtain by performing a cross
  4747. join. We make sure that the rows are large, so they spill to disk well.
  4748. """
  4749. c.sql(
  4750. """
  4751. DROP TABLE IF EXISTS t CASCADE;
  4752. CREATE TABLE t (i int, x text);
  4753. INSERT INTO t
  4754. SELECT generate_series, repeat('a', 100) FROM generate_series(1, 1000);
  4755. CREATE MATERIALIZED VIEW mv IN CLUSTER test AS
  4756. SELECT t1.i i1, t1.x x1, t2.i i2, t2.x x2 FROM t t1, t t2;
  4757. """
  4758. )
  4759. # Test 1: The MV should be able to hydrate with a disk limit of 1 GiB.
  4760. c.sql(
  4761. """
  4762. ALTER SYSTEM SET lgalloc_limiter_usage_factor = 1;
  4763. ALTER SYSTEM SET lgalloc_limiter_burst_factor = 0;
  4764. """,
  4765. port=6877,
  4766. user="mz_system",
  4767. )
  4768. setup_workload()
  4769. c.up("clusterd1")
  4770. c.testdrive("> SELECT count(*) FROM mv\n1000000")
  4771. c.kill("clusterd1")
  4772. # Test 2: The MV should be unable to hydrate with a disk limit of 10 MiB.
  4773. c.sql(
  4774. """
  4775. ALTER SYSTEM SET lgalloc_limiter_usage_factor = 0.01;
  4776. ALTER SYSTEM SET lgalloc_limiter_burst_factor = 0;
  4777. """,
  4778. port=6877,
  4779. user="mz_system",
  4780. )
  4781. setup_workload()
  4782. c.up("clusterd1", wait=False)
  4783. for _ in range(100):
  4784. time.sleep(1)
  4785. ps = c.invoke("ps", "clusterd1", "-a", capture=True, silent=True).stdout
  4786. if "Exited (167)" in ps:
  4787. break
  4788. else:
  4789. raise RuntimeError("replica did not exit with code 167")
  4790. # Test 3: The MV should be able to hydrate with a disk limit of 10 MiB
  4791. # and a burst budget of 10 GiB-seconds.
  4792. c.sql(
  4793. """
  4794. ALTER SYSTEM SET lgalloc_limiter_usage_factor = 0.01;
  4795. ALTER SYSTEM SET lgalloc_limiter_burst_factor = 1000;
  4796. """,
  4797. port=6877,
  4798. user="mz_system",
  4799. )
  4800. setup_workload()
  4801. # Force a reconnect to make sure the replica gets the new config immediately.
  4802. # TODO(database-issues#9483): make this workaround unnecessary
  4803. c.up("clusterd1")
  4804. time.sleep(1)
  4805. c.kill("clusterd1")
  4806. c.up("clusterd1")
  4807. c.testdrive("> SELECT count(*) FROM mv\n1000000")
  4808. c.kill("clusterd1")
  4809. def workflow_test_memory_limiter(c: Composition) -> None:
  4810. """
  4811. Test that the memory limiter functions as expected.
  4812. We run a workload whose memory usage is roughly known and then assert that it
  4813. does, or does not, manage to hydrate with various limiter configurations.
  4814. """
  4815. c.down(destroy_volumes=True)
  4816. with c.override(
  4817. Materialized(
  4818. additional_system_parameter_defaults={
  4819. "enable_lgalloc": "false",
  4820. "memory_limiter_interval": "100ms",
  4821. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  4822. },
  4823. ),
  4824. Clusterd(
  4825. name="clusterd1",
  4826. # Announce an (unenforced) memory limit of 1GiB. Memory limits are
  4827. # derived from this memory limit.
  4828. options=["--announce-memory-limit=1073741824"],
  4829. ),
  4830. Testdrive(no_reset=True),
  4831. ):
  4832. c.up("materialized", {"name": "testdrive", "persistent": True})
  4833. c.sql(
  4834. """
  4835. CREATE CLUSTER test REPLICAS (
  4836. r1 (
  4837. STORAGECTL ADDRESSES ['clusterd1:2100'],
  4838. STORAGE ADDRESSES ['clusterd1:2103'],
  4839. COMPUTECTL ADDRESSES ['clusterd1:2101'],
  4840. COMPUTE ADDRESSES ['clusterd1:2102'],
  4841. WORKERS 1
  4842. )
  4843. )
  4844. """
  4845. )
  4846. def setup_workload():
  4847. """
  4848. For our workload we use a large MV, which we obtain by performing a cross
  4849. join. We make sure that the rows are large, so consume some memory.
  4850. """
  4851. c.sql(
  4852. """
  4853. DROP TABLE IF EXISTS t CASCADE;
  4854. CREATE TABLE t (i int, x text);
  4855. INSERT INTO t
  4856. SELECT generate_series, repeat('a', 100) FROM generate_series(1, 1000);
  4857. CREATE MATERIALIZED VIEW mv IN CLUSTER test AS
  4858. SELECT t1.i i1, t1.x x1, t2.i i2, t2.x x2 FROM t t1, t t2;
  4859. """
  4860. )
  4861. # Test 1: The MV should be able to hydrate with a memory limit of 2 GiB.
  4862. c.sql(
  4863. """
  4864. ALTER SYSTEM SET memory_limiter_usage_factor = 2;
  4865. ALTER SYSTEM SET memory_limiter_burst_factor = 0;
  4866. """,
  4867. port=6877,
  4868. user="mz_system",
  4869. )
  4870. setup_workload()
  4871. c.up("clusterd1")
  4872. c.testdrive("> SELECT count(*) FROM mv\n1000000")
  4873. c.kill("clusterd1")
  4874. # Test 2: The MV should be unable to hydrate with a memory limit of 205 MiB.
  4875. c.sql(
  4876. """
  4877. ALTER SYSTEM SET memory_limiter_usage_factor = 0.20;
  4878. ALTER SYSTEM SET memory_limiter_burst_factor = 0;
  4879. """,
  4880. port=6877,
  4881. user="mz_system",
  4882. )
  4883. setup_workload()
  4884. c.up("clusterd1", wait=False)
  4885. for _ in range(100):
  4886. time.sleep(1)
  4887. ps = c.invoke("ps", "clusterd1", "-a", capture=True, silent=True).stdout
  4888. if "Exited (167)" in ps:
  4889. break
  4890. else:
  4891. raise RuntimeError("replica did not exit with code 167")
  4892. # Test 3: The MV should be able to hydrate with a memory limit of 1 GiB
  4893. # and a burst budget of 10 GiB-seconds.
  4894. c.sql(
  4895. """
  4896. ALTER SYSTEM SET memory_limiter_usage_factor = 1;
  4897. ALTER SYSTEM SET memory_limiter_burst_factor = 10;
  4898. """,
  4899. port=6877,
  4900. user="mz_system",
  4901. )
  4902. setup_workload()
  4903. # Force a reconnect to make sure the replica gets the new config immediately.
  4904. # TODO(database-issues#9483): make this workaround unnecessary
  4905. c.up("clusterd1")
  4906. time.sleep(1)
  4907. c.kill("clusterd1")
  4908. c.up("clusterd1")
  4909. c.testdrive("> SELECT count(*) FROM mv\n1000000")
  4910. c.kill("clusterd1")