kafka-time-offset.td 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  12. ALTER SYSTEM SET max_clusters = 20
  13. # Tests for `START TIMESTAMP` configuration which resolves a start offset
  14. # during creation of the source.
  15. #
  16. # Errors
  17. #
  18. $ kafka-create-topic topic=t0
  19. > CREATE CONNECTION kafka_conn
  20. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  21. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  22. URL '${testdrive.schema-registry-url}'
  23. );
  24. ! CREATE SOURCE missing_topic
  25. IN CLUSTER ${arg.single-replica-cluster}
  26. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=1, TOPIC 'missing_topic')
  27. FORMAT TEXT
  28. INCLUDE OFFSET
  29. contains:Topic does not exist
  30. ! CREATE SOURCE pick_one
  31. IN CLUSTER ${arg.single-replica-cluster}
  32. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=1, START OFFSET=[1], TOPIC 'testdrive-t0-${testdrive.seed}')
  33. FORMAT TEXT
  34. INCLUDE OFFSET
  35. contains:cannot specify START TIMESTAMP and START OFFSET at same time
  36. ! CREATE SOURCE not_a_number
  37. IN CLUSTER ${arg.single-replica-cluster}
  38. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP="not_a_number", TOPIC 'testdrive-t0-${testdrive.seed}')
  39. FORMAT TEXT
  40. INCLUDE OFFSET
  41. contains:invalid START TIMESTAMP: cannot use value as number
  42. #
  43. # Append-Only
  44. #
  45. $ kafka-create-topic topic=t1 partitions=3
  46. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=1 partition=0
  47. apple:apple
  48. banana:banana
  49. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=2 partition=1
  50. cherry:cherry
  51. date:date
  52. eggfruit:eggfruit
  53. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=3 partition=1
  54. fig:fig
  55. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=4 partition=2
  56. grape:grape
  57. > CREATE CLUSTER append_time_offset_0_cluster SIZE '${arg.default-storage-size}';
  58. > CREATE SOURCE append_time_offset_0
  59. IN CLUSTER append_time_offset_0_cluster
  60. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=0, TOPIC 'testdrive-t1-${testdrive.seed}')
  61. FORMAT TEXT
  62. INCLUDE OFFSET
  63. > CREATE CLUSTER append_time_offset_1_cluster SIZE '${arg.default-storage-size}';
  64. > CREATE SOURCE append_time_offset_1
  65. IN CLUSTER append_time_offset_1_cluster
  66. FROM KAFKA CONNECTION kafka_conn (
  67. START TIMESTAMP=1,
  68. TOPIC 'testdrive-t1-${testdrive.seed}'
  69. )
  70. FORMAT TEXT
  71. INCLUDE OFFSET
  72. > CREATE CLUSTER append_time_offset_2_cluster SIZE '${arg.default-storage-size}';
  73. > CREATE SOURCE append_time_offset_2
  74. IN CLUSTER append_time_offset_2_cluster
  75. FROM KAFKA CONNECTION kafka_conn (
  76. START TIMESTAMP=2,
  77. TOPIC 'testdrive-t1-${testdrive.seed}'
  78. )
  79. FORMAT TEXT
  80. INCLUDE OFFSET
  81. > CREATE CLUSTER append_time_offset_3_cluster SIZE '${arg.default-storage-size}';
  82. > CREATE SOURCE append_time_offset_3
  83. IN CLUSTER append_time_offset_3_cluster
  84. FROM KAFKA CONNECTION kafka_conn (
  85. START TIMESTAMP=3,
  86. TOPIC 'testdrive-t1-${testdrive.seed}'
  87. )
  88. FORMAT TEXT
  89. INCLUDE OFFSET
  90. > CREATE CLUSTER append_time_offset_4_cluster SIZE '${arg.default-storage-size}';
  91. > CREATE SOURCE append_time_offset_4
  92. IN CLUSTER append_time_offset_4_cluster
  93. FROM KAFKA CONNECTION kafka_conn (
  94. START TIMESTAMP=4,
  95. TOPIC 'testdrive-t1-${testdrive.seed}'
  96. )
  97. FORMAT TEXT
  98. INCLUDE OFFSET
  99. > CREATE CLUSTER append_time_offset_5_cluster SIZE '${arg.default-storage-size}';
  100. > CREATE SOURCE append_time_offset_5
  101. IN CLUSTER append_time_offset_5_cluster
  102. FROM KAFKA CONNECTION kafka_conn (
  103. START TIMESTAMP=5,
  104. TOPIC 'testdrive-t1-${testdrive.seed}'
  105. )
  106. FORMAT TEXT
  107. INCLUDE OFFSET
  108. > SELECT * FROM append_time_offset_0
  109. text offset
  110. -------------------
  111. apple 0
  112. banana 1
  113. cherry 0
  114. date 1
  115. eggfruit 2
  116. fig 3
  117. grape 0
  118. > SELECT * FROM append_time_offset_1
  119. text offset
  120. -------------------
  121. apple 0
  122. banana 1
  123. cherry 0
  124. date 1
  125. eggfruit 2
  126. fig 3
  127. grape 0
  128. > SELECT * FROM append_time_offset_2
  129. text offset
  130. -------------------
  131. cherry 0
  132. date 1
  133. eggfruit 2
  134. fig 3
  135. grape 0
  136. > SELECT * FROM append_time_offset_3
  137. text offset
  138. -------------------
  139. fig 3
  140. grape 0
  141. > SELECT * FROM append_time_offset_4
  142. text offset
  143. -------------------
  144. grape 0
  145. > SELECT * FROM append_time_offset_5
  146. text offset
  147. -------------------
  148. $ kafka-add-partitions topic=t1 total-partitions=4
  149. $ kafka-ingest format=bytes topic=t1 key-format=bytes key-terminator=: timestamp=5 partition=3
  150. hazelnut:hazelnut
  151. $ set-sql-timeout duration=60s
  152. > SELECT * FROM append_time_offset_5
  153. text offset
  154. -------------------
  155. hazelnut 0
  156. #
  157. # Upsert
  158. #
  159. $ kafka-create-topic topic=t2 partitions=3
  160. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=1 partition=0
  161. apple:apple
  162. banana:banana
  163. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=1 partition=0
  164. apple:
  165. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=2 partition=1
  166. cherry:cherry
  167. date:date
  168. eggfruit:eggfruit
  169. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=3 partition=1
  170. cherry:
  171. fig:fig
  172. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=4 partition=2
  173. grape:grape
  174. > CREATE CLUSTER upsert_time_offset_0_cluster SIZE '${arg.default-storage-size}';
  175. > CREATE SOURCE upsert_time_offset_0
  176. IN CLUSTER upsert_time_offset_0_cluster
  177. FROM KAFKA CONNECTION kafka_conn (
  178. START TIMESTAMP=0,
  179. TOPIC 'testdrive-t2-${testdrive.seed}'
  180. )
  181. KEY FORMAT TEXT VALUE FORMAT TEXT
  182. INCLUDE OFFSET
  183. ENVELOPE UPSERT
  184. > CREATE CLUSTER upsert_time_offset_1_cluster SIZE '${arg.default-storage-size}';
  185. > CREATE SOURCE upsert_time_offset_1
  186. IN CLUSTER upsert_time_offset_1_cluster
  187. FROM KAFKA CONNECTION kafka_conn (
  188. START TIMESTAMP 1,
  189. TOPIC 'testdrive-t2-${testdrive.seed}'
  190. )
  191. KEY FORMAT TEXT VALUE FORMAT TEXT
  192. INCLUDE OFFSET
  193. ENVELOPE UPSERT
  194. > CREATE CLUSTER upsert_time_offset_2_cluster SIZE '${arg.default-storage-size}';
  195. > CREATE SOURCE upsert_time_offset_2
  196. IN CLUSTER upsert_time_offset_2_cluster
  197. FROM KAFKA CONNECTION kafka_conn (
  198. START TIMESTAMP 2,
  199. TOPIC 'testdrive-t2-${testdrive.seed}'
  200. )
  201. KEY FORMAT TEXT VALUE FORMAT TEXT
  202. INCLUDE OFFSET
  203. ENVELOPE UPSERT
  204. > CREATE CLUSTER upsert_time_offset_3_cluster SIZE '${arg.default-storage-size}';
  205. > CREATE SOURCE upsert_time_offset_3
  206. IN CLUSTER upsert_time_offset_3_cluster
  207. FROM KAFKA CONNECTION kafka_conn (
  208. START TIMESTAMP 3,
  209. TOPIC 'testdrive-t2-${testdrive.seed}'
  210. )
  211. KEY FORMAT TEXT VALUE FORMAT TEXT
  212. INCLUDE OFFSET
  213. ENVELOPE UPSERT
  214. > CREATE CLUSTER upsert_time_offset_4_cluster SIZE '${arg.default-storage-size}';
  215. > CREATE SOURCE upsert_time_offset_4
  216. IN CLUSTER upsert_time_offset_4_cluster
  217. FROM KAFKA CONNECTION kafka_conn (
  218. START TIMESTAMP 4,
  219. TOPIC 'testdrive-t2-${testdrive.seed}'
  220. )
  221. KEY FORMAT TEXT VALUE FORMAT TEXT
  222. INCLUDE OFFSET
  223. ENVELOPE UPSERT
  224. > CREATE CLUSTER upsert_time_offset_5_cluster SIZE '${arg.default-storage-size}';
  225. > CREATE SOURCE upsert_time_offset_5
  226. IN CLUSTER upsert_time_offset_5_cluster
  227. FROM KAFKA CONNECTION kafka_conn (
  228. START TIMESTAMP 5,
  229. TOPIC 'testdrive-t2-${testdrive.seed}'
  230. )
  231. KEY FORMAT TEXT VALUE FORMAT TEXT
  232. INCLUDE OFFSET
  233. ENVELOPE UPSERT
  234. > SELECT * FROM upsert_time_offset_0
  235. key text offset
  236. -----------------------------
  237. banana banana 1
  238. date date 1
  239. eggfruit eggfruit 2
  240. fig fig 4
  241. grape grape 0
  242. > SELECT * FROM upsert_time_offset_1
  243. key text offset
  244. -----------------------------
  245. banana banana 1
  246. date date 1
  247. eggfruit eggfruit 2
  248. fig fig 4
  249. grape grape 0
  250. > SELECT * FROM upsert_time_offset_2
  251. key text offset
  252. -----------------------------
  253. date date 1
  254. eggfruit eggfruit 2
  255. fig fig 4
  256. grape grape 0
  257. > SELECT * FROM upsert_time_offset_3
  258. key text offset
  259. -----------------------------
  260. fig fig 4
  261. grape grape 0
  262. > SELECT * FROM upsert_time_offset_4
  263. key text offset
  264. -----------------------------
  265. grape grape 0
  266. > SELECT * FROM upsert_time_offset_5
  267. key text offset
  268. -----------------------------
  269. $ kafka-add-partitions topic=t2 total-partitions=4
  270. $ kafka-ingest format=bytes topic=t2 key-format=bytes key-terminator=: timestamp=5 partition=3
  271. hazelnut:hazelnut
  272. # It takes a while for new partitions to be consumed...
  273. $ set-sql-timeout duration=60s
  274. > SELECT * FROM upsert_time_offset_5
  275. key text offset
  276. -----------------------------
  277. hazelnut hazelnut 0
  278. #
  279. # Relative timestamps
  280. #
  281. # These tests are mainly meant as smoke tests. We can't do good tests currently,
  282. # because we cannot control system time in tests.
  283. #
  284. $ kafka-create-topic topic=t3 partitions=1
  285. $ kafka-ingest format=bytes topic=t3 timestamp=1
  286. apple
  287. # Timestamp for June 2021
  288. $ kafka-ingest format=bytes topic=t3 timestamp=1622666300000
  289. banana
  290. # Timestamp for June 2099
  291. $ kafka-ingest format=bytes topic=t3 timestamp=4084108700000
  292. cherry
  293. > CREATE CLUSTER relative_time_offset_30_years_ago_cluster SIZE '${arg.default-storage-size}';
  294. > CREATE SOURCE relative_time_offset_30_years_ago
  295. IN CLUSTER relative_time_offset_30_years_ago_cluster
  296. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=-946100000000, TOPIC 'testdrive-t3-${testdrive.seed}')
  297. FORMAT TEXT
  298. INCLUDE OFFSET
  299. > CREATE CLUSTER relative_time_offset_today_cluster SIZE '${arg.default-storage-size}';
  300. > CREATE SOURCE relative_time_offset_today
  301. IN CLUSTER relative_time_offset_today_cluster
  302. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=-1, TOPIC 'testdrive-t3-${testdrive.seed}')
  303. FORMAT TEXT
  304. INCLUDE OFFSET
  305. > SELECT * FROM relative_time_offset_30_years_ago
  306. text offset
  307. -------------------
  308. banana 1
  309. cherry 2
  310. > SELECT * FROM relative_time_offset_today
  311. text offset
  312. -------------------
  313. cherry 2
  314. # Make sure that we don't fetch any messages that we don't want to fetch
  315. $ kafka-create-topic topic=t4 partitions=1
  316. $ kafka-ingest format=bytes topic=t4 timestamp=1
  317. apple
  318. pie
  319. # A time offset of -1 specifies that we want to start from the end of the topic
  320. # (negative offsets are relative from the end).
  321. > CREATE CLUSTER verify_no_fetch_cluster SIZE '${arg.default-storage-size}';
  322. > CREATE SOURCE verify_no_fetch
  323. IN CLUSTER verify_no_fetch_cluster
  324. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=-1, TOPIC 'testdrive-t4-${testdrive.seed}')
  325. FORMAT TEXT
  326. INCLUDE OFFSET
  327. #
  328. # UPSERT + AVRO
  329. #
  330. $ set keyschema={
  331. "type": "record",
  332. "name": "Key",
  333. "fields": [
  334. {"name": "id", "type": "long"}
  335. ]
  336. }
  337. $ set schema={
  338. "type" : "record",
  339. "name" : "envelope",
  340. "fields" : [
  341. {
  342. "name": "before",
  343. "type": [
  344. {
  345. "name": "row",
  346. "type": "record",
  347. "fields": [
  348. {
  349. "name": "id",
  350. "type": "long"
  351. },
  352. {
  353. "name": "creature",
  354. "type": "string"
  355. }]
  356. },
  357. "null"
  358. ]
  359. },
  360. {
  361. "name": "after",
  362. "type": ["row", "null"]
  363. }
  364. ]
  365. }
  366. $ kafka-create-topic topic=dbzupsert
  367. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  368. {"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}}
  369. {"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}}
  370. {"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}}
  371. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=2
  372. {"id": 1} {"before": {"row": {"id": 1, "creature": "lizard"}}, "after": {"row": {"id": 1, "creature": "dino"}}}
  373. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=3
  374. {"id": 2} {"before": null, "after": {"row": {"id": 2, "creature": "archeopteryx"}}}
  375. {"id": 2} {"before": {"row": {"id": 2, "creature": "archeopteryx"}}, "after": {"row": {"id": 2, "creature": "velociraptor"}}}
  376. # test duplicates
  377. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=4
  378. {"id": 3} {"before": {"row": {"id": 3, "creature": "protoceratops"}}, "after": {"row": {"id": 3, "creature": "triceratops"}}}
  379. {"id": 3} {"before": {"row": {"id": 3, "creature": "protoceratops"}}, "after": {"row": {"id": 3, "creature": "triceratops"}}}
  380. # test removal and reinsertion
  381. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=5
  382. {"id": 4} {"before": null, "after": {"row": {"id": 4, "creature": "moros"}}}
  383. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=6
  384. {"id": 4} {"before": {"row": {"id": 4, "creature": "trex"}}, "after": null}
  385. $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschema} schema=${schema} timestamp=7
  386. {"id": 4} {"before": {"row": {"id": 4, "creature": "trex"}}, "after": {"row": {"id": 4, "creature": "chicken"}}}
  387. > CREATE CLUSTER upsert_time_skip_cluster SIZE '${arg.default-storage-size}';
  388. > CREATE SOURCE upsert_time_skip
  389. IN CLUSTER upsert_time_skip_cluster
  390. FROM KAFKA CONNECTION kafka_conn (START TIMESTAMP=6, TOPIC 'testdrive-dbzupsert-${testdrive.seed}')
  391. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  392. ENVELOPE DEBEZIUM
  393. > SELECT * FROM upsert_time_skip
  394. id creature
  395. -----------
  396. 4 chicken