sink.py 71 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, disabled, externally_idempotent
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. from materialize.checks.executors import Executor
  14. from materialize.mz_version import MzVersion
  15. def schemas() -> str:
  16. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  17. def schemas_null() -> str:
  18. return dedent(
  19. """
  20. $ set keyschema={
  21. "type": "record",
  22. "name": "Key",
  23. "fields": [
  24. {"name": "key1", "type": "string"}
  25. ]
  26. }
  27. $ set schema={
  28. "type" : "record",
  29. "name" : "test",
  30. "fields" : [
  31. {"name":"f1", "type":["null", "string"]},
  32. {"name":"f2", "type":["long", "null"]}
  33. ]
  34. }
  35. """
  36. )
  37. @externally_idempotent(False)
  38. class SinkUpsert(Check):
  39. """Basic Check on sinks from an upsert source"""
  40. def initialize(self) -> Testdrive:
  41. return Testdrive(
  42. schemas()
  43. + dedent(
  44. """
  45. $ kafka-create-topic topic=sink-source
  46. $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
  47. {"key1": "U2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  48. $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
  49. {"key1": "D2${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  50. $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
  51. {"key1": "U3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  52. $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
  53. {"key1": "D3${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
  54. > CREATE SOURCE sink_source_src
  55. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-${testdrive.seed}')
  56. > CREATE TABLE sink_source FROM SOURCE sink_source_src (REFERENCE "testdrive-sink-source-${testdrive.seed}")
  57. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  58. ENVELOPE UPSERT
  59. > CREATE MATERIALIZED VIEW sink_source_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v, COUNT(*) AS c FROM sink_source GROUP BY LEFT(key1, 2), LEFT(f1, 1);
  60. > CREATE SINK sink_sink1 FROM sink_source_view
  61. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
  62. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  63. ENVELOPE DEBEZIUM
  64. """
  65. )
  66. )
  67. def manipulate(self) -> list[Testdrive]:
  68. return [
  69. Testdrive(schemas() + dedent(s))
  70. for s in [
  71. """
  72. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  73. GRANT SELECT ON sink_source_view TO materialize
  74. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  75. GRANT USAGE ON CONNECTION csr_conn TO materialize
  76. $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
  77. {"key1": "I2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
  78. {"key1": "U2${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
  79. {"key1": "D2${kafka-ingest.iteration}"}
  80. > CREATE SINK sink_sink2 FROM sink_source_view
  81. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
  82. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  83. ENVELOPE DEBEZIUM
  84. """,
  85. """
  86. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  87. GRANT SELECT ON sink_source_view TO materialize
  88. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  89. GRANT USAGE ON CONNECTION csr_conn TO materialize
  90. $ kafka-ingest format=avro key-format=avro topic=sink-source key-schema=${keyschema} schema=${schema} repeat=1000
  91. {"key1": "I3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
  92. {"key1": "U3${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
  93. {"key1": "D3${kafka-ingest.iteration}"}
  94. > CREATE SINK sink_sink3 FROM sink_source_view
  95. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
  96. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  97. ENVELOPE DEBEZIUM
  98. """,
  99. ]
  100. ]
  101. def validate(self) -> Testdrive:
  102. return Testdrive(
  103. dedent(
  104. """
  105. $ set-sql-timeout duration=60s
  106. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  107. GRANT SELECT ON sink_source_view TO materialize
  108. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  109. GRANT USAGE ON CONNECTION csr_conn TO materialize
  110. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  111. GRANT CREATECLUSTER ON SYSTEM TO materialize
  112. > SELECT * FROM sink_source_view;
  113. I2 B 1000
  114. I3 C 1000
  115. U2 B 1000
  116. U3 C 1000
  117. # We check the contents of the sink topics by re-ingesting them.
  118. > CREATE SOURCE sink_view1_src
  119. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink1')
  120. > CREATE TABLE sink_view1 FROM SOURCE sink_view1_src (REFERENCE "sink-sink1")
  121. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  122. ENVELOPE NONE
  123. > CREATE SOURCE sink_view2_src
  124. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink2')
  125. > CREATE TABLE sink_view2 FROM SOURCE sink_view2_src (REFERENCE "sink-sink2")
  126. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  127. ENVELOPE NONE
  128. > CREATE SOURCE sink_view3_src
  129. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink3')
  130. > CREATE TABLE sink_view3 FROM SOURCE sink_view3_src (REFERENCE "sink-sink3")
  131. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  132. ENVELOPE NONE
  133. # Validate the sink by aggregating all the 'before' and 'after' records using SQL
  134. > SELECT l_v, l_k, SUM(c)
  135. FROM (
  136. SELECT (after).l_v, (after).l_k, (after).c FROM sink_view1
  137. UNION ALL
  138. SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view1
  139. ) GROUP BY l_v, l_k
  140. HAVING SUM(c) > 0;
  141. B I2 1000
  142. B U2 1000
  143. C I3 1000
  144. C U3 1000
  145. > SELECT l_v, l_k, SUM(c)
  146. FROM (
  147. SELECT (after).l_v, (after).l_k, (after).c FROM sink_view2
  148. UNION ALL
  149. SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view2
  150. ) GROUP BY l_v, l_k
  151. HAVING SUM(c) > 0;
  152. B I2 1000
  153. B U2 1000
  154. C I3 1000
  155. C U3 1000
  156. > SELECT l_v, l_k, SUM(c)
  157. FROM (
  158. SELECT (after).l_v, (after).l_k, (after).c FROM sink_view3
  159. UNION ALL
  160. SELECT (before).l_v, (before).l_k, -(before).c FROM sink_view3
  161. ) GROUP BY l_v, l_k
  162. HAVING SUM(c) > 0;
  163. B I2 1000
  164. B U2 1000
  165. C I3 1000
  166. C U3 1000
  167. > DROP SOURCE sink_view1_src CASCADE;
  168. > DROP SOURCE sink_view2_src CASCADE;
  169. > DROP SOURCE sink_view3_src CASCADE;
  170. """
  171. )
  172. )
  173. @externally_idempotent(False)
  174. class SinkTables(Check):
  175. """Sink and re-ingest a large transaction from a table source"""
  176. def initialize(self) -> Testdrive:
  177. return Testdrive(
  178. schemas()
  179. + dedent(
  180. """
  181. > CREATE TABLE sink_large_transaction_table (f1 INTEGER, f2 TEXT, PRIMARY KEY (f1));
  182. > CREATE DEFAULT INDEX ON sink_large_transaction_table;
  183. > INSERT INTO sink_large_transaction_table SELECT generate_series, REPEAT('x', 1024) FROM generate_series(1, 100000);
  184. # Can be slow with a large transaction
  185. $ set-sql-timeout duration=240s
  186. > CREATE MATERIALIZED VIEW sink_large_transaction_view AS SELECT f1 - 1 AS f1 , f2 FROM sink_large_transaction_table;
  187. > CREATE CLUSTER sink_large_transaction_sink1_cluster SIZE '4';
  188. > CREATE SINK sink_large_transaction_sink1
  189. IN CLUSTER sink_large_transaction_sink1_cluster
  190. FROM sink_large_transaction_view
  191. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
  192. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  193. ENVELOPE DEBEZIUM;
  194. """
  195. )
  196. )
  197. def manipulate(self) -> list[Testdrive]:
  198. return [
  199. Testdrive(schemas() + dedent(s))
  200. for s in [
  201. """
  202. > UPDATE sink_large_transaction_table SET f2 = REPEAT('y', 1024)
  203. """,
  204. """
  205. > UPDATE sink_large_transaction_table SET f2 = REPEAT('z', 1024)
  206. """,
  207. ]
  208. ]
  209. def validate(self) -> Testdrive:
  210. return Testdrive(
  211. dedent(
  212. """
  213. $ set-sql-timeout duration=60s
  214. $ schema-registry-verify schema-type=avro subject=testdrive-sink-large-transaction-sink-${testdrive.seed}-value
  215. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"f1","type":"int"},{"name":"f2","type":["null","string"]}]}]},{"name":"after","type":["null","row"]}]}
  216. # We check the contents of the sink topics by re-ingesting them.
  217. > CREATE SOURCE sink_large_transaction_source_src
  218. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-large-transaction-sink-${testdrive.seed}')
  219. > CREATE TABLE sink_large_transaction_source FROM SOURCE sink_large_transaction_source_src (REFERENCE "testdrive-sink-large-transaction-sink-${testdrive.seed}")
  220. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  221. ENVELOPE NONE
  222. # Can be slow with a large transaction
  223. $ set-sql-timeout duration=240s
  224. > CREATE MATERIALIZED VIEW sink_large_transaction_view2
  225. AS
  226. SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
  227. FROM (
  228. SELECT (before).f1, (before).f2 FROM sink_large_transaction_source
  229. )
  230. > CREATE MATERIALIZED VIEW sink_large_transaction_view3
  231. AS
  232. SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
  233. FROM (
  234. SELECT (after).f1, (after).f2 FROM sink_large_transaction_source
  235. )
  236. > CREATE MATERIALIZED VIEW sink_large_transaction_view4
  237. AS
  238. SELECT LEFT(f2, 1), SUM(c)
  239. FROM (
  240. SELECT (after).f2, COUNT(*) AS c FROM sink_large_transaction_source GROUP BY (after).f2
  241. UNION ALL
  242. SELECT (before).f2, -COUNT(*) AS c FROM sink_large_transaction_source GROUP BY (before).f2
  243. )
  244. GROUP BY f2
  245. > SELECT * FROM sink_large_transaction_view2
  246. 500000 200000 100000 0 99999
  247. > SELECT * FROM sink_large_transaction_view3
  248. 500000 300000 100000 0 99999
  249. > SELECT * FROM sink_large_transaction_view4
  250. <null> -100000
  251. x 0
  252. y 0
  253. z 100000
  254. > DROP SOURCE sink_large_transaction_source_src CASCADE;
  255. """
  256. )
  257. )
  258. @externally_idempotent(False)
  259. class SinkNullDefaults(Check):
  260. """Check on an Avro sink with NULL DEFAULTS"""
  261. def initialize(self) -> Testdrive:
  262. return Testdrive(
  263. schemas_null()
  264. + dedent(
  265. """
  266. $ kafka-create-topic topic=sink-source-null
  267. $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
  268. {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
  269. $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
  270. {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  271. $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
  272. {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
  273. $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
  274. {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  275. > CREATE SOURCE sink_source_null_src
  276. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-null-${testdrive.seed}')
  277. > CREATE TABLE sink_source_null FROM SOURCE sink_source_null_src (REFERENCE "testdrive-sink-source-null-${testdrive.seed}")
  278. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  279. ENVELOPE UPSERT
  280. > CREATE MATERIALIZED VIEW sink_source_null_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_null GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100;
  281. > CREATE SINK sink_sink_null1 FROM sink_source_null_view
  282. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
  283. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  284. ( NULL DEFAULTS )
  285. ENVELOPE DEBEZIUM
  286. """
  287. )
  288. )
  289. def manipulate(self) -> list[Testdrive]:
  290. return [
  291. Testdrive(schemas_null() + dedent(s))
  292. for s in [
  293. """
  294. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  295. GRANT SELECT ON sink_source_null_view TO materialize
  296. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  297. GRANT USAGE ON CONNECTION csr_conn TO materialize
  298. $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
  299. {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
  300. {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  301. {"key1": "D2${kafka-ingest.iteration}"}
  302. > CREATE SINK sink_sink_null2 FROM sink_source_null_view
  303. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
  304. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  305. ( NULL DEFAULTS )
  306. ENVELOPE DEBEZIUM
  307. """,
  308. """
  309. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  310. GRANT SELECT ON sink_source_null_view TO materialize
  311. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  312. GRANT USAGE ON CONNECTION csr_conn TO materialize
  313. $ kafka-ingest format=avro key-format=avro topic=sink-source-null key-schema=${keyschema} schema=${schema} repeat=1000
  314. {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
  315. {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  316. {"key1": "D2${kafka-ingest.iteration}"}
  317. > CREATE SINK sink_sink_null3 FROM sink_source_null_view
  318. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
  319. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  320. ( NULL DEFAULTS )
  321. ENVELOPE DEBEZIUM
  322. """,
  323. ]
  324. ]
  325. def validate(self) -> Testdrive:
  326. return Testdrive(
  327. dedent(
  328. """
  329. $ set-sql-timeout duration=60s
  330. $ schema-registry-verify schema-type=avro subject=sink-sink-null1-value
  331. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
  332. $ schema-registry-verify schema-type=avro subject=sink-sink-null2-value
  333. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
  334. $ schema-registry-verify schema-type=avro subject=sink-sink-null3-value
  335. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null},{"name":"l_v2","type":["null","long"],"default":null},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
  336. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  337. GRANT SELECT ON sink_source_null_view TO materialize
  338. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  339. GRANT USAGE ON CONNECTION csr_conn TO materialize
  340. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  341. GRANT CREATECLUSTER ON SYSTEM TO materialize
  342. > SELECT * FROM sink_source_null_view;
  343. D3 <null> 0 100
  344. D3 <null> 1 100
  345. D3 <null> 2 100
  346. D3 <null> 3 100
  347. D3 <null> 4 100
  348. D3 <null> 5 100
  349. D3 <null> 6 100
  350. D3 <null> 7 100
  351. D3 <null> 8 100
  352. D3 <null> 9 100
  353. I2 B <null> 1000
  354. U2 <null> 0 100
  355. U2 <null> 1 100
  356. U2 <null> 2 100
  357. U2 <null> 3 100
  358. U2 <null> 4 100
  359. U2 <null> 5 100
  360. U2 <null> 6 100
  361. U2 <null> 7 100
  362. U2 <null> 8 100
  363. U2 <null> 9 100
  364. U3 A <null> 1000
  365. # We check the contents of the sink topics by re-ingesting them.
  366. > CREATE SOURCE sink_view_null1_src
  367. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null1')
  368. > CREATE TABLE sink_view_null1 FROM SOURCE sink_view_null1_src (REFERENCE "sink-sink-null1")
  369. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  370. ENVELOPE NONE
  371. > CREATE SOURCE sink_view_null2_src
  372. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null2')
  373. > CREATE TABLE sink_view_null2 FROM SOURCE sink_view_null2_src (REFERENCE "sink-sink-null2")
  374. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  375. ENVELOPE NONE
  376. > CREATE SOURCE sink_view_null3_src
  377. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-null3')
  378. > CREATE TABLE sink_view_null3 FROM SOURCE sink_view_null3_src (REFERENCE "sink-sink-null3")
  379. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  380. ENVELOPE NONE
  381. # Validate the sink by aggregating all the 'before' and 'after' records using SQL
  382. > SELECT l_v1, l_v2, l_k, SUM(c)
  383. FROM (
  384. SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null1
  385. UNION ALL
  386. SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null1
  387. ) GROUP BY l_v1, l_v2, l_k
  388. HAVING SUM(c) > 0;
  389. <null> 0 D3 100
  390. <null> 0 U2 100
  391. <null> 1 D3 100
  392. <null> 1 U2 100
  393. <null> 2 D3 100
  394. <null> 2 U2 100
  395. <null> 3 D3 100
  396. <null> 3 U2 100
  397. <null> 4 D3 100
  398. <null> 4 U2 100
  399. <null> 5 D3 100
  400. <null> 5 U2 100
  401. <null> 6 D3 100
  402. <null> 6 U2 100
  403. <null> 7 D3 100
  404. <null> 7 U2 100
  405. <null> 8 D3 100
  406. <null> 8 U2 100
  407. <null> 9 D3 100
  408. <null> 9 U2 100
  409. A <null> U3 1000
  410. B <null> I2 1000
  411. > SELECT l_v1, l_v2, l_k, SUM(c)
  412. FROM (
  413. SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null2
  414. UNION ALL
  415. SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null2
  416. ) GROUP BY l_v1, l_v2, l_k
  417. HAVING SUM(c) > 0;
  418. <null> 0 D3 100
  419. <null> 0 U2 100
  420. <null> 1 D3 100
  421. <null> 1 U2 100
  422. <null> 2 D3 100
  423. <null> 2 U2 100
  424. <null> 3 D3 100
  425. <null> 3 U2 100
  426. <null> 4 D3 100
  427. <null> 4 U2 100
  428. <null> 5 D3 100
  429. <null> 5 U2 100
  430. <null> 6 D3 100
  431. <null> 6 U2 100
  432. <null> 7 D3 100
  433. <null> 7 U2 100
  434. <null> 8 D3 100
  435. <null> 8 U2 100
  436. <null> 9 D3 100
  437. <null> 9 U2 100
  438. A <null> U3 1000
  439. B <null> I2 1000
  440. > SELECT l_v1, l_v2, l_k, SUM(c)
  441. FROM (
  442. SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_null3
  443. UNION ALL
  444. SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_null3
  445. ) GROUP BY l_v1, l_v2, l_k
  446. HAVING SUM(c) > 0;
  447. <null> 0 D3 100
  448. <null> 0 U2 100
  449. <null> 1 D3 100
  450. <null> 1 U2 100
  451. <null> 2 D3 100
  452. <null> 2 U2 100
  453. <null> 3 D3 100
  454. <null> 3 U2 100
  455. <null> 4 D3 100
  456. <null> 4 U2 100
  457. <null> 5 D3 100
  458. <null> 5 U2 100
  459. <null> 6 D3 100
  460. <null> 6 U2 100
  461. <null> 7 D3 100
  462. <null> 7 U2 100
  463. <null> 8 D3 100
  464. <null> 8 U2 100
  465. <null> 9 D3 100
  466. <null> 9 U2 100
  467. A <null> U3 1000
  468. B <null> I2 1000
  469. > DROP SOURCE sink_view_null1_src CASCADE;
  470. > DROP SOURCE sink_view_null2_src CASCADE;
  471. > DROP SOURCE sink_view_null3_src CASCADE;
  472. """
  473. )
  474. )
  475. @externally_idempotent(False)
  476. class SinkComments(Check):
  477. """Check on an Avro sink with comments"""
  478. def initialize(self) -> Testdrive:
  479. return Testdrive(
  480. schemas_null()
  481. + dedent(
  482. """
  483. $ kafka-create-topic topic=sink-sourcecomments
  484. $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
  485. {"key1": "U2${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
  486. $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
  487. {"key1": "D2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  488. $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
  489. {"key1": "U3${kafka-ingest.iteration}"} {"f1": {"string": "A${kafka-ingest.iteration}"}, "f2": null}
  490. $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
  491. {"key1": "D3${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  492. > CREATE SOURCE sink_source_comments_src
  493. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-source-comments-${testdrive.seed}')
  494. > CREATE TABLE sink_source_comments FROM SOURCE sink_source_comments_src (REFERENCE "testdrive-sink-source-comments-${testdrive.seed}")
  495. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  496. ENVELOPE UPSERT
  497. > CREATE MATERIALIZED VIEW sink_source_comments_view AS SELECT LEFT(key1, 2) as l_k, LEFT(f1, 1) AS l_v1, f2 / 100 AS l_v2, COUNT(*) AS c FROM sink_source_comments GROUP BY LEFT(key1, 2), LEFT(f1, 1), f2 / 100
  498. > COMMENT ON MATERIALIZED VIEW sink_source_comments_view IS 'comment on view sink_source_comments_view'
  499. > CREATE SINK sink_sink_comments1 FROM sink_source_comments_view
  500. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
  501. KEY (l_v2) NOT ENFORCED
  502. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  503. ( NULL DEFAULTS,
  504. DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
  505. VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
  506. KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
  507. )
  508. ENVELOPE DEBEZIUM
  509. """
  510. )
  511. )
  512. def manipulate(self) -> list[Testdrive]:
  513. return [
  514. Testdrive(schemas_null() + dedent(s))
  515. for s in [
  516. """
  517. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  518. GRANT SELECT ON sink_source_comments_view TO materialize
  519. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  520. GRANT USAGE ON CONNECTION csr_conn TO materialize
  521. $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
  522. {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
  523. {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  524. {"key1": "D2${kafka-ingest.iteration}"}
  525. > CREATE SINK sink_sink_comments2 FROM sink_source_comments_view
  526. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
  527. KEY (l_v2) NOT ENFORCED
  528. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  529. ( NULL DEFAULTS,
  530. DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
  531. VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
  532. KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
  533. )
  534. ENVELOPE DEBEZIUM
  535. """,
  536. """
  537. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  538. GRANT SELECT ON sink_source_comments_view TO materialize
  539. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  540. GRANT USAGE ON CONNECTION csr_conn TO materialize
  541. $ kafka-ingest format=avro key-format=avro topic=sink-source-comments key-schema=${keyschema} schema=${schema} repeat=1000
  542. {"key1": "I2${kafka-ingest.iteration}"} {"f1": {"string": "B${kafka-ingest.iteration}"}, "f2": null}
  543. {"key1": "U2${kafka-ingest.iteration}"} {"f1": null, "f2": {"long": ${kafka-ingest.iteration}}}
  544. {"key1": "D2${kafka-ingest.iteration}"}
  545. > CREATE SINK sink_sink_comments3 FROM sink_source_comments_view
  546. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
  547. KEY (l_v2) NOT ENFORCED
  548. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  549. ( NULL DEFAULTS,
  550. DOC ON COLUMN sink_source_comments_view.l_v1 = 'doc on l_v1',
  551. VALUE DOC ON COLUMN sink_source_comments_view.l_v2 = 'value doc on l_v2',
  552. KEY DOC ON COLUMN sink_source_comments_view.l_v2 = 'key doc on l_v2'
  553. )
  554. ENVELOPE DEBEZIUM
  555. """,
  556. ]
  557. ]
  558. def validate(self) -> Testdrive:
  559. return Testdrive(
  560. dedent(
  561. """
  562. $ set-sql-timeout duration=60s
  563. $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-key
  564. {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
  565. $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-key
  566. {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
  567. $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-key
  568. {"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_v2","type":["null","long"],"default":null,"doc":"key doc on l_v2"}]}
  569. $ schema-registry-verify schema-type=avro subject=sink-sink-comments1-value
  570. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
  571. $ schema-registry-verify schema-type=avro subject=sink-sink-comments2-value
  572. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
  573. $ schema-registry-verify schema-type=avro subject=sink-sink-comments3-value
  574. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","doc":"comment on view sink_source_comments_view","fields":[{"name":"l_k","type":"string"},{"name":"l_v1","type":["null","string"],"default":null,"doc":"doc on l_v1"},{"name":"l_v2","type":["null","long"],"default":null,"doc":"value doc on l_v2"},{"name":"c","type":"long"}]}],"default":null},{"name":"after","type":["null","row"],"default":null}]}
  575. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  576. GRANT SELECT ON sink_source_comments_view TO materialize
  577. GRANT USAGE ON CONNECTION kafka_conn TO materialize
  578. GRANT USAGE ON CONNECTION csr_conn TO materialize
  579. $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
  580. GRANT CREATECLUSTER ON SYSTEM TO materialize
  581. > SELECT * FROM sink_source_comments_view;
  582. D3 <null> 0 100
  583. D3 <null> 1 100
  584. D3 <null> 2 100
  585. D3 <null> 3 100
  586. D3 <null> 4 100
  587. D3 <null> 5 100
  588. D3 <null> 6 100
  589. D3 <null> 7 100
  590. D3 <null> 8 100
  591. D3 <null> 9 100
  592. I2 B <null> 1000
  593. U2 <null> 0 100
  594. U2 <null> 1 100
  595. U2 <null> 2 100
  596. U2 <null> 3 100
  597. U2 <null> 4 100
  598. U2 <null> 5 100
  599. U2 <null> 6 100
  600. U2 <null> 7 100
  601. U2 <null> 8 100
  602. U2 <null> 9 100
  603. U3 A <null> 1000
  604. # We check the contents of the sink topics by re-ingesting them.
  605. > CREATE SOURCE sink_view_comments1_src
  606. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments1')
  607. > CREATE TABLE sink_view_comments1 FROM SOURCE sink_view_comments1_src (REFERENCE "sink-sink-comments1")
  608. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  609. ENVELOPE NONE
  610. > CREATE SOURCE sink_view_comments2_src
  611. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments2')
  612. > CREATE TABLE sink_view_comments2 FROM SOURCE sink_view_comments2_src (REFERENCE "sink-sink-comments2")
  613. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  614. ENVELOPE NONE
  615. > CREATE SOURCE sink_view_comments3_src
  616. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-sink-comments3')
  617. > CREATE TABLE sink_view_comments3 FROM SOURCE sink_view_comments3_src (REFERENCE "sink-sink-comments3")
  618. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  619. ENVELOPE NONE
  620. # Validate the sink by aggregating all the 'before' and 'after' records using SQL
  621. > SELECT l_v1, l_v2, l_k, SUM(c)
  622. FROM (
  623. SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments1
  624. UNION ALL
  625. SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments1
  626. ) GROUP BY l_v1, l_v2, l_k
  627. HAVING SUM(c) > 0;
  628. <null> 0 D3 100
  629. <null> 0 U2 100
  630. <null> 1 D3 100
  631. <null> 1 U2 100
  632. <null> 2 D3 100
  633. <null> 2 U2 100
  634. <null> 3 D3 100
  635. <null> 3 U2 100
  636. <null> 4 D3 100
  637. <null> 4 U2 100
  638. <null> 5 D3 100
  639. <null> 5 U2 100
  640. <null> 6 D3 100
  641. <null> 6 U2 100
  642. <null> 7 D3 100
  643. <null> 7 U2 100
  644. <null> 8 D3 100
  645. <null> 8 U2 100
  646. <null> 9 D3 100
  647. <null> 9 U2 100
  648. A <null> U3 1000
  649. B <null> I2 1000
  650. > SELECT l_v1, l_v2, l_k, SUM(c)
  651. FROM (
  652. SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments2
  653. UNION ALL
  654. SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments2
  655. ) GROUP BY l_v1, l_v2, l_k
  656. HAVING SUM(c) > 0;
  657. <null> 0 D3 100
  658. <null> 0 U2 100
  659. <null> 1 D3 100
  660. <null> 1 U2 100
  661. <null> 2 D3 100
  662. <null> 2 U2 100
  663. <null> 3 D3 100
  664. <null> 3 U2 100
  665. <null> 4 D3 100
  666. <null> 4 U2 100
  667. <null> 5 D3 100
  668. <null> 5 U2 100
  669. <null> 6 D3 100
  670. <null> 6 U2 100
  671. <null> 7 D3 100
  672. <null> 7 U2 100
  673. <null> 8 D3 100
  674. <null> 8 U2 100
  675. <null> 9 D3 100
  676. <null> 9 U2 100
  677. A <null> U3 1000
  678. B <null> I2 1000
  679. > SELECT l_v1, l_v2, l_k, SUM(c)
  680. FROM (
  681. SELECT (after).l_v1, (after).l_v2, (after).l_k, (after).c FROM sink_view_comments3
  682. UNION ALL
  683. SELECT (before).l_v1, (before).l_v2, (before).l_k, -(before).c FROM sink_view_comments3
  684. ) GROUP BY l_v1, l_v2, l_k
  685. HAVING SUM(c) > 0;
  686. <null> 0 D3 100
  687. <null> 0 U2 100
  688. <null> 1 D3 100
  689. <null> 1 U2 100
  690. <null> 2 D3 100
  691. <null> 2 U2 100
  692. <null> 3 D3 100
  693. <null> 3 U2 100
  694. <null> 4 D3 100
  695. <null> 4 U2 100
  696. <null> 5 D3 100
  697. <null> 5 U2 100
  698. <null> 6 D3 100
  699. <null> 6 U2 100
  700. <null> 7 D3 100
  701. <null> 7 U2 100
  702. <null> 8 D3 100
  703. <null> 8 U2 100
  704. <null> 9 D3 100
  705. <null> 9 U2 100
  706. A <null> U3 1000
  707. B <null> I2 1000
  708. > DROP SOURCE sink_view_comments1_src CASCADE;
  709. > DROP SOURCE sink_view_comments2_src CASCADE;
  710. > DROP SOURCE sink_view_comments3_src CASCADE;
  711. """
  712. )
  713. )
  714. @externally_idempotent(False)
  715. class SinkAutoCreatedTopicConfig(Check):
  716. """Check on a sink with auto-created topic configuration"""
  717. def initialize(self) -> Testdrive:
  718. return Testdrive(
  719. schemas_null()
  720. + dedent(
  721. """
  722. > CREATE TABLE sink_config_table (f1 int)
  723. > INSERT INTO sink_config_table VALUES (1);
  724. > CREATE SINK sink_config1 FROM sink_config_table
  725. INTO KAFKA CONNECTION kafka_conn (
  726. TOPIC 'sink-config1',
  727. TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  728. )
  729. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  730. ENVELOPE DEBEZIUM
  731. """
  732. )
  733. )
  734. def manipulate(self) -> list[Testdrive]:
  735. return [
  736. Testdrive(schemas_null() + dedent(s))
  737. for s in [
  738. """
  739. > INSERT INTO sink_config_table VALUES (2);
  740. > CREATE SINK sink_config2 FROM sink_config_table
  741. INTO KAFKA CONNECTION kafka_conn (
  742. TOPIC 'sink-config2',
  743. TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  744. )
  745. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  746. ENVELOPE DEBEZIUM
  747. """,
  748. """
  749. > INSERT INTO sink_config_table VALUES (3);
  750. > CREATE SINK sink_config3 FROM sink_config_table
  751. INTO KAFKA CONNECTION kafka_conn (
  752. TOPIC 'sink-config3',
  753. TOPIC CONFIG MAP['cleanup.policy' => 'compact']
  754. )
  755. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  756. ENVELOPE DEBEZIUM
  757. """,
  758. ]
  759. ]
  760. def validate(self) -> Testdrive:
  761. return Testdrive(
  762. dedent(
  763. """
  764. $ set-sql-timeout duration=60s
  765. $ kafka-verify-topic sink=materialize.public.sink_config1 partition-count=1 topic-config={"cleanup.policy": "compact"}
  766. $ kafka-verify-topic sink=materialize.public.sink_config2 partition-count=1 topic-config={"cleanup.policy": "compact"}
  767. $ kafka-verify-topic sink=materialize.public.sink_config3 partition-count=1 topic-config={"cleanup.policy": "compact"}
  768. # TODO: Reenable this check when kafka-verify-data can deal with validate being run twice
  769. # $ kafka-verify-data format=avro sink=materialize.public.sink_config1 sort-messages=true
  770. # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
  771. # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
  772. # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}
  773. # $ kafka-verify-data format=avro sink=materialize.public.sink_config2 sort-messages=true
  774. # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
  775. # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
  776. # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}
  777. # $ kafka-verify-data format=avro sink=materialize.public.sink_config3 sort-messages=true
  778. # {{"before": null, "after": {{"row":{{"f1": 1}}}}}}
  779. # {{"before": null, "after": {{"row":{{"f1": 2}}}}}}
  780. # {{"before": null, "after": {{"row":{{"f1": 3}}}}}}
  781. """
  782. )
  783. )
  784. @externally_idempotent(False)
  785. class AlterSink(Check):
  786. """Check ALTER SINK"""
  787. def initialize(self) -> Testdrive:
  788. return Testdrive(
  789. dedent(
  790. """
  791. > CREATE TABLE table_alter1 (x int, y string)
  792. > CREATE TABLE table_alter2 (x int, y string)
  793. > CREATE TABLE table_alter3 (x int, y string)
  794. > CREATE SINK sink_alter FROM table_alter1
  795. INTO KAFKA CONNECTION kafka_conn (TOPIC 'alter-sink')
  796. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  797. ENVELOPE DEBEZIUM
  798. > INSERT INTO table_alter1 VALUES (0, 'a')
  799. > INSERT INTO table_alter2 VALUES (1, 'b')
  800. > INSERT INTO table_alter3 VALUES (2, 'c')
  801. """
  802. )
  803. )
  804. def manipulate(self) -> list[Testdrive]:
  805. return [
  806. Testdrive(dedent(s))
  807. for s in [
  808. """
  809. $ set-from-sql var=running_count
  810. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter';
  811. > ALTER SINK sink_alter SET FROM table_alter2;
  812. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter';
  813. true
  814. > INSERT INTO table_alter1 VALUES (10, 'aa')
  815. > INSERT INTO table_alter2 VALUES (11, 'bb')
  816. > INSERT INTO table_alter3 VALUES (12, 'cc')
  817. """,
  818. """
  819. $ set-from-sql var=running_count
  820. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter';
  821. > ALTER SINK sink_alter SET FROM table_alter3;
  822. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter';
  823. true
  824. > INSERT INTO table_alter1 VALUES (100, 'aaa')
  825. > INSERT INTO table_alter2 VALUES (101, 'bbb')
  826. > INSERT INTO table_alter3 VALUES (102, 'ccc')
  827. """,
  828. ]
  829. ]
  830. def validate(self) -> Testdrive:
  831. return Testdrive(
  832. dedent(
  833. """
  834. $ set-sql-timeout duration=60s
  835. # We check the contents of the sink topics by re-ingesting them.
  836. > CREATE SOURCE sink_alter_source_src
  837. FROM KAFKA CONNECTION kafka_conn (TOPIC 'alter-sink')
  838. > CREATE TABLE sink_alter_source FROM SOURCE sink_alter_source_src (REFERENCE "alter-sink")
  839. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  840. ENVELOPE NONE
  841. > SELECT before IS NULL, (after).x, (after).y FROM sink_alter_source
  842. true 0 a
  843. true 11 bb
  844. true 102 ccc
  845. > DROP SOURCE sink_alter_source_src CASCADE;
  846. """
  847. )
  848. )
  849. @externally_idempotent(False)
  850. class AlterSinkMv(Check):
  851. """Check ALTER SINK with materialized views"""
  852. def _can_run(self, e: Executor) -> bool:
  853. return self.base_version > MzVersion.parse_mz("v0.134.0-dev")
  854. def initialize(self) -> Testdrive:
  855. return Testdrive(
  856. dedent(
  857. """
  858. > CREATE TABLE table_alter_mv1 (a INT);
  859. > INSERT INTO table_alter_mv1 VALUES (0)
  860. > CREATE MATERIALIZED VIEW mv_alter1 AS SELECT * FROM table_alter_mv1
  861. > CREATE SINK sink_alter_mv FROM mv_alter1
  862. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-mv')
  863. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  864. ENVELOPE DEBEZIUM
  865. """
  866. )
  867. )
  868. def manipulate(self) -> list[Testdrive]:
  869. return [
  870. Testdrive(dedent(s))
  871. for s in [
  872. """
  873. > CREATE TABLE table_alter_mv2 (a INT);
  874. > CREATE MATERIALIZED VIEW mv_alter2 AS SELECT * FROM table_alter_mv2
  875. $ set-from-sql var=running_count
  876. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_mv';
  877. > ALTER SINK sink_alter_mv SET FROM mv_alter2;
  878. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_mv';
  879. true
  880. > INSERT INTO table_alter_mv1 VALUES (10)
  881. > INSERT INTO table_alter_mv2 VALUES (11)
  882. """,
  883. """
  884. > CREATE TABLE table_alter_mv3 (a INT);
  885. > CREATE MATERIALIZED VIEW mv_alter3 AS SELECT * FROM table_alter_mv3
  886. $ set-from-sql var=running_count
  887. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_mv';
  888. > ALTER SINK sink_alter_mv SET FROM mv_alter3;
  889. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_mv';
  890. true
  891. > INSERT INTO table_alter_mv1 VALUES (100)
  892. > INSERT INTO table_alter_mv2 VALUES (101)
  893. > INSERT INTO table_alter_mv3 VALUES (102)
  894. """,
  895. ]
  896. ]
  897. def validate(self) -> Testdrive:
  898. return Testdrive(
  899. dedent(
  900. """
  901. $ set-sql-timeout duration=60s
  902. > CREATE SOURCE sink_alter_mv_source_src
  903. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-mv')
  904. > CREATE TABLE sink_alter_mv_source FROM SOURCE sink_alter_mv_source_src (REFERENCE "sink-alter-mv")
  905. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  906. ENVELOPE NONE
  907. > SELECT before IS NULL, (after).a FROM sink_alter_mv_source
  908. true 0
  909. true 11
  910. true 102
  911. > DROP SOURCE sink_alter_mv_source_src CASCADE;
  912. """
  913. )
  914. )
  915. @externally_idempotent(False)
  916. @disabled("due to database-issues#8982")
  917. class AlterSinkLGSource(Check):
  918. """Check ALTER SINK with a load generator source"""
  919. def _can_run(self, e: Executor) -> bool:
  920. return self.base_version > MzVersion.parse_mz("v0.134.0-dev")
  921. def initialize(self) -> Testdrive:
  922. return Testdrive(
  923. dedent(
  924. """
  925. > CREATE SOURCE lg1 FROM LOAD GENERATOR COUNTER (UP TO 2);
  926. > CREATE SINK sink_alter_lg FROM lg1
  927. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-lg')
  928. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  929. ENVELOPE DEBEZIUM
  930. """
  931. )
  932. )
  933. def manipulate(self) -> list[Testdrive]:
  934. return [
  935. Testdrive(dedent(s))
  936. for s in [
  937. """
  938. > CREATE SOURCE lg2 FROM LOAD GENERATOR COUNTER (UP TO 4, TICK INTERVAL '5s');
  939. $ set-from-sql var=running_count
  940. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_lg';
  941. > ALTER SINK sink_alter_lg SET FROM lg2;
  942. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_lg';
  943. true
  944. """,
  945. """
  946. > CREATE SOURCE lg3 FROM LOAD GENERATOR COUNTER (UP TO 6, TICK INTERVAL '5s');
  947. $ set-from-sql var=running_count
  948. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_lg';
  949. > ALTER SINK sink_alter_lg SET FROM lg3;
  950. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_lg';
  951. true
  952. """,
  953. ]
  954. ]
  955. def validate(self) -> Testdrive:
  956. return Testdrive(
  957. dedent(
  958. """
  959. $ set-sql-timeout duration=60s
  960. > CREATE SOURCE sink_alter_lg_source_src
  961. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-lg')
  962. > CREATE TABLE sink_alter_lg_source FROM SOURCE sink_alter_lg_source_src (REFERENCE "sink-alter-lg")
  963. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  964. ENVELOPE NONE
  965. > SELECT before IS NULL, (after).counter FROM sink_alter_lg_source
  966. true 1
  967. true 2
  968. true 3
  969. true 4
  970. true 5
  971. true 6
  972. > DROP SOURCE sink_alter_lg_source_src CASCADE;
  973. """
  974. )
  975. )
  976. @externally_idempotent(False)
  977. @disabled(
  978. "manual sleep is impossible to get right, this check has to be reworked so as not to flake CI"
  979. )
  980. class AlterSinkPgSource(Check):
  981. """Check ALTER SINK with a postgres source"""
  982. def _can_run(self, e: Executor) -> bool:
  983. return self.base_version > MzVersion.parse_mz("v0.134.0-dev")
  984. def initialize(self) -> Testdrive:
  985. return Testdrive(
  986. dedent(
  987. """
  988. $ postgres-execute connection=postgres://postgres:postgres@postgres
  989. CREATE USER postgres3 WITH SUPERUSER PASSWORD 'postgres';
  990. ALTER USER postgres3 WITH replication;
  991. DROP PUBLICATION IF EXISTS pg;
  992. CREATE PUBLICATION pg FOR ALL TABLES;
  993. DROP TABLE IF EXISTS pg_table1;
  994. CREATE TABLE pg_table1 (f1 INT);
  995. ALTER TABLE pg_table1 REPLICA IDENTITY FULL;
  996. > CREATE SECRET pgpass3 AS 'postgres'
  997. > CREATE CONNECTION pg_conn1 FOR POSTGRES
  998. HOST 'postgres',
  999. DATABASE postgres,
  1000. USER postgres3,
  1001. PASSWORD SECRET pgpass3
  1002. > CREATE SOURCE pg_source1
  1003. FROM POSTGRES CONNECTION pg_conn1
  1004. (PUBLICATION 'pg')
  1005. > CREATE TABLE pg1 FROM SOURCE pg_source1 (REFERENCE pg_table1)
  1006. > CREATE SINK sink_alter_pg FROM pg1
  1007. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-pg')
  1008. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1009. ENVELOPE DEBEZIUM
  1010. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1011. INSERT INTO pg_table1 VALUES (1);
  1012. """
  1013. )
  1014. )
  1015. def manipulate(self) -> list[Testdrive]:
  1016. return [
  1017. Testdrive(dedent(s))
  1018. for s in [
  1019. """
  1020. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1021. DROP TABLE IF EXISTS pg_table2;
  1022. CREATE TABLE pg_table2 (f1 INT);
  1023. ALTER TABLE pg_table2 REPLICA IDENTITY FULL;
  1024. > CREATE SOURCE pg_source2
  1025. FROM POSTGRES CONNECTION pg_conn1
  1026. (PUBLICATION 'pg')
  1027. > CREATE TABLE pg2 FROM SOURCE pg_source2 (REFERENCE pg_table2)
  1028. $ set-from-sql var=running_count
  1029. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_pg';
  1030. > ALTER SINK sink_alter_pg SET FROM pg2;
  1031. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_pg';
  1032. true
  1033. # Still needs to sleep some before the sink is updated
  1034. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="30s"
  1035. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1036. INSERT INTO pg_table2 VALUES (2);
  1037. """,
  1038. """
  1039. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1040. DROP TABLE IF EXISTS pg_table3;
  1041. CREATE TABLE pg_table3 (f1 INT);
  1042. ALTER TABLE pg_table3 REPLICA IDENTITY FULL;
  1043. > CREATE SOURCE pg_source3
  1044. FROM POSTGRES CONNECTION pg_conn1
  1045. (PUBLICATION 'pg')
  1046. > CREATE TABLE pg3 FROM SOURCE pg_source3 (REFERENCE pg_table3)
  1047. $ set-from-sql var=running_count
  1048. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_pg';
  1049. > ALTER SINK sink_alter_pg SET FROM pg3;
  1050. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_pg';
  1051. true
  1052. # Still needs to sleep some before the sink is updated
  1053. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="30s"
  1054. $ postgres-execute connection=postgres://postgres:postgres@postgres
  1055. INSERT INTO pg_table3 VALUES (3);
  1056. """,
  1057. ]
  1058. ]
  1059. def validate(self) -> Testdrive:
  1060. return Testdrive(
  1061. dedent(
  1062. """
  1063. $ set-sql-timeout duration=60s
  1064. > CREATE SOURCE sink_alter_pg_source_src
  1065. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-pg')
  1066. > CREATE TABLE sink_alter_pg_source FROM SOURCE sink_alter_pg_source_src (REFERENCE "sink-alter-pg")
  1067. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1068. ENVELOPE NONE
  1069. > SELECT before IS NULL, (after).f1 FROM sink_alter_pg_source
  1070. true 1
  1071. true 2
  1072. true 3
  1073. > DROP SOURCE sink_alter_pg_source_src CASCADE;
  1074. """
  1075. )
  1076. )
  1077. @externally_idempotent(False)
  1078. class AlterSinkWebhook(Check):
  1079. """Check ALTER SINK with webhook sources"""
  1080. def _can_run(self, e: Executor) -> bool:
  1081. return self.base_version > MzVersion.parse_mz("v0.134.0-dev")
  1082. def initialize(self) -> Testdrive:
  1083. return Testdrive(
  1084. dedent(
  1085. """
  1086. >[version>=14700] CREATE CLUSTER sink_webhook_cluster SIZE '1', REPLICATION FACTOR 2;
  1087. >[version<14700] CREATE CLUSTER sink_webhook_cluster SIZE '1', REPLICATION FACTOR 1;
  1088. > CREATE SOURCE webhook_alter1 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
  1089. > CREATE SINK sink_alter_wh FROM webhook_alter1
  1090. INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-wh')
  1091. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1092. ENVELOPE DEBEZIUM
  1093. $ webhook-append database=materialize schema=public name=webhook_alter1
  1094. 1
  1095. """
  1096. )
  1097. )
  1098. def manipulate(self) -> list[Testdrive]:
  1099. return [
  1100. Testdrive(dedent(s))
  1101. for s in [
  1102. """
  1103. > CREATE SOURCE webhook_alter2 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
  1104. $ set-from-sql var=running_count
  1105. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
  1106. > ALTER SINK sink_alter_wh SET FROM webhook_alter2;
  1107. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
  1108. true
  1109. $ webhook-append database=materialize schema=public name=webhook_alter2
  1110. 2
  1111. """,
  1112. """
  1113. > CREATE SOURCE webhook_alter3 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
  1114. $ set-from-sql var=running_count
  1115. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
  1116. > ALTER SINK sink_alter_wh SET FROM webhook_alter3;
  1117. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
  1118. true
  1119. $ webhook-append database=materialize schema=public name=webhook_alter3
  1120. 3
  1121. """,
  1122. ]
  1123. ]
  1124. def validate(self) -> Testdrive:
  1125. return Testdrive(
  1126. dedent(
  1127. """
  1128. # Can be slow in 0dt upgrade scenarios
  1129. $ set-sql-timeout duration=480s
  1130. > CREATE SOURCE sink_alter_wh_source_src
  1131. FROM KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-wh')
  1132. > CREATE TABLE sink_alter_wh_source FROM SOURCE sink_alter_wh_source_src (REFERENCE "sink-alter-wh")
  1133. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1134. ENVELOPE NONE
  1135. > SELECT before IS NULL, (after).body FROM sink_alter_wh_source
  1136. true 1
  1137. true 2
  1138. true 3
  1139. > DROP SOURCE sink_alter_wh_source_src CASCADE;
  1140. """
  1141. )
  1142. )
  1143. @externally_idempotent(False)
  1144. class AlterSinkOrder(Check):
  1145. """Check ALTER SINK with a table created after the sink, see incident 131"""
  1146. def initialize(self) -> Testdrive:
  1147. return Testdrive(
  1148. dedent(
  1149. """
  1150. > CREATE TABLE table_alter_order1 (x int, y string)
  1151. > CREATE SINK sink_alter_order FROM table_alter_order1
  1152. INTO KAFKA CONNECTION kafka_conn (TOPIC 'alter-sink-order')
  1153. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1154. ENVELOPE DEBEZIUM
  1155. > INSERT INTO table_alter_order1 VALUES (0, 'a')
  1156. """
  1157. )
  1158. )
  1159. def manipulate(self) -> list[Testdrive]:
  1160. return [
  1161. Testdrive(dedent(s))
  1162. for s in [
  1163. """
  1164. > CREATE TABLE table_alter_order2 (x int, y string)
  1165. $ set-from-sql var=running_count
  1166. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_order';
  1167. > ALTER SINK sink_alter_order SET FROM table_alter_order2;
  1168. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_order';
  1169. true
  1170. > INSERT INTO table_alter_order2 VALUES (1, 'b')
  1171. > INSERT INTO table_alter_order1 VALUES (10, 'aa')
  1172. > INSERT INTO table_alter_order2 VALUES (11, 'bb')
  1173. """,
  1174. """
  1175. > CREATE TABLE table_alter_order3 (x int, y string)
  1176. $ set-from-sql var=running_count
  1177. SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_order';
  1178. > ALTER SINK sink_alter_order SET FROM table_alter_order3;
  1179. > SELECT COUNT(*) > ${running_count} FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_order';
  1180. true
  1181. > INSERT INTO table_alter_order3 VALUES (2, 'c')
  1182. > INSERT INTO table_alter_order3 VALUES (12, 'cc')
  1183. > INSERT INTO table_alter_order1 VALUES (100, 'aaa')
  1184. > INSERT INTO table_alter_order2 VALUES (101, 'bbb')
  1185. > INSERT INTO table_alter_order3 VALUES (102, 'ccc')
  1186. """,
  1187. ]
  1188. ]
  1189. def validate(self) -> Testdrive:
  1190. return Testdrive(
  1191. dedent(
  1192. """
  1193. $ set-sql-timeout duration=60s
  1194. # We check the contents of the sink topics by re-ingesting them.
  1195. > CREATE SOURCE sink_alter_order_source_src
  1196. FROM KAFKA CONNECTION kafka_conn (TOPIC 'alter-sink-order')
  1197. > CREATE TABLE sink_alter_order_source FROM SOURCE sink_alter_order_source_src (REFERENCE "alter-sink-order")
  1198. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1199. ENVELOPE NONE
  1200. > SELECT before IS NULL, (after).x, (after).y FROM sink_alter_order_source
  1201. true 0 a
  1202. true 1 b
  1203. true 2 c
  1204. true 11 bb
  1205. true 12 cc
  1206. true 102 ccc
  1207. > DROP SOURCE sink_alter_order_source_src CASCADE;
  1208. """
  1209. )
  1210. )
  1211. @externally_idempotent(False)
  1212. class SinkFormat(Check):
  1213. """Check SINK with KEY FORMAT and VALUE FORMAT"""
  1214. def initialize(self) -> Testdrive:
  1215. return Testdrive(
  1216. schemas()
  1217. + dedent(
  1218. """
  1219. > CREATE TABLE sink_format_table (f1 INTEGER, f2 TEXT, f3 INT, PRIMARY KEY (f1));
  1220. > CREATE DEFAULT INDEX ON sink_format_table;
  1221. > INSERT INTO sink_format_table VALUES (1, 'A', 10);
  1222. > CREATE CLUSTER sink_format_sink1_cluster SIZE '1';
  1223. > CREATE SINK sink_format_sink1
  1224. IN CLUSTER sink_format_sink1_cluster
  1225. FROM sink_format_table
  1226. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-format-sink-${testdrive.seed}')
  1227. KEY (f1)
  1228. KEY FORMAT TEXT
  1229. VALUE FORMAT JSON
  1230. ENVELOPE UPSERT;
  1231. """
  1232. )
  1233. )
  1234. def manipulate(self) -> list[Testdrive]:
  1235. return [
  1236. Testdrive(schemas() + dedent(s))
  1237. for s in [
  1238. """
  1239. > INSERT INTO sink_format_table VALUES (2, 'B', 20);
  1240. """,
  1241. """
  1242. > INSERT INTO sink_format_table VALUES (3, 'C', 30);
  1243. """,
  1244. ]
  1245. ]
  1246. def validate(self) -> Testdrive:
  1247. return Testdrive(
  1248. dedent(
  1249. """
  1250. $ set-sql-timeout duration=60s
  1251. # We check the contents of the sink topics by re-ingesting them.
  1252. > CREATE SOURCE sink_format_source_src
  1253. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-format-sink-${testdrive.seed}')
  1254. > CREATE TABLE sink_format_source FROM SOURCE sink_format_source_src (REFERENCE "testdrive-sink-format-sink-${testdrive.seed}")
  1255. KEY FORMAT TEXT
  1256. VALUE FORMAT JSON
  1257. ENVELOPE UPSERT
  1258. > SELECT key, data->>'f2', data->>'f3' FROM sink_format_source
  1259. 1 A 10
  1260. 2 B 20
  1261. 3 C 30
  1262. > DROP SOURCE sink_format_source_src CASCADE;
  1263. """
  1264. )
  1265. )
  1266. @externally_idempotent(False)
  1267. class SinkPartitionByDebezium(Check):
  1268. """Check SINK with ENVELOPE DEBEZIUM and PARTITION BY"""
  1269. def initialize(self) -> Testdrive:
  1270. return Testdrive(
  1271. schemas()
  1272. + dedent(
  1273. """
  1274. > CREATE TABLE sink_partition_by_debezium_table (f1 INTEGER, f2 TEXT, PRIMARY KEY (f1));
  1275. > CREATE DEFAULT INDEX ON sink_partition_by_debezium_table;
  1276. > INSERT INTO sink_partition_by_debezium_table SELECT generate_series, REPEAT('x', 1024) FROM generate_series(1, 100000);
  1277. > CREATE MATERIALIZED VIEW sink_partition_by_debezium_view AS SELECT f1 - 1 AS f1 , f2 FROM sink_partition_by_debezium_table;
  1278. > CREATE CLUSTER sink_partition_by_debezium_sink1_cluster SIZE '4';
  1279. > CREATE SINK sink_partition_by_debezium_sink1
  1280. IN CLUSTER sink_partition_by_debezium_sink1_cluster
  1281. FROM sink_partition_by_debezium_view
  1282. INTO KAFKA CONNECTION kafka_conn (
  1283. TOPIC 'testdrive-sink-partition-by-debezium-sink-${testdrive.seed}',
  1284. TOPIC PARTITION COUNT 4,
  1285. PARTITION BY f1
  1286. )
  1287. KEY (f1) NOT ENFORCED
  1288. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1289. ENVELOPE DEBEZIUM;
  1290. """
  1291. )
  1292. )
  1293. def manipulate(self) -> list[Testdrive]:
  1294. return [
  1295. Testdrive(schemas() + dedent(s))
  1296. for s in [
  1297. """
  1298. > UPDATE sink_partition_by_debezium_table SET f2 = REPEAT('y', 1024)
  1299. """,
  1300. """
  1301. > UPDATE sink_partition_by_debezium_table SET f2 = REPEAT('z', 1024)
  1302. """,
  1303. ]
  1304. ]
  1305. def validate(self) -> Testdrive:
  1306. return Testdrive(
  1307. dedent(
  1308. """
  1309. # Can be slow in 0dt upgrade scenarios
  1310. $ set-sql-timeout duration=480s
  1311. $ schema-registry-verify schema-type=avro subject=testdrive-sink-partition-by-debezium-sink-${testdrive.seed}-value
  1312. {"type":"record","name":"envelope","fields":[{"name":"before","type":["null",{"type":"record","name":"row","fields":[{"name":"f1","type":"int"},{"name":"f2","type":["null","string"]}]}]},{"name":"after","type":["null","row"]}]}
  1313. # We check the contents of the sink topics by re-ingesting them.
  1314. > CREATE SOURCE sink_partition_by_debezium_source_src
  1315. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-partition-by-debezium-sink-${testdrive.seed}')
  1316. > CREATE TABLE sink_partition_by_debezium_source
  1317. FROM SOURCE sink_partition_by_debezium_source_src (REFERENCE "testdrive-sink-partition-by-debezium-sink-${testdrive.seed}")
  1318. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  1319. ENVELOPE NONE
  1320. > CREATE MATERIALIZED VIEW sink_partition_by_debezium_view2
  1321. AS
  1322. SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
  1323. FROM (
  1324. SELECT (before).f1, (before).f2 FROM sink_partition_by_debezium_source
  1325. )
  1326. > CREATE MATERIALIZED VIEW sink_partition_by_debezium_view3
  1327. AS
  1328. SELECT COUNT(*) AS c1 , COUNT(f1) AS c2, COUNT(DISTINCT f1) AS c3 , MIN(f1), MAX(f1)
  1329. FROM (
  1330. SELECT (after).f1, (after).f2 FROM sink_partition_by_debezium_source
  1331. )
  1332. > CREATE MATERIALIZED VIEW sink_partition_by_debezium_view4
  1333. AS
  1334. SELECT LEFT(f2, 1), SUM(c)
  1335. FROM (
  1336. SELECT (after).f2, COUNT(*) AS c FROM sink_partition_by_debezium_source GROUP BY (after).f2
  1337. UNION ALL
  1338. SELECT (before).f2, -COUNT(*) AS c FROM sink_partition_by_debezium_source GROUP BY (before).f2
  1339. )
  1340. GROUP BY f2
  1341. > SELECT * FROM sink_partition_by_debezium_view2
  1342. 300000 200000 100000 0 99999
  1343. > SELECT * FROM sink_partition_by_debezium_view3
  1344. 300000 300000 100000 0 99999
  1345. > SELECT * FROM sink_partition_by_debezium_view4
  1346. <null> -100000
  1347. x 0
  1348. y 0
  1349. z 100000
  1350. > DROP SOURCE sink_partition_by_debezium_source_src CASCADE;
  1351. # TODO: kafka-verify-data when it can deal with being run twice, to check the actual partitioning
  1352. """
  1353. )
  1354. )