status-history.td 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. # Specify the behaviour of the status history tables
  11. $ set-regex match="\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d(\.\d\d\d)?" replacement="<TIMESTAMP>"
  12. > DROP CLUSTER IF EXISTS c CASCADE
  13. > CREATE CLUSTER c SIZE '1', REPLICATION FACTOR 0
  14. > CREATE SOURCE counter in cluster c FROM LOAD GENERATOR COUNTER (UP TO 100)
  15. $ set-from-sql var=load_id
  16. SELECT id FROM mz_sources WHERE name = 'counter'
  17. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 1;
  18. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"There is currently no replica running this source\"]}" <null>
  19. > ALTER CLUSTER c SET (REPLICATION FACTOR 1)
  20. $ set-from-sql var=replica_id
  21. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'c'
  22. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 3;
  23. "<TIMESTAMP> UTC" ${load_id} running <null> <null> ${replica_id}
  24. "<TIMESTAMP> UTC" ${load_id} starting <null> <null> ${replica_id}
  25. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"There is currently no replica running this source\"]}" <null>
  26. > ALTER CLUSTER c SET (REPLICATION FACTOR 0)
  27. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${load_id}' ORDER BY occurred_at DESC LIMIT 4;
  28. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"The replica running this source has been dropped\"]}" ${replica_id}
  29. "<TIMESTAMP> UTC" ${load_id} running <null> <null> ${replica_id}
  30. "<TIMESTAMP> UTC" ${load_id} starting <null> <null> ${replica_id}
  31. "<TIMESTAMP> UTC" ${load_id} paused <null> "{\"hints\":[\"There is currently no replica running this source\"]}" <null>
  32. > DROP CLUSTER c CASCADE
  33. $ kafka-create-topic topic=status-history
  34. > CREATE CONNECTION kafka_conn
  35. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  36. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  37. URL '${testdrive.schema-registry-url}'
  38. );
  39. ## The basics: create a source and sink, pass in some data, and confirm that we see the status
  40. ## entries we expect.
  41. > CREATE CLUSTER kafka_source_cluster SIZE '${arg.default-storage-size}';
  42. > CREATE SOURCE kafka_source
  43. IN CLUSTER kafka_source_cluster
  44. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}')
  45. FORMAT TEXT
  46. > CREATE CLUSTER kafka_sink_cluster SIZE '${arg.default-storage-size}';
  47. > CREATE SINK kafka_sink
  48. IN CLUSTER kafka_sink_cluster
  49. FROM kafka_source
  50. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-${testdrive.seed}')
  51. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  52. ENVELOPE DEBEZIUM
  53. $ set-from-sql var=source_id
  54. SELECT id FROM mz_sources WHERE name = 'kafka_source'
  55. $ set-from-sql var=source_replica_id
  56. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_source_cluster'
  57. $ set-from-sql var=sink_replica_id
  58. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_sink_cluster'
  59. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 2;
  60. "<TIMESTAMP> UTC" ${source_id} running <null> <null> ${source_replica_id}
  61. "<TIMESTAMP> UTC" ${source_id} starting <null> <null> ${source_replica_id}
  62. > SELECT * FROM mz_internal.mz_source_statuses WHERE id = '${source_id}';
  63. "${source_id}" kafka_source kafka "<TIMESTAMP> UTC" running <null> <null>
  64. $ set-from-sql var=sink_id
  65. SELECT id FROM mz_sinks WHERE name = 'kafka_sink'
  66. # Verify we get a starting -- it's possible we move to running by the time this query runs.
  67. # Additionally it can happen that both 'starting' and 'running' are reported on the same millisecond
  68. # so we filter out any other statuses.
  69. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' AND status = 'starting' ORDER BY occurred_at ASC LIMIT 1;
  70. "<TIMESTAMP> UTC" ${sink_id} starting <null> <null> ${sink_replica_id}
  71. $ kafka-ingest format=bytes topic=status-history
  72. a
  73. b
  74. c
  75. d
  76. > SELECT * FROM kafka_source ORDER BY 1;
  77. a
  78. b
  79. c
  80. d
  81. $ kafka-verify-data format=avro sink=materialize.public.kafka_sink sort-messages=true
  82. {"before": null, "after": {"row":{"text": "a"}}}
  83. {"before": null, "after": {"row":{"text": "b"}}}
  84. {"before": null, "after": {"row":{"text": "c"}}}
  85. {"before": null, "after": {"row":{"text": "d"}}}
  86. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' ORDER BY occurred_at DESC LIMIT 2;
  87. "<TIMESTAMP> UTC" ${sink_id} running <null> <null> ${sink_replica_id}
  88. "<TIMESTAMP> UTC" ${sink_id} starting <null> <null> ${sink_replica_id}
  89. > SELECT * FROM mz_internal.mz_sink_statuses WHERE id = '${sink_id}';
  90. "${sink_id}" kafka_sink kafka "<TIMESTAMP> UTC" running <null> <null>
  91. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 2;
  92. "<TIMESTAMP> UTC" ${source_id} starting <null> <null> ${source_replica_id}
  93. "<TIMESTAMP> UTC" ${source_id} running <null> <null> ${source_replica_id}
  94. > SELECT * FROM mz_internal.mz_source_statuses WHERE id = '${source_id}';
  95. "${source_id}" kafka_source kafka "<TIMESTAMP> UTC" running <null> <null>
  96. ## Confirm that the tables report statuses for multiple sources and sinks.
  97. > CREATE CLUSTER kafka_source_2_cluster SIZE '${arg.default-storage-size}';
  98. > CREATE SOURCE kafka_source_2
  99. IN CLUSTER kafka_source_2_cluster
  100. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-status-history-${testdrive.seed}')
  101. FORMAT TEXT
  102. > CREATE CLUSTER kafka_sink_2_cluster SIZE '${arg.default-storage-size}';
  103. > CREATE SINK kafka_sink_2
  104. IN CLUSTER kafka_sink_2_cluster
  105. FROM kafka_source_2
  106. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-2-${testdrive.seed}')
  107. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  108. ENVELOPE DEBEZIUM
  109. $ set-from-sql var=source_id_2
  110. SELECT id FROM mz_sources WHERE name = 'kafka_source_2'
  111. $ set-from-sql var=sink_id_2
  112. SELECT id FROM mz_sinks WHERE name = 'kafka_sink_2'
  113. $ set-from-sql var=source_2_replica_id
  114. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_source_2_cluster'
  115. $ set-from-sql var=sink_2_replica_id
  116. SELECT r.id FROM mz_clusters c JOIN mz_cluster_replicas r ON c.id = r.cluster_id WHERE c.name = 'kafka_sink_2_cluster'
  117. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id_2}' ORDER BY occurred_at DESC LIMIT 2;
  118. "<TIMESTAMP> UTC" ${sink_id_2} starting <null> <null> ${sink_2_replica_id}
  119. "<TIMESTAMP> UTC" ${sink_id_2} running <null> <null> ${sink_2_replica_id}
  120. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id_2}' ORDER BY occurred_at DESC LIMIT 2;
  121. "<TIMESTAMP> UTC" ${source_id_2} running <null> <null> ${source_2_replica_id}
  122. "<TIMESTAMP> UTC" ${source_id_2} starting <null> <null> ${source_2_replica_id}
  123. > SELECT * FROM mz_internal.mz_sink_statuses WHERE id IN ('${sink_id}', '${sink_id_2}') ORDER BY id;
  124. "${sink_id}" kafka_sink kafka "<TIMESTAMP> UTC" running <null> <null>
  125. "${sink_id_2}" kafka_sink_2 kafka "<TIMESTAMP> UTC" running <null> <null>
  126. > SELECT * FROM mz_internal.mz_source_statuses WHERE id IN ('${source_id}', '${source_id_2}') ORDER BY id;
  127. "${source_id}" kafka_source kafka "<TIMESTAMP> UTC" running <null> <null>
  128. "${source_id_2}" kafka_source_2 kafka "<TIMESTAMP> UTC" running <null> <null>
  129. # ensure `dropped` also shows up
  130. > DROP SINK kafka_sink
  131. > SELECT * FROM mz_internal.mz_sink_status_history WHERE sink_id = '${sink_id}' ORDER BY occurred_at DESC LIMIT 3;
  132. "<TIMESTAMP> UTC" ${sink_id} dropped <null> <null> <null>
  133. "<TIMESTAMP> UTC" ${sink_id} running <null> <null> ${sink_replica_id}
  134. "<TIMESTAMP> UTC" ${sink_id} starting <null> <null> ${sink_replica_id}
  135. > DROP SOURCE kafka_source CASCADE
  136. > SELECT * FROM mz_internal.mz_source_status_history WHERE source_id = '${source_id}' ORDER BY occurred_at DESC LIMIT 3;
  137. "<TIMESTAMP> UTC" ${source_id} dropped <null> <null> <null>
  138. "<TIMESTAMP> UTC" ${source_id} running <null> <null> ${source_replica_id}
  139. "<TIMESTAMP> UTC" ${source_id} starting <null> <null> ${source_replica_id}