retain_history.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import re
  10. from textwrap import dedent
  11. from materialize.checks.actions import Testdrive
  12. from materialize.checks.checks import Check, disabled
  13. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  14. # This duration needs to be long enough for running all scenarios and the CI build!
  15. RETAIN_HISTORY_DURATION = "60m"
  16. def schemas() -> str:
  17. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  18. @disabled(
  19. "database-issues#7310 and compaction not predicable and now() not appropriate while mz_now() not applicable"
  20. )
  21. class RetainHistoryOnMv(Check):
  22. def initialize(self) -> Testdrive:
  23. return Testdrive(
  24. dedent(
  25. f"""
  26. > CREATE TABLE time_for_mv (time_index INT, t TIMESTAMP);
  27. # Give it some time
  28. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  29. > INSERT INTO time_for_mv VALUES (0, now());
  30. > CREATE TABLE retain_history_table (key INT, value INT);
  31. # Give it some time
  32. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  33. > INSERT INTO time_for_mv VALUES (1, now());
  34. > INSERT INTO retain_history_table VALUES (1, 100), (2, 200), (3, 300);
  35. # Give it some time
  36. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  37. > INSERT INTO time_for_mv VALUES (2, now());
  38. > CREATE MATERIALIZED VIEW retain_history_mv1 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS
  39. SELECT * FROM retain_history_table;
  40. > SELECT count(*) FROM retain_history_mv1;
  41. 3
  42. # Give it some time
  43. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  44. > INSERT INTO time_for_mv VALUES (3, now());
  45. """
  46. )
  47. )
  48. def manipulate(self) -> list[Testdrive]:
  49. return [
  50. Testdrive(dedent(s))
  51. for s in [
  52. f"""
  53. > UPDATE retain_history_table SET value = value + 10 WHERE key = 1;
  54. # Give it some time
  55. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  56. > INSERT INTO time_for_mv VALUES (4, now());
  57. > INSERT INTO retain_history_table VALUES (4, 400);
  58. # Give it some time
  59. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  60. > INSERT INTO time_for_mv VALUES (5, now());
  61. > DELETE FROM retain_history_table WHERE key = 3;
  62. # Give it some time
  63. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  64. > INSERT INTO time_for_mv VALUES (6, now());
  65. > CREATE MATERIALIZED VIEW retain_history_mv2 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS
  66. SELECT * FROM retain_history_table;
  67. # Give it some time
  68. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  69. > INSERT INTO time_for_mv VALUES (7, now());
  70. """,
  71. f"""
  72. > CREATE MATERIALIZED VIEW retain_history_mv3 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS
  73. SELECT * FROM retain_history_mv2;
  74. # Give it some time
  75. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  76. > INSERT INTO time_for_mv VALUES (8, now());
  77. > UPDATE retain_history_table SET value = value + 1;
  78. # Give it some time
  79. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  80. > INSERT INTO time_for_mv VALUES (9, now());
  81. > INSERT INTO retain_history_table VALUES (5, 500);
  82. # Give it some time
  83. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  84. > INSERT INTO time_for_mv VALUES (10, now());
  85. > DELETE FROM retain_history_table WHERE key = 4;
  86. # Give it some time
  87. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  88. > INSERT INTO time_for_mv VALUES (11, now());
  89. """,
  90. ]
  91. ]
  92. def validate(self) -> Testdrive:
  93. time_definitions = """
  94. $ set-from-sql var=time0
  95. SELECT t::STRING FROM time_for_mv WHERE time_index = 0
  96. $ set-from-sql var=time1
  97. SELECT t::STRING FROM time_for_mv WHERE time_index = 1
  98. $ set-from-sql var=time2
  99. SELECT t::STRING FROM time_for_mv WHERE time_index = 2
  100. $ set-from-sql var=time3
  101. SELECT t::STRING FROM time_for_mv WHERE time_index = 3
  102. $ set-from-sql var=time4
  103. SELECT t::STRING FROM time_for_mv WHERE time_index = 4
  104. $ set-from-sql var=time5
  105. SELECT t::STRING FROM time_for_mv WHERE time_index = 5
  106. $ set-from-sql var=time6
  107. SELECT t::STRING FROM time_for_mv WHERE time_index = 6
  108. $ set-from-sql var=time7
  109. SELECT t::STRING FROM time_for_mv WHERE time_index = 7
  110. $ set-from-sql var=time8
  111. SELECT t::STRING FROM time_for_mv WHERE time_index = 8
  112. $ set-from-sql var=time9
  113. SELECT t::STRING FROM time_for_mv WHERE time_index = 9
  114. $ set-from-sql var=time10
  115. SELECT t::STRING FROM time_for_mv WHERE time_index = 10
  116. $ set-from-sql var=time11
  117. SELECT t::STRING FROM time_for_mv WHERE time_index = 11
  118. """
  119. content_validations = "\n".join(
  120. f"""
  121. ! SELECT * FROM {mv_name} AS OF '${{time0}}'::TIMESTAMP; -- time0 (nothing exists)
  122. contains: is not valid for all inputs
  123. ! SELECT count(*) FROM {mv_name} AS OF '${{time1}}'::TIMESTAMP; -- time1 (table created)
  124. contains: is not valid for all inputs
  125. > SELECT * FROM {mv_name} AS OF '${{time2}}'::TIMESTAMP; -- time2 (table populated)
  126. 1 100
  127. 2 200
  128. 3 300
  129. > SELECT * FROM {mv_name} AS OF '${{time3}}'::TIMESTAMP; -- time3 (mv1 created)
  130. 1 100
  131. 2 200
  132. 3 300
  133. > SELECT * FROM {mv_name} AS OF '${{time4}}'::TIMESTAMP; -- time4 (table updated in manipulate#1)
  134. 1 110
  135. 2 200
  136. 3 300
  137. > SELECT * FROM {mv_name} AS OF '${{time5}}'::TIMESTAMP; -- time5 (table updated in manipulate#1)
  138. 1 110
  139. 2 200
  140. 3 300
  141. 4 400
  142. > SELECT * FROM {mv_name} AS OF '${{time6}}'::TIMESTAMP; -- time6 (table updated in manipulate#1)
  143. 1 110
  144. 2 200
  145. 4 400
  146. > SELECT * FROM {mv_name} AS OF '${{time7}}'::TIMESTAMP; -- time7 (mv2 created in manipulate#1)
  147. 1 110
  148. 2 200
  149. 4 400
  150. > SELECT * FROM {mv_name} AS OF '${{time8}}'::TIMESTAMP; -- time8 (mv3 created in manipulate#2)
  151. 1 110
  152. 2 200
  153. 4 400
  154. > SELECT * FROM {mv_name} AS OF '${{time9}}'::TIMESTAMP; -- time9 (table updated in manipulate#2)
  155. 1 111
  156. 2 201
  157. 4 401
  158. > SELECT * FROM {mv_name} AS OF '${{time10}}'::TIMESTAMP; -- time10 (table updated in manipulate#2)
  159. 1 111
  160. 2 201
  161. 4 401
  162. 5 500
  163. > SELECT * FROM {mv_name} AS OF '${{time11}}'::TIMESTAMP; -- time11 (table updated in manipulate#2)
  164. 1 111
  165. 2 201
  166. 5 500
  167. > SELECT * FROM {mv_name};
  168. 1 111
  169. 2 201
  170. 5 500
  171. """
  172. for mv_name in [
  173. "retain_history_mv1",
  174. "retain_history_mv2",
  175. "retain_history_mv3",
  176. ]
  177. )
  178. definition_validations = f"""
  179. > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv1);
  180. "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv1\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\""
  181. """
  182. other_validations = """
  183. ? EXPLAIN OPTIMIZED PLAN AS TEXT FOR SELECT * FROM retain_history_mv1
  184. Explained Query:
  185. ReadStorage materialize.public.retain_history_mv1
  186. Target cluster: quickstart
  187. """
  188. return Testdrive(
  189. dedent(
  190. f"""
  191. {time_definitions}
  192. {content_validations}
  193. {definition_validations}
  194. {other_validations}
  195. """
  196. )
  197. )
  198. @disabled(
  199. "database-issues#7310 and compaction not predicable and now() not appropriate while mz_now() not applicable"
  200. )
  201. class RetainHistoryOnKafkaSource(Check):
  202. def initialize(self) -> Testdrive:
  203. return Testdrive(
  204. schemas()
  205. + dedent(
  206. f"""
  207. > CREATE TABLE time_for_source (time_index INT, t TIMESTAMP);
  208. # Give it some time
  209. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  210. > INSERT INTO time_for_source VALUES (0, now());
  211. $ kafka-create-topic topic=retain-history
  212. # Give it some time
  213. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  214. > INSERT INTO time_for_source VALUES (1, now());
  215. $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=4
  216. {{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
  217. # Give it some time
  218. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  219. > INSERT INTO time_for_source VALUES (2, now());
  220. $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=5
  221. {{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
  222. # Give it some time
  223. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  224. > INSERT INTO time_for_source VALUES (3, now());
  225. > CREATE SOURCE retain_history_source_src
  226. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-retain-history-${{testdrive.seed}}')
  227. WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}')
  228. > CREATE TABLE retain_history_source FROM SOURCE retain_history_source_src (REFERENCE "testdrive-retain-history-${{testdrive.seed}}")
  229. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  230. ENVELOPE UPSERT
  231. # Give it some time
  232. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  233. > INSERT INTO time_for_source VALUES (4, now());
  234. """
  235. )
  236. )
  237. def manipulate(self) -> list[Testdrive]:
  238. return [
  239. Testdrive(schemas() + dedent(s))
  240. for s in [
  241. """
  242. $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=2
  243. {"key1": "K${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
  244. # Give it some time
  245. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  246. > INSERT INTO time_for_source VALUES (5, now());
  247. """,
  248. """
  249. $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=6
  250. {"key1": "K${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
  251. # Give it some time
  252. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  253. > INSERT INTO time_for_source VALUES (6, now());
  254. $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=1
  255. {"key1": "K${kafka-ingest.iteration}"} {"f1": "D${kafka-ingest.iteration}"}
  256. # Give it some time
  257. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
  258. > INSERT INTO time_for_source VALUES (7, now());
  259. """,
  260. ]
  261. ]
  262. def validate(self) -> Testdrive:
  263. return Testdrive(
  264. dedent(
  265. """
  266. $ set-from-sql var=time0
  267. SELECT t::STRING FROM time_for_source WHERE time_index = 0
  268. $ set-from-sql var=time1
  269. SELECT t::STRING FROM time_for_source WHERE time_index = 1
  270. $ set-from-sql var=time2
  271. SELECT t::STRING FROM time_for_source WHERE time_index = 2
  272. $ set-from-sql var=time3
  273. SELECT t::STRING FROM time_for_source WHERE time_index = 3
  274. $ set-from-sql var=time4
  275. SELECT t::STRING FROM time_for_source WHERE time_index = 4
  276. $ set-from-sql var=time5
  277. SELECT t::STRING FROM time_for_source WHERE time_index = 5
  278. $ set-from-sql var=time6
  279. SELECT t::STRING FROM time_for_source WHERE time_index = 6
  280. $ set-from-sql var=time7
  281. SELECT t::STRING FROM time_for_source WHERE time_index = 7
  282. > SELECT * FROM retain_history_source AS OF '${time0}'::TIMESTAMP; -- time0 (nothing exists)
  283. K0 A0
  284. K1 A1
  285. K2 A2
  286. K3 A3
  287. K4 A4
  288. > SELECT * FROM retain_history_source AS OF '${time1}'::TIMESTAMP; -- time1 (topic created)
  289. K0 A0
  290. K1 A1
  291. K2 A2
  292. K3 A3
  293. K4 A4
  294. > SELECT * FROM retain_history_source AS OF '${time2}'::TIMESTAMP; -- time2 (added data to topic)
  295. K0 A0
  296. K1 A1
  297. K2 A2
  298. K3 A3
  299. K4 A4
  300. > SELECT * FROM retain_history_source AS OF '${time3}'::TIMESTAMP; -- time3 (further added data to topic)
  301. K0 A0
  302. K1 A1
  303. K2 A2
  304. K3 A3
  305. K4 A4
  306. > SELECT * FROM retain_history_source AS OF '${time4}'::TIMESTAMP; -- time4 (created source)
  307. K0 A0
  308. K1 A1
  309. K2 A2
  310. K3 A3
  311. K4 A4
  312. > SELECT * FROM retain_history_source AS OF '${time5}'::TIMESTAMP; -- time5 (updated data in topic in manipulate#1)
  313. K0 B0
  314. K1 B1
  315. K2 A2
  316. K3 A3
  317. K4 A4
  318. > SELECT * FROM retain_history_source AS OF '${time6}'::TIMESTAMP; -- time6 (updated data in topic in manipulate#2)
  319. K0 C0
  320. K1 C1
  321. K2 C2
  322. K3 C3
  323. K4 C4
  324. K5 C5
  325. > SELECT * FROM retain_history_source AS OF '${time7}'::TIMESTAMP; -- time7 (updated data in topic again in manipulate#2)
  326. K0 D0
  327. K1 C1
  328. K2 C2
  329. K3 C3
  330. K4 C4
  331. K5 C5
  332. > SELECT * FROM retain_history_source;
  333. K0 D0
  334. K1 C1
  335. K2 C2
  336. K3 C3
  337. K4 C4
  338. K5 C5
  339. """
  340. )
  341. )
  342. def remove_target_cluster_from_explain(sql: str) -> str:
  343. return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)