error.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. class ParseError(Check):
  13. def initialize(self) -> Testdrive:
  14. return Testdrive(
  15. dedent(
  16. """
  17. > CREATE TABLE parse_error_table (f1 STRING);
  18. > CREATE MATERIALIZED VIEW parse_error_view AS SELECT f1::INTEGER FROM parse_error_table;
  19. > INSERT INTO parse_error_table VALUES ('123');
  20. """
  21. )
  22. )
  23. def manipulate(self) -> list[Testdrive]:
  24. return [
  25. Testdrive(s)
  26. for s in [
  27. "> INSERT INTO parse_error_table VALUES ('abc'), ('234');",
  28. "> INSERT INTO parse_error_table VALUES ('345'), ('klm');",
  29. ]
  30. ]
  31. def validate(self) -> Testdrive:
  32. return Testdrive(
  33. dedent(
  34. """
  35. ! SELECT * FROM parse_error_view;
  36. contains: invalid input syntax for type integer
  37. """
  38. )
  39. )
  40. class ParseHexError(Check):
  41. def initialize(self) -> Testdrive:
  42. return Testdrive(
  43. dedent(
  44. """
  45. > CREATE TABLE parse_hex_error_table (f1 STRING);
  46. > CREATE MATERIALIZED VIEW parse_hex_error_view AS SELECT decode(f1, 'hex') FROM parse_hex_error_table;
  47. > INSERT INTO parse_hex_error_table VALUES ('aa');
  48. """
  49. )
  50. )
  51. def manipulate(self) -> list[Testdrive]:
  52. return [
  53. Testdrive(s)
  54. for s in [
  55. "> INSERT INTO parse_hex_error_table VALUES ('bb'), ('xx');",
  56. "> INSERT INTO parse_hex_error_table VALUES ('yy'), ('cc');",
  57. ]
  58. ]
  59. def validate(self) -> Testdrive:
  60. return Testdrive(
  61. dedent(
  62. """
  63. ! SELECT * FROM parse_hex_error_view;
  64. contains: invalid hexadecimal digit
  65. """
  66. )
  67. )
  68. class DataflowErrorRetraction(Check):
  69. def initialize(self) -> Testdrive:
  70. return Testdrive(
  71. dedent(
  72. """
  73. > CREATE TABLE dataflow_error_retraction_table (f1 STRING);
  74. > CREATE MATERIALIZED VIEW dataflow_error_retraction_view AS SELECT f1::INTEGER FROM dataflow_error_retraction_table;
  75. > INSERT INTO dataflow_error_retraction_table VALUES ('123');
  76. > INSERT INTO dataflow_error_retraction_table VALUES ('abc');
  77. > INSERT INTO dataflow_error_retraction_table VALUES ('klm');
  78. > INSERT INTO dataflow_error_retraction_table VALUES ('234');
  79. ! SELECT * FROM dataflow_error_retraction_view;
  80. contains: invalid input syntax for type integer
  81. """
  82. )
  83. )
  84. def manipulate(self) -> list[Testdrive]:
  85. return [
  86. Testdrive(s)
  87. for s in [
  88. dedent(
  89. """
  90. > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'abc'
  91. """
  92. ),
  93. dedent(
  94. """
  95. > DELETE FROM dataflow_error_retraction_table WHERE f1 = 'klm'
  96. """
  97. ),
  98. ]
  99. ]
  100. def validate(self) -> Testdrive:
  101. return Testdrive(
  102. dedent(
  103. """
  104. > SELECT * FROM dataflow_error_retraction_view;
  105. 123
  106. 234
  107. """
  108. )
  109. )
  110. def schemas() -> str:
  111. return dedent(
  112. """
  113. $ set schema-f1={
  114. "type" : "record",
  115. "name" : "test",
  116. "fields" : [
  117. {"name":"f1", "type":"string"}
  118. ]
  119. }
  120. $ set schema-f2={
  121. "type" : "record",
  122. "name" : "test",
  123. "fields" : [
  124. {"name":"f2", "type":"int"}
  125. ]
  126. }
  127. """
  128. )
  129. @externally_idempotent(False)
  130. class DecodeError(Check):
  131. def initialize(self) -> Testdrive:
  132. return Testdrive(
  133. schemas()
  134. + dedent(
  135. """
  136. $ kafka-create-topic topic=decode-error
  137. $ kafka-ingest format=avro topic=decode-error schema=${schema-f1} repeat=1
  138. {"f1": "A"}
  139. > CREATE SOURCE decode_error_src
  140. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-${testdrive.seed}')
  141. > CREATE TABLE decode_error FROM SOURCE decode_error_src (REFERENCE "testdrive-decode-error-${testdrive.seed}")
  142. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  143. ENVELOPE NONE
  144. """
  145. )
  146. )
  147. def manipulate(self) -> list[Testdrive]:
  148. return [
  149. Testdrive(schemas() + dedent(s))
  150. for s in [
  151. """
  152. # {"f2": 123456789}, no publish
  153. $ kafka-ingest format=bytes topic=decode-error repeat=1
  154. \\x00\x00\x00\x00\x01\xaa\xb4\xde\x75
  155. """,
  156. """
  157. $ kafka-ingest format=bytes topic=decode-error repeat=1
  158. ABCD
  159. """,
  160. ]
  161. ]
  162. def validate(self) -> Testdrive:
  163. return Testdrive(
  164. dedent(
  165. """
  166. ! SELECT * FROM decode_error
  167. contains: Decode error
  168. """
  169. )
  170. )
  171. class DecodeErrorUpsertValue(Check):
  172. def initialize(self) -> Testdrive:
  173. return Testdrive(
  174. dedent(
  175. """
  176. $ kafka-create-topic topic=decode-error-upsert-value
  177. $ set schema={
  178. "type" : "record",
  179. "name" : "test",
  180. "fields" : [
  181. {"name":"f1", "type":"int"}
  182. ]
  183. }
  184. $ kafka-ingest format=avro topic=decode-error-upsert-value key-format=bytes key-terminator=: schema=${schema}
  185. key0: {"f1": 1}
  186. key1: {"f1": 2}
  187. key2: {"f1": 3}
  188. > CREATE CLUSTER decode_error_upsert_value_cluster SIZE '1';
  189. > CREATE SOURCE decode_error_upsert_value_src
  190. IN CLUSTER decode_error_upsert_value_cluster
  191. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-value-${testdrive.seed}')
  192. > CREATE TABLE decode_error_upsert_value FROM SOURCE decode_error_upsert_value_src (REFERENCE "testdrive-decode-error-upsert-value-${testdrive.seed}")
  193. KEY FORMAT TEXT
  194. VALUE FORMAT AVRO USING SCHEMA '${schema}'
  195. ENVELOPE UPSERT
  196. $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
  197. key1: garbage
  198. ! SELECT * FROM decode_error_upsert_value
  199. contains: avro deserialization error
  200. """
  201. )
  202. )
  203. def manipulate(self) -> list[Testdrive]:
  204. return [
  205. Testdrive(
  206. dedent(
  207. """
  208. $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
  209. key2: garbage2
  210. ! SELECT * FROM decode_error_upsert_value
  211. contains: avro deserialization error
  212. """
  213. )
  214. ),
  215. Testdrive(
  216. dedent(
  217. """
  218. # Ingest valid avro, but with an incompatible schema
  219. $ set schema-string={
  220. "type" : "record",
  221. "name" : "test",
  222. "fields" : [
  223. {"name":"f1", "type":"string"}
  224. ]
  225. }
  226. $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=avro schema=${schema-string} confluent-wire-format=false
  227. key3: {"f1": "garbage3"}
  228. ! SELECT * FROM decode_error_upsert_value
  229. contains: avro deserialization error
  230. """,
  231. )
  232. ),
  233. ]
  234. def validate(self) -> Testdrive:
  235. return Testdrive(
  236. dedent(
  237. """
  238. # Retract all the garbage and confirm the source is now operational
  239. $ kafka-ingest topic=decode-error-upsert-value key-format=bytes key-terminator=: format=bytes
  240. key1:
  241. key2:
  242. key3:
  243. > SELECT f1 FROM decode_error_upsert_value
  244. 1
  245. """
  246. )
  247. )
  248. class DecodeErrorUpsertKey(Check):
  249. def initialize(self) -> Testdrive:
  250. return Testdrive(
  251. dedent(
  252. """
  253. $ kafka-create-topic topic=decode-error-upsert-key
  254. $ set key-schema={
  255. "type" : "record",
  256. "name" : "test",
  257. "fields" : [
  258. {"name":"f1", "type":"int"}
  259. ]
  260. }
  261. $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema}
  262. {"f1": 1} value1
  263. {"f1": 2} value2
  264. > CREATE SOURCE decode_error_upsert_key_src
  265. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-decode-error-upsert-key-${testdrive.seed}')
  266. > CREATE TABLE decode_error_upsert_key FROM SOURCE decode_error_upsert_key_src (REFERENCE "testdrive-decode-error-upsert-key-${testdrive.seed}")
  267. KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  268. VALUE FORMAT BYTES
  269. ENVELOPE UPSERT
  270. $ kafka-ingest topic=decode-error-upsert-key key-format=bytes key-terminator=: format=bytes
  271. garbage1: value3
  272. ! SELECT * FROM decode_error_upsert_key
  273. contains: avro deserialization error
  274. """
  275. )
  276. )
  277. def manipulate(self) -> list[Testdrive]:
  278. return [
  279. Testdrive(
  280. dedent(
  281. """
  282. # Retract existing garbage
  283. $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
  284. garbage1:
  285. # And introduce a new one -- valid avro, but with an incompatible schema
  286. $ set key-schema-string={
  287. "type" : "record",
  288. "name" : "test",
  289. "fields" : [
  290. {"name":"f1", "type":"string"}
  291. ]
  292. }
  293. $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
  294. {"f1": "garbage2"} value4
  295. ! SELECT * FROM decode_error_upsert_key
  296. contains: avro deserialization error
  297. """
  298. )
  299. ),
  300. Testdrive(
  301. dedent(
  302. """
  303. # Retract existing garbage and introduce a new one
  304. $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
  305. garbage3: value4
  306. $ set key-schema-string={
  307. "type" : "record",
  308. "name" : "test",
  309. "fields" : [
  310. {"name":"f1", "type":"string"}
  311. ]
  312. }
  313. $ kafka-ingest topic=decode-error-upsert-key key-format=avro format=bytes key-schema=${key-schema-string} confluent-wire-format=false
  314. {"f1": "garbage2"}
  315. ! SELECT * FROM decode_error_upsert_key
  316. contains: avro deserialization error
  317. """,
  318. )
  319. ),
  320. ]
  321. def validate(self) -> Testdrive:
  322. return Testdrive(
  323. dedent(
  324. """
  325. # Retract any remaining garbage
  326. $ kafka-ingest topic=decode-error-upsert-key key-format=bytes format=bytes key-terminator=:
  327. garbage3:
  328. # Source should return to operational status
  329. > SELECT f1 FROM decode_error_upsert_key
  330. 1
  331. 2
  332. """
  333. )
  334. )