123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- # Test reporting of controller frontiers through `mz_internal.mz_frontiers` and
- # `mz_cluster_replica_frontiers`.
- #
- # These tests rely on testdrive's retry feature, as they query introspection
- # sources whose data might not be immediately available.
- > DROP CLUSTER IF EXISTS test
- > DROP CLUSTER IF EXISTS test_source
- > CREATE CLUSTER test REPLICAS (
- r1 (SIZE '1'),
- r2 (SIZE '1')
- )
- > CREATE CLUSTER test_source REPLICAS (
- s1 (SIZE '1')
- )
- > SET cluster = test;
- > CREATE TABLE t1 (a int)
- > INSERT INTO t1 VALUES (1)
- # Test that frontiers of materialized views are reported.
- > CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM t1
- > SELECT
- mvs.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_materialized_views mvs
- ON frontiers.object_id = mvs.id
- LEFT JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.write_frontier > 0
- mv1 r1
- mv1 r2
- > SELECT
- mvs.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_materialized_views mvs
- ON frontiers.object_id = mvs.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0
- mv1
- # Test that frontiers of indexes are reported.
- > CREATE INDEX idx1 ON t1 (a)
- > SELECT
- indexes.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_indexes indexes
- ON frontiers.object_id = indexes.id
- LEFT JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.write_frontier > 0
- idx1 r1
- idx1 r2
- > SELECT
- indexes.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_indexes indexes
- ON frontiers.object_id = indexes.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0
- idx1
- # Test that frontiers of continual tasks are reported.
- > CREATE CONTINUAL TASK ct1 (a int) ON INPUT t1 AS (
- INSERT INTO ct1 SELECT * FROM t1
- )
- > SELECT
- cts.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_internal.mz_continual_tasks cts
- ON frontiers.object_id = cts.id
- LEFT JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.write_frontier > 0
- ct1 r1
- ct1 r2
- > SELECT
- cts.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_internal.mz_continual_tasks cts
- ON frontiers.object_id = cts.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0
- ct1
- # Test that frontiers of sources are reported.
- > CREATE SOURCE source1
- IN CLUSTER test_source
- FROM LOAD GENERATOR COUNTER (UP TO 100)
- > SELECT
- sources.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_sources sources
- ON frontiers.object_id = sources.id
- LEFT JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.write_frontier > 0
- source1 s1
- > SELECT
- sources.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_sources sources
- ON frontiers.object_id = sources.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0
- source1
- source1_progress
- # Test that frontiers of sinks are reported.
- > CREATE CONNECTION kafka_conn
- TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
- > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
- URL '${testdrive.schema-registry-url}'
- )
- > CREATE SINK sink1
- IN CLUSTER test_source
- FROM t1
- INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
- FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
- ENVELOPE DEBEZIUM
- > SELECT
- sinks.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_sinks sinks
- ON frontiers.object_id = sinks.id
- LEFT JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.write_frontier > 0
- sink1 s1
- > SELECT
- sinks.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_sinks sinks
- ON frontiers.object_id = sinks.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- (frontiers.read_frontier > 0 OR frontiers.read_frontier IS NULL) AND
- frontiers.write_frontier > 0
- sink1
- # Test that the frontiers of introspection sources are reported.
- > SELECT
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_indexes indexes
- ON frontiers.object_id = indexes.id
- JOIN mz_clusters clusters
- ON indexes.cluster_id = clusters.id
- LEFT JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- indexes.name LIKE 'mz_active_peeks_per_worker_u%_primary_idx' AND
- frontiers.write_frontier > 0 AND
- clusters.name = 'test'
- r1
- r2
- > SELECT
- count(*)
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_indexes indexes
- ON frontiers.object_id = indexes.id
- JOIN mz_clusters clusters
- ON indexes.cluster_id = clusters.id
- WHERE
- indexes.name LIKE 'mz_active_peeks_per_worker_u%_primary_idx' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0 AND
- clusters.name = 'test'
- 1
- # Test that the frontiers of tables are reported.
- > SELECT
- tables.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_tables tables
- ON frontiers.object_id = tables.id
- WHERE
- frontiers.object_id LIKE 'u%' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0
- t1
- # Test that the frontiers of storage-managed collections are reported.
- > SELECT
- sources.name
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_sources sources
- ON frontiers.object_id = sources.id
- WHERE
- sources.name = 'mz_frontiers' AND
- frontiers.read_frontier > 0 AND
- frontiers.write_frontier > 0
- mz_frontiers
- # Test that frontiers are added when replicas are created.
- > SELECT
- objects.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_objects objects
- ON frontiers.object_id = objects.id
- JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- JOIN mz_clusters clusters
- ON replicas.cluster_id = clusters.id
- WHERE
- objects.id LIKE 'u%' AND
- frontiers.write_frontier > 0 AND
- clusters.name = 'test'
- ct1 r1
- ct1 r2
- idx1 r1
- idx1 r2
- mv1 r1
- mv1 r2
- > CREATE CLUSTER REPLICA test.r3 SIZE '1'
- > SELECT
- objects.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_objects objects
- ON frontiers.object_id = objects.id
- JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- JOIN mz_clusters clusters
- ON replicas.cluster_id = clusters.id
- WHERE
- objects.id LIKE 'u%' AND
- frontiers.write_frontier > 0 AND
- clusters.name = 'test'
- ct1 r1
- ct1 r2
- ct1 r3
- idx1 r1
- idx1 r2
- idx1 r3
- mv1 r1
- mv1 r2
- mv1 r3
- # Test that frontiers are removed when replicas are dropped.
- > DROP CLUSTER REPLICA test.r1
- > SELECT
- objects.name,
- replicas.name
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_objects objects
- ON frontiers.object_id = objects.id
- JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- JOIN mz_clusters clusters
- ON replicas.cluster_id = clusters.id
- WHERE
- objects.id LIKE 'u%' AND
- frontiers.write_frontier > 0 AND
- clusters.name = 'test'
- ct1 r2
- ct1 r3
- idx1 r2
- idx1 r3
- mv1 r2
- mv1 r3
- # Test that empty frontiers show up as NULL.
- > CREATE MATERIALIZED VIEW mv2 AS SELECT 1
- > SELECT
- replicas.name,
- frontiers.write_frontier
- FROM mz_cluster_replica_frontiers frontiers
- JOIN mz_materialized_views mvs
- ON frontiers.object_id = mvs.id
- JOIN mz_cluster_replicas replicas
- ON frontiers.replica_id = replicas.id
- WHERE
- mvs.name = 'mv2'
- r2 <null>
- r3 <null>
- > SELECT
- frontiers.read_frontier,
- frontiers.write_frontier
- FROM mz_internal.mz_frontiers frontiers
- JOIN mz_materialized_views mvs
- ON frontiers.object_id = mvs.id
- WHERE
- mvs.name = 'mv2'
- 0 <null>
- # Test that frontiers are removed when objects are dropped.
- > DROP MATERIALIZED VIEW mv1
- > DROP MATERIALIZED VIEW mv2
- > DROP INDEX idx1
- > DROP CONTINUAL TASK ct1
- > DROP SOURCE source1 CASCADE
- > DROP SINK sink1
- > DROP TABLE t1
- > SELECT *
- FROM mz_cluster_replica_frontiers frontiers
- WHERE object_id LIKE 'u%'
- > SELECT *
- FROM mz_internal.mz_frontiers frontiers
- WHERE object_id LIKE 'u%'
- # Test that frontiers are correctly initialized for collections on clusters
- # with zero replicas.
- > CREATE CLUSTER empty SIZE '1', REPLICATION FACTOR 0
- > CREATE TABLE t2 (a int)
- > CREATE INDEX idx2 IN CLUSTER empty ON t2 (a)
- > SELECT read_frontier > 0, read_frontier = write_frontier
- FROM mz_internal.mz_frontiers
- JOIN mz_indexes ON (id = object_id)
- WHERE name = 'idx2'
- true true
|