123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """Test the retain history feature."""
- import time
- from datetime import datetime
- from textwrap import dedent
- from materialize.mzcompose.composition import Composition
- from materialize.mzcompose.services.materialized import Materialized
- from materialize.mzcompose.services.postgres import CockroachOrPostgresMetadata
- from materialize.mzcompose.services.testdrive import Testdrive
- SERVICES = [
- CockroachOrPostgresMetadata(),
- Materialized(propagate_crashes=True, external_metadata_store=True),
- Testdrive(no_reset=True, default_timeout="5s"),
- ]
- def workflow_default(c: Composition) -> None:
- setup(c)
- run_test_with_mv_on_table(c)
- run_test_with_mv_on_table_with_altered_retention(c)
- run_test_with_mv_on_counter_source(c)
- run_test_with_counter_source(c)
- # TODO: database-issues#7310 needs to be fixed
- # run_test_gh_24479(c)
- run_test_with_index(c)
- run_test_consistency(c)
- def setup(c: Composition) -> None:
- c.up("materialized", {"name": "testdrive", "persistent": True})
- # Test that the catalog is consistent for the three types of retain histories (disabled, default,
- # specified).
- def run_test_consistency(c: Composition) -> None:
- c.testdrive(
- dedent(
- """
- > CREATE TABLE testdrive_consistency_table (i INT);
- > CREATE INDEX testdrive_consistency_table_idx ON testdrive_consistency_table(i);
- > ALTER INDEX testdrive_consistency_table_idx SET (RETAIN HISTORY = FOR '1m')
- > ALTER INDEX testdrive_consistency_table_idx SET (RETAIN HISTORY = FOR '1000 hours')
- > ALTER INDEX testdrive_consistency_table_idx RESET (RETAIN HISTORY)
- """,
- ),
- args=["--consistency-checks=statement"],
- )
- def run_test_with_mv_on_table(c: Composition) -> None:
- mv_on_mv1_retention_in_sec = 1
- mv_on_mv_on_mv1_retention_in_sec = 60
- mz_time0 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- f"""
- > CREATE TABLE retain_history_table (key INT, value INT);
- > INSERT INTO retain_history_table VALUES (1, 100), (2, 200);
- > CREATE MATERIALIZED VIEW retain_history_mv1 WITH (RETAIN HISTORY FOR '10s') AS
- SELECT * FROM retain_history_table;
- > CREATE MATERIALIZED VIEW retain_history_mv_on_mv1 WITH (RETAIN HISTORY FOR '{mv_on_mv1_retention_in_sec}s') AS
- SELECT * FROM retain_history_mv1;
- > CREATE MATERIALIZED VIEW retain_history_mv_on_mv_on_mv1 WITH (RETAIN HISTORY FOR '{mv_on_mv_on_mv1_retention_in_sec}s') AS
- SELECT * FROM retain_history_mv_on_mv1;
- > SELECT count(*) FROM retain_history_mv1;
- 2
- """,
- )
- )
- mz_time1 = fetch_now_from_mz(c)
- test_time1 = datetime.now()
- c.testdrive(
- dedent(
- f"""
- > UPDATE retain_history_table SET value = value + 10;
- > INSERT INTO retain_history_table VALUES (3, 300);
- > INSERT INTO retain_history_table VALUES (4, 400);
- > INSERT INTO retain_history_table VALUES (5, 500);
- > DELETE FROM retain_history_table WHERE key = 4;
- > UPDATE retain_history_table SET key = 4 WHERE key = 5;
- > UPDATE retain_history_table SET value = value + 1;
- > SELECT * FROM retain_history_mv1;
- 1 111
- 2 211
- 3 301
- 4 501
- ! SELECT count(*) FROM retain_history_mv1 AS OF '{mz_time0}'::TIMESTAMP;
- contains: is not valid for all inputs
- > SELECT count(*) >= 2 FROM retain_history_mv1 AS OF AT LEAST '{mz_time1}'::TIMESTAMP;
- true
- > SELECT * FROM retain_history_mv1 AS OF '{mz_time1}'::TIMESTAMP;
- 1 100
- 2 200
- > INSERT INTO retain_history_table VALUES (6, 600);
- """,
- )
- )
- mz_time2 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- """
- > DELETE FROM retain_history_table WHERE key IN (3, 4);
- """,
- )
- )
- mz_time3 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- f"""
- > SELECT count(*) FROM retain_history_mv1;
- 3
- > SELECT * FROM retain_history_mv1 AS OF '{mz_time1}'::TIMESTAMP;
- 1 100
- 2 200
- > SELECT * FROM retain_history_mv_on_mv1 AS OF '{mz_time1}'::TIMESTAMP;
- 1 100
- 2 200
- > SELECT * FROM retain_history_mv1 AS OF '{mz_time2}'::TIMESTAMP;
- 1 111
- 2 211
- 3 301
- 4 501
- 6 600
- > SELECT count(*) IN (2, 5) FROM retain_history_mv1 AS OF AT LEAST '{mz_time2}'::TIMESTAMP;
- true
- > SELECT sum(value), max(value) FROM retain_history_mv1 AS OF '{mz_time2}'::TIMESTAMP;
- 1724 600
- > SELECT count(*) FROM retain_history_mv1 AS OF '{mz_time3}'::TIMESTAMP;
- 3
- ? EXPLAIN SELECT * FROM retain_history_mv1 AS OF '{mz_time2}'::TIMESTAMP;
- Explained Query:
- ReadStorage materialize.public.retain_history_mv1
- Target cluster: quickstart
- > SELECT mv1a.key, mv1b.key
- FROM retain_history_mv1 mv1a
- LEFT OUTER JOIN retain_history_mv1 mv1b
- ON mv1a.key = mv1b.key
- AS OF '{mz_time2}'::TIMESTAMP;
- 1 1
- 2 2
- 3 3
- 4 4
- 6 6
- ! SELECT t.key, mv.key
- FROM retain_history_table t
- LEFT OUTER JOIN retain_history_mv1 mv
- ON t.key = mv.key
- AS OF '{mz_time2}'::TIMESTAMP;
- contains: is not valid for all inputs
- > UPDATE retain_history_table SET key = 9 WHERE key = 1;
- """,
- )
- )
- mz_time4 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- f"""
- > SELECT count(*) FROM retain_history_mv1 WHERE key = 1 AS OF '{mz_time3}'::TIMESTAMP;
- 1
- > SELECT count(*) FROM retain_history_mv1 WHERE key = 1 AS OF '{mz_time4}'::TIMESTAMP;
- 0
- > SELECT 1 WHERE 1 = (SELECT count(*) FROM retain_history_mv1 WHERE key = 1) AS OF '{mz_time3}'::TIMESTAMP;
- 1
- """,
- )
- )
- test_time5 = datetime.now()
- if (test_time5 - test_time1).total_seconds() <= mv_on_mv1_retention_in_sec:
- time.sleep(1)
- assert (
- test_time5 - test_time1
- ).total_seconds() < mv_on_mv_on_mv1_retention_in_sec, "test precondition not satisfied, consider increasing 'mv_on_mv_on_mv1_retention_in_sec'"
- mz_time_in_far_future = "2044-01-11 09:24:10.459000+00:00"
- c.testdrive(
- dedent(
- f"""
- # retain period exceeded
- ! SELECT * FROM retain_history_mv_on_mv1 AS OF '{mz_time1}'::TIMESTAMP;
- contains: is not valid for all inputs
- # retain period on wrapping mv still valid
- > SELECT * FROM retain_history_mv_on_mv_on_mv1 AS OF '{mz_time1}'::TIMESTAMP;
- 1 100
- 2 200
- # retain period in future
- ! SELECT * FROM retain_history_mv_on_mv1 AS OF '{mz_time_in_far_future}'::TIMESTAMP;
- timeout
- """,
- )
- )
- def run_test_with_mv_on_table_with_altered_retention(c: Composition) -> None:
- """
- Verify we can still read the most recent timestamp, then reduce the retain history and verify we can't read anymore.
- """
- c.testdrive(
- dedent(
- """
- > DROP MATERIALIZED VIEW IF EXISTS retain_history_mv;
- > DROP TABLE IF EXISTS retain_history_table;
- > CREATE TABLE retain_history_table (key INT, value INT);
- > INSERT INTO retain_history_table VALUES (1, 100), (2, 200);
- > CREATE MATERIALIZED VIEW retain_history_mv WITH (RETAIN HISTORY FOR '30s') AS
- SELECT * FROM retain_history_table;
- """,
- )
- )
- mz_time1 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- """
- > INSERT INTO retain_history_table VALUES (3, 300);
- """,
- )
- )
- mz_time2 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- f"""
- > SELECT count(*) FROM retain_history_mv AS OF '{mz_time1}'::TIMESTAMP; -- mz_time1
- 2
- > SELECT count(*) FROM retain_history_mv AS OF '{mz_time2}'::TIMESTAMP; -- mz_time2
- 3
- > INSERT INTO retain_history_table VALUES (4, 400);
- # reduce retention period
- > ALTER MATERIALIZED VIEW retain_history_mv SET (RETAIN HISTORY FOR '2s');
- """,
- ),
- )
- mz_time3 = fetch_now_from_mz(c)
- # wait for the retention period to expire
- time.sleep(2 + 1)
- c.testdrive(
- dedent(
- f"""
- ! SELECT count(*) FROM retain_history_mv AS OF '{mz_time2}'::TIMESTAMP; -- mz_time2
- contains: is not valid for all inputs
- ! SELECT count(*) FROM retain_history_mv AS OF '{mz_time3}'::TIMESTAMP; -- mz_time3
- contains: is not valid for all inputs
- > SELECT count(*) FROM retain_history_mv;
- 4
- """,
- ),
- # use a timeout that is significantly lower than the original retention period
- args=["--default-timeout=1s"],
- )
- mz_time4 = fetch_now_from_mz(c)
- c.testdrive(
- dedent(
- """
- # increase the retention period again
- > ALTER MATERIALIZED VIEW retain_history_mv SET (RETAIN HISTORY FOR '30s');
- > INSERT INTO retain_history_table VALUES (5, 500);
- """,
- ),
- )
- mz_time5 = fetch_now_from_mz(c)
- # let the duration of the old retention period pass
- time.sleep(2 + 1)
- c.testdrive(
- dedent(
- f"""
- # do not expect to regain old states
- ! SELECT count(*) FROM retain_history_mv AS OF '{mz_time3}'::TIMESTAMP; -- mz_time3
- contains: is not valid for all inputs
- # expect the new retention period to apply
- > SELECT count(*) FROM retain_history_mv AS OF '{mz_time4}'::TIMESTAMP; -- mz_time4
- 4
- > SELECT count(*) FROM retain_history_mv AS OF '{mz_time5}'::TIMESTAMP; -- mz_time5
- 5
- """,
- ),
- )
- def run_test_with_mv_on_counter_source(c: Composition) -> None:
- c.testdrive(
- dedent(
- """
- > CREATE SOURCE retain_history_source1
- FROM LOAD GENERATOR COUNTER
- (TICK INTERVAL '100ms');
- > CREATE MATERIALIZED VIEW retain_history_mv2 WITH (RETAIN HISTORY FOR '10s') AS
- SELECT * FROM retain_history_source1;
- > SELECT count(*) > 0 FROM retain_history_mv2;
- true
- """,
- )
- )
- _validate_count_of_counter_source(c, "retain_history_mv2")
- def run_test_with_counter_source(c: Composition) -> None:
- c.testdrive(
- dedent(
- """
- > CREATE SOURCE retain_history_source2
- FROM LOAD GENERATOR COUNTER
- (TICK INTERVAL '100ms')
- WITH (RETAIN HISTORY FOR '10s');
- """,
- )
- )
- _validate_count_of_counter_source(c, "retain_history_source2")
- def _validate_count_of_counter_source(c: Composition, object_name: str) -> None:
- sleep_duration_between_mz_time1_and_mz_time2 = 1.5
- mz_time1 = fetch_now_from_mz(c)
- count_at_mz_time1 = c.sql_query(
- f"SELECT count(*) FROM {object_name} AS OF '{mz_time1}'::TIMESTAMP"
- )[0][0]
- time.sleep(sleep_duration_between_mz_time1_and_mz_time2)
- mz_time2 = fetch_now_from_mz(c)
- count_at_mz_time2 = c.sql_query(
- f"SELECT count(*) FROM {object_name} AS OF '{mz_time2}'::TIMESTAMP"
- )[0][0]
- count_at_mz_time1_queried_at_mz_time2 = c.sql_query(
- f"SELECT count(*) FROM {object_name} AS OF '{mz_time1}'::TIMESTAMP"
- )[0][0]
- assert count_at_mz_time1 == count_at_mz_time1_queried_at_mz_time2
- assert (
- count_at_mz_time2 > count_at_mz_time1
- ), f"value at time2 did not progress ({count_at_mz_time2} vs. {count_at_mz_time1}), consider increasing 'sleep_duration_between_mz_time1_and_mz_time2'"
- def run_test_with_index(c: Composition) -> None:
- c.testdrive(
- dedent(
- """
- > CREATE SOURCE retain_history_source3
- FROM LOAD GENERATOR COUNTER
- (TICK INTERVAL '100ms');
- > CREATE DEFAULT INDEX retain_history_idx
- ON retain_history_source2
- WITH (RETAIN HISTORY FOR '10s');
- """
- )
- )
- _validate_count_of_counter_source(c, "retain_history_source3")
- def run_test_with_table(c: Composition) -> None:
- c.testdrive(
- dedent(
- """
- > CREATE TABLE time (time_index int, t timestamp);
- > CREATE TABLE table_with_retain_history (x int) WITH (RETAIN HISTORY FOR = '10s');
- > INSERT INTO time VALUES (0, now());
- # sleep justification: force time to advance
- $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="2s";
- > INSERT INTO table_with_retain_history VALUES (0);
- > INSERT INTO time VALUES (1, now());
- # sleep justification: force time to advance
- $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="2s";
- > INSERT INTO table_with_retain_history VALUES (1);
- > SELECT count(*) FROM table_with_retain_history;
- 2
- $ set-from-sql var=time0
- SELECT t::string FROM time WHERE time_index = 0
- > SELECT count(*) FROM table_with_retain_history AS OF '${{time0}}'::timestamp;
- 0
- $ set-from-sql var=time0
- SELECT t::string FROM time WHERE time_index = 1
- > SELECT count(*) FROM table_with_retain_history AS OF '${{time1}}'::timestamp;
- 1
- """
- )
- )
- def run_test_gh_24479(c: Composition) -> None:
- for seed, sleep_enabled in [(0, False), (1, True)]:
- c.testdrive(
- dedent(
- f"""
- > CREATE TABLE time_{seed} (time_index INT, t TIMESTAMP);
- > CREATE TABLE retain_history_table_{seed} (key INT, value INT);
- > INSERT INTO time_{seed} VALUES (1, now());
- {'$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="2s"' if sleep_enabled else ''}
- > CREATE MATERIALIZED VIEW retain_history_mv2_{seed} WITH (RETAIN HISTORY FOR '30s') AS
- SELECT * FROM retain_history_table_{seed};
- $ set-from-sql var=time1_{seed}
- SELECT t::STRING FROM time_{seed} WHERE time_index = 1
- > SELECT count(*) FROM retain_history_mv2_{seed} AS OF '${{time1_{seed}}}'::TIMESTAMP; -- time1_{seed} with sleep_enabled={sleep_enabled}
- 0
- """
- )
- )
- def fetch_now_from_mz(c: Composition) -> str:
- return c.sql_query("SELECT now()")[0][0]
|