# Copyright Materialize, Inc. and contributors. All rights reserved. # # Use of this software is governed by the Business Source License # included in the LICENSE file at the root of this repository. # # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. $ set-arg-default default-storage-size=1 # Specify the behaviour of the status history tables $ set-regex match="\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d(\.\d\d\d)?" replacement="" > DROP CLUSTER IF EXISTS c CASCADE > CREATE CLUSTER c SIZE '1', REPLICATION FACTOR 0 > CREATE SOURCE counter in cluster c FROM LOAD GENERATOR COUNTER (UP TO 100) $ set-from-sql var=load_id SELECT id FROM mz_sources WHERE name = 'counter' > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 1; " UTC" ${load_id} paused "{\"hints\":[\"There is currently no replica running this source\"]}" > ALTER CLUSTER c SET (REPLICATION FACTOR 1) $ set-from-sql var=replica_id SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'c' > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 3; " UTC" ${load_id} running ${replica_id} " UTC" ${load_id} starting ${replica_id} " UTC" ${load_id} paused "{\"hints\":[\"There is currently no replica running this source\"]}" > ALTER CLUSTER c SET (REPLICATION FACTOR 0) > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 4; " UTC" ${load_id} paused "{\"hints\":[\"The replica running this source has been dropped\"]}" ${replica_id} " UTC" ${load_id} running ${replica_id} " UTC" ${load_id} starting ${replica_id} " UTC" ${load_id} paused "{\"hints\":[\"There is currently no replica running this source\"]}" > DROP CLUSTER c CASCADE $ kafka-create-topic topic=status-history > CREATE CONNECTION kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL '${testdrive.schema-registry-url}' ); ## The basics: create a source and sink, pass in some data, and confirm that we see the status ## entries we expect. > CREATE CLUSTER kafka_source_cluster SIZE '${arg.default-storage-size}'; > CREATE SOURCE kafka_source IN CLUSTER kafka_source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}') FORMAT TEXT > CREATE CLUSTER kafka_sink_cluster SIZE '${arg.default-storage-size}'; > CREATE SINK kafka_sink IN CLUSTER kafka_sink_cluster FROM kafka_source INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${testdrive.seed}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM $ set-from-sql var=source_id SELECT id FROM mz_sources WHERE name = 'kafka_source' $ set-from-sql var=source_replica_id SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_source_cluster' $ set-from-sql var=sink_replica_id SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_sink_cluster' > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 2; " UTC" ${source_id} running ${source_replica_id} " UTC" ${source_id} starting ${source_replica_id} > SELECT * FROM mz_internal.mz_source_statuses WHERE id = '${source_id}'; "${source_id}" kafka_source kafka " UTC" running $ set-from-sql var=sink_id SELECT id FROM mz_sinks WHERE name = 'kafka_sink' # Verify we get a starting -- it's possible we move to running by the time this query runs. # Additionally it can happen that both 'starting' and 'running' are reported on the same millisecond # so we filter out any other statuses. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' AND status = 'starting' ORDER BY occurred_at ASC LIMIT 1; " UTC" ${sink_id} starting ${sink_replica_id} $ kafka-ingest format=bytes topic=status-history a b c d > SELECT * FROM kafka_source ORDER BY 1; a b c d $ kafka-verify-data format=avro sink=materialize.public.kafka_sink sort-messages=true {"before": null, "after": {"row":{"text": "a"}}} {"before": null, "after": {"row":{"text": "b"}}} {"before": null, "after": {"row":{"text": "c"}}} {"before": null, "after": {"row":{"text": "d"}}} > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' ORDER BY occurred_at DESC LIMIT 2; " UTC" ${sink_id} running ${sink_replica_id} " UTC" ${sink_id} starting ${sink_replica_id} > SELECT * FROM mz_internal.mz_sink_statuses WHERE id = '${sink_id}'; "${sink_id}" kafka_sink kafka " UTC" running > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 2; " UTC" ${source_id} starting ${source_replica_id} " UTC" ${source_id} running ${source_replica_id} > SELECT * FROM mz_internal.mz_source_statuses WHERE id = '${source_id}'; "${source_id}" kafka_source kafka " UTC" running ## Confirm that the tables report statuses for multiple sources and sinks. > CREATE CLUSTER kafka_source_2_cluster SIZE '${arg.default-storage-size}'; > CREATE SOURCE kafka_source_2 IN CLUSTER kafka_source_2_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}') FORMAT TEXT > CREATE CLUSTER kafka_sink_2_cluster SIZE '${arg.default-storage-size}'; > CREATE SINK kafka_sink_2 IN CLUSTER kafka_sink_2_cluster FROM kafka_source_2 INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-2-${testdrive.seed}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM $ set-from-sql var=source_id_2 SELECT id FROM mz_sources WHERE name = 'kafka_source_2' $ set-from-sql var=sink_id_2 SELECT id FROM mz_sinks WHERE name = 'kafka_sink_2' $ set-from-sql var=source_2_replica_id SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_source_2_cluster' $ set-from-sql var=sink_2_replica_id SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_sink_2_cluster' > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id_2}' ORDER BY occurred_at DESC LIMIT 2; " UTC" ${sink_id_2} starting ${sink_2_replica_id} " UTC" ${sink_id_2} running ${sink_2_replica_id} > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id_2}' ORDER BY occurred_at DESC LIMIT 2; " UTC" ${source_id_2} running ${source_2_replica_id} " UTC" ${source_id_2} starting ${source_2_replica_id} > SELECT * FROM mz_internal.mz_sink_statuses WHERE id IN ('${sink_id}', '${sink_id_2}') ORDER BY id; "${sink_id}" kafka_sink kafka " UTC" running "${sink_id_2}" kafka_sink_2 kafka " UTC" running > SELECT * FROM mz_internal.mz_source_statuses WHERE id IN ('${source_id}', '${source_id_2}') ORDER BY id; "${source_id}" kafka_source kafka " UTC" running "${source_id_2}" kafka_source_2 kafka " UTC" running # ensure `dropped` also shows up > DROP SINK kafka_sink > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' ORDER BY occurred_at DESC LIMIT 3; " UTC" ${sink_id} dropped " UTC" ${sink_id} running ${sink_replica_id} " UTC" ${sink_id} starting ${sink_replica_id} > DROP SOURCE kafka_source CASCADE > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 3; " UTC" ${source_id} dropped " UTC" ${source_id} running ${source_replica_id} " UTC" ${source_id} starting ${source_replica_id}