123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- # Tests `LOAD GENERATOR KEY VALUE`
- $ set-arg-default default-replica-size=1
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET storage_statistics_collection_interval = 1000
- ALTER SYSTEM SET storage_statistics_interval = 2000
- ALTER SYSTEM SET enable_load_generator_key_value = true
- > CREATE CLUSTER lg_cluster SIZE '${arg.default-replica-size}'
- # Error if trying to create with subsources
- ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
- KEYS 8,
- PARTITIONS 1,
- SNAPSHOT ROUNDS 1,
- VALUE SIZE 1,
- SEED 42,
- BATCH SIZE 4
- ) FOR ALL TABLES;
- contains:FOR ALL TABLES
- ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
- KEYS 8,
- PARTITIONS 1,
- SNAPSHOT ROUNDS 1,
- VALUE SIZE 1,
- SEED 42,
- BATCH SIZE 4
- ) FOR TABLES ("foo");
- contains:FOR TABLES
- ! CREATE SOURCE g FROM LOAD GENERATOR KEY VALUE(
- KEYS 8,
- PARTITIONS 1,
- SNAPSHOT ROUNDS 1,
- VALUE SIZE 1,
- SEED 42,
- BATCH SIZE 4
- ) FOR SCHEMAS ("foo");
- contains:FOR SCHEMAS
- # A loadgen that only snapshots.
- > CREATE SOURCE up_no_update
- IN CLUSTER lg_cluster
- FROM LOAD GENERATOR KEY VALUE (
- KEYS 16,
- PARTITIONS 4,
- SNAPSHOT ROUNDS 3,
- SEED 123,
- VALUE SIZE 10,
- BATCH SIZE 2
- )
- ENVELOPE UPSERT
- > CREATE SOURCE up_quick
- IN CLUSTER lg_cluster
- FROM LOAD GENERATOR KEY VALUE (
- KEYS 16,
- PARTITIONS 4,
- SNAPSHOT ROUNDS 3,
- TRANSACTIONAL SNAPSHOT false,
- SEED 123,
- VALUE SIZE 10,
- BATCH SIZE 2
- )
- INCLUDE KEY AS whatever
- ENVELOPE UPSERT
- # Ensure data is spread as expected.
- > SELECT partition, count(*) FROM up_no_update GROUP BY partition
- 0 4
- 1 4
- 2 4
- 3 4
- > SELECT MAX(key) FROM up_no_update;
- 15
- > SELECT partition, count(*) FROM up_quick GROUP BY partition
- 0 4
- 1 4
- 2 4
- 3 4
- > SELECT MAX(whatever) FROM up_quick;
- 15
- # 48 values produced (3 snapshot rounds with 16 keys).
- # For the `TRANSACTIONAL SNAPSHOT = false` source, we produce 48 updates from the 3 rounds.
- # We expect and 6 quick round offsets (based on the batch size)
- #
- # NOTE: For these statistics queries, we take the MAX, because we will have
- # statistics per replica that is (or was) running the source.
- > SELECT
- s.name,
- MAX(u.offset_known),
- MAX(u.offset_committed),
- MAX(u.snapshot_records_known),
- MAX(u.snapshot_records_staged),
- MAX(u.messages_received),
- MAX(u.records_indexed)
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics u ON s.id = u.id
- WHERE s.name IN ('up_no_update', 'up_quick')
- GROUP BY s.name
- up_no_update 3 3 48 48 48 16
- up_quick 6 6 0 0 48 16
- $ set-from-sql var=pre-rehydration
- SELECT
- encode(value, 'base64')
- FROM up_no_update
- WHERE
- key = 14
- $ set-from-sql var=pre-rehydration-quick
- SELECT
- encode(value, 'base64')
- FROM up_quick
- WHERE
- whatever = 14
- > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
- > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
- # Ensure that we rehydrate and keep the same value as before.
- > SELECT
- encode(value, 'base64') = '${pre-rehydration}'
- FROM up_no_update
- WHERE
- key = 14
- true
- > SELECT
- encode(value, 'base64') = '${pre-rehydration-quick}'
- FROM up_quick
- WHERE
- whatever = 14
- true
- > SELECT
- s.name,
- MAX(u.offset_known),
- MAX(u.offset_committed),
- MAX(u.snapshot_records_known),
- MAX(u.snapshot_records_staged),
- MAX(u.messages_received),
- MAX(u.records_indexed)
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics u ON s.id = u.id
- WHERE s.name IN ('up_no_update', 'up_quick')
- GROUP BY s.name
- up_no_update 3 3 48 48 48 16
- up_quick 6 6 0 0 48 16
- > DROP SOURCE up_no_update
- > DROP SOURCE up_quick
- # Create a source with 1s updates after snapshotting.
- > CREATE SOURCE up_with_update
- IN CLUSTER lg_cluster
- FROM LOAD GENERATOR KEY VALUE (
- KEYS 16,
- PARTITIONS 4,
- SNAPSHOT ROUNDS 3,
- SEED 123,
- VALUE SIZE 10,
- BATCH SIZE 2,
- TICK INTERVAL '1s'
- )
- ENVELOPE UPSERT
- # Ensure data is partitioned correctly.
- > SELECT partition, count(*) FROM up_with_update GROUP BY partition
- 0 4
- 1 4
- 2 4
- 3 4
- # Doesn't work reliably under high load in CI
- # Higher offsets than before, as we produce more values.
- # > SELECT
- # s.name,
- # MAX(u.offset_known) > 3,
- # MAX(u.offset_committed) = MAX(u.offset_known),
- # MAX(u.snapshot_records_known),
- # MAX(u.snapshot_records_staged),
- # MAX(u.messages_received) > 48,
- # MAX(u.records_indexed)
- # FROM mz_sources s
- # JOIN mz_internal.mz_source_statistics u ON s.id = u.id
- # WHERE s.name IN ('up_with_update')
- # GROUP BY s.name
- # up_with_update true true 48 48 true 16
- # Also, despite the same seed, values should be different than the snapshot-only source.
- > SELECT
- encode(value, 'base64') != '${pre-rehydration}'
- FROM up_with_update
- WHERE
- key = 14
- true
- > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 0);
- > ALTER CLUSTER lg_cluster SET (REPLICATION FACTOR 1);
- $ set-from-sql var=pre-rehydration-with-update
- SELECT
- encode(value, 'base64')
- FROM up_with_update
- WHERE
- key = 14
- # After restarting, we should also still see new updates override values.
- > SELECT
- encode(value, 'base64') != '${pre-rehydration-with-update}'
- FROM up_with_update
- WHERE
- key = 14
- true
- # Test NONE-envelope
- > CREATE SOURCE kv_none
- IN CLUSTER lg_cluster
- FROM LOAD GENERATOR KEY VALUE (
- KEYS 16,
- PARTITIONS 4,
- SNAPSHOT ROUNDS 3,
- TRANSACTIONAL SNAPSHOT false,
- SEED 123,
- VALUE SIZE 10,
- BATCH SIZE 2
- )
- ENVELOPE NONE
- > SELECT partition, count(*) FROM kv_none GROUP BY partition
- 0 12
- 1 12
- 2 12
- 3 12
- # Test INCLUDE OFFSET
- > CREATE SOURCE kv_offset
- IN CLUSTER lg_cluster
- FROM LOAD GENERATOR KEY VALUE (
- KEYS 16,
- PARTITIONS 4,
- SNAPSHOT ROUNDS 3,
- TRANSACTIONAL SNAPSHOT false,
- SEED 123,
- VALUE SIZE 10,
- BATCH SIZE 2
- )
- INCLUDE OFFSET
- ENVELOPE NONE
- > SELECT partition, MAX("offset") FROM kv_offset GROUP BY partition
- 0 5
- 1 5
- 2 5
- 3 5
- > CREATE SOURCE kv_offset2
- IN CLUSTER lg_cluster
- FROM LOAD GENERATOR KEY VALUE (
- KEYS 16,
- PARTITIONS 4,
- SNAPSHOT ROUNDS 3,
- TRANSACTIONAL SNAPSHOT false,
- SEED 123,
- VALUE SIZE 10,
- BATCH SIZE 2
- )
- INCLUDE
- OFFSET AS something_else,
- KEY AS whatever
- ENVELOPE NONE
- > SELECT partition, MAX(something_else) FROM kv_offset2 GROUP BY partition
- 0 5
- 1 5
- 2 5
- 3 5
- > SELECT MAX(whatever) FROM kv_offset2;
- 15
|