status-history.td 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Specify the behaviour of the status history tables
  11. $ set-regex match="\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d(\.\d\d\d)?" replacement="<TIMESTAMP>"
  12. > DROP CLUSTER IF EXISTS c CASCADE
  13. > CREATE CLUSTER c SIZE '1', REPLICATION FACTOR 0
  14. > CREATE SOURCE counter in cluster c FROM LOAD GENERATOR COUNTER (UP TO 100)
  15. $ set-from-sql var=load_id
  16. SELECT id FROM mz_sources WHERE name = 'counter'
  17. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 1;
  18. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"There is currently no replica running this source\"]}" <null>
  19. > ALTER CLUSTER c SET (REPLICATION FACTOR 1)
  20. $ set-from-sql var=replica_id
  21. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'c'
  22. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 3;
  23. "<TIMESTAMP> UTC" ${load_id} running <null> <null> ${replica_id}
  24. "<TIMESTAMP> UTC" ${load_id} starting <null> <null> ${replica_id}
  25. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"There is currently no replica running this source\"]}" <null>
  26. > ALTER CLUSTER c SET (REPLICATION FACTOR 0)
  27. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 4;
  28. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"The replica running this source has been dropped\"]}" ${replica_id}
  29. "<TIMESTAMP> UTC" ${load_id} running <null> <null> ${replica_id}
  30. "<TIMESTAMP> UTC" ${load_id} starting <null> <null> ${replica_id}
  31. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"There is currently no replica running this source\"]}" <null>
  32. > DROP CLUSTER c CASCADE
  33. $ kafka-create-topic topic=status-history
  34. > CREATE CONNECTION kafka_conn
  35. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  36. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  37. URL '${testdrive.schema-registry-url}'
  38. );
  39. ## The basics: create a source and sink, pass in some data, and confirm that we see the status
  40. ## entries we expect.
  41. > CREATE CLUSTER kafka_source_cluster SIZE '${arg.default-storage-size}';
  42. > CREATE SOURCE kafka_source
  43. IN CLUSTER kafka_source_cluster
  44. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}')
  45. > CREATE TABLE kafka_source_tbl FROM SOURCE kafka_source (REFERENCE "testdrive-status-history-${testdrive.seed}")
  46. FORMAT TEXT
  47. > CREATE CLUSTER kafka_sink_cluster SIZE '${arg.default-storage-size}';
  48. > CREATE SINK kafka_sink
  49. IN CLUSTER kafka_sink_cluster
  50. FROM kafka_source_tbl
  51. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${testdrive.seed}')
  52. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  53. ENVELOPE DEBEZIUM
  54. $ set-from-sql var=source_id
  55. SELECT id FROM mz_sources WHERE name = 'kafka_source'
  56. $ set-from-sql var=source_replica_id
  57. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_source_cluster'
  58. $ set-from-sql var=sink_replica_id
  59. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_sink_cluster'
  60. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 2;
  61. "<TIMESTAMP> UTC" ${source_id} running <null> <null> ${source_replica_id}
  62. "<TIMESTAMP> UTC" ${source_id} starting <null> <null> ${source_replica_id}
  63. > SELECT * FROM mz_internal.mz_source_statuses WHERE id = '${source_id}';
  64. "${source_id}" kafka_source kafka "<TIMESTAMP> UTC" running <null> <null>
  65. $ set-from-sql var=sink_id
  66. SELECT id FROM mz_sinks WHERE name = 'kafka_sink'
  67. # Verify we get a starting -- it's possible we move to running by the time this query runs.
  68. # Additionally it can happen that both 'starting' and 'running' are reported on the same millisecond
  69. # so we filter out any other statuses.
  70. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' AND status = 'starting' ORDER BY occurred_at ASC LIMIT 1;
  71. "<TIMESTAMP> UTC" ${sink_id} starting <null> <null> ${sink_replica_id}
  72. $ kafka-ingest format=bytes topic=status-history
  73. a
  74. b
  75. c
  76. d
  77. > SELECT * FROM kafka_source_tbl ORDER BY 1;
  78. a
  79. b
  80. c
  81. d
  82. $ kafka-verify-data format=avro sink=materialize.public.kafka_sink sort-messages=true
  83. {"before": null, "after": {"row":{"text": "a"}}}
  84. {"before": null, "after": {"row":{"text": "b"}}}
  85. {"before": null, "after": {"row":{"text": "c"}}}
  86. {"before": null, "after": {"row":{"text": "d"}}}
  87. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' ORDER BY occurred_at DESC LIMIT 2;
  88. "<TIMESTAMP> UTC" ${sink_id} running <null> <null> ${sink_replica_id}
  89. "<TIMESTAMP> UTC" ${sink_id} starting <null> <null> ${sink_replica_id}
  90. > SELECT * FROM mz_internal.mz_sink_statuses WHERE id = '${sink_id}';
  91. "${sink_id}" kafka_sink kafka "<TIMESTAMP> UTC" running <null> <null>
  92. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 2;
  93. "<TIMESTAMP> UTC" ${source_id} starting <null> <null> ${source_replica_id}
  94. "<TIMESTAMP> UTC" ${source_id} running <null> <null> ${source_replica_id}
  95. > SELECT * FROM mz_internal.mz_source_statuses WHERE id = '${source_id}';
  96. "${source_id}" kafka_source kafka "<TIMESTAMP> UTC" running <null> <null>
  97. ## Confirm that the tables report statuses for multiple sources and sinks.
  98. > CREATE CLUSTER kafka_source_2_cluster SIZE '${arg.default-storage-size}';
  99. > CREATE SOURCE kafka_source_2
  100. IN CLUSTER kafka_source_2_cluster
  101. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}')
  102. > CREATE TABLE kafka_source_2_tbl FROM SOURCE kafka_source_2 (REFERENCE "testdrive-status-history-${testdrive.seed}")
  103. FORMAT TEXT
  104. > CREATE CLUSTER kafka_sink_2_cluster SIZE '${arg.default-storage-size}';
  105. > CREATE SINK kafka_sink_2
  106. IN CLUSTER kafka_sink_2_cluster
  107. FROM kafka_source_2_tbl
  108. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-2-${testdrive.seed}')
  109. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  110. ENVELOPE DEBEZIUM
  111. $ set-from-sql var=source_id_2
  112. SELECT id FROM mz_sources WHERE name = 'kafka_source_2'
  113. $ set-from-sql var=sink_id_2
  114. SELECT id FROM mz_sinks WHERE name = 'kafka_sink_2'
  115. $ set-from-sql var=source_2_replica_id
  116. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_source_2_cluster'
  117. $ set-from-sql var=sink_2_replica_id
  118. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_sink_2_cluster'
  119. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id_2}' ORDER BY occurred_at DESC LIMIT 2;
  120. "<TIMESTAMP> UTC" ${sink_id_2} starting <null> <null> ${sink_2_replica_id}
  121. "<TIMESTAMP> UTC" ${sink_id_2} running <null> <null> ${sink_2_replica_id}
  122. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id_2}' ORDER BY occurred_at DESC LIMIT 2;
  123. "<TIMESTAMP> UTC" ${source_id_2} running <null> <null> ${source_2_replica_id}
  124. "<TIMESTAMP> UTC" ${source_id_2} starting <null> <null> ${source_2_replica_id}
  125. > SELECT * FROM mz_internal.mz_sink_statuses WHERE id IN ('${sink_id}', '${sink_id_2}') ORDER BY id;
  126. "${sink_id}" kafka_sink kafka "<TIMESTAMP> UTC" running <null> <null>
  127. "${sink_id_2}" kafka_sink_2 kafka "<TIMESTAMP> UTC" running <null> <null>
  128. > SELECT * FROM mz_internal.mz_source_statuses WHERE id IN ('${source_id}', '${source_id_2}') ORDER BY id;
  129. "${source_id}" kafka_source kafka "<TIMESTAMP> UTC" running <null> <null>
  130. "${source_id_2}" kafka_source_2 kafka "<TIMESTAMP> UTC" running <null> <null>
  131. # ensure `dropped` also shows up
  132. > DROP SINK kafka_sink
  133. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' ORDER BY occurred_at DESC LIMIT 3;
  134. "<TIMESTAMP> UTC" ${sink_id} dropped <null> <null> <null>
  135. "<TIMESTAMP> UTC" ${sink_id} running <null> <null> ${sink_replica_id}
  136. "<TIMESTAMP> UTC" ${sink_id} starting <null> <null> ${sink_replica_id}
  137. > DROP SOURCE kafka_source CASCADE
  138. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 3;
  139. "<TIMESTAMP> UTC" ${source_id} dropped <null> <null> <null>
  140. "<TIMESTAMP> UTC" ${source_id} running <null> <null> ${source_replica_id}
  141. "<TIMESTAMP> UTC" ${source_id} starting <null> <null> ${source_replica_id}