123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- $ set-arg-default single-replica-cluster=quickstart
- $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
- ALTER SYSTEM SET storage_statistics_collection_interval = 1000
- ALTER SYSTEM SET storage_statistics_interval = 2000
- $ set keyschema={
- "type": "record",
- "name": "Key",
- "fields": [
- {"name": "key", "type": "string"}
- ]
- }
- $ set schema={
- "type" : "record",
- "name" : "test",
- "fields" : [
- {"name":"f1", "type":"string"},
- {"name":"f2", "type":"long"}
- ]
- }
- $ kafka-create-topic topic=upsert partitions=1
- # The length of `moosemooose` must be longer than `moose` to ensure a tombstone doesn't HAPPEN
- # to have the same size.
- $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} schema=${schema}
- {"key": "fish"} {"f1": "fish", "f2": 1000}
- {"key": "bird1"} {"f1":"goose", "f2": 1}
- {"key": "birdmore"} {"f1":"geese", "f2": 2}
- {"key": "mammal1"} {"f1": "moose", "f2": 1}
- {"key": "bird1"}
- {"key": "birdmore"} {"f1":"geese", "f2": 56}
- {"key": "mammalmore"} {"f1": "moose", "f2": 42}
- {"key": "mammal1"}
- {"key": "mammalmore"} {"f1":"moosemoose", "f2": 2}
- $ kafka-create-topic topic=metrics-test partitions=1
- $ kafka-ingest topic=metrics-test format=bytes
- jack,jill
- goofus,gallant
- > CREATE CONNECTION kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
- > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- );
- > CREATE SOURCE metrics_test_source
- IN CLUSTER ${arg.single-replica-cluster}
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-metrics-test-${testdrive.seed}')
- > CREATE TABLE metrics_test_source_tbl (a, b) FROM SOURCE metrics_test_source (REFERENCE "testdrive-metrics-test-${testdrive.seed}")
- FORMAT CSV WITH 2 COLUMNS
- INCLUDE OFFSET
- > SELECT * FROM metrics_test_source_tbl
- jack jill 0
- goofus gallant 1
- # The `CREATE TABLE ... FROM SOURCE` command caused a recreation of the source
- # dataflow, during which we might have lost the statistics about committed
- # updates from the snapshot. Ingest some more data to ensure we see some
- # `updates_committed`.
- $ kafka-ingest topic=metrics-test format=bytes
- calvin,hobbes
- > SELECT
- s.name,
- bool_and(u.snapshot_committed),
- SUM(u.messages_received) >= 4,
- SUM(u.updates_staged) > 0,
- SUM(u.updates_committed) > 0,
- SUM(u.bytes_received) > 0,
- bool_and(u.rehydration_latency IS NOT NULL)
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
- WHERE s.name IN ('metrics_test_source')
- GROUP BY s.name
- ORDER BY s.name
- metrics_test_source true true true true true true
- > DROP SOURCE metrics_test_source CASCADE
- > CREATE SOURCE upsert
- IN CLUSTER ${arg.single-replica-cluster}
- FROM KAFKA CONNECTION kafka_conn (TOPIC
- 'testdrive-upsert-${testdrive.seed}'
- )
- > CREATE TABLE upsert_tbl FROM SOURCE upsert (REFERENCE "testdrive-upsert-${testdrive.seed}")
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- INCLUDE OFFSET
- ENVELOPE UPSERT
- # Adding a select here so that the ingests after this
- # triggers lookup from the upsert state
- > SELECT key, f1, f2 FROM upsert_tbl
- key f1 f2
- ------------------------
- fish fish 1000
- birdmore geese 56
- mammalmore moosemoose 2
- # Also test a source with multiple sub-sources.
- # NOTE: We give this source a unique name because we want to query for it with
- # a `SUBSCRIBE ... AS OF AT LEAST 0` below and want to avoid receiving results
- # for sources created by previous tests.
- > CREATE SOURCE auction_house_in_source_statistics_td
- IN CLUSTER ${arg.single-replica-cluster}
- FROM LOAD GENERATOR AUCTION;
- > CREATE TABLE accounts FROM SOURCE auction_house_in_source_statistics_td (REFERENCE accounts);
- > CREATE TABLE auctions FROM SOURCE auction_house_in_source_statistics_td (REFERENCE auctions);
- > CREATE TABLE bids FROM SOURCE auction_house_in_source_statistics_td (REFERENCE bids);
- > CREATE TABLE organizations FROM SOURCE auction_house_in_source_statistics_td (REFERENCE organizations);
- > CREATE TABLE users FROM SOURCE auction_house_in_source_statistics_td (REFERENCE users);
- # Note that only the base-source has `messages_received`, but the sub-sources have `messages_committed`.
- # Committed will usually, for auction sources, add up to received, but we don't test this (right now) because of
- # jitter on when metrics are produced for each sub-source.
- > SELECT
- s.name,
- bool_and(u.snapshot_committed),
- SUM(u.messages_received) > 0,
- SUM(u.updates_staged) > 0,
- SUM(u.updates_committed) > 0,
- SUM(u.bytes_received) > 0,
- bool_and(u.rehydration_latency IS NOT NULL)
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
- WHERE s.name IN ('accounts', 'auction_house_in_source_statistics_td', 'auctions', 'bids', 'organizations', 'users')
- GROUP BY s.name
- ORDER BY s.name
- auction_house_in_source_statistics_td true true false false true true
- # Assert that the codepath that ensures a 0-value as the first value in `mz_source_statistics` occurs, using a `SUBSCRIBE`.
- # Sinks and subsources use the same codepath.
- $ set-regex match=\d{13} replacement=<TIMESTAMP>
- > BEGIN
- > DECLARE c CURSOR FOR SUBSCRIBE (
- SELECT u.messages_received
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics_with_history u ON s.id = u.id
- WHERE s.name = 'auction_house_in_source_statistics_td'
- )
- AS OF AT LEAST 0
- > FETCH 1 c;
- <TIMESTAMP> 1 0
- > COMMIT
- > DROP SOURCE auction_house_in_source_statistics_td CASCADE
- # Test upsert
- # Note that we always count 9 messages received (18 with source tables), but can see as low as 3 updates.
- # This is because there are 3 active keys, as all the messages could be received in 1 second.
- # There could also be up to 11 updates, as updates cause inserts and deletes
- # (5 initial inserts, 2 deletes, and 2 updates). In total 3 records should be present in upsert state.
- # We can't control this, so have to accept the range.
- > SELECT
- s.name,
- SUM(u.messages_received) >= 18,
- SUM(u.bytes_received) > 0
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
- WHERE s.name IN ('upsert')
- GROUP BY s.name
- ORDER BY s.name
- upsert true true
- # Upsert stats are on the UPSERT sub source/table, not the main source.
- > SELECT
- t.name,
- bool_and(u.snapshot_committed),
- SUM(u.updates_staged) BETWEEN 3 AND 11,
- SUM(u.updates_committed) BETWEEN 3 AND 11,
- SUM(u.bytes_indexed) > 0,
- SUM(u.records_indexed),
- bool_and(u.rehydration_latency IS NOT NULL)
- FROM mz_tables t
- JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
- WHERE t.name IN ('upsert_tbl')
- GROUP BY t.name
- ORDER BY t.name
- upsert_tbl true true true true 3 true
- # While we can't control how batching works above, we can ensure that this new, later update
- # causes 1 more messages to be received, which is 1 update, a delete.
- # We use `set-from-sql` to assert this. We will also use this to validate that the
- # `bytes_indexed` value goes down because of the delete.
- $ set-from-sql var=updates-committed
- SELECT
- (SUM(u.updates_committed) + 1)::text
- FROM mz_tables t
- JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
- WHERE t.name IN ('upsert_tbl')
- $ set-from-sql var=state-bytes
- SELECT
- (SUM(u.bytes_indexed))::text
- FROM mz_tables t
- JOIN mz_internal.mz_source_statistics u ON t.id = u.id
- WHERE t.name IN ('upsert_tbl')
- $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} schema=${schema}
- {"key": "mammalmore"}
- > SELECT s.name,
- SUM(u.messages_received) >= 20,
- SUM(u.bytes_received) > 0
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
- WHERE s.name IN ('upsert')
- GROUP BY s.name
- ORDER BY s.name
- upsert true true
- > SELECT t.name,
- bool_and(u.snapshot_committed),
- SUM(u.updates_staged),
- SUM(u.updates_committed),
- SUM(u.bytes_indexed) < ${state-bytes},
- SUM(u.records_indexed)
- FROM mz_tables t
- JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
- WHERE t.name IN ('upsert_tbl')
- GROUP BY t.name
- ORDER BY t.name
- upsert_tbl true "${updates-committed}" "${updates-committed}" true 2
- # check pre-aggregated view
- > SELECT s.name,
- u.messages_received >= 20,
- u.bytes_received > 0
- FROM mz_sources s
- JOIN mz_internal.mz_source_statistics u ON s.id = u.id
- WHERE s.name IN ('upsert')
- upsert true true
- > SELECT t.name,
- u.snapshot_committed,
- u.updates_staged,
- u.updates_committed,
- u.bytes_indexed < ${state-bytes},
- u.records_indexed,
- u.rehydration_latency IS NOT NULL
- FROM mz_tables t
- JOIN mz_internal.mz_source_statistics u ON t.id = u.id
- WHERE t.name IN ('upsert_tbl')
- upsert_tbl true "${updates-committed}" "${updates-committed}" true 2 true
- > DROP SOURCE upsert CASCADE
- # should be empty because the source was dropped
- > SELECT count(*) FROM mz_internal.mz_source_statistics;
- 0
|