source-statistics.td 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  12. ALTER SYSTEM SET storage_statistics_interval = 2000
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key", "type": "string"}
  18. ]
  19. }
  20. $ set schema={
  21. "type" : "record",
  22. "name" : "test",
  23. "fields" : [
  24. {"name":"f1", "type":"string"},
  25. {"name":"f2", "type":"long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=upsert partitions=1
  29. # The length of `moosemooose` must be longer than `moose` to ensure a tombstone doesn't HAPPEN
  30. # to have the same size.
  31. $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} schema=${schema}
  32. {"key": "fish"} {"f1": "fish", "f2": 1000}
  33. {"key": "bird1"} {"f1":"goose", "f2": 1}
  34. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  35. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  36. {"key": "bird1"}
  37. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  38. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  39. {"key": "mammal1"}
  40. {"key": "mammalmore"} {"f1":"moosemoose", "f2": 2}
  41. $ kafka-create-topic topic=metrics-test partitions=1
  42. $ kafka-ingest topic=metrics-test format=bytes
  43. jack,jill
  44. goofus,gallant
  45. > CREATE CONNECTION kafka_conn
  46. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  47. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  48. URL '${testdrive.schema-registry-url}'
  49. );
  50. > CREATE SOURCE metrics_test_source (a, b)
  51. IN CLUSTER ${arg.single-replica-cluster}
  52. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-metrics-test-${testdrive.seed}')
  53. FORMAT CSV WITH 2 COLUMNS
  54. INCLUDE OFFSET
  55. > CREATE SOURCE upsert
  56. IN CLUSTER ${arg.single-replica-cluster}
  57. FROM KAFKA CONNECTION kafka_conn (TOPIC
  58. 'testdrive-upsert-${testdrive.seed}'
  59. )
  60. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  61. INCLUDE OFFSET
  62. ENVELOPE UPSERT
  63. # Adding a select here so that the ingests after this
  64. # triggers lookup from the upsert state
  65. > SELECT key, f1, f2 FROM upsert
  66. key f1 f2
  67. ------------------------
  68. fish fish 1000
  69. birdmore geese 56
  70. mammalmore moosemoose 2
  71. # statistics are only populated every minute by default
  72. $ set-sql-timeout duration=2minutes
  73. # Also test a source with multiple sub-sources.
  74. # NOTE: We give this source a unique name because we want to query for it with
  75. # a `SUBSCRIBE ... AS OF AT LEAST 0` below and want to avoid receiving results
  76. # for sources created by previous tests.
  77. > CREATE SOURCE auction_house_in_source_statistics_td
  78. IN CLUSTER ${arg.single-replica-cluster}
  79. FROM LOAD GENERATOR AUCTION;
  80. > CREATE TABLE accounts FROM SOURCE auction_house_in_source_statistics_td (REFERENCE accounts);
  81. > CREATE TABLE auctions FROM SOURCE auction_house_in_source_statistics_td (REFERENCE auctions);
  82. > CREATE TABLE bids FROM SOURCE auction_house_in_source_statistics_td (REFERENCE bids);
  83. > CREATE TABLE organizations FROM SOURCE auction_house_in_source_statistics_td (REFERENCE organizations);
  84. > CREATE TABLE users FROM SOURCE auction_house_in_source_statistics_td (REFERENCE users);
  85. # NOTE: These queries are slow to succeed because the default metrics scraping
  86. # interval is 30 seconds.
  87. > SELECT s.name,
  88. bool_and(u.snapshot_committed),
  89. SUM(u.messages_received), SUM(u.updates_staged), SUM(u.updates_committed), SUM(u.bytes_received) > 0, bool_and(u.rehydration_latency IS NOT NULL)
  90. FROM mz_sources s
  91. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  92. WHERE s.name IN ('metrics_test_source')
  93. GROUP BY s.name
  94. ORDER BY s.name
  95. metrics_test_source true 2 2 2 true true
  96. > DROP SOURCE metrics_test_source CASCADE
  97. # Note that only the base-source has `messages_received`, but the sub-sources have `messages_committed`.
  98. # Committed will usually, for auction sources, add up to received, but we don't test this (right now) because of
  99. # jitter on when metrics are produced for each sub-source.
  100. > SELECT s.name,
  101. bool_and(u.snapshot_committed),
  102. SUM(u.messages_received) > 0, SUM(u.updates_staged) > 0, SUM(u.updates_committed) > 0, SUM(u.bytes_received) > 0, bool_and(u.rehydration_latency IS NOT NULL)
  103. FROM mz_sources s
  104. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  105. WHERE s.name IN ('accounts', 'auction_house_in_source_statistics_td', 'auctions', 'bids', 'organizations', 'users')
  106. GROUP BY s.name
  107. ORDER BY s.name
  108. auction_house_in_source_statistics_td true true false false true true
  109. # Assert that the codepath that ensures a 0-value as the first value in `mz_source_statistics` occurs, using a `SUBSCRIBE`.
  110. # Sinks and subsources use the same codepath.
  111. $ set-regex match=\d{13} replacement=<TIMESTAMP>
  112. > BEGIN
  113. > DECLARE c CURSOR FOR SUBSCRIBE (
  114. SELECT u.messages_received
  115. FROM mz_sources s
  116. JOIN mz_internal.mz_source_statistics_with_history u ON s.id = u.id
  117. WHERE s.name = 'auction_house_in_source_statistics_td'
  118. )
  119. AS OF AT LEAST 0
  120. > FETCH 1 c;
  121. <TIMESTAMP> 1 0
  122. > COMMIT
  123. > DROP SOURCE auction_house_in_source_statistics_td CASCADE
  124. # Test upsert
  125. # Note that we always count 9 messages received, but can see as low as 3 updates.
  126. # This is because there are 3 active keys, as all the messages could be received in 1 second.
  127. # There could also be up to 11 updates, as updates cause inserts and deletes
  128. # (5 initial inserts, 2 deletes, and 2 updates). In total 3 records should be present in upsert state.
  129. # We can't control this, so have to accept the range.
  130. > SELECT
  131. s.name,
  132. bool_and(u.snapshot_committed),
  133. SUM(u.messages_received),
  134. SUM(u.updates_staged) BETWEEN 3 AND 11,
  135. SUM(u.updates_committed) BETWEEN 3 AND 11,
  136. SUM(u.bytes_received) > 0,
  137. SUM(u.bytes_indexed) > 0,
  138. SUM(u.records_indexed),
  139. bool_and(u.rehydration_latency IS NOT NULL)
  140. FROM mz_sources s
  141. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  142. WHERE s.name IN ('upsert')
  143. GROUP BY s.name
  144. ORDER BY s.name
  145. upsert true 9 true true true true 3 true
  146. # While we can't control how batching works above, we can ensure that this new, later update
  147. # causes 1 more messages to be received, which is 1 update, a delete.
  148. # We use `set-from-sql` to assert this. We will also use this to validate that the
  149. # `bytes_indexed` value goes down because of the delete.
  150. $ set-from-sql var=updates-committed
  151. SELECT
  152. (SUM(u.updates_committed) + 1)::text
  153. FROM mz_sources s
  154. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  155. WHERE s.name IN ('upsert')
  156. $ set-from-sql var=state-bytes
  157. SELECT
  158. (SUM(u.bytes_indexed))::text
  159. FROM mz_sources s
  160. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  161. WHERE s.name IN ('upsert')
  162. $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} schema=${schema}
  163. {"key": "mammalmore"}
  164. > SELECT s.name,
  165. bool_and(u.snapshot_committed),
  166. SUM(u.messages_received),
  167. SUM(u.updates_staged),
  168. SUM(u.updates_committed),
  169. SUM(u.bytes_received) > 0,
  170. SUM(u.bytes_indexed) < ${state-bytes},
  171. SUM(u.records_indexed)
  172. FROM mz_sources s
  173. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  174. WHERE s.name IN ('upsert')
  175. GROUP BY s.name
  176. ORDER BY s.name
  177. upsert true 10 "${updates-committed}" "${updates-committed}" true true 2
  178. # check pre-aggregated view
  179. > SELECT s.name,
  180. u.snapshot_committed,
  181. u.messages_received,
  182. u.updates_staged,
  183. u.updates_committed,
  184. u.bytes_received > 0,
  185. u.bytes_indexed < ${state-bytes},
  186. u.records_indexed,
  187. u.rehydration_latency IS NOT NULL
  188. FROM mz_sources s
  189. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  190. WHERE s.name IN ('upsert')
  191. upsert true 10 "${updates-committed}" "${updates-committed}" true true 2 true
  192. > DROP SOURCE upsert CASCADE
  193. # should be empty because the source was dropped
  194. > SELECT count(*) FROM mz_internal.mz_source_statistics;
  195. 0