avro-registry.td 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Tests that hit the Confluent Schema Registry.
  11. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  12. URL '${testdrive.schema-registry-url}'
  13. );
  14. # Verify the error message is useful when a schema is not present in the
  15. # registry.
  16. > CREATE CONNECTION kafka_conn
  17. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  18. $ kafka-create-topic topic=noexist partitions=1
  19. > CREATE SOURCE noexist
  20. IN CLUSTER ${arg.single-replica-cluster}
  21. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-noexist-${testdrive.seed}')
  22. ! CREATE TABLE noexist_tbl FROM SOURCE noexist (REFERENCE "testdrive-noexist-${testdrive.seed}")
  23. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  24. ENVELOPE DEBEZIUM
  25. contains:No value schema found
  26. $ set schema-v1={
  27. "type": "record",
  28. "name": "envelope",
  29. "fields": [
  30. {
  31. "name": "before",
  32. "type": [
  33. {
  34. "name": "row",
  35. "type": "record",
  36. "fields": [
  37. {"name": "a", "type": "long"}
  38. ]
  39. },
  40. "null"
  41. ]
  42. },
  43. { "name": "op", "type": "string" },
  44. { "name": "after", "type": ["row", "null"] },
  45. {
  46. "name": "source",
  47. "type": {
  48. "type": "record",
  49. "name": "Source",
  50. "namespace": "io.debezium.connector.mysql",
  51. "fields": [
  52. {
  53. "name": "file",
  54. "type": "string"
  55. },
  56. {
  57. "name": "pos",
  58. "type": "long"
  59. },
  60. {
  61. "name": "row",
  62. "type": "int"
  63. },
  64. {
  65. "name": "snapshot",
  66. "type": [
  67. {
  68. "type": "boolean",
  69. "connect.default": false
  70. },
  71. "null"
  72. ],
  73. "default": false
  74. }
  75. ],
  76. "connect.name": "io.debezium.connector.mysql.Source"
  77. }
  78. }
  79. ]
  80. }
  81. $ set schema-v2={
  82. "type": "record",
  83. "name": "envelope",
  84. "fields": [
  85. {
  86. "name": "before",
  87. "type": [
  88. {
  89. "name": "row",
  90. "type": "record",
  91. "fields": [
  92. {"name": "a", "type": "long"},
  93. {"name": "b", "type": "long", "default": 42}
  94. ]
  95. },
  96. "null"
  97. ]
  98. },
  99. { "name": "op", "type": "string" },
  100. { "name": "after", "type": ["row", "null"] },
  101. {
  102. "name": "source",
  103. "type": {
  104. "type": "record",
  105. "name": "Source",
  106. "namespace": "io.debezium.connector.mysql",
  107. "fields": [
  108. {
  109. "name": "file",
  110. "type": "string"
  111. },
  112. {
  113. "name": "pos",
  114. "type": "long"
  115. },
  116. {
  117. "name": "row",
  118. "type": "int"
  119. },
  120. {
  121. "name": "snapshot",
  122. "type": [
  123. {
  124. "type": "boolean",
  125. "connect.default": false
  126. },
  127. "null"
  128. ],
  129. "default": false
  130. }
  131. ],
  132. "connect.name": "io.debezium.connector.mysql.Source"
  133. }
  134. }
  135. ]
  136. }
  137. $ kafka-create-topic topic=data
  138. $ set valid-key-schema={
  139. "type": "record",
  140. "name": "Key",
  141. "fields": [
  142. {"name": "a", "type": "long"}
  143. ]
  144. }
  145. $ kafka-ingest format=avro topic=data key-format=avro key-schema=${valid-key-schema} schema=${schema-v1} timestamp=1
  146. {"a": 1} {"before": null, "after": {"row": {"a": 1}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  147. > CREATE SOURCE data_v1
  148. IN CLUSTER ${arg.single-replica-cluster}
  149. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  150. > CREATE TABLE data_v1_tbl FROM SOURCE data_v1 (REFERENCE "testdrive-data-${testdrive.seed}")
  151. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  152. ENVELOPE DEBEZIUM
  153. > SELECT * FROM data_v1_tbl
  154. a
  155. ---
  156. 1
  157. $ kafka-ingest format=avro topic=data key-format=avro key-schema=${valid-key-schema} schema=${schema-v2} timestamp=3
  158. {"a": 2} {"before": null, "after": {"row": {"a": 2, "b": -1}}, "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  159. > CREATE SOURCE data_v2
  160. IN CLUSTER ${arg.single-replica-cluster}
  161. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  162. > CREATE TABLE data_v2_tbl FROM SOURCE data_v2 (REFERENCE "testdrive-data-${testdrive.seed}")
  163. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  164. ENVELOPE DEBEZIUM
  165. > SELECT * FROM data_v1_tbl
  166. a
  167. ---
  168. 1
  169. 2
  170. > SELECT * FROM data_v2_tbl
  171. a b
  172. ----
  173. 1 42
  174. 2 -1
  175. # [btv] uncomment if we bring back classic debezium mode
  176. # $ kafka-ingest topic=data timestamp=5
  177. # format=avro schema=${schema-v1} key-format=avro key-schema=${valid-key-schema}
  178. # {"a": 1} {"before": null, "after": {"row": {"a": 1}}, "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  179. # > CREATE SOURCE data_v3
  180. # IN CLUSTER ${arg.single-replica-cluster}
  181. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  182. # > CREATE TABLE data_v3_tbl FROM SOURCE data_v3 (REFERENCE "testdrive-data-${testdrive.seed}")
  183. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  184. # ENVELOPE DEBEZIUM
  185. # > SELECT * FROM data_v3_tbl
  186. # a b
  187. # ----
  188. # 1 42
  189. # 1 42
  190. # 2 -1
  191. # # Make sure this query gives WRONG results,
  192. # # which should prove that we are respecting primary
  193. # # key information (and optimizing by transforming
  194. # # a reduce on a primary key to a map)
  195. # > SELECT a, count(*) FROM data_v3_tbl
  196. # GROUP BY a
  197. # a count
  198. # -------
  199. # 1 1
  200. # 1 1
  201. # 2 1