kafka-avro-sources.td 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. # Test support for Avro sources without using the Confluent Schema Registry.
  14. $ set key-schema={
  15. "type": "record",
  16. "name": "Key",
  17. "fields": [{"name": "a", "type": "long"}]
  18. }
  19. $ set schema={
  20. "type": "record",
  21. "name": "envelope",
  22. "fields": [
  23. {
  24. "name": "before",
  25. "type": [
  26. {
  27. "name": "row",
  28. "type": "record",
  29. "fields": [
  30. {"name": "a", "type": "long"},
  31. {"name": "b", "type": "long"},
  32. {
  33. "name": "json",
  34. "type": {
  35. "connect.name": "io.debezium.data.Json",
  36. "type": "string"
  37. }
  38. },
  39. {
  40. "name": "c",
  41. "type": {
  42. "type": "enum",
  43. "name": "Bool",
  44. "symbols": ["True", "False", "FileNotFound"]
  45. }
  46. },
  47. {"name": "d", "type": "Bool"},
  48. {"name": "e", "type": ["null",{
  49. "type": "record",
  50. "name": "nested_data_1",
  51. "fields": [
  52. {"name": "n1_a", "type": "long"},
  53. {"name": "n1_b", "type": ["null", "double", {
  54. "type": "record",
  55. "name": "nested_data_2",
  56. "fields": [
  57. {"name": "n2_a", "type": "long"},
  58. {"name": "n2_b", "type": "int"}
  59. ]
  60. }]
  61. }
  62. ]
  63. }]
  64. },
  65. {"name": "f", "type": ["null", "nested_data_2"]}
  66. ]
  67. },
  68. "null"
  69. ]
  70. },
  71. { "name": "after", "type": ["row", "null"] },
  72. { "name": "op", "type": "string" },
  73. {
  74. "name": "source",
  75. "type": {
  76. "type": "record",
  77. "name": "Source",
  78. "namespace": "io.debezium.connector.mysql",
  79. "fields": [
  80. {
  81. "name": "file",
  82. "type": "string"
  83. },
  84. {
  85. "name": "pos",
  86. "type": "long"
  87. },
  88. {
  89. "name": "row",
  90. "type": "int"
  91. },
  92. {
  93. "name": "snapshot",
  94. "type": [
  95. {
  96. "type": "boolean",
  97. "connect.default": false
  98. },
  99. "null"
  100. ],
  101. "default": false
  102. }
  103. ],
  104. "connect.name": "io.debezium.connector.mysql.Source"
  105. }
  106. }
  107. ]
  108. }
  109. $ kafka-create-topic topic=data partitions=1
  110. $ kafka-ingest format=avro topic=data schema=${schema} timestamp=1
  111. {"before": null, "after": {"row": {"a": 1, "b": 1, "json": "null", "c": "True", "d": "False", "e": {"nested_data_1": {"n1_a": 42, "n1_b": {"double": 86.5}}}, "f": null}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"boolean": false}}, "op": "c"}
  112. {"before": null, "after": {"row": {"a": 2, "b": 3, "json": "{\"hello\": \"world\"}", "c": "False", "d": "FileNotFound", "e": {"nested_data_1": {"n1_a": 43, "n1_b":{"nested_data_2": {"n2_a": 44, "n2_b": -1}}}}, "f": {"nested_data_2": {"n2_a": 45, "n2_b": -2}}}}, "source": {"file": "binlog", "pos": 1, "row": 0, "snapshot": {"boolean": false}}, "op": "c"}
  113. {"before": null, "after": {"row": {"a": -1, "b": 7, "json": "[1, 2, 3]", "c": "FileNotFound", "d": "True", "e": null, "f": null}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"boolean": false}}, "op": "c"}
  114. > CREATE CONNECTION kafka_conn
  115. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  116. > SHOW SOURCES
  117. name type cluster comment
  118. --------------------------------
  119. # [btv] uncomment if we bring back classic debezium mode
  120. # ! CREATE SOURCE fast_forwarded
  121. # IN CLUSTER ${arg.single-replica-cluster}
  122. # FROM KAFKA CONNECTION kafka_conn (START OFFSET=[2], TOPIC 'testdrive-data-${testdrive.seed}')
  123. # ! CREATE TABLE fast_forwarded_tbl FROM SOURCE fast_forwarded (REFERENCE "testdrive-data-${testdrive.seed}")
  124. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  125. # VALUE FORMAT AVRO USING SCHEMA '${schema}'
  126. # ENVELOPE DEBEZIUM
  127. # contains:START OFFSET is not supported with ENVELOPE DEBEZIUM
  128. # Test an Avro source without a Debezium envelope.
  129. $ set non-dbz-schema={
  130. "type": "record",
  131. "name": "cpx",
  132. "fields": [
  133. {"name": "a", "type": "long"},
  134. {"name": "b", "type": "long"}
  135. ]
  136. }
  137. $ kafka-create-topic topic=non-dbz-data partitions=1
  138. $ kafka-ingest format=avro topic=non-dbz-data schema=${non-dbz-schema} timestamp=1
  139. {"a": 1, "b": 2}
  140. {"a": 2, "b": 3}
  141. > CREATE CLUSTER non_dbz_data_cluster SIZE '${arg.default-storage-size}';
  142. > CREATE SOURCE non_dbz_data
  143. IN CLUSTER non_dbz_data_cluster
  144. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
  145. > CREATE TABLE non_dbz_data_tbl FROM SOURCE non_dbz_data (REFERENCE "testdrive-non-dbz-data-${testdrive.seed}")
  146. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  147. ENVELOPE NONE
  148. > SELECT * FROM non_dbz_data_tbl
  149. a b
  150. ---
  151. 1 2
  152. 2 3
  153. # test INCLUDE metadata
  154. > CREATE CLUSTER non_dbz_data_metadata_cluster SIZE '${arg.default-storage-size}';
  155. > CREATE SOURCE non_dbz_data_metadata
  156. IN CLUSTER non_dbz_data_metadata_cluster
  157. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
  158. > CREATE TABLE non_dbz_data_metadata_tbl FROM SOURCE non_dbz_data_metadata (REFERENCE "testdrive-non-dbz-data-${testdrive.seed}")
  159. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  160. INCLUDE PARTITION, OFFSET
  161. ENVELOPE NONE
  162. > SELECT * FROM non_dbz_data_metadata_tbl
  163. a b partition offset
  164. --------------------
  165. 1 2 0 0
  166. 2 3 0 1
  167. > CREATE CLUSTER non_dbz_data_metadata_named_cluster SIZE '${arg.default-storage-size}';
  168. > CREATE SOURCE non_dbz_data_metadata_named
  169. IN CLUSTER non_dbz_data_metadata_named_cluster
  170. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
  171. > CREATE TABLE non_dbz_data_metadata_named_tbl FROM SOURCE non_dbz_data_metadata_named (REFERENCE "testdrive-non-dbz-data-${testdrive.seed}")
  172. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  173. INCLUDE PARTITION as part, OFFSET as mzo
  174. ENVELOPE NONE
  175. > SELECT * FROM non_dbz_data_metadata_named_tbl
  176. a b part mzo
  177. --------------
  178. 1 2 0 0
  179. 2 3 0 1
  180. # Test an Avro source without a Debezium envelope starting at specified partition offsets.
  181. $ kafka-create-topic topic=non-dbz-data-multi-partition partitions=2
  182. $ kafka-ingest format=avro topic=non-dbz-data-multi-partition schema=${non-dbz-schema} timestamp=1 partition=1
  183. {"a": 4, "b": 1}
  184. $ kafka-ingest format=avro topic=non-dbz-data-multi-partition schema=${non-dbz-schema} timestamp=1 partition=0
  185. {"a": 1, "b": 2}
  186. > CREATE CLUSTER non_dbz_data_multi_partition_cluster SIZE '${arg.default-storage-size}';
  187. > CREATE SOURCE non_dbz_data_multi_partition
  188. IN CLUSTER non_dbz_data_multi_partition_cluster
  189. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  190. > CREATE TABLE non_dbz_data_multi_partition_tbl FROM SOURCE non_dbz_data_multi_partition (REFERENCE "testdrive-non-dbz-data-multi-partition-${testdrive.seed}")
  191. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  192. ENVELOPE NONE
  193. > SELECT * FROM non_dbz_data_multi_partition_tbl
  194. a b
  195. -----
  196. 4 1
  197. > CREATE CLUSTER non_dbz_data_multi_partition_2_cluster SIZE '${arg.default-storage-size}';
  198. > CREATE SOURCE non_dbz_data_multi_partition_2
  199. IN CLUSTER non_dbz_data_multi_partition_2_cluster
  200. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[0,0], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  201. > CREATE TABLE non_dbz_data_multi_partition_2_tbl FROM SOURCE non_dbz_data_multi_partition_2 (REFERENCE "testdrive-non-dbz-data-multi-partition-${testdrive.seed}")
  202. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  203. ENVELOPE NONE
  204. > SELECT * FROM non_dbz_data_multi_partition_2_tbl
  205. a b
  206. -----
  207. 1 2
  208. 4 1
  209. > CREATE CLUSTER non_dbz_data_multi_partition_fast_forwarded_cluster SIZE '${arg.default-storage-size}';
  210. > CREATE SOURCE non_dbz_data_multi_partition_fast_forwarded
  211. IN CLUSTER non_dbz_data_multi_partition_fast_forwarded_cluster
  212. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[0,1], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  213. > CREATE TABLE non_dbz_data_multi_partition_fast_forwarded_tbl FROM SOURCE non_dbz_data_multi_partition_fast_forwarded (REFERENCE "testdrive-non-dbz-data-multi-partition-${testdrive.seed}")
  214. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  215. ENVELOPE NONE
  216. > SELECT * FROM non_dbz_data_multi_partition_fast_forwarded_tbl
  217. a b
  218. ----
  219. 1 2
  220. > CREATE CLUSTER non_dbz_data_multi_partition_fast_forwarded_2_cluster SIZE '${arg.default-storage-size}';
  221. > CREATE SOURCE non_dbz_data_multi_partition_fast_forwarded_2
  222. IN CLUSTER non_dbz_data_multi_partition_fast_forwarded_2_cluster
  223. FROM KAFKA CONNECTION kafka_conn (START OFFSET=[1,0], TOPIC 'testdrive-non-dbz-data-multi-partition-${testdrive.seed}')
  224. > CREATE TABLE non_dbz_data_multi_partition_fast_forwarded_2_tbl FROM SOURCE non_dbz_data_multi_partition_fast_forwarded_2 (REFERENCE "testdrive-non-dbz-data-multi-partition-${testdrive.seed}")
  225. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  226. ENVELOPE NONE
  227. > SELECT * FROM non_dbz_data_multi_partition_fast_forwarded_2_tbl
  228. a b
  229. ----
  230. 4 1
  231. # Test an Avro source without a Debezium envelope with specified offsets and varying partition numbers.
  232. $ kafka-create-topic topic=non-dbz-data-varying-partition partitions=1
  233. $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz-schema} timestamp=1 partition=0
  234. {"a": 5, "b": 6}
  235. > CREATE CLUSTER non_dbz_data_varying_partition_cluster SIZE '${arg.default-storage-size}';
  236. > CREATE SOURCE non_dbz_data_varying_partition
  237. IN CLUSTER non_dbz_data_varying_partition_cluster
  238. FROM KAFKA CONNECTION kafka_conn (
  239. TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}',
  240. START OFFSET=[1]
  241. )
  242. > CREATE TABLE non_dbz_data_varying_partition_tbl FROM SOURCE non_dbz_data_varying_partition (REFERENCE "testdrive-non-dbz-data-varying-partition-${testdrive.seed}")
  243. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  244. ENVELOPE NONE
  245. > SELECT * FROM non_dbz_data_varying_partition_tbl
  246. $ kafka-add-partitions topic=non-dbz-data-varying-partition total-partitions=2
  247. # Reading data that's ingested to a new partition takes longer than the default timeout.
  248. $ set-sql-timeout duration=180s
  249. $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz-schema} timestamp=1 partition=1
  250. {"a": 7, "b": 8}
  251. {"a": 9, "b": 10}
  252. # Because the start offset for any new partitions will be 0, the first record sent to the new
  253. # partition will be included.
  254. > SELECT * FROM non_dbz_data_varying_partition_tbl
  255. a b
  256. -----
  257. 7 8
  258. 9 10
  259. > CREATE CLUSTER non_dbz_data_varying_partition_2_cluster SIZE '${arg.default-storage-size}';
  260. > CREATE SOURCE non_dbz_data_varying_partition_2
  261. IN CLUSTER non_dbz_data_varying_partition_2_cluster
  262. FROM KAFKA CONNECTION kafka_conn (
  263. TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}',
  264. START OFFSET=[1,1]
  265. )
  266. > CREATE TABLE non_dbz_data_varying_partition_2_tbl FROM SOURCE non_dbz_data_varying_partition_2 (REFERENCE "testdrive-non-dbz-data-varying-partition-${testdrive.seed}")
  267. FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
  268. ENVELOPE NONE
  269. $ kafka-add-partitions topic=non-dbz-data-varying-partition total-partitions=3
  270. $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz-schema} timestamp=1 partition=2
  271. {"a": 11, "b": 12}
  272. # Because the start offset for any new partitions will be 0, the first record sent to the new
  273. # partition will be included.
  274. > SELECT * FROM non_dbz_data_varying_partition_2_tbl
  275. a b
  276. -----
  277. 9 10
  278. 11 12
  279. $ set-sql-timeout duration=default
  280. # Source with new-style three-valued "snapshot".
  281. $ set new-dbz-schema={
  282. "type": "record",
  283. "name": "envelope",
  284. "fields": [
  285. {
  286. "name": "before",
  287. "type": [
  288. {
  289. "name": "row",
  290. "type": "record",
  291. "fields": [
  292. {"name": "a", "type": "long"},
  293. {"name": "b", "type": "long"}
  294. ]
  295. },
  296. "null"
  297. ]
  298. },
  299. { "name": "after", "type": ["row", "null"] },
  300. { "name": "op", "type": "string" },
  301. {
  302. "name": "source",
  303. "type": {
  304. "type": "record",
  305. "name": "Source",
  306. "namespace": "io.debezium.connector.mysql",
  307. "fields": [
  308. {
  309. "name": "snapshot",
  310. "type": [
  311. {
  312. "type": "string",
  313. "connect.version": 1,
  314. "connect.parameters": {
  315. "allowed": "true,last,false"
  316. },
  317. "connect.default": "false",
  318. "connect.name": "io.debezium.data.Enum"
  319. },
  320. "null"
  321. ],
  322. "default": "false"
  323. },
  324. {
  325. "name": "file",
  326. "type": "string"
  327. },
  328. {
  329. "name": "pos",
  330. "type": "long"
  331. },
  332. {
  333. "name": "row",
  334. "type": "int"
  335. }
  336. ],
  337. "connect.name": "io.debezium.connector.mysql.Source"
  338. }
  339. }
  340. ]
  341. }
  342. $ kafka-create-topic topic=new-dbz-data partitions=1
  343. # We don't do anything sensible yet for snapshot "true" or "last", so just test that those are ingested.
  344. # [btv] uncomment if we bring back classic debezium mode
  345. # $ kafka-ingest format=avro topic=new-dbz-data key-format=avro key-schema=${key-schema} schema=${new-dbz-schema} timestamp=1
  346. # {"a": 9} {"before": null, "after": {"row":{"a": 9, "b": 10}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"string": "true"}}, "op": "r"}
  347. # {"a": 11} {"before": null, "after": {"row":{"a": 11, "b": 11}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"string": "last"}}, "op": "r"}
  348. # {"a": 14} {"before": null, "after": {"row":{"a": 14, "b": 6}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": null}, "op": "c"}
  349. # {"a": 1} {"before": null, "after": {"row":{"a": 1, "b": 1}}, "source": {"file": "binlog", "pos": 0, "row": 0, "snapshot": {"string": "false"}}, "op": "c"}
  350. # {"a": 2} {"before": null, "after": {"row":{"a": 2, "b": 3}}, "source": {"file": "binlog", "pos": 1, "row": 0, "snapshot": {"string": "false"}}, "op": "c"}
  351. # {"a": -1} {"before": null, "after": {"row":{"a": -1, "b": 7}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"string": "false"}}, "op": "c"}
  352. # {"a": -1} {"before": null, "after": {"row":{"a": -1, "b": 7}}, "source": {"file": "binlog", "pos": 1, "row": 1, "snapshot": {"string": "false"}}, "op": "c"}
  353. # > CREATE SOURCE new_dbz
  354. # IN CLUSTER ${arg.single-replica-cluster}
  355. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-new-dbz-data-${testdrive.seed}')
  356. # > CREATE TABLE new_dbz_tbl FROM SOURCE new_dbz (REFERENCE "testdrive-new-dbz-data-${testdrive.seed}")
  357. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  358. # VALUE FORMAT AVRO USING SCHEMA '${new-dbz-schema}'
  359. # ENVELOPE DEBEZIUM
  360. # > SELECT * FROM new_dbz_tbl
  361. # a b
  362. # ---
  363. # 9 10
  364. # 11 11
  365. # 14 6
  366. # 2 3
  367. # -1 7
  368. $ kafka-create-topic topic=ignored partitions=1
  369. > CREATE SOURCE recursive
  370. IN CLUSTER ${arg.single-replica-cluster}
  371. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-ignored-${testdrive.seed}')
  372. ! CREATE TABLE recursive_tbl FROM SOURCE recursive (REFERENCE "testdrive-ignored-${testdrive.seed}")
  373. FORMAT AVRO USING SCHEMA '{"type":"record","name":"a","fields":[{"name":"f","type":["a","null"]}]}'
  374. contains:validating avro schema: Recursive types are not supported: .a
  375. $ set key-schema={"type": "string"}
  376. $ set value-schema={"type": "record", "name": "r", "fields": [{"name": "a", "type": "string"}]}
  377. $ kafka-create-topic topic=non-subset-key
  378. $ kafka-ingest format=avro topic=non-subset-key key-format=avro key-schema=${key-schema} schema=${value-schema}
  379. "asdf" {"a": "asdf"}
  380. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  381. URL '${testdrive.schema-registry-url}'
  382. );
  383. > CREATE CLUSTER non_subset_key_cluster SIZE '${arg.default-storage-size}';
  384. > CREATE SOURCE non_subset_key
  385. IN CLUSTER non_subset_key_cluster
  386. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-subset-key-${testdrive.seed}')
  387. > CREATE TABLE non_subset_key_tbl FROM SOURCE non_subset_key (REFERENCE "testdrive-non-subset-key-${testdrive.seed}")
  388. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  389. ENVELOPE NONE
  390. > SELECT * FROM non_subset_key_tbl
  391. a
  392. ---
  393. "asdf"
  394. # Test that Postgres-style sources can be ingested.
  395. $ set pg-dbz-schema={
  396. "type": "record",
  397. "name": "envelope",
  398. "fields": [
  399. {
  400. "name": "before",
  401. "type": [
  402. {
  403. "name": "row",
  404. "type": "record",
  405. "fields": [
  406. {"name": "a", "type": "long"},
  407. {"name": "b", "type": "long"}
  408. ]
  409. },
  410. "null"
  411. ]
  412. },
  413. { "name": "after", "type": ["row", "null"] },
  414. { "name": "op", "type": "string" },
  415. {
  416. "name": "source",
  417. "type": {
  418. "type": "record",
  419. "name": "Source",
  420. "namespace": "whatever",
  421. "fields": [
  422. {
  423. "name": "snapshot",
  424. "type": [
  425. {
  426. "type": "string",
  427. "connect.version": 1,
  428. "connect.parameters": {
  429. "allowed": "true,last,false"
  430. },
  431. "connect.default": "false",
  432. "connect.name": "io.debezium.data.Enum"
  433. },
  434. "null"
  435. ],
  436. "default": "false"
  437. },
  438. {
  439. "name": "lsn",
  440. "type": ["long", "null"]
  441. },
  442. {
  443. "name": "sequence",
  444. "type": ["string", "null"]
  445. }
  446. ]
  447. }
  448. }
  449. ]
  450. }
  451. # $ kafka-create-topic topic=pg-dbz-data partitions=1
  452. # # The third and fourth records will be skipped, since `sequence` has gone backwards.
  453. # $ kafka-ingest format=avro topic=pg-dbz-data schema=${pg-dbz-schema} timestamp=1
  454. # {"before": null, "after": {"row":{"a": 1, "b": 1}}, "source": {"lsn": {"long": 1}, "sequence": {"string": "[\"1\", \"1\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  455. # {"before": null, "after": {"row":{"a": 2, "b": 3}}, "source": {"lsn": {"long": 2}, "sequence": {"string": "[\"1\", \"2\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  456. # {"before": null, "after": {"row":{"a": -1, "b": 7}}, "source": {"lsn": {"long": 0}, "sequence": {"string": "[\"0\", \"1\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  457. # {"before": null, "after": {"row":{"a": 4, "b": 5}}, "source": {"lsn": {"long": 2}, "sequence": {"string": "[\"1\", \"2\"]"}, "snapshot": {"string": "false"}}, "op": "c"}
  458. # > CREATE SOURCE pg_dbz
  459. # IN CLUSTER ${arg.single-replica-cluster}
  460. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-pg-dbz-data-${testdrive.seed}')
  461. # > CREATE TABLE pg_dbz_tbl FROM SOURCE pg_dbz (REFERENCE "testdrive-pg-dbz-data-${testdrive.seed}")
  462. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  463. # VALUE FORMAT AVRO USING SCHEMA '${pg-dbz-schema}'
  464. # ENVELOPE DEBEZIUM
  465. # > SELECT * FROM pg_dbz_tbl
  466. # a b
  467. # ---
  468. # 1 1
  469. # 2 3
  470. # Test that SQL Server-style sources can be ingested.
  471. # $ set ms-dbz-schema={
  472. # "connect.name": "com.materialize.test.Envelope",
  473. # "fields": [
  474. # {
  475. # "default": null,
  476. # "name": "before",
  477. # "type": [
  478. # "null",
  479. # {
  480. # "connect.name": "com.materialize.test.Value",
  481. # "fields": [
  482. # {
  483. # "name": "a",
  484. # "type": "int"
  485. # },
  486. # {
  487. # "name": "b",
  488. # "type": "int"
  489. # }
  490. # ],
  491. # "name": "Value",
  492. # "type": "record"
  493. # }
  494. # ]
  495. # },
  496. # {
  497. # "default": null,
  498. # "name": "after",
  499. # "type": [
  500. # "null",
  501. # "Value"
  502. # ]
  503. # },
  504. # { "name": "op", "type": "string" },
  505. # {
  506. # "name": "source",
  507. # "type": {
  508. # "connect.name": "io.debezium.connector.sqlserver.Source",
  509. # "fields": [
  510. # {
  511. # "default": "false",
  512. # "name": "snapshot",
  513. # "type": [
  514. # {
  515. # "connect.default": "false",
  516. # "connect.name": "io.debezium.data.Enum",
  517. # "connect.parameters": {
  518. # "allowed": "true,last,false"
  519. # },
  520. # "connect.version": 1,
  521. # "type": "string"
  522. # },
  523. # "null"
  524. # ]
  525. # },
  526. # {
  527. # "default": null,
  528. # "name": "change_lsn",
  529. # "type": [
  530. # "null",
  531. # "string"
  532. # ]
  533. # },
  534. # {
  535. # "default": null,
  536. # "name": "sequence",
  537. # "type": [
  538. # "null",
  539. # "string"
  540. # ]
  541. # },
  542. # {
  543. # "default": null,
  544. # "name": "event_serial_no",
  545. # "type": [
  546. # "null",
  547. # "long"
  548. # ]
  549. # }
  550. # ],
  551. # "name": "Source",
  552. # "namespace": "io.debezium.connector.sqlserver",
  553. # "type": "record"
  554. # }
  555. # }
  556. # ],
  557. # "name": "Envelope",
  558. # "namespace": "com.materialize.test",
  559. # "type": "record"
  560. # }
  561. # $ kafka-create-topic topic=ms-dbz-data partitions=1
  562. # # The third record will be skipped, since `lsn` has gone backwards.
  563. # $ kafka-ingest format=avro topic=ms-dbz-data schema=${ms-dbz-schema} timestamp=1
  564. # {"before": null, "after": {"Value":{"a": 1, "b": 1}}, "source": {"change_lsn": {"string": "00000025:00000728:001b"}, "sequence": null, "event_serial_no": {"long": 1}, "snapshot": {"string": "false"}}, "op": "c"}
  565. # {"before": null, "after": {"Value":{"a": 2, "b": 3}}, "source": {"change_lsn": {"string": "00000025:00000728:001c"}, "sequence": null, "event_serial_no": {"long": 1}, "snapshot": {"string": "false"}}, "op": "c"}
  566. # {"before": null, "after": {"Value":{"a": -1, "b": 7}}, "source": {"change_lsn": {"string": "00000025:00000728:001a"}, "sequence": null, "event_serial_no": {"long": 1}, "snapshot": {"string": "false"}}, "op": "c"}
  567. # > CREATE SOURCE ms_dbz
  568. # IN CLUSTER ${arg.single-replica-cluster}
  569. # FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-ms-dbz-data-${testdrive.seed}')
  570. # > CREATE TABLE ms_dbz_tbl FROM SOURCE ms_dbz (REFERENCE "testdrive-ms-dbz-data-${testdrive.seed}")
  571. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  572. # VALUE FORMAT AVRO USING SCHEMA '${ms-dbz-schema}'
  573. # ENVELOPE DEBEZIUM
  574. # > SELECT * FROM ms_dbz_tbl
  575. # a b
  576. # ---
  577. # 1 1
  578. # 2 3
  579. # > CREATE SOURCE ms_dbz_uncommitted
  580. # IN CLUSTER ${arg.single-replica-cluster}
  581. # FROM KAFKA CONNECTION kafka_conn (ISOLATION LEVEL = 'read_uncommitted', TOPIC 'testdrive-ms-dbz-data-${testdrive.seed}')
  582. # > CREATE TABLE ms_dbz_uncommitted_tbl FROM SOURCE ms_dbz_uncommitted (REFERENCE "testdrive-ms-dbz-data-${testdrive.seed}")
  583. # KEY FORMAT AVRO USING SCHEMA '${key-schema}'
  584. # VALUE FORMAT AVRO USING SCHEMA '${ms-dbz-schema}'
  585. # ENVELOPE DEBEZIUM
  586. # > SELECT * FROM ms_dbz_uncommitted_tbl
  587. # a b
  588. # ---
  589. # 1 1
  590. # 2 3