kafka-time-offset.td 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. # Tests for `START TIMESTAMP` configuration which resolves a start offset
  14. # during creation of the source.
  15. #
  16. # Errors
  17. #
  18. $ kafka-create-topic topic=t0
  19. > CREATE CONNECTION kafka_conn
  20. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  21. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  22. URL '${testdrive.schema-registry-url}'
  23. );
  24. ! CREATE SOURCE missing_topic
  25. IN CLUSTER ${arg.single-replica-cluster}
  26. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=1, TOPIC 'missing_topic')
  27. contains:Topic does not exist
  28. ! CREATE SOURCE pick_one
  29. IN CLUSTER ${arg.single-replica-cluster}
  30. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=1, START OFFSET=[1], TOPIC 'testdrive-t0-${testdrive.seed}')
  31. contains:cannot specify START TIMESTAMP and START OFFSET at same time
  32. ! CREATE SOURCE not_a_number
  33. IN CLUSTER ${arg.single-replica-cluster}
  34. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP="not_a_number", TOPIC 'testdrive-t0-${testdrive.seed}')
  35. contains:invalid START TIMESTAMP: cannot use value as number
  36. #
  37. # Append-Only
  38. #
  39. $ kafka-create-topic topic=t1 partitions=3
  40. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=1 partition=0
  41. apple:apple
  42. banana:banana
  43. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=2 partition=1
  44. cherry:cherry
  45. date:date
  46. eggfruit:eggfruit
  47. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=3 partition=1
  48. fig:fig
  49. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=4 partition=2
  50. grape:grape
  51. > CREATE CLUSTER append_time_offset_0_cluster SIZE '${arg.default-storage-size}';
  52. > CREATE SOURCE append_time_offset_0
  53. IN CLUSTER append_time_offset_0_cluster
  54. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=0, TOPIC 'testdrive-t1-${testdrive.seed}')
  55. > CREATE TABLE append_time_offset_0_tbl FROM SOURCE append_time_offset_0 (REFERENCE "testdrive-t1-${testdrive.seed}")
  56. FORMAT TEXT
  57. INCLUDE OFFSET
  58. > CREATE CLUSTER append_time_offset_1_cluster SIZE '${arg.default-storage-size}';
  59. > CREATE SOURCE append_time_offset_1
  60. IN CLUSTER append_time_offset_1_cluster
  61. FROM KAFKA CONNECTION kafka_conn (
  62. START TIMESTAMP=1,
  63. TOPIC 'testdrive-t1-${testdrive.seed}'
  64. )
  65. > CREATE TABLE append_time_offset_1_tbl FROM SOURCE append_time_offset_1 (REFERENCE "testdrive-t1-${testdrive.seed}")
  66. FORMAT TEXT
  67. INCLUDE OFFSET
  68. > CREATE CLUSTER append_time_offset_2_cluster SIZE '${arg.default-storage-size}';
  69. > CREATE SOURCE append_time_offset_2
  70. IN CLUSTER append_time_offset_2_cluster
  71. FROM KAFKA CONNECTION kafka_conn (
  72. START TIMESTAMP=2,
  73. TOPIC 'testdrive-t1-${testdrive.seed}'
  74. )
  75. > CREATE TABLE append_time_offset_2_tbl FROM SOURCE append_time_offset_2 (REFERENCE "testdrive-t1-${testdrive.seed}")
  76. FORMAT TEXT
  77. INCLUDE OFFSET
  78. > CREATE CLUSTER append_time_offset_3_cluster SIZE '${arg.default-storage-size}';
  79. > CREATE SOURCE append_time_offset_3
  80. IN CLUSTER append_time_offset_3_cluster
  81. FROM KAFKA CONNECTION kafka_conn (
  82. START TIMESTAMP=3,
  83. TOPIC 'testdrive-t1-${testdrive.seed}'
  84. )
  85. > CREATE TABLE append_time_offset_3_tbl FROM SOURCE append_time_offset_3 (REFERENCE "testdrive-t1-${testdrive.seed}")
  86. FORMAT TEXT
  87. INCLUDE OFFSET
  88. > CREATE CLUSTER append_time_offset_4_cluster SIZE '${arg.default-storage-size}';
  89. > CREATE SOURCE append_time_offset_4
  90. IN CLUSTER append_time_offset_4_cluster
  91. FROM KAFKA CONNECTION kafka_conn (
  92. START TIMESTAMP=4,
  93. TOPIC 'testdrive-t1-${testdrive.seed}'
  94. )
  95. > CREATE TABLE append_time_offset_4_tbl FROM SOURCE append_time_offset_4 (REFERENCE "testdrive-t1-${testdrive.seed}")
  96. FORMAT TEXT
  97. INCLUDE OFFSET
  98. > CREATE CLUSTER append_time_offset_5_cluster SIZE '${arg.default-storage-size}';
  99. > CREATE SOURCE append_time_offset_5
  100. IN CLUSTER append_time_offset_5_cluster
  101. FROM KAFKA CONNECTION kafka_conn (
  102. START TIMESTAMP=5,
  103. TOPIC 'testdrive-t1-${testdrive.seed}'
  104. )
  105. > CREATE TABLE append_time_offset_5_tbl FROM SOURCE append_time_offset_5 (REFERENCE "testdrive-t1-${testdrive.seed}")
  106. FORMAT TEXT
  107. INCLUDE OFFSET
  108. > SELECT * FROM append_time_offset_0_tbl
  109. text offset
  110. -------------------
  111. apple 0
  112. banana 1
  113. cherry 0
  114. date 1
  115. eggfruit 2
  116. fig 3
  117. grape 0
  118. > SELECT * FROM append_time_offset_1_tbl
  119. text offset
  120. -------------------
  121. apple 0
  122. banana 1
  123. cherry 0
  124. date 1
  125. eggfruit 2
  126. fig 3
  127. grape 0
  128. > SELECT * FROM append_time_offset_2_tbl
  129. text offset
  130. -------------------
  131. cherry 0
  132. date 1
  133. eggfruit 2
  134. fig 3
  135. grape 0
  136. > SELECT * FROM append_time_offset_3_tbl
  137. text offset
  138. -------------------
  139. fig 3
  140. grape 0
  141. > SELECT * FROM append_time_offset_4_tbl
  142. text offset
  143. -------------------
  144. grape 0
  145. > SELECT * FROM append_time_offset_5_tbl
  146. text offset
  147. -------------------
  148. $ kafka-add-partitions topic=t1 total-partitions=4
  149. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=5 partition=3
  150. hazelnut:hazelnut
  151. $ set-sql-timeout duration=60s
  152. > SELECT * FROM append_time_offset_5_tbl
  153. text offset
  154. -------------------
  155. hazelnut 0
  156. #
  157. # Upsert
  158. #
  159. $ kafka-create-topic topic=t2 partitions=3
  160. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=1 partition=0
  161. apple:apple
  162. banana:banana
  163. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=1 partition=0
  164. apple:
  165. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=2 partition=1
  166. cherry:cherry
  167. date:date
  168. eggfruit:eggfruit
  169. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=3 partition=1
  170. cherry:
  171. fig:fig
  172. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=4 partition=2
  173. grape:grape
  174. > CREATE CLUSTER upsert_time_offset_0_cluster SIZE '${arg.default-storage-size}';
  175. > CREATE SOURCE upsert_time_offset_0
  176. IN CLUSTER upsert_time_offset_0_cluster
  177. FROM KAFKA CONNECTION kafka_conn (
  178. START TIMESTAMP=0,
  179. TOPIC 'testdrive-t2-${testdrive.seed}'
  180. )
  181. > CREATE TABLE upsert_time_offset_0_tbl FROM SOURCE upsert_time_offset_0 (REFERENCE "testdrive-t2-${testdrive.seed}")
  182. KEY FORMAT TEXT VALUE FORMAT TEXT
  183. INCLUDE OFFSET
  184. ENVELOPE UPSERT
  185. > CREATE CLUSTER upsert_time_offset_1_cluster SIZE '${arg.default-storage-size}';
  186. > CREATE SOURCE upsert_time_offset_1
  187. IN CLUSTER upsert_time_offset_1_cluster
  188. FROM KAFKA CONNECTION kafka_conn (
  189. START TIMESTAMP 1,
  190. TOPIC 'testdrive-t2-${testdrive.seed}'
  191. )
  192. > CREATE TABLE upsert_time_offset_1_tbl FROM SOURCE upsert_time_offset_1 (REFERENCE "testdrive-t2-${testdrive.seed}")
  193. KEY FORMAT TEXT VALUE FORMAT TEXT
  194. INCLUDE OFFSET
  195. ENVELOPE UPSERT
  196. > CREATE CLUSTER upsert_time_offset_2_cluster SIZE '${arg.default-storage-size}';
  197. > CREATE SOURCE upsert_time_offset_2
  198. IN CLUSTER upsert_time_offset_2_cluster
  199. FROM KAFKA CONNECTION kafka_conn (
  200. START TIMESTAMP 2,
  201. TOPIC 'testdrive-t2-${testdrive.seed}'
  202. )
  203. > CREATE TABLE upsert_time_offset_2_tbl FROM SOURCE upsert_time_offset_2 (REFERENCE "testdrive-t2-${testdrive.seed}")
  204. KEY FORMAT TEXT VALUE FORMAT TEXT
  205. INCLUDE OFFSET
  206. ENVELOPE UPSERT
  207. > CREATE CLUSTER upsert_time_offset_3_cluster SIZE '${arg.default-storage-size}';
  208. > CREATE SOURCE upsert_time_offset_3
  209. IN CLUSTER upsert_time_offset_3_cluster
  210. FROM KAFKA CONNECTION kafka_conn (
  211. START TIMESTAMP 3,
  212. TOPIC 'testdrive-t2-${testdrive.seed}'
  213. )
  214. > CREATE TABLE upsert_time_offset_3_tbl FROM SOURCE upsert_time_offset_3 (REFERENCE "testdrive-t2-${testdrive.seed}")
  215. KEY FORMAT TEXT VALUE FORMAT TEXT
  216. INCLUDE OFFSET
  217. ENVELOPE UPSERT
  218. > CREATE CLUSTER upsert_time_offset_4_cluster SIZE '${arg.default-storage-size}';
  219. > CREATE SOURCE upsert_time_offset_4
  220. IN CLUSTER upsert_time_offset_4_cluster
  221. FROM KAFKA CONNECTION kafka_conn (
  222. START TIMESTAMP 4,
  223. TOPIC 'testdrive-t2-${testdrive.seed}'
  224. )
  225. > CREATE TABLE upsert_time_offset_4_tbl FROM SOURCE upsert_time_offset_4 (REFERENCE "testdrive-t2-${testdrive.seed}")
  226. KEY FORMAT TEXT VALUE FORMAT TEXT
  227. INCLUDE OFFSET
  228. ENVELOPE UPSERT
  229. > CREATE CLUSTER upsert_time_offset_5_cluster SIZE '${arg.default-storage-size}';
  230. > CREATE SOURCE upsert_time_offset_5
  231. IN CLUSTER upsert_time_offset_5_cluster
  232. FROM KAFKA CONNECTION kafka_conn (
  233. START TIMESTAMP 5,
  234. TOPIC 'testdrive-t2-${testdrive.seed}'
  235. )
  236. > CREATE TABLE upsert_time_offset_5_tbl FROM SOURCE upsert_time_offset_5 (REFERENCE "testdrive-t2-${testdrive.seed}")
  237. KEY FORMAT TEXT VALUE FORMAT TEXT
  238. INCLUDE OFFSET
  239. ENVELOPE UPSERT
  240. > SELECT * FROM upsert_time_offset_0_tbl
  241. key text offset
  242. -----------------------------
  243. banana banana 1
  244. date date 1
  245. eggfruit eggfruit 2
  246. fig fig 4
  247. grape grape 0
  248. > SELECT * FROM upsert_time_offset_1_tbl
  249. key text offset
  250. -----------------------------
  251. banana banana 1
  252. date date 1
  253. eggfruit eggfruit 2
  254. fig fig 4
  255. grape grape 0
  256. > SELECT * FROM upsert_time_offset_2_tbl
  257. key text offset
  258. -----------------------------
  259. date date 1
  260. eggfruit eggfruit 2
  261. fig fig 4
  262. grape grape 0
  263. > SELECT * FROM upsert_time_offset_3_tbl
  264. key text offset
  265. -----------------------------
  266. fig fig 4
  267. grape grape 0
  268. > SELECT * FROM upsert_time_offset_4_tbl
  269. key text offset
  270. -----------------------------
  271. grape grape 0
  272. > SELECT * FROM upsert_time_offset_5_tbl
  273. key text offset
  274. -----------------------------
  275. $ kafka-add-partitions topic=t2 total-partitions=4
  276. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=5 partition=3
  277. hazelnut:hazelnut
  278. # It takes a while for new partitions to be consumed...
  279. $ set-sql-timeout duration=60s
  280. > SELECT * FROM upsert_time_offset_5_tbl
  281. key text offset
  282. -----------------------------
  283. hazelnut hazelnut 0
  284. #
  285. # Relative timestamps
  286. #
  287. # These tests are mainly meant as smoke tests. We can't do good tests currently,
  288. # because we cannot control system time in tests.
  289. #
  290. $ kafka-create-topic topic=t3 partitions=1
  291. $ kafka-ingest format=bytes topic=t3 timestamp=1
  292. apple
  293. # Timestamp for June 2021
  294. $ kafka-ingest format=bytes topic=t3 timestamp=1622666300000
  295. banana
  296. # Timestamp for June 2099
  297. $ kafka-ingest format=bytes topic=t3 timestamp=4084108700000
  298. cherry
  299. > CREATE CLUSTER relative_time_offset_30_years_ago_cluster SIZE '${arg.default-storage-size}';
  300. > CREATE SOURCE relative_time_offset_30_years_ago
  301. IN CLUSTER relative_time_offset_30_years_ago_cluster
  302. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=-946100000000, TOPIC 'testdrive-t3-${testdrive.seed}')
  303. > CREATE TABLE relative_time_offset_30_years_ago_tbl FROM SOURCE relative_time_offset_30_years_ago (REFERENCE "testdrive-t3-${testdrive.seed}")
  304. FORMAT TEXT
  305. INCLUDE OFFSET
  306. > CREATE CLUSTER relative_time_offset_today_cluster SIZE '${arg.default-storage-size}';
  307. > CREATE SOURCE relative_time_offset_today
  308. IN CLUSTER relative_time_offset_today_cluster
  309. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=-1, TOPIC 'testdrive-t3-${testdrive.seed}')
  310. > CREATE TABLE relative_time_offset_today_tbl FROM SOURCE relative_time_offset_today (REFERENCE "testdrive-t3-${testdrive.seed}")
  311. FORMAT TEXT
  312. INCLUDE OFFSET
  313. > SELECT * FROM relative_time_offset_30_years_ago_tbl
  314. text offset
  315. -------------------
  316. banana 1
  317. cherry 2
  318. > SELECT * FROM relative_time_offset_today_tbl
  319. text offset
  320. -------------------
  321. cherry 2
  322. # Make sure that we don't fetch any messages that we don't want to fetch
  323. $ kafka-create-topic topic=t4 partitions=1
  324. $ kafka-ingest format=bytes topic=t4 timestamp=1
  325. apple
  326. pie
  327. # A time offset of -1 specifies that we want to start from the end of the topic
  328. # (negative offsets are relative from the end).
  329. > CREATE CLUSTER verify_no_fetch_cluster SIZE '${arg.default-storage-size}';
  330. > CREATE SOURCE verify_no_fetch
  331. IN CLUSTER verify_no_fetch_cluster
  332. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=-1, TOPIC 'testdrive-t4-${testdrive.seed}')
  333. > CREATE TABLE verify_no_fetch_tbl FROM SOURCE verify_no_fetch (REFERENCE "testdrive-t4-${testdrive.seed}")
  334. FORMAT TEXT
  335. INCLUDE OFFSET
  336. #
  337. # UPSERT + AVRO
  338. #
  339. $ set keyschema={
  340. "type": "record",
  341. "name": "Key",
  342. "fields": [
  343. {"name": "id", "type": "long"}
  344. ]
  345. }
  346. $ set schema={
  347. "type" : "record",
  348. "name" : "envelope",
  349. "fields" : [
  350. {
  351. "name": "before",
  352. "type": [
  353. {
  354. "name": "row",
  355. "type": "record",
  356. "fields": [
  357. {
  358. "name": "id",
  359. "type": "long"
  360. },
  361. {
  362. "name": "creature",
  363. "type": "string"
  364. }]
  365. },
  366. "null"
  367. ]
  368. },
  369. {
  370. "name": "after",
  371. "type": ["row", "null"]
  372. }
  373. ]
  374. }
  375. $ kafka-create-topic topic=dbzupsert
  376. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  377. {"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}}
  378. {"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}}
  379. {"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}}
  380. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=2
  381. {"id": 1} {"before": {"row": {"id": 1, "creature": "lizard"}}, "after": {"row": {"id": 1, "creature": "dino"}}}
  382. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=3
  383. {"id": 2} {"before": null, "after": {"row": {"id": 2, "creature": "archeopteryx"}}}
  384. {"id": 2} {"before": {"row": {"id": 2, "creature": "archeopteryx"}}, "after": {"row": {"id": 2, "creature": "velociraptor"}}}
  385. # test duplicates
  386. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=4
  387. {"id": 3} {"before": {"row": {"id": 3, "creature": "protoceratops"}}, "after": {"row": {"id": 3, "creature": "triceratops"}}}
  388. {"id": 3} {"before": {"row": {"id": 3, "creature": "protoceratops"}}, "after": {"row": {"id": 3, "creature": "triceratops"}}}
  389. # test removal and reinsertion
  390. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=5
  391. {"id": 4} {"before": null, "after": {"row": {"id": 4, "creature": "moros"}}}
  392. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=6
  393. {"id": 4} {"before": {"row": {"id": 4, "creature": "trex"}}, "after": null}
  394. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=7
  395. {"id": 4} {"before": {"row": {"id": 4, "creature": "trex"}}, "after": {"row": {"id": 4, "creature": "chicken"}}}
  396. > CREATE CLUSTER upsert_time_skip_cluster SIZE '${arg.default-storage-size}';
  397. > CREATE SOURCE upsert_time_skip
  398. IN CLUSTER upsert_time_skip_cluster
  399. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=6, TOPIC 'testdrive-dbzupsert-${testdrive.seed}')
  400. > CREATE TABLE upsert_time_skip_tbl FROM SOURCE upsert_time_skip (REFERENCE "testdrive-dbzupsert-${testdrive.seed}")
  401. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  402. ENVELOPE DEBEZIUM
  403. > SELECT * FROM upsert_time_skip_tbl
  404. id creature
  405. -----------
  406. 4 chicken