kafka-avro-debezium-transaction.td 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Test support for Avro sources without using the Confluent Schema Registry.
  11. # This test is broken.
  12. # See: https://github.com/MaterializeInc/database-issues/issues/3892
  13. $ set schema={
  14. "type": "record",
  15. "name": "envelope",
  16. "fields": [
  17. {
  18. "name": "before",
  19. "type": [
  20. {
  21. "name": "row",
  22. "type": "record",
  23. "fields": [
  24. {"name": "a", "type": "long"},
  25. {"name": "b", "type": "long"}
  26. ]
  27. },
  28. "null"
  29. ]
  30. },
  31. { "name": "op", "type": "string" },
  32. { "name": "after", "type": ["row", "null"] },
  33. {
  34. "name": "source",
  35. "type": {
  36. "type": "record",
  37. "name": "Source",
  38. "namespace": "io.debezium.connector.mysql",
  39. "fields": [
  40. {
  41. "name": "file",
  42. "type": "string"
  43. },
  44. {
  45. "name": "pos",
  46. "type": "long"
  47. },
  48. {
  49. "name": "row",
  50. "type": "int"
  51. },
  52. {
  53. "name": "snapshot",
  54. "type": [
  55. {
  56. "type": "boolean",
  57. "connect.default": false
  58. },
  59. "null"
  60. ],
  61. "default": false
  62. }
  63. ],
  64. "connect.name": "io.debezium.connector.mysql.Source"
  65. }
  66. },
  67. {
  68. "name": "transaction",
  69. "type": {
  70. "type": "record",
  71. "name": "Transaction",
  72. "namespace": "whatever",
  73. "fields": [
  74. {
  75. "name": "total_order",
  76. "type": ["long", "null"]
  77. },
  78. {
  79. "name": "id",
  80. "type": "string"
  81. }
  82. ]
  83. }
  84. }
  85. ]
  86. }
  87. $ set txschema={
  88. "type": "record",
  89. "name": "TransactionMetadataValue",
  90. "namespace": "io.debezium.connector.common",
  91. "fields": [
  92. {"name": "status", "type": "string"},
  93. {"name": "id", "type": "string"},
  94. {
  95. "name": "event_count",
  96. "type": ["null", "long"],
  97. "default": null
  98. },
  99. {
  100. "name": "data_collections",
  101. "type": [
  102. "null",
  103. {
  104. "type": "array",
  105. "items": {
  106. "type": "record",
  107. "name": "ConnectDefault",
  108. "namespace": "io.confluent.connect.Avro",
  109. "fields": [
  110. {"name": "data_collection", "type": "string"},
  111. {"name": "event_count", "type": "long"}
  112. ]
  113. }
  114. }
  115. ],
  116. "default": null
  117. }
  118. ],
  119. "connect.name": "io.debezium.connector.common.TransactionMetadataValue"
  120. }
  121. $ set txschema-bad-schema={
  122. "type": "record",
  123. "name": "TransactionMetadataValue",
  124. "namespace": "io.debezium.connector.common",
  125. "fields": [
  126. {"name": "status", "type": "string"},
  127. {
  128. "name": "id",
  129. "type": ["null", "string"]
  130. },
  131. {
  132. "name": "event_count",
  133. "type": ["null", "long"],
  134. "default": null
  135. },
  136. {
  137. "name": "data_collections",
  138. "type": [
  139. "null",
  140. {
  141. "type": "array",
  142. "items": {
  143. "type": "record",
  144. "name": "ConnectDefault",
  145. "namespace": "io.confluent.connect.Avro",
  146. "fields": [
  147. {"name": "data_collection", "type": "string"},
  148. {"name": "event_count", "type": "long"}
  149. ]
  150. }
  151. }
  152. ],
  153. "default": null
  154. }
  155. ],
  156. "connect.name": "io.debezium.connector.common.TransactionMetadataValue"
  157. }
  158. $ kafka-create-topic topic=data-txdata
  159. > CREATE CONNECTION kafka_conn
  160. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  161. > CREATE SOURCE data_txdata
  162. IN CLUSTER ${arg.single-replica-cluster}
  163. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-txdata-${testdrive.seed}')
  164. > CREATE TABLE data_txdata_tbl FROM SOURCE data_txdata (REFERENCE "testdrive-data-txdata-${testdrive.seed}")
  165. FORMAT AVRO USING SCHEMA '${txschema}'
  166. ENVELOPE NONE
  167. $ kafka-ingest format=avro topic=data-txdata schema=${txschema} timestamp=4
  168. {"status": "BEGIN", "id": "1", "event_count": null, "data_collections": null}
  169. {"status": "END", "id": "1", "event_count": {"long": 4}, "data_collections": {"array": [{"event_count": 3, "data_collection": "testdrive-data-${testdrive.seed}"}, {"event_count": 1, "data_collection": "testdrive-data2-${testdrive.seed}"}]}}
  170. $ kafka-create-topic topic=data
  171. $ kafka-create-topic topic=data2
  172. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  173. {"before": null, "after": {"row": {"a": 1, "b": 1}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "1"}}
  174. {"before": null, "after": {"row": {"a": 2, "b": 3}}, "source": {"file": "binlog", "pos": 1, "row": 0, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "1"}}
  175. #
  176. # Create a source using an inline schema.
  177. #
  178. > CREATE SOURCE data_schema_inline
  179. IN CLUSTER ${arg.single-replica-cluster}
  180. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  181. > CREATE TABLE data_schema_inline_tbl FROM SOURCE data_schema_inline (REFERENCE "testdrive-data-${testdrive.seed}")
  182. FORMAT AVRO USING SCHEMA '${schema}'
  183. ENVELOPE DEBEZIUM (
  184. TRANSACTION METADATA (SOURCE data_txdata, COLLECTION 'testdrive-data-${testdrive.seed}')
  185. )
  186. > CREATE SOURCE data2_schema_inline
  187. IN CLUSTER ${arg.single-replica-cluster}
  188. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data2-${testdrive.seed}')
  189. > CREATE TABLE data2_schema_inline_tbl FROM SOURCE data2_schema_inline (REFERENCE "testdrive-data2-${testdrive.seed}")
  190. FORMAT AVRO USING SCHEMA '${schema}'
  191. ENVELOPE DEBEZIUM (
  192. TRANSACTION METADATA (SOURCE data_txdata, COLLECTION 'testdrive-data2-${testdrive.seed}')
  193. )
  194. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  195. {"before": null, "after": {"row": {"a": 4, "b": 5}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "1"}}
  196. # Note that this should still work even if data2 (which shares the transaction metadata source) isn't able to progress!
  197. > SELECT a, b FROM data_schema_inline_tbl
  198. a b
  199. -----
  200. 1 1
  201. 2 3
  202. 4 5
  203. $ kafka-ingest format=avro topic=data-txdata schema=${txschema} timestamp=2
  204. {"status": "BEGIN", "id": "5", "event_count": null, "data_collections": null}
  205. {"status": "END", "id": "5", "event_count": {"long": 1}, "data_collections": {"array": [{"event_count": 1, "data_collection": "testdrive-data-${testdrive.seed}"}]}}
  206. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  207. {"before": null, "after": {"row": {"a": 8, "b": 9}}, "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "5"}}
  208. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  209. URL '${testdrive.schema-registry-url}'
  210. );
  211. > CREATE SINK data_sink
  212. IN CLUSTER ${arg.single-replica-cluster}
  213. FROM data_schema_inline_tbl
  214. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-sink-${testdrive.seed}')
  215. FORMAT AVRO
  216. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  217. ENVELOPE DEBEZIUM
  218. # Check that repeated Debezium messages are skipped.
  219. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  220. {"before": null, "after": {"row": {"a": 4, "b": 5}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "1"}}
  221. {"before": null, "after": {"row": {"a": 8, "b": 9}}, "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "5"}}
  222. > SELECT a, b FROM data_schema_inline_tbl
  223. a b
  224. ----
  225. 1 1
  226. 2 3
  227. 4 5
  228. 8 9
  229. # Now do data2
  230. $ kafka-ingest format=avro topic=data2 schema=${schema} timestamp=1
  231. {"before": null, "after": {"row": {"a": 101, "b": 101}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "1"}}
  232. > SELECT a, b FROM data2_schema_inline_tbl
  233. a b
  234. ---------
  235. 101 101
  236. $ set-regex match=\d{13} replacement=<TIMESTAMP>
  237. $ kafka-verify-data format=avro sink=materialize.public.data_sink sort-messages=true
  238. {"before": null, "after": {"row": {"a": 1, "b": 1}}}
  239. {"before": null, "after": {"row": {"a": 2, "b": 3}}}
  240. {"before": null, "after": {"row": {"a": 4, "b": 5}}}
  241. $ kafka-verify-data format=avro sink=materialize.public.data_sink sort-messages=true
  242. {"before": null, "after": {"row": {"a": 8, "b": 9}}}
  243. #
  244. # Test reading from the source when tx and data don't match
  245. #
  246. # We want the next message to have a different timestamp
  247. > BEGIN
  248. > DECLARE c CURSOR FOR SUBSCRIBE data_schema_inline_tbl WITH (snapshot = false, progress = true)
  249. > FETCH 1 c
  250. mz_timestamp mz_progressed mz_diff a b
  251. -------------------------------------------------
  252. <TIMESTAMP> true <null> <null> <null>
  253. > COMMIT
  254. $ kafka-ingest format=avro topic=data-txdata schema=${txschema} timestamp=2
  255. {"status": "BEGIN", "id": "7", "event_count": null, "data_collections": null}
  256. {"status": "END", "id": "7", "event_count": {"long": 1}, "data_collections": {"array": [{"event_count": 1, "data_collection": "testdrive-data-${testdrive.seed}"}]}}
  257. > SELECT a, b FROM data_schema_inline_tbl
  258. a b
  259. -----
  260. 1 1
  261. 2 3
  262. 4 5
  263. 8 9
  264. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  265. {"before": null, "after": {"row": {"a": 2, "b": 7}}, "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "7"}}
  266. > SELECT a, b FROM data_schema_inline_tbl
  267. a b
  268. -----
  269. 1 1
  270. 2 3
  271. 4 5
  272. 8 9
  273. 2 7
  274. # We want the next message to have a different timestamp
  275. > BEGIN
  276. > DECLARE c CURSOR FOR SUBSCRIBE data_schema_inline_tbl WITH (snapshot = false, progress = true)
  277. > FETCH 1 c
  278. mz_timestamp mz_progressed mz_diff a b
  279. -------------------------------------------------
  280. <TIMESTAMP> true <null> <null> <null>
  281. > COMMIT
  282. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  283. {"before": null, "after": {"row": {"a": 3, "b": 9}}, "source": {"file": "binlog4", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c", "transaction": {"total_order": null, "id": "9"}}
  284. > SELECT a, b FROM data_schema_inline_tbl
  285. a b
  286. -----
  287. 1 1
  288. 2 3
  289. 4 5
  290. 8 9
  291. 2 7
  292. $ kafka-ingest format=avro topic=data-txdata schema=${txschema} timestamp=2
  293. {"status": "BEGIN", "id": "9", "event_count": null, "data_collections": null}
  294. {"status": "END", "id": "9", "event_count": {"long": 1}, "data_collections": {"array": [{"event_count": 1, "data_collection": "testdrive-data-${testdrive.seed}"}]}}
  295. > SELECT a, b FROM data_schema_inline_tbl
  296. a b
  297. -----
  298. 1 1
  299. 2 3
  300. 4 5
  301. 8 9
  302. 2 7
  303. 3 9
  304. $ unset-regex
  305. # Reingest to verify that we keep transactionality
  306. > CREATE SOURCE data_sink_reingest
  307. IN CLUSTER ${arg.single-replica-cluster}
  308. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-sink-${testdrive.seed}')
  309. > CREATE TABLE data_sink_reingest_tbl FROM SOURCE data_sink_reingest (REFERENCE "testdrive-data-sink-${testdrive.seed}")
  310. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  311. ENVELOPE NONE
  312. > SELECT after::text FROM data_sink_reingest_tbl ORDER BY transaction ASC
  313. (1,1)
  314. (2,3)
  315. (4,5)
  316. (8,9)
  317. (2,7)
  318. (3,9)
  319. > SELECT COUNT(*) AS event_count FROM data_sink_reingest_tbl GROUP BY transaction ORDER BY transaction ASC
  320. 3
  321. 1
  322. 1
  323. 1
  324. #
  325. # Testing tx_metadata specification
  326. #
  327. $ kafka-create-topic topic=data-txdata-bad-schema
  328. $ kafka-create-topic topic=data-bad-schema
  329. > CREATE SOURCE data_txdata_bad_schema
  330. IN CLUSTER ${arg.single-replica-cluster}
  331. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-txdata-bad-schema-${testdrive.seed}')
  332. > CREATE TABLE data_txdata_bad_schema_tbl FROM SOURCE data_txdata_bad_schema (REFERENCE "testdrive-data-txdata-bad-schema-${testdrive.seed}")
  333. FORMAT AVRO USING SCHEMA '${txschema-bad-schema}'
  334. ENVELOPE NONE
  335. $ kafka-ingest format=avro topic=data-txdata-bad-schema schema=${txschema-bad-schema} timestamp=4
  336. {"status": "BEGIN", "id": null, "event_count": null, "data_collections": null}
  337. {"status": "END", "id": {"string": "1"}, "event_count": {"long": 3}, "data_collections": {"array": [{"event_count": 3, "data_collection": "testdrive-data-${testdrive.seed}"}]}}
  338. > CREATE SOURCE data_schema_inline_with_bad_schema_tx_metadata
  339. IN CLUSTER ${arg.single-replica-cluster}
  340. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-bad-schema-${testdrive.seed}')
  341. ! CREATE TABLE data_schema_inline_with_bad_schema_tx_metadata_tbl FROM SOURCE data_schema_inline_with_bad_schema_tx_metadata (REFERENCE "testdrive-data-bad-schema-${testdrive.seed}")
  342. FORMAT AVRO USING SCHEMA '${schema}'
  343. ENVELOPE DEBEZIUM (
  344. TRANSACTION METADATA (
  345. SOURCE data_txdata_bad_schema,
  346. COLLECTION 'testdrive-data-bad-schema-${testdrive.seed}'
  347. )
  348. )
  349. contains:'id' column must be of type non-nullable string
  350. > CREATE SOURCE data_schema_inline_with_sink
  351. IN CLUSTER ${arg.single-replica-cluster}
  352. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  353. ! CREATE TABLE data_schema_inline_with_sink_tbl FROM SOURCE data_schema_inline_with_sink (REFERENCE "testdrive-data-${testdrive.seed}")
  354. FORMAT AVRO USING SCHEMA '${schema}'
  355. ENVELOPE DEBEZIUM (
  356. TRANSACTION METADATA (SOURCE data_sink, COLLECTION 'testdrive-data-${testdrive.seed}')
  357. )
  358. contains:provided TRANSACTION METADATA SOURCE materialize.public.data_sink is not a source