kafka-avro-sources.td 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. # Test support for Avro sources without using the Confluent Schema Registry.
  14. $ set key-schema={
  15. "type": "record",
  16. "name": "Key",
  17. "fields": [{"name": "a", "type": "long"}]
  18. }
  19. $ set schema={
  20. "type": "record",
  21. "name": "envelope",
  22. "fields": [
  23. {
  24. "name": "before",
  25. "type": [
  26. {
  27. "name": "row",
  28. "type": "record",
  29. "fields": [
  30. {"name": "a", "type": "long"},
  31. {"name": "b", "type": "long"},
  32. {
  33. "name": "json",
  34. "type": {
  35. "connect.name": "io.debezium.data.Json",
  36. "type": "string"
  37. }
  38. },
  39. {
  40. "name": "c",
  41. "type": {
  42. "type": "enum",
  43. "name": "Bool",
  44. "symbols": ["True", "False", "FileNotFound"]
  45. }
  46. },
  47. {"name": "d", "type": "Bool"},
  48. {"name": "e", "type": ["null",{
  49. "type": "record",
  50. "name": "nested_data_1",
  51. "fields": [
  52. {"name": "n1_a", "type": "long"},
  53. {"name": "n1_b", "type": ["null", "double", {
  54. "type": "record",
  55. "name": "nested_data_2",
  56. "fields": [
  57. {"name": "n2_a", "type": "long"},
  58. {"name": "n2_b", "type": "int"}
  59. ]
  60. }]
  61. }
  62. ]
  63. }]
  64. },
  65. {"name": "f", "type": ["null", "nested_data_2"]}
  66. ]
  67. },
  68. "null"
  69. ]
  70. },
  71. { "name": "after", "type": ["row", "null"] },
  72. { "name": "op", "type": "string" },
  73. {
  74. "name": "source",
  75. "type": {
  76. "type": "record",
  77. "name": "Source",
  78. "namespace": "io.debezium.connector.mysql",
  79. "fields": [
  80. {
  81. "name": "file",
  82. "type": "string"
  83. },
  84. {
  85. "name": "pos",
  86. "type": "long"
  87. },
  88. {
  89. "name": "row",
  90. "type": "int"
  91. },
  92. {
  93. "name": "snapshot",
  94. "type": [
  95. {
  96. "type": "boolean",
  97. "connect.default": false
  98. },
  99. "null"
  100. ],
  101. "default": false
  102. }
  103. ],
  104. "connect.name": "io.debezium.connector.mysql.Source"
  105. }
  106. }
  107. ]
  108. }
  109. $ kafka-create-topic topic=data partitions=1
  110. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  111. {"before": null, "after": {"row": {"a": 1, "b": 1, "json": "null", "c": "True", "d": "False", "e": {"nested_data_1": {"n1_a": 42, "n1_b": {"double": 86.5}}}, "f": null}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"boolean": false}}, "op": "c"}
  112. {"before": null, "after": {"row": {"a": 2, "b": 3, "json": "{\"hello\": \"world\"}", "c": "False", "d": "FileNotFound", "e": {"nested_data_1": {"n1_a": 43, "n1_b":{"nested_data_2": {"n2_a": 44, "n2_b": -1}}}}, "f": {"nested_data_2": {"n2_a": 45, "n2_b": -2}}}}, "source": {"file": "binlog", "pos": 1, "row": 0, "snapshot": {"boolean": false}}, "op": "c"}
  113. {"before": null, "after": {"row": {"a": -1, "b": 7, "json": "[1, 2, 3]", "c": "FileNotFound", "d": "True", "e": null, "f": null}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  114. > CREATE CONNECTION kafka_conn
  115. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  116. > SHOW SOURCES
  117. name type cluster comment
  118. --------------------------------
  119. # [btv] uncomment if we bring back classic debezium mode
  120. # ! CREATE SOURCE fast_forwarded
  121. # IN CLUSTER ${arg.single-replica-cluster}
  122. # FROM KAFKA CONNECTION kafka_conn (START OFFSET=[2], TOPIC 'testdrive-data-${testdrive.seed}')
  123. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  124. # VALUE FORMAT AVRO USING SCHEMA '${schema}'
  125. # ENVELOPE DEBEZIUM
  126. # contains:START OFFSET is not supported with ENVELOPE DEBEZIUM
  127. # Test an Avro source without a Debezium envelope.
  128. $ set non-dbz-schema={
  129. "type": "record",
  130. "name": "cpx",
  131. "fields": [
  132. {"name": "a", "type": "long"},
  133. {"name": "b", "type": "long"}
  134. ]
  135. }
  136. $ kafka-create-topic topic=non-dbz-data partitions=1
  137. $ kafka-ingest format=avro topic=non-dbz-data schema=${non-dbz-schema} timestamp=1
  138. {"a": 1, "b": 2}
  139. {"a": 2, "b": 3}
  140. > CREATE CLUSTER non_dbz_data_cluster SIZE '${arg.default-storage-size}';
  141. > CREATE SOURCE non_dbz_data
  142. IN CLUSTER non_dbz_data_cluster
  143. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
  144. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  145. ENVELOPE NONE
  146. > SELECT * FROM non_dbz_data
  147. a b
  148. ---
  149. 1 2
  150. 2 3
  151. # test INCLUDE metadata
  152. > CREATE CLUSTER non_dbz_data_metadata_cluster SIZE '${arg.default-storage-size}';
  153. > CREATE SOURCE non_dbz_data_metadata
  154. IN CLUSTER non_dbz_data_metadata_cluster
  155. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
  156. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  157. INCLUDE PARTITION, OFFSET
  158. ENVELOPE NONE
  159. > SELECT * FROM non_dbz_data_metadata
  160. a b partition offset
  161. --------------------
  162. 1 2 0 0
  163. 2 3 0 1
  164. > CREATE CLUSTER non_dbz_data_metadata_named_cluster SIZE '${arg.default-storage-size}';
  165. > CREATE SOURCE non_dbz_data_metadata_named
  166. IN CLUSTER non_dbz_data_metadata_named_cluster
  167. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
  168. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  169. INCLUDE PARTITION as part, OFFSET as mzo
  170. ENVELOPE NONE
  171. > SELECT * FROM non_dbz_data_metadata_named
  172. a b part mzo
  173. --------------
  174. 1 2 0 0
  175. 2 3 0 1
  176. # Test an Avro source without a Debezium envelope starting at specified partition offsets.
  177. $ kafka-create-topic topic=non-dbz-data-multi-partition partitions=2
  178. $ kafka-ingest format=avro topic=non-dbz-data-multi-partition schema=${non-dbz-schema} timestamp=1 partition=1
  179. {"a": 4, "b": 1}
  180. $ kafka-ingest format=avro topic=non-dbz-data-multi-partition schema=${non-dbz-schema} timestamp=1 partition=0
  181. {"a": 1, "b": 2}
  182. > CREATE CLUSTER non_dbz_data_multi_partition_cluster SIZE '${arg.default-storage-size}';
  183. > CREATE SOURCE non_dbz_data_multi_partition
  184. IN CLUSTER non_dbz_data_multi_partition_cluster
  185. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  186. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  187. ENVELOPE NONE
  188. > SELECT * FROM non_dbz_data_multi_partition
  189. a b
  190. -----
  191. 4 1
  192. > CREATE CLUSTER non_dbz_data_multi_partition_2_cluster SIZE '${arg.default-storage-size}';
  193. > CREATE SOURCE non_dbz_data_multi_partition_2
  194. IN CLUSTER non_dbz_data_multi_partition_2_cluster
  195. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[0,0], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  196. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  197. ENVELOPE NONE
  198. > SELECT * FROM non_dbz_data_multi_partition_2
  199. a b
  200. -----
  201. 1 2
  202. 4 1
  203. > CREATE CLUSTER non_dbz_data_multi_partition_fast_forwarded_cluster SIZE '${arg.default-storage-size}';
  204. > CREATE SOURCE non_dbz_data_multi_partition_fast_forwarded
  205. IN CLUSTER non_dbz_data_multi_partition_fast_forwarded_cluster
  206. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[0,1], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  207. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  208. ENVELOPE NONE
  209. > SELECT * FROM non_dbz_data_multi_partition_fast_forwarded
  210. a b
  211. ----
  212. 1 2
  213. > CREATE CLUSTER non_dbz_data_multi_partition_fast_forwarded_2_cluster SIZE '${arg.default-storage-size}';
  214. > CREATE SOURCE non_dbz_data_multi_partition_fast_forwarded_2
  215. IN CLUSTER non_dbz_data_multi_partition_fast_forwarded_2_cluster
  216. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1,0], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  217. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  218. ENVELOPE NONE
  219. > SELECT * FROM non_dbz_data_multi_partition_fast_forwarded_2
  220. a b
  221. ----
  222. 4 1
  223. # Test an Avro source without a Debezium envelope with specified offsets and varying partition numbers.
  224. $ kafka-create-topic topic=non-dbz-data-varying-partition partitions=1
  225. $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz-schema} timestamp=1 partition=0
  226. {"a": 5, "b": 6}
  227. > CREATE CLUSTER non_dbz_data_varying_partition_cluster SIZE '${arg.default-storage-size}';
  228. > CREATE SOURCE non_dbz_data_varying_partition
  229. IN CLUSTER non_dbz_data_varying_partition_cluster
  230. FROM KAFKA CONNECTION kafka_conn (
  231. TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}',
  232. START OFFSET=[1]
  233. )
  234. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  235. ENVELOPE NONE
  236. > SELECT * FROM non_dbz_data_varying_partition
  237. $ kafka-add-partitions topic=non-dbz-data-varying-partition total-partitions=2
  238. # Reading data that's ingested to a new partition takes longer than the default timeout.
  239. $ set-sql-timeout duration=180s
  240. $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz-schema} timestamp=1 partition=1
  241. {"a": 7, "b": 8}
  242. {"a": 9, "b": 10}
  243. # Because the start offset for any new partitions will be 0, the first record sent to the new
  244. # partition will be included.
  245. > SELECT * FROM non_dbz_data_varying_partition
  246. a b
  247. -----
  248. 7 8
  249. 9 10
  250. > CREATE CLUSTER non_dbz_data_varying_partition_2_cluster SIZE '${arg.default-storage-size}';
  251. > CREATE SOURCE non_dbz_data_varying_partition_2
  252. IN CLUSTER non_dbz_data_varying_partition_2_cluster
  253. FROM KAFKA CONNECTION kafka_conn (
  254. TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}',
  255. START OFFSET=[1,1]
  256. )
  257. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  258. ENVELOPE NONE
  259. $ kafka-add-partitions topic=non-dbz-data-varying-partition total-partitions=3
  260. $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz-schema} timestamp=1 partition=2
  261. {"a": 11, "b": 12}
  262. # Because the start offset for any new partitions will be 0, the first record sent to the new
  263. # partition will be included.
  264. > SELECT * FROM non_dbz_data_varying_partition_2
  265. a b
  266. -----
  267. 9 10
  268. 11 12
  269. $ set-sql-timeout duration=default
  270. # Source with new-style three-valued "snapshot".
  271. $ set new-dbz-schema={
  272. "type": "record",
  273. "name": "envelope",
  274. "fields": [
  275. {
  276. "name": "before",
  277. "type": [
  278. {
  279. "name": "row",
  280. "type": "record",
  281. "fields": [
  282. {"name": "a", "type": "long"},
  283. {"name": "b", "type": "long"}
  284. ]
  285. },
  286. "null"
  287. ]
  288. },
  289. { "name": "after", "type": ["row", "null"] },
  290. { "name": "op", "type": "string" },
  291. {
  292. "name": "source",
  293. "type": {
  294. "type": "record",
  295. "name": "Source",
  296. "namespace": "io.debezium.connector.mysql",
  297. "fields": [
  298. {
  299. "name": "snapshot",
  300. "type": [
  301. {
  302. "type": "string",
  303. "connect.version": 1,
  304. "connect.parameters": {
  305. "allowed": "true,last,false"
  306. },
  307. "connect.default": "false",
  308. "connect.name": "io.debezium.data.Enum"
  309. },
  310. "null"
  311. ],
  312. "default": "false"
  313. },
  314. {
  315. "name": "file",
  316. "type": "string"
  317. },
  318. {
  319. "name": "pos",
  320. "type": "long"
  321. },
  322. {
  323. "name": "row",
  324. "type": "int"
  325. }
  326. ],
  327. "connect.name": "io.debezium.connector.mysql.Source"
  328. }
  329. }
  330. ]
  331. }
  332. $ kafka-create-topic topic=new-dbz-data partitions=1
  333. # We don't do anything sensible yet for snapshot "true" or "last", so just test that those are ingested.
  334. # [btv] uncomment if we bring back classic debezium mode
  335. # $ kafka-ingest format=avro topic=new-dbz-data key-format=avro key-schema=${key-schema} schema=${new-dbz-schema} timestamp=1
  336. # {"a": 9} {"before": null, "after": {"row":{"a": 9, "b": 10}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"string": "true"}}, "op": "r"}
  337. # {"a": 11} {"before": null, "after": {"row":{"a": 11, "b": 11}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"string": "last"}}, "op": "r"}
  338. # {"a": 14} {"before": null, "after": {"row":{"a": 14, "b": 6}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": null}, "op": "c"}
  339. # {"a": 1} {"before": null, "after": {"row":{"a": 1, "b": 1}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"string": "false"}}, "op": "c"}
  340. # {"a": 2} {"before": null, "after": {"row":{"a": 2, "b": 3}}, "source": {"file": "binlog", "pos": 1, "row": 0, "snapshot": {"string": "false"}}, "op": "c"}
  341. # {"a": -1} {"before": null, "after": {"row":{"a": -1, "b": 7}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"string": "false"}}, "op": "c"}
  342. # {"a": -1} {"before": null, "after": {"row":{"a": -1, "b": 7}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"string": "false"}}, "op": "c"}
  343. # > CREATE SOURCE new_dbz
  344. # IN CLUSTER ${arg.single-replica-cluster}
  345. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-new-dbz-data-${testdrive.seed}')
  346. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  347. # VALUE FORMAT AVRO USING SCHEMA '${new-dbz-schema}'
  348. # ENVELOPE DEBEZIUM
  349. # > SELECT * FROM new_dbz
  350. # a b
  351. # ---
  352. # 9 10
  353. # 11 11
  354. # 14 6
  355. # 2 3
  356. # -1 7
  357. $ kafka-create-topic topic=ignored partitions=1
  358. ! CREATE SOURCE recursive
  359. IN CLUSTER ${arg.single-replica-cluster}
  360. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-ignored-${testdrive.seed}')
  361. FORMAT AVRO USING SCHEMA '{"type":"record","name":"a","fields":[{"name":"f","type":["a","null"]}]}'
  362. contains:validating avro schema: Recursive types are not supported: .a
  363. $ set key-schema={"type": "string"}
  364. $ set value-schema={"type": "record", "name": "r", "fields": [{"name": "a", "type": "string"}]}
  365. $ kafka-create-topic topic=non-subset-key
  366. $ kafka-ingest format=avro topic=non-subset-key key-format=avro key-schema=${key-schema} schema=${value-schema}
  367. "asdf" {"a": "asdf"}
  368. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  369. URL '${testdrive.schema-registry-url}'
  370. );
  371. > CREATE CLUSTER non_subset_key_cluster SIZE '${arg.default-storage-size}';
  372. > CREATE SOURCE non_subset_key
  373. IN CLUSTER non_subset_key_cluster
  374. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-subset-key-${testdrive.seed}')
  375. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  376. ENVELOPE NONE
  377. > SELECT * FROM non_subset_key
  378. a
  379. ---
  380. "asdf"
  381. # Test that Postgres-style sources can be ingested.
  382. $ set pg-dbz-schema={
  383. "type": "record",
  384. "name": "envelope",
  385. "fields": [
  386. {
  387. "name": "before",
  388. "type": [
  389. {
  390. "name": "row",
  391. "type": "record",
  392. "fields": [
  393. {"name": "a", "type": "long"},
  394. {"name": "b", "type": "long"}
  395. ]
  396. },
  397. "null"
  398. ]
  399. },
  400. { "name": "after", "type": ["row", "null"] },
  401. { "name": "op", "type": "string" },
  402. {
  403. "name": "source",
  404. "type": {
  405. "type": "record",
  406. "name": "Source",
  407. "namespace": "whatever",
  408. "fields": [
  409. {
  410. "name": "snapshot",
  411. "type": [
  412. {
  413. "type": "string",
  414. "connect.version": 1,
  415. "connect.parameters": {
  416. "allowed": "true,last,false"
  417. },
  418. "connect.default": "false",
  419. "connect.name": "io.debezium.data.Enum"
  420. },
  421. "null"
  422. ],
  423. "default": "false"
  424. },
  425. {
  426. "name": "lsn",
  427. "type": ["long", "null"]
  428. },
  429. {
  430. "name": "sequence",
  431. "type": ["string", "null"]
  432. }
  433. ]
  434. }
  435. }
  436. ]
  437. }
  438. # $ kafka-create-topic topic=pg-dbz-data partitions=1
  439. # # The third and fourth records will be skipped, since `sequence` has gone backwards.
  440. # $ kafka-ingest format=avro topic=pg-dbz-data schema=${pg-dbz-schema} timestamp=1
  441. # {"before": null, "after": {"row":{"a": 1, "b": 1}}, "source": {"lsn": {"long": 1}, "sequence": {"string": "[\"1\", \"1\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  442. # {"before": null, "after": {"row":{"a": 2, "b": 3}}, "source": {"lsn": {"long": 2}, "sequence": {"string": "[\"1\", \"2\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  443. # {"before": null, "after": {"row":{"a": -1, "b": 7}}, "source": {"lsn": {"long": 0}, "sequence": {"string": "[\"0\", \"1\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  444. # {"before": null, "after": {"row":{"a": 4, "b": 5}}, "source": {"lsn": {"long": 2}, "sequence": {"string": "[\"1\", \"2\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  445. # > CREATE SOURCE pg_dbz
  446. # IN CLUSTER ${arg.single-replica-cluster}
  447. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-pg-dbz-data-${testdrive.seed}')
  448. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  449. # VALUE FORMAT AVRO USING SCHEMA '${pg-dbz-schema}'
  450. # ENVELOPE DEBEZIUM
  451. # > SELECT * FROM pg_dbz
  452. # a b
  453. # ---
  454. # 1 1
  455. # 2 3
  456. # Test that SQL Server-style sources can be ingested.
  457. # $ set ms-dbz-schema={
  458. # "connect.name": "com.materialize.test.Envelope",
  459. # "fields": [
  460. # {
  461. # "default": null,
  462. # "name": "before",
  463. # "type": [
  464. # "null",
  465. # {
  466. # "connect.name": "com.materialize.test.Value",
  467. # "fields": [
  468. # {
  469. # "name": "a",
  470. # "type": "int"
  471. # },
  472. # {
  473. # "name": "b",
  474. # "type": "int"
  475. # }
  476. # ],
  477. # "name": "Value",
  478. # "type": "record"
  479. # }
  480. # ]
  481. # },
  482. # {
  483. # "default": null,
  484. # "name": "after",
  485. # "type": [
  486. # "null",
  487. # "Value"
  488. # ]
  489. # },
  490. # { "name": "op", "type": "string" },
  491. # {
  492. # "name": "source",
  493. # "type": {
  494. # "connect.name": "io.debezium.connector.sqlserver.Source",
  495. # "fields": [
  496. # {
  497. # "default": "false",
  498. # "name": "snapshot",
  499. # "type": [
  500. # {
  501. # "connect.default": "false",
  502. # "connect.name": "io.debezium.data.Enum",
  503. # "connect.parameters": {
  504. # "allowed": "true,last,false"
  505. # },
  506. # "connect.version": 1,
  507. # "type": "string"
  508. # },
  509. # "null"
  510. # ]
  511. # },
  512. # {
  513. # "default": null,
  514. # "name": "change_lsn",
  515. # "type": [
  516. # "null",
  517. # "string"
  518. # ]
  519. # },
  520. # {
  521. # "default": null,
  522. # "name": "sequence",
  523. # "type": [
  524. # "null",
  525. # "string"
  526. # ]
  527. # },
  528. # {
  529. # "default": null,
  530. # "name": "event_serial_no",
  531. # "type": [
  532. # "null",
  533. # "long"
  534. # ]
  535. # }
  536. # ],
  537. # "name": "Source",
  538. # "namespace": "io.debezium.connector.sqlserver",
  539. # "type": "record"
  540. # }
  541. # }
  542. # ],
  543. # "name": "Envelope",
  544. # "namespace": "com.materialize.test",
  545. # "type": "record"
  546. # }
  547. # $ kafka-create-topic topic=ms-dbz-data partitions=1
  548. # # The third record will be skipped, since `lsn` has gone backwards.
  549. # $ kafka-ingest format=avro topic=ms-dbz-data schema=${ms-dbz-schema} timestamp=1
  550. # {"before": null, "after": {"Value":{"a": 1, "b": 1}}, "source": {"change_lsn": {"string": "00000025:00000728:001b"}, "sequence": null, "event_serial_no": {"long": 1}, "snapshot": {"string": "false"}}, "op": "c"}
  551. # {"before": null, "after": {"Value":{"a": 2, "b": 3}}, "source": {"change_lsn": {"string": "00000025:00000728:001c"}, "sequence": null, "event_serial_no": {"long": 1}, "snapshot": {"string": "false"}}, "op": "c"}
  552. # {"before": null, "after": {"Value":{"a": -1, "b": 7}}, "source": {"change_lsn": {"string": "00000025:00000728:001a"}, "sequence": null, "event_serial_no": {"long": 1}, "snapshot": {"string": "false"}}, "op": "c"}
  553. # > CREATE SOURCE ms_dbz
  554. # IN CLUSTER ${arg.single-replica-cluster}
  555. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-ms-dbz-data-${testdrive.seed}')
  556. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  557. # VALUE FORMAT AVRO USING SCHEMA '${ms-dbz-schema}'
  558. # ENVELOPE DEBEZIUM
  559. # > SELECT * FROM ms_dbz
  560. # a b
  561. # ---
  562. # 1 1
  563. # 2 3
  564. # > CREATE SOURCE ms_dbz_uncommitted
  565. # IN CLUSTER ${arg.single-replica-cluster}
  566. # FROM KAFKA CONNECTION kafka_conn (ISOLATION LEVEL = 'read_uncommitted', TOPIC 'testdrive-ms-dbz-data-${testdrive.seed}')
  567. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  568. # VALUE FORMAT AVRO USING SCHEMA '${ms-dbz-schema}'
  569. # ENVELOPE DEBEZIUM
  570. # > SELECT * FROM ms_dbz_uncommitted
  571. # a b
  572. # ---
  573. # 1 1
  574. # 2 3