avro-registry.td 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Tests that hit the Confluent Schema Registry.
  11. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  12. URL '${testdrive.schema-registry-url}'
  13. );
  14. # Verify the error message is useful when a schema is not present in the
  15. # registry.
  16. > CREATE CONNECTION kafka_conn
  17. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  18. $ kafka-create-topic topic=noexist partitions=1
  19. ! CREATE SOURCE noexist
  20. IN CLUSTER ${arg.single-replica-cluster}
  21. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-noexist-${testdrive.seed}')
  22. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  23. ENVELOPE DEBEZIUM
  24. contains:No value schema found
  25. $ set schema-v1={
  26. "type": "record",
  27. "name": "envelope",
  28. "fields": [
  29. {
  30. "name": "before",
  31. "type": [
  32. {
  33. "name": "row",
  34. "type": "record",
  35. "fields": [
  36. {"name": "a", "type": "long"}
  37. ]
  38. },
  39. "null"
  40. ]
  41. },
  42. { "name": "op", "type": "string" },
  43. { "name": "after", "type": ["row", "null"] },
  44. {
  45. "name": "source",
  46. "type": {
  47. "type": "record",
  48. "name": "Source",
  49. "namespace": "io.debezium.connector.mysql",
  50. "fields": [
  51. {
  52. "name": "file",
  53. "type": "string"
  54. },
  55. {
  56. "name": "pos",
  57. "type": "long"
  58. },
  59. {
  60. "name": "row",
  61. "type": "int"
  62. },
  63. {
  64. "name": "snapshot",
  65. "type": [
  66. {
  67. "type": "boolean",
  68. "connect.default": false
  69. },
  70. "null"
  71. ],
  72. "default": false
  73. }
  74. ],
  75. "connect.name": "io.debezium.connector.mysql.Source"
  76. }
  77. }
  78. ]
  79. }
  80. $ set schema-v2={
  81. "type": "record",
  82. "name": "envelope",
  83. "fields": [
  84. {
  85. "name": "before",
  86. "type": [
  87. {
  88. "name": "row",
  89. "type": "record",
  90. "fields": [
  91. {"name": "a", "type": "long"},
  92. {"name": "b", "type": "long", "default": 42}
  93. ]
  94. },
  95. "null"
  96. ]
  97. },
  98. { "name": "op", "type": "string" },
  99. { "name": "after", "type": ["row", "null"] },
  100. {
  101. "name": "source",
  102. "type": {
  103. "type": "record",
  104. "name": "Source",
  105. "namespace": "io.debezium.connector.mysql",
  106. "fields": [
  107. {
  108. "name": "file",
  109. "type": "string"
  110. },
  111. {
  112. "name": "pos",
  113. "type": "long"
  114. },
  115. {
  116. "name": "row",
  117. "type": "int"
  118. },
  119. {
  120. "name": "snapshot",
  121. "type": [
  122. {
  123. "type": "boolean",
  124. "connect.default": false
  125. },
  126. "null"
  127. ],
  128. "default": false
  129. }
  130. ],
  131. "connect.name": "io.debezium.connector.mysql.Source"
  132. }
  133. }
  134. ]
  135. }
  136. $ kafka-create-topic topic=data
  137. $ set valid-key-schema={
  138. "type": "record",
  139. "name": "Key",
  140. "fields": [
  141. {"name": "a", "type": "long"}
  142. ]
  143. }
  144. $ kafka-ingest format=avro topic=data key-format=avro key-schema=${valid-key-schema} schema=${schema-v1} timestamp=1
  145. {"a": 1} {"before": null, "after": {"row": {"a": 1}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  146. > CREATE SOURCE data_v1
  147. IN CLUSTER ${arg.single-replica-cluster}
  148. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  149. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  150. ENVELOPE DEBEZIUM
  151. > SELECT * FROM data_v1
  152. a
  153. ---
  154. 1
  155. $ kafka-ingest format=avro topic=data key-format=avro key-schema=${valid-key-schema} schema=${schema-v2} timestamp=3
  156. {"a": 2} {"before": null, "after": {"row": {"a": 2, "b": -1}}, "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  157. > CREATE SOURCE data_v2
  158. IN CLUSTER ${arg.single-replica-cluster}
  159. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  160. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  161. ENVELOPE DEBEZIUM
  162. > SELECT * FROM data_v1
  163. a
  164. ---
  165. 1
  166. 2
  167. > SELECT * FROM data_v2
  168. a b
  169. ----
  170. 1 42
  171. 2 -1
  172. # [btv] uncomment if we bring back classic debezium mode
  173. # $ kafka-ingest topic=data timestamp=5
  174. # format=avro schema=${schema-v1} key-format=avro key-schema=${valid-key-schema}
  175. # {"a": 1} {"before": null, "after": {"row": {"a": 1}}, "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  176. # > CREATE SOURCE data_v3
  177. # IN CLUSTER ${arg.single-replica-cluster}
  178. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  179. # FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  180. # ENVELOPE DEBEZIUM
  181. # > SELECT * FROM data_v3
  182. # a b
  183. # ----
  184. # 1 42
  185. # 1 42
  186. # 2 -1
  187. # # Make sure this query gives WRONG results,
  188. # # which should prove that we are respecting primary
  189. # # key information (and optimizing by transforming
  190. # # a reduce on a primary key to a map)
  191. # > SELECT a, count(*) FROM data_v3
  192. # GROUP BY a
  193. # a count
  194. # -------
  195. # 1 1
  196. # 1 1
  197. # 2 1