statistics-deletion.td 6.6 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  11. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  12. ALTER SYSTEM SET storage_statistics_interval = 1000
  13. > CREATE CLUSTER cluster1 REPLICAS (r1 (SIZE '2'))
  14. > CREATE CONNECTION kafka_conn
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  16. > CREATE TABLE t (a text, b text)
  17. > CREATE MATERIALIZED VIEW simple_view AS SELECT * from t;
  18. > INSERT INTO t VALUES ('key1', 'value')
  19. # Setup various sinks and sources
  20. > CREATE CLUSTER sink_size_cluster SIZE '2';
  21. > CREATE SINK sink_size
  22. IN CLUSTER sink_size_cluster
  23. FROM simple_view
  24. INTO KAFKA CONNECTION kafka_conn (TOPIC 'topic-${testdrive.seed}')
  25. KEY (a)
  26. FORMAT JSON
  27. ENVELOPE DEBEZIUM
  28. > CREATE SINK sink_cluster
  29. IN CLUSTER cluster1
  30. FROM simple_view
  31. INTO KAFKA CONNECTION kafka_conn (TOPIC 'topic-${testdrive.seed}')
  32. KEY (a)
  33. FORMAT JSON
  34. ENVELOPE DEBEZIUM
  35. $ kafka-create-topic topic=upsert partitions=1
  36. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  37. one:two
  38. > CREATE CLUSTER upsert_size_cluster SIZE '2';
  39. > CREATE SOURCE upsert_size
  40. IN CLUSTER upsert_size_cluster
  41. FROM KAFKA CONNECTION kafka_conn (TOPIC
  42. 'testdrive-upsert-${testdrive.seed}'
  43. )
  44. > CREATE TABLE upsert_size_tbl FROM SOURCE upsert_size (REFERENCE "testdrive-upsert-${testdrive.seed}")
  45. KEY FORMAT BYTES
  46. VALUE FORMAT BYTES
  47. ENVELOPE UPSERT
  48. > CREATE SOURCE upsert_cluster
  49. IN CLUSTER cluster1
  50. FROM KAFKA CONNECTION kafka_conn (TOPIC
  51. 'testdrive-upsert-${testdrive.seed}'
  52. )
  53. > CREATE TABLE upsert_cluster_tbl FROM SOURCE upsert_cluster (REFERENCE "testdrive-upsert-${testdrive.seed}")
  54. KEY FORMAT BYTES
  55. VALUE FORMAT BYTES
  56. ENVELOPE UPSERT
  57. > SELECT * FROM upsert_size_tbl
  58. one two
  59. > SELECT * FROM upsert_cluster_tbl
  60. one two
  61. > CREATE SCHEMA subsources_size;
  62. > CREATE SCHEMA subsources_cluster;
  63. > CREATE CLUSTER subsource_size_s_cluster SIZE '2';
  64. > CREATE SOURCE subsources_size.s IN CLUSTER subsource_size_s_cluster FROM LOAD GENERATOR AUCTION (UP TO 100);
  65. > CREATE SOURCE subsources_cluster.s IN CLUSTER cluster1 FROM LOAD GENERATOR AUCTION (UP TO 100);
  66. > CREATE TABLE subsources_size.accounts FROM SOURCE subsources_size.s (REFERENCE accounts);
  67. > CREATE TABLE subsources_size.auctions FROM SOURCE subsources_size.s (REFERENCE auctions);
  68. > CREATE TABLE subsources_size.bids FROM SOURCE subsources_size.s (REFERENCE bids);
  69. > CREATE TABLE subsources_size.organizations FROM SOURCE subsources_size.s (REFERENCE organizations);
  70. > CREATE TABLE subsources_size.users FROM SOURCE subsources_size.s (REFERENCE users);
  71. > CREATE TABLE subsources_cluster.accounts FROM SOURCE subsources_cluster.s (REFERENCE accounts);
  72. > CREATE TABLE subsources_cluster.auctions FROM SOURCE subsources_cluster.s (REFERENCE auctions);
  73. > CREATE TABLE subsources_cluster.bids FROM SOURCE subsources_cluster.s (REFERENCE bids);
  74. > CREATE TABLE subsources_cluster.organizations FROM SOURCE subsources_cluster.s (REFERENCE organizations);
  75. > CREATE TABLE subsources_cluster.users FROM SOURCE subsources_cluster.s (REFERENCE users);
  76. # The `CREATE TABLE ... FROM SOURCE` commands caused a recreation of the
  77. # respective source dataflows, during which we might have lost the statistics
  78. # about committed updates from the snapshot. Ingest some more data to ensure we
  79. # see some `updates_committed`.
  80. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  81. two:three
  82. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  83. FROM mz_sinks s
  84. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  85. WHERE s.name IN ('sink_size', 'sink_cluster')
  86. GROUP BY s.name
  87. sink_size 1 1 true true
  88. sink_cluster 1 1 true true
  89. > SELECT s.name,
  90. SUM(u.updates_committed) > 0
  91. FROM mz_sources s
  92. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  93. WHERE s.name IN ('upsert_size', 'upsert_cluster', 's', 'bids')
  94. GROUP BY s.id, s.name
  95. upsert_size true
  96. upsert_cluster true
  97. s false
  98. s false
  99. # We have to obtain these before we delete the sink.
  100. $ set-from-sql var=sink-size-id
  101. SELECT s.id
  102. FROM mz_sinks s
  103. WHERE s.name IN ('sink_size')
  104. $ set-from-sql var=sink-cluster-id
  105. SELECT s.id
  106. FROM mz_sinks s
  107. WHERE s.name IN ('sink_cluster')
  108. $ set-from-sql var=upsert-size-id
  109. SELECT s.id
  110. FROM mz_sources s
  111. WHERE s.name IN ('upsert_size')
  112. $ set-from-sql var=upsert-cluster-id
  113. SELECT s.id
  114. FROM mz_sources s
  115. WHERE s.name IN ('upsert_cluster')
  116. # We also need to check that subsources and top-level
  117. # sources are cleared out.
  118. $ set-from-sql var=subsources-size-top-level-id
  119. SELECT s.id
  120. FROM mz_sources s
  121. JOIN mz_schemas sch
  122. ON sch.id = s.schema_id
  123. WHERE s.name IN ('s') AND sch.name = 'subsources_size'
  124. $ set-from-sql var=subsources-size-bids-id
  125. SELECT t.id
  126. FROM mz_tables t
  127. JOIN mz_schemas sch
  128. ON sch.id = t.schema_id
  129. WHERE t.name IN ('bids') AND sch.name = 'subsources_size'
  130. $ set-from-sql var=subsources-cluster-top-level-id
  131. SELECT s.id
  132. FROM mz_sources s
  133. JOIN mz_schemas sch
  134. ON sch.id = s.schema_id
  135. WHERE s.name IN ('s') AND sch.name = 'subsources_cluster'
  136. $ set-from-sql var=subsources-cluster-bids-id
  137. SELECT t.id
  138. FROM mz_tables t
  139. JOIN mz_schemas sch
  140. ON sch.id = t.schema_id
  141. WHERE t.name IN ('bids') AND sch.name = 'subsources_cluster'
  142. > DROP SINK sink_size
  143. > DROP SINK sink_cluster
  144. > DROP SOURCE upsert_size CASCADE
  145. > DROP SOURCE upsert_cluster CASCADE
  146. > DROP SOURCE subsources_size.s CASCADE;
  147. > DROP SOURCE subsources_cluster.s CASCADE;
  148. > SELECT COUNT(*)
  149. FROM mz_internal.mz_sink_statistics_raw u
  150. WHERE u.id = '${sink-size-id}'
  151. 0
  152. > SELECT COUNT(*)
  153. FROM mz_internal.mz_sink_statistics_raw u
  154. WHERE u.id = '${sink-cluster-id}'
  155. 0
  156. > SELECT COUNT(*)
  157. FROM mz_internal.mz_sink_statistics_raw u
  158. WHERE u.id = '${upsert-size-id}'
  159. 0
  160. > SELECT COUNT(*)
  161. FROM mz_internal.mz_sink_statistics_raw u
  162. WHERE u.id = '${upsert-cluster-id}'
  163. 0
  164. > SELECT COUNT(*)
  165. FROM mz_internal.mz_sink_statistics_raw u
  166. WHERE u.id = '${subsources-size-top-level-id}'
  167. 0
  168. > SELECT COUNT(*)
  169. FROM mz_internal.mz_sink_statistics_raw u
  170. WHERE u.id = '${subsources-size-bids-id}'
  171. 0
  172. > SELECT COUNT(*)
  173. FROM mz_internal.mz_sink_statistics_raw u
  174. WHERE u.id = '${subsources-cluster-top-level-id}'
  175. 0
  176. > SELECT COUNT(*)
  177. FROM mz_internal.mz_sink_statistics_raw u
  178. WHERE u.id = '${subsources-cluster-bids-id}'
  179. 0