source-statistics.td 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  11. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  12. ALTER SYSTEM SET storage_statistics_interval = 2000
  13. $ set keyschema={
  14. "type": "record",
  15. "name": "Key",
  16. "fields": [
  17. {"name": "key", "type": "string"}
  18. ]
  19. }
  20. $ set schema={
  21. "type" : "record",
  22. "name" : "test",
  23. "fields" : [
  24. {"name":"f1", "type":"string"},
  25. {"name":"f2", "type":"long"}
  26. ]
  27. }
  28. $ kafka-create-topic topic=upsert partitions=1
  29. # The length of `moosemooose` must be longer than `moose` to ensure a tombstone doesn't HAPPEN
  30. # to have the same size.
  31. $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} schema=${schema}
  32. {"key": "fish"} {"f1": "fish", "f2": 1000}
  33. {"key": "bird1"} {"f1":"goose", "f2": 1}
  34. {"key": "birdmore"} {"f1":"geese", "f2": 2}
  35. {"key": "mammal1"} {"f1": "moose", "f2": 1}
  36. {"key": "bird1"}
  37. {"key": "birdmore"} {"f1":"geese", "f2": 56}
  38. {"key": "mammalmore"} {"f1": "moose", "f2": 42}
  39. {"key": "mammal1"}
  40. {"key": "mammalmore"} {"f1":"moosemoose", "f2": 2}
  41. $ kafka-create-topic topic=metrics-test partitions=1
  42. $ kafka-ingest topic=metrics-test format=bytes
  43. jack,jill
  44. goofus,gallant
  45. > CREATE CONNECTION kafka_conn
  46. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  47. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  48. URL '${testdrive.schema-registry-url}'
  49. );
  50. > CREATE SOURCE metrics_test_source
  51. IN CLUSTER ${arg.single-replica-cluster}
  52. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-metrics-test-${testdrive.seed}')
  53. > CREATE TABLE metrics_test_source_tbl (a, b) FROM SOURCE metrics_test_source (REFERENCE "testdrive-metrics-test-${testdrive.seed}")
  54. FORMAT CSV WITH 2 COLUMNS
  55. INCLUDE OFFSET
  56. > SELECT * FROM metrics_test_source_tbl
  57. jack jill 0
  58. goofus gallant 1
  59. # The `CREATE TABLE ... FROM SOURCE` command caused a recreation of the source
  60. # dataflow, during which we might have lost the statistics about committed
  61. # updates from the snapshot. Ingest some more data to ensure we see some
  62. # `updates_committed`.
  63. $ kafka-ingest topic=metrics-test format=bytes
  64. calvin,hobbes
  65. > SELECT
  66. s.name,
  67. bool_and(u.snapshot_committed),
  68. SUM(u.messages_received) >= 4,
  69. SUM(u.updates_staged) > 0,
  70. SUM(u.updates_committed) > 0,
  71. SUM(u.bytes_received) > 0,
  72. bool_and(u.rehydration_latency IS NOT NULL)
  73. FROM mz_sources s
  74. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  75. WHERE s.name IN ('metrics_test_source')
  76. GROUP BY s.name
  77. ORDER BY s.name
  78. metrics_test_source true true true true true true
  79. > DROP SOURCE metrics_test_source CASCADE
  80. > CREATE SOURCE upsert
  81. IN CLUSTER ${arg.single-replica-cluster}
  82. FROM KAFKA CONNECTION kafka_conn (TOPIC
  83. 'testdrive-upsert-${testdrive.seed}'
  84. )
  85. > CREATE TABLE upsert_tbl FROM SOURCE upsert (REFERENCE "testdrive-upsert-${testdrive.seed}")
  86. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  87. INCLUDE OFFSET
  88. ENVELOPE UPSERT
  89. # Adding a select here so that the ingests after this
  90. # triggers lookup from the upsert state
  91. > SELECT key, f1, f2 FROM upsert_tbl
  92. key f1 f2
  93. ------------------------
  94. fish fish 1000
  95. birdmore geese 56
  96. mammalmore moosemoose 2
  97. # Also test a source with multiple sub-sources.
  98. # NOTE: We give this source a unique name because we want to query for it with
  99. # a `SUBSCRIBE ... AS OF AT LEAST 0` below and want to avoid receiving results
  100. # for sources created by previous tests.
  101. > CREATE SOURCE auction_house_in_source_statistics_td
  102. IN CLUSTER ${arg.single-replica-cluster}
  103. FROM LOAD GENERATOR AUCTION;
  104. > CREATE TABLE accounts FROM SOURCE auction_house_in_source_statistics_td (REFERENCE accounts);
  105. > CREATE TABLE auctions FROM SOURCE auction_house_in_source_statistics_td (REFERENCE auctions);
  106. > CREATE TABLE bids FROM SOURCE auction_house_in_source_statistics_td (REFERENCE bids);
  107. > CREATE TABLE organizations FROM SOURCE auction_house_in_source_statistics_td (REFERENCE organizations);
  108. > CREATE TABLE users FROM SOURCE auction_house_in_source_statistics_td (REFERENCE users);
  109. # Note that only the base-source has `messages_received`, but the sub-sources have `messages_committed`.
  110. # Committed will usually, for auction sources, add up to received, but we don't test this (right now) because of
  111. # jitter on when metrics are produced for each sub-source.
  112. > SELECT
  113. s.name,
  114. bool_and(u.snapshot_committed),
  115. SUM(u.messages_received) > 0,
  116. SUM(u.updates_staged) > 0,
  117. SUM(u.updates_committed) > 0,
  118. SUM(u.bytes_received) > 0,
  119. bool_and(u.rehydration_latency IS NOT NULL)
  120. FROM mz_sources s
  121. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  122. WHERE s.name IN ('accounts', 'auction_house_in_source_statistics_td', 'auctions', 'bids', 'organizations', 'users')
  123. GROUP BY s.name
  124. ORDER BY s.name
  125. auction_house_in_source_statistics_td true true false false true true
  126. # Assert that the codepath that ensures a 0-value as the first value in `mz_source_statistics` occurs, using a `SUBSCRIBE`.
  127. # Sinks and subsources use the same codepath.
  128. $ set-regex match=\d{13} replacement=<TIMESTAMP>
  129. > BEGIN
  130. > DECLARE c CURSOR FOR SUBSCRIBE (
  131. SELECT u.messages_received
  132. FROM mz_sources s
  133. JOIN mz_internal.mz_source_statistics_with_history u ON s.id = u.id
  134. WHERE s.name = 'auction_house_in_source_statistics_td'
  135. )
  136. AS OF AT LEAST 0
  137. > FETCH 1 c;
  138. <TIMESTAMP> 1 0
  139. > COMMIT
  140. > DROP SOURCE auction_house_in_source_statistics_td CASCADE
  141. # Test upsert
  142. # Note that we always count 9 messages received (18 with source tables), but can see as low as 3 updates.
  143. # This is because there are 3 active keys, as all the messages could be received in 1 second.
  144. # There could also be up to 11 updates, as updates cause inserts and deletes
  145. # (5 initial inserts, 2 deletes, and 2 updates). In total 3 records should be present in upsert state.
  146. # We can't control this, so have to accept the range.
  147. > SELECT
  148. s.name,
  149. SUM(u.messages_received) >= 18,
  150. SUM(u.bytes_received) > 0
  151. FROM mz_sources s
  152. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  153. WHERE s.name IN ('upsert')
  154. GROUP BY s.name
  155. ORDER BY s.name
  156. upsert true true
  157. # Upsert stats are on the UPSERT sub source/table, not the main source.
  158. > SELECT
  159. t.name,
  160. bool_and(u.snapshot_committed),
  161. SUM(u.updates_staged) BETWEEN 3 AND 11,
  162. SUM(u.updates_committed) BETWEEN 3 AND 11,
  163. SUM(u.bytes_indexed) > 0,
  164. SUM(u.records_indexed),
  165. bool_and(u.rehydration_latency IS NOT NULL)
  166. FROM mz_tables t
  167. JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
  168. WHERE t.name IN ('upsert_tbl')
  169. GROUP BY t.name
  170. ORDER BY t.name
  171. upsert_tbl true true true true 3 true
  172. # While we can't control how batching works above, we can ensure that this new, later update
  173. # causes 1 more messages to be received, which is 1 update, a delete.
  174. # We use `set-from-sql` to assert this. We will also use this to validate that the
  175. # `bytes_indexed` value goes down because of the delete.
  176. $ set-from-sql var=updates-committed
  177. SELECT
  178. (SUM(u.updates_committed) + 1)::text
  179. FROM mz_tables t
  180. JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
  181. WHERE t.name IN ('upsert_tbl')
  182. $ set-from-sql var=state-bytes
  183. SELECT
  184. (SUM(u.bytes_indexed))::text
  185. FROM mz_tables t
  186. JOIN mz_internal.mz_source_statistics u ON t.id = u.id
  187. WHERE t.name IN ('upsert_tbl')
  188. $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} schema=${schema}
  189. {"key": "mammalmore"}
  190. > SELECT s.name,
  191. SUM(u.messages_received) >= 20,
  192. SUM(u.bytes_received) > 0
  193. FROM mz_sources s
  194. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  195. WHERE s.name IN ('upsert')
  196. GROUP BY s.name
  197. ORDER BY s.name
  198. upsert true true
  199. > SELECT t.name,
  200. bool_and(u.snapshot_committed),
  201. SUM(u.updates_staged),
  202. SUM(u.updates_committed),
  203. SUM(u.bytes_indexed) < ${state-bytes},
  204. SUM(u.records_indexed)
  205. FROM mz_tables t
  206. JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
  207. WHERE t.name IN ('upsert_tbl')
  208. GROUP BY t.name
  209. ORDER BY t.name
  210. upsert_tbl true "${updates-committed}" "${updates-committed}" true 2
  211. # check pre-aggregated view
  212. > SELECT s.name,
  213. u.messages_received >= 20,
  214. u.bytes_received > 0
  215. FROM mz_sources s
  216. JOIN mz_internal.mz_source_statistics u ON s.id = u.id
  217. WHERE s.name IN ('upsert')
  218. upsert true true
  219. > SELECT t.name,
  220. u.snapshot_committed,
  221. u.updates_staged,
  222. u.updates_committed,
  223. u.bytes_indexed < ${state-bytes},
  224. u.records_indexed,
  225. u.rehydration_latency IS NOT NULL
  226. FROM mz_tables t
  227. JOIN mz_internal.mz_source_statistics u ON t.id = u.id
  228. WHERE t.name IN ('upsert_tbl')
  229. upsert_tbl true "${updates-committed}" "${updates-committed}" true 2 true
  230. > DROP SOURCE upsert CASCADE
  231. # should be empty because the source was dropped
  232. > SELECT count(*) FROM mz_internal.mz_source_statistics;
  233. 0