avro-decode.td 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Test complex reordered/resolved schemas.
  11. # The schemas are mostly a copy and paste from test_complex_resolutions
  12. # in mod avro. The code duplication is warranted because making that
  13. # test exercise the new decode/deserialize API would
  14. # invert the dependency structure, requiring mod avro to depend
  15. # on core materialize.
  16. $ set writer-schema={
  17. "name": "some_record",
  18. "type": "record",
  19. "fields": [
  20. {
  21. "name": "f5",
  22. "type": "long"
  23. },
  24. {
  25. "name": "f4",
  26. "type": [
  27. "long",
  28. {
  29. "name": "variant1",
  30. "type": "fixed",
  31. "size": 1
  32. },
  33. "null",
  34. {
  35. "name": "variant2",
  36. "type": "fixed",
  37. "size": 2
  38. }
  39. ]
  40. },
  41. {
  42. "name": "f3",
  43. "type": "long"
  44. },
  45. {
  46. "name": "f2",
  47. "type": {
  48. "type": "enum",
  49. "symbols": ["Clubs", "Diamonds", "Hearts", "Spades"],
  50. "name": "Suit"
  51. }
  52. },
  53. {
  54. "name": "f1",
  55. "type": [
  56. {
  57. "name": "variant3",
  58. "type": "fixed",
  59. "size": 3
  60. },
  61. "long",
  62. {
  63. "name": "variant4",
  64. "type": "fixed",
  65. "size": 4
  66. }
  67. ]
  68. },
  69. {
  70. "name": "f6",
  71. "type": {
  72. "type": "map",
  73. "values": "long"
  74. }
  75. }
  76. ]
  77. }
  78. $ set reader-schema={
  79. "name": "some_record",
  80. "type": "record",
  81. "fields": [
  82. {
  83. "name": "f0",
  84. "type": {
  85. "type": "record",
  86. "name": "f0_value",
  87. "fields": [
  88. {
  89. "name": "f0_0",
  90. "type": "long"
  91. },
  92. {
  93. "name": "f0_1",
  94. "type": [
  95. {
  96. "type": "enum",
  97. "symbols": ["foo", "bar", "blah"],
  98. "name": "some_enum"
  99. },
  100. "null"
  101. ]
  102. }
  103. ]
  104. },
  105. "default": {"f0_1": "bar", "f0_0": 7777}
  106. },
  107. {
  108. "name": "f1",
  109. "type": [
  110. {
  111. "name": "variant4",
  112. "type": "fixed",
  113. "size": 4
  114. },
  115. {
  116. "name": "variant3",
  117. "type": "fixed",
  118. "size": 3
  119. },
  120. "long"
  121. ]
  122. },
  123. {
  124. "name": "f2",
  125. "type": {
  126. "type": "enum",
  127. "symbols": ["Hearts", "Spades", "Diamonds", "Clubs", "Jokers"],
  128. "name": "Suit",
  129. "default": "Diamonds"
  130. }
  131. },
  132. {
  133. "name": "f5",
  134. "type": [
  135. {
  136. "name": "extra_variant",
  137. "type": "fixed",
  138. "size": 10
  139. },
  140. "long"
  141. ]
  142. },
  143. {
  144. "name": "f6",
  145. "type": {
  146. "type": "map",
  147. "values": "long"
  148. }
  149. }
  150. ]
  151. }
  152. $ kafka-create-topic topic=avro-data
  153. $ kafka-ingest format=avro topic=avro-data schema=${writer-schema} timestamp=1
  154. { "f5": 1234, "f4": {"variant1": [0]}, "f3": 2345, "f2": "Diamonds", "f1": {"variant4": [0, 1, 2, 3]}, "f6": {"hello": 123, "another": 2144} }
  155. $ kafka-ingest format=avro topic=avro-data schema=${reader-schema} timestamp=1
  156. { "f0": {"f0_0": 9999, "f0_1": null}, "f1": {"long": 3456}, "f2": "Jokers", "f5": {"extra_variant": [0,1,2,3,4,5,6,7,8,9]}, "f6": {"key": 8372} }
  157. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  158. URL '${testdrive.schema-registry-url}'
  159. );
  160. > CREATE CONNECTION kafka_conn
  161. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  162. > CREATE CLUSTER avro_data_cluster SIZE '${arg.default-storage-size}';
  163. > CREATE SOURCE avro_data
  164. IN CLUSTER avro_data_cluster
  165. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-${testdrive.seed}')
  166. > CREATE TABLE avro_data_tbl FROM SOURCE avro_data (REFERENCE "testdrive-avro-data-${testdrive.seed}")
  167. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  168. > SELECT f0::text, f11, f12, f13, f2, f51, f52
  169. FROM avro_data_tbl
  170. f0 f11 f12 f13 f2 f51 f52
  171. ---
  172. (7777,bar) \x00\x01\x02\x03 <null> <null> Diamonds <null> 1234
  173. (9999,) <null> <null> 3456 Jokers \x00\x01\x02\x03\x04\x05\x06\x07\x08\t <null>
  174. # Testdrive prepares statements before they are executed.
  175. # Because maps are a non-standard Postgres type, we can't SELECT them directly.
  176. # TODO@jldlaughlin: Update this test case when we have a binary encoding for maps.
  177. > SELECT f6 -> 'hello' FROM avro_data_tbl
  178. <null>
  179. 123
  180. $ kafka-create-topic topic=avro-data-no-registry
  181. $ kafka-ingest format=avro topic=avro-data-no-registry schema=${reader-schema} confluent-wire-format=false timestamp=1
  182. { "f0": {"f0_0": 9999, "f0_1": null}, "f1": {"long": 3456}, "f2": "Jokers", "f5": {"extra_variant": [0,1,2,3,4,5,6,7,8,9]}, "f6": {"key": 8372} }
  183. > CREATE CLUSTER avro_data_no_registry_cluster SIZE '${arg.default-storage-size}';
  184. > CREATE SOURCE avro_data_no_registry
  185. IN CLUSTER avro_data_no_registry_cluster
  186. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-data-no-registry-${testdrive.seed}')
  187. > CREATE TABLE avro_data_no_registry_tbl FROM SOURCE avro_data_no_registry (REFERENCE "testdrive-avro-data-no-registry-${testdrive.seed}")
  188. FORMAT AVRO USING SCHEMA '${reader-schema}' (CONFLUENT WIRE FORMAT = false)
  189. > SELECT f2
  190. FROM avro_data_no_registry_tbl
  191. Jokers
  192. # Test decoding of corrupted messages
  193. $ kafka-create-topic topic=avro-corrupted-values
  194. $ kafka-ingest format=bytes topic=avro-corrupted-values timestamp=1
  195. garbage
  196. > CREATE CLUSTER avro_corrupted_values_cluster SIZE '${arg.default-storage-size}';
  197. > CREATE SOURCE avro_corrupted_values
  198. IN CLUSTER avro_corrupted_values_cluster
  199. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-corrupted-values-${testdrive.seed}')
  200. > CREATE TABLE avro_corrupted_values_tbl FROM SOURCE avro_corrupted_values (REFERENCE "testdrive-avro-corrupted-values-${testdrive.seed}")
  201. FORMAT AVRO USING SCHEMA '${reader-schema}'
  202. ! SELECT f2 FROM avro_corrupted_values_tbl
  203. contains:Decode error: avro deserialization error: wrong Confluent-style avro serialization magic: expected 0, got 103 (original text: garbage, original bytes: "67617262616765")
  204. # Test decoding of corrupted messages without magic byte
  205. $ kafka-create-topic topic=avro-corrupted-values2
  206. $ kafka-ingest format=bytes topic=avro-corrupted-values2 timestamp=1
  207. garbage
  208. > CREATE CLUSTER avro_corrupted_values2_cluster SIZE '${arg.default-storage-size}';
  209. > CREATE SOURCE avro_corrupted_values2
  210. IN CLUSTER avro_corrupted_values2_cluster
  211. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-avro-corrupted-values2-${testdrive.seed}')
  212. > CREATE TABLE avro_corrupted_values2_tbl FROM SOURCE avro_corrupted_values2 (REFERENCE "testdrive-avro-corrupted-values2-${testdrive.seed}")
  213. FORMAT AVRO USING SCHEMA '${reader-schema}' (CONFLUENT WIRE FORMAT = false)
  214. ! SELECT f2 FROM avro_corrupted_values2_tbl
  215. contains:Decode error: Decoding error: Expected non-negative integer, got -49