mzcompose.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Test the retain history feature."""
  10. import time
  11. from datetime import datetime
  12. from textwrap import dedent
  13. from materialize.mzcompose.composition import Composition
  14. from materialize.mzcompose.services.materialized import Materialized
  15. from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
  16. from materialize.mzcompose.services.testdrive import Testdrive
  17. SERVICES = [
  18. CockroachOrPostgresMetadata(),
  19. Materialized(propagate_crashes=True, external_metadata_store=True),
  20. Testdrive(no_reset=True, default_timeout="5s"),
  21. ]
  22. def workflow_default(c: Composition) -> None:
  23. setup(c)
  24. run_test_with_mv_on_table(c)
  25. run_test_with_mv_on_table_with_altered_retention(c)
  26. run_test_with_mv_on_counter_source(c)
  27. run_test_with_counter_source(c)
  28. # TODO: database-issues#7310 needs to be fixed
  29. # run_test_gh_24479(c)
  30. run_test_with_index(c)
  31. run_test_consistency(c)
  32. def setup(c: Composition) -> None:
  33. c.up("materialized", {"name": "testdrive", "persistent": True})
  34. # Test that the catalog is consistent for the three types of retain histories (disabled, default,
  35. # specified).
  36. def run_test_consistency(c: Composition) -> None:
  37. c.testdrive(
  38. dedent(
  39. """
  40. > CREATE TABLE testdrive_consistency_table (i INT);
  41. > CREATE INDEX testdrive_consistency_table_idx ON testdrive_consistency_table(i);
  42. > ALTER INDEX testdrive_consistency_table_idx SET (RETAIN HISTORY = FOR '1m')
  43. > ALTER INDEX testdrive_consistency_table_idx SET (RETAIN HISTORY = FOR '1000 hours')
  44. > ALTER INDEX testdrive_consistency_table_idx RESET (RETAIN HISTORY)
  45. """,
  46. ),
  47. args=["--consistency-checks=statement"],
  48. )
  49. def run_test_with_mv_on_table(c: Composition) -> None:
  50. mv_on_mv1_retention_in_sec = 1
  51. mv_on_mv_on_mv1_retention_in_sec = 60
  52. mz_time0 = fetch_now_from_mz(c)
  53. c.testdrive(
  54. dedent(
  55. f"""
  56. > CREATE TABLE retain_history_table (key INT, value INT);
  57. > INSERT INTO retain_history_table VALUES (1, 100), (2, 200);
  58. > CREATE MATERIALIZED VIEW retain_history_mv1 WITH (RETAIN HISTORY FOR '10s') AS
  59. SELECT * FROM retain_history_table;
  60. > CREATE MATERIALIZED VIEW retain_history_mv_on_mv1 WITH (RETAIN HISTORY FOR '{mv_on_mv1_retention_in_sec}s') AS
  61. SELECT * FROM retain_history_mv1;
  62. > CREATE MATERIALIZED VIEW retain_history_mv_on_mv_on_mv1 WITH (RETAIN HISTORY FOR '{mv_on_mv_on_mv1_retention_in_sec}s') AS
  63. SELECT * FROM retain_history_mv_on_mv1;
  64. > SELECT count(*) FROM retain_history_mv1;
  65. 2
  66. """,
  67. )
  68. )
  69. mz_time1 = fetch_now_from_mz(c)
  70. test_time1 = datetime.now()
  71. c.testdrive(
  72. dedent(
  73. f"""
  74. > UPDATE retain_history_table SET value = value + 10;
  75. > INSERT INTO retain_history_table VALUES (3, 300);
  76. > INSERT INTO retain_history_table VALUES (4, 400);
  77. > INSERT INTO retain_history_table VALUES (5, 500);
  78. > DELETE FROM retain_history_table WHERE key = 4;
  79. > UPDATE retain_history_table SET key = 4 WHERE key = 5;
  80. > UPDATE retain_history_table SET value = value + 1;
  81. > SELECT * FROM retain_history_mv1;
  82. 1 111
  83. 2 211
  84. 3 301
  85. 4 501
  86. ! SELECT count(*) FROM retain_history_mv1 AS OF '{mz_time0}'::TIMESTAMP;
  87. contains: is not valid for all inputs
  88. > SELECT count(*) >= 2 FROM retain_history_mv1 AS OF AT LEAST '{mz_time1}'::TIMESTAMP;
  89. true
  90. > SELECT * FROM retain_history_mv1 AS OF '{mz_time1}'::TIMESTAMP;
  91. 1 100
  92. 2 200
  93. > INSERT INTO retain_history_table VALUES (6, 600);
  94. """,
  95. )
  96. )
  97. mz_time2 = fetch_now_from_mz(c)
  98. c.testdrive(
  99. dedent(
  100. """
  101. > DELETE FROM retain_history_table WHERE key IN (3, 4);
  102. """,
  103. )
  104. )
  105. mz_time3 = fetch_now_from_mz(c)
  106. c.testdrive(
  107. dedent(
  108. f"""
  109. > SELECT count(*) FROM retain_history_mv1;
  110. 3
  111. > SELECT * FROM retain_history_mv1 AS OF '{mz_time1}'::TIMESTAMP;
  112. 1 100
  113. 2 200
  114. > SELECT * FROM retain_history_mv_on_mv1 AS OF '{mz_time1}'::TIMESTAMP;
  115. 1 100
  116. 2 200
  117. > SELECT * FROM retain_history_mv1 AS OF '{mz_time2}'::TIMESTAMP;
  118. 1 111
  119. 2 211
  120. 3 301
  121. 4 501
  122. 6 600
  123. > SELECT count(*) IN (2, 5) FROM retain_history_mv1 AS OF AT LEAST '{mz_time2}'::TIMESTAMP;
  124. true
  125. > SELECT sum(value), max(value) FROM retain_history_mv1 AS OF '{mz_time2}'::TIMESTAMP;
  126. 1724 600
  127. > SELECT count(*) FROM retain_history_mv1 AS OF '{mz_time3}'::TIMESTAMP;
  128. 3
  129. ? EXPLAIN SELECT * FROM retain_history_mv1 AS OF '{mz_time2}'::TIMESTAMP;
  130. Explained Query:
  131. ReadStorage materialize.public.retain_history_mv1
  132. Target cluster: quickstart
  133. > SELECT mv1a.key, mv1b.key
  134. FROM retain_history_mv1 mv1a
  135. LEFT OUTER JOIN retain_history_mv1 mv1b
  136. ON mv1a.key = mv1b.key
  137. AS OF '{mz_time2}'::TIMESTAMP;
  138. 1 1
  139. 2 2
  140. 3 3
  141. 4 4
  142. 6 6
  143. ! SELECT t.key, mv.key
  144. FROM retain_history_table t
  145. LEFT OUTER JOIN retain_history_mv1 mv
  146. ON t.key = mv.key
  147. AS OF '{mz_time2}'::TIMESTAMP;
  148. contains: is not valid for all inputs
  149. > UPDATE retain_history_table SET key = 9 WHERE key = 1;
  150. """,
  151. )
  152. )
  153. mz_time4 = fetch_now_from_mz(c)
  154. c.testdrive(
  155. dedent(
  156. f"""
  157. > SELECT count(*) FROM retain_history_mv1 WHERE key = 1 AS OF '{mz_time3}'::TIMESTAMP;
  158. 1
  159. > SELECT count(*) FROM retain_history_mv1 WHERE key = 1 AS OF '{mz_time4}'::TIMESTAMP;
  160. 0
  161. > SELECT 1 WHERE 1 = (SELECT count(*) FROM retain_history_mv1 WHERE key = 1) AS OF '{mz_time3}'::TIMESTAMP;
  162. 1
  163. """,
  164. )
  165. )
  166. test_time5 = datetime.now()
  167. if (test_time5 - test_time1).total_seconds() <= mv_on_mv1_retention_in_sec:
  168. time.sleep(1)
  169. assert (
  170. test_time5 - test_time1
  171. ).total_seconds() < mv_on_mv_on_mv1_retention_in_sec, "test precondition not satisfied, consider increasing 'mv_on_mv_on_mv1_retention_in_sec'"
  172. mz_time_in_far_future = "2044-01-11 09:24:10.459000+00:00"
  173. c.testdrive(
  174. dedent(
  175. f"""
  176. # retain period exceeded
  177. ! SELECT * FROM retain_history_mv_on_mv1 AS OF '{mz_time1}'::TIMESTAMP;
  178. contains: is not valid for all inputs
  179. # retain period on wrapping mv still valid
  180. > SELECT * FROM retain_history_mv_on_mv_on_mv1 AS OF '{mz_time1}'::TIMESTAMP;
  181. 1 100
  182. 2 200
  183. # retain period in future
  184. ! SELECT * FROM retain_history_mv_on_mv1 AS OF '{mz_time_in_far_future}'::TIMESTAMP;
  185. timeout
  186. """,
  187. )
  188. )
  189. def run_test_with_mv_on_table_with_altered_retention(c: Composition) -> None:
  190. """
  191. Verify we can still read the most recent timestamp, then reduce the retain history and verify we can't read anymore.
  192. """
  193. c.testdrive(
  194. dedent(
  195. """
  196. > DROP MATERIALIZED VIEW IF EXISTS retain_history_mv;
  197. > DROP TABLE IF EXISTS retain_history_table;
  198. > CREATE TABLE retain_history_table (key INT, value INT);
  199. > INSERT INTO retain_history_table VALUES (1, 100), (2, 200);
  200. > CREATE MATERIALIZED VIEW retain_history_mv WITH (RETAIN HISTORY FOR '30s') AS
  201. SELECT * FROM retain_history_table;
  202. """,
  203. )
  204. )
  205. mz_time1 = fetch_now_from_mz(c)
  206. c.testdrive(
  207. dedent(
  208. """
  209. > INSERT INTO retain_history_table VALUES (3, 300);
  210. """,
  211. )
  212. )
  213. mz_time2 = fetch_now_from_mz(c)
  214. c.testdrive(
  215. dedent(
  216. f"""
  217. > SELECT count(*) FROM retain_history_mv AS OF '{mz_time1}'::TIMESTAMP; -- mz_time1
  218. 2
  219. > SELECT count(*) FROM retain_history_mv AS OF '{mz_time2}'::TIMESTAMP; -- mz_time2
  220. 3
  221. > INSERT INTO retain_history_table VALUES (4, 400);
  222. # reduce retention period
  223. > ALTER MATERIALIZED VIEW retain_history_mv SET (RETAIN HISTORY FOR '2s');
  224. """,
  225. ),
  226. )
  227. mz_time3 = fetch_now_from_mz(c)
  228. # wait for the retention period to expire
  229. time.sleep(2 + 1)
  230. c.testdrive(
  231. dedent(
  232. f"""
  233. ! SELECT count(*) FROM retain_history_mv AS OF '{mz_time2}'::TIMESTAMP; -- mz_time2
  234. contains: is not valid for all inputs
  235. ! SELECT count(*) FROM retain_history_mv AS OF '{mz_time3}'::TIMESTAMP; -- mz_time3
  236. contains: is not valid for all inputs
  237. > SELECT count(*) FROM retain_history_mv;
  238. 4
  239. """,
  240. ),
  241. # use a timeout that is significantly lower than the original retention period
  242. args=["--default-timeout=1s"],
  243. )
  244. mz_time4 = fetch_now_from_mz(c)
  245. c.testdrive(
  246. dedent(
  247. """
  248. # increase the retention period again
  249. > ALTER MATERIALIZED VIEW retain_history_mv SET (RETAIN HISTORY FOR '30s');
  250. > INSERT INTO retain_history_table VALUES (5, 500);
  251. """,
  252. ),
  253. )
  254. mz_time5 = fetch_now_from_mz(c)
  255. # let the duration of the old retention period pass
  256. time.sleep(2 + 1)
  257. c.testdrive(
  258. dedent(
  259. f"""
  260. # do not expect to regain old states
  261. ! SELECT count(*) FROM retain_history_mv AS OF '{mz_time3}'::TIMESTAMP; -- mz_time3
  262. contains: is not valid for all inputs
  263. # expect the new retention period to apply
  264. > SELECT count(*) FROM retain_history_mv AS OF '{mz_time4}'::TIMESTAMP; -- mz_time4
  265. 4
  266. > SELECT count(*) FROM retain_history_mv AS OF '{mz_time5}'::TIMESTAMP; -- mz_time5
  267. 5
  268. """,
  269. ),
  270. )
  271. def run_test_with_mv_on_counter_source(c: Composition) -> None:
  272. c.testdrive(
  273. dedent(
  274. """
  275. > CREATE SOURCE retain_history_source1
  276. FROM LOAD GENERATOR COUNTER
  277. (TICK INTERVAL '100ms');
  278. > CREATE MATERIALIZED VIEW retain_history_mv2 WITH (RETAIN HISTORY FOR '10s') AS
  279. SELECT * FROM retain_history_source1;
  280. > SELECT count(*) > 0 FROM retain_history_mv2;
  281. true
  282. """,
  283. )
  284. )
  285. _validate_count_of_counter_source(c, "retain_history_mv2")
  286. def run_test_with_counter_source(c: Composition) -> None:
  287. c.testdrive(
  288. dedent(
  289. """
  290. > CREATE SOURCE retain_history_source2
  291. FROM LOAD GENERATOR COUNTER
  292. (TICK INTERVAL '100ms')
  293. WITH (RETAIN HISTORY FOR '10s');
  294. """,
  295. )
  296. )
  297. _validate_count_of_counter_source(c, "retain_history_source2")
  298. def _validate_count_of_counter_source(c: Composition, object_name: str) -> None:
  299. sleep_duration_between_mz_time1_and_mz_time2 = 1.5
  300. mz_time1 = fetch_now_from_mz(c)
  301. count_at_mz_time1 = c.sql_query(
  302. f"SELECT count(*) FROM {object_name} AS OF '{mz_time1}'::TIMESTAMP"
  303. )[0][0]
  304. time.sleep(sleep_duration_between_mz_time1_and_mz_time2)
  305. mz_time2 = fetch_now_from_mz(c)
  306. count_at_mz_time2 = c.sql_query(
  307. f"SELECT count(*) FROM {object_name} AS OF '{mz_time2}'::TIMESTAMP"
  308. )[0][0]
  309. count_at_mz_time1_queried_at_mz_time2 = c.sql_query(
  310. f"SELECT count(*) FROM {object_name} AS OF '{mz_time1}'::TIMESTAMP"
  311. )[0][0]
  312. assert count_at_mz_time1 == count_at_mz_time1_queried_at_mz_time2
  313. assert (
  314. count_at_mz_time2 > count_at_mz_time1
  315. ), f"value at time2 did not progress ({count_at_mz_time2} vs. {count_at_mz_time1}), consider increasing 'sleep_duration_between_mz_time1_and_mz_time2'"
  316. def run_test_with_index(c: Composition) -> None:
  317. c.testdrive(
  318. dedent(
  319. """
  320. > CREATE SOURCE retain_history_source3
  321. FROM LOAD GENERATOR COUNTER
  322. (TICK INTERVAL '100ms');
  323. > CREATE DEFAULT INDEX retain_history_idx
  324. ON retain_history_source2
  325. WITH (RETAIN HISTORY FOR '10s');
  326. """
  327. )
  328. )
  329. _validate_count_of_counter_source(c, "retain_history_source3")
  330. def run_test_with_table(c: Composition) -> None:
  331. c.testdrive(
  332. dedent(
  333. """
  334. > CREATE TABLE time (time_index int, t timestamp);
  335. > CREATE TABLE table_with_retain_history (x int) WITH (RETAIN HISTORY FOR = '10s');
  336. > INSERT INTO time VALUES (0, now());
  337. # sleep justification: force time to advance
  338. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="2s";
  339. > INSERT INTO table_with_retain_history VALUES (0);
  340. > INSERT INTO time VALUES (1, now());
  341. # sleep justification: force time to advance
  342. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="2s";
  343. > INSERT INTO table_with_retain_history VALUES (1);
  344. > SELECT count(*) FROM table_with_retain_history;
  345. 2
  346. $ set-from-sql var=time0
  347. SELECT t::string FROM time WHERE time_index = 0
  348. > SELECT count(*) FROM table_with_retain_history AS OF '${{time0}}'::timestamp;
  349. 0
  350. $ set-from-sql var=time0
  351. SELECT t::string FROM time WHERE time_index = 1
  352. > SELECT count(*) FROM table_with_retain_history AS OF '${{time1}}'::timestamp;
  353. 1
  354. """
  355. )
  356. )
  357. def run_test_gh_24479(c: Composition) -> None:
  358. for seed, sleep_enabled in [(0, False), (1, True)]:
  359. c.testdrive(
  360. dedent(
  361. f"""
  362. > CREATE TABLE time_{seed} (time_index INT, t TIMESTAMP);
  363. > CREATE TABLE retain_history_table_{seed} (key INT, value INT);
  364. > INSERT INTO time_{seed} VALUES (1, now());
  365. {'$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="2s"' if sleep_enabled else ''}
  366. > CREATE MATERIALIZED VIEW retain_history_mv2_{seed} WITH (RETAIN HISTORY FOR '30s') AS
  367. SELECT * FROM retain_history_table_{seed};
  368. $ set-from-sql var=time1_{seed}
  369. SELECT t::STRING FROM time_{seed} WHERE time_index = 1
  370. > SELECT count(*) FROM retain_history_mv2_{seed} AS OF '${{time1_{seed}}}'::TIMESTAMP; -- time1_{seed} with sleep_enabled={sleep_enabled}
  371. 0
  372. """
  373. )
  374. )
  375. def fetch_now_from_mz(c: Composition) -> str:
  376. return c.sql_query("SELECT now()")[0][0]