statistics-maintenance.td 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  11. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  12. ALTER SYSTEM SET storage_statistics_interval = 2000
  13. > CREATE CLUSTER cluster1 (SIZE '2')
  14. > CREATE CONNECTION kafka_conn
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  16. > CREATE TABLE t (a text, b text)
  17. > CREATE MATERIALIZED VIEW simple_view AS SELECT * from t;
  18. > INSERT INTO t VALUES ('key1', 'value')
  19. # Setup various sinks and sources
  20. > CREATE SINK sink1
  21. IN CLUSTER cluster1
  22. FROM simple_view
  23. INTO KAFKA CONNECTION kafka_conn (TOPIC 'topic-${testdrive.seed}')
  24. KEY (a)
  25. FORMAT JSON
  26. ENVELOPE DEBEZIUM
  27. $ kafka-create-topic topic=upsert partitions=1
  28. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  29. one:two
  30. > CREATE SOURCE upsert1
  31. IN CLUSTER cluster1
  32. FROM KAFKA CONNECTION kafka_conn (TOPIC
  33. 'testdrive-upsert-${testdrive.seed}'
  34. )
  35. KEY FORMAT BYTES
  36. VALUE FORMAT BYTES
  37. ENVELOPE UPSERT
  38. # Ensure we produce statistics
  39. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  40. FROM mz_sinks s
  41. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  42. WHERE s.name IN ('sink1')
  43. GROUP BY s.name
  44. sink1 1 1 true true
  45. > SELECT s.name,
  46. SUM(u.updates_committed) > 0,
  47. SUM(u.messages_received)
  48. FROM mz_sources s
  49. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  50. WHERE s.name IN ('upsert1')
  51. GROUP BY s.id, s.name
  52. upsert1 true 1
  53. # Shut down the cluster
  54. > ALTER CLUSTER cluster1 SET (REPLICATION FACTOR = 0)
  55. # Statistics should remain the same
  56. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  57. FROM mz_sinks s
  58. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  59. WHERE s.name IN ('sink1')
  60. GROUP BY s.name
  61. sink1 1 1 true true
  62. > SELECT s.name,
  63. SUM(u.updates_committed) > 0,
  64. SUM(u.messages_received)
  65. FROM mz_sources s
  66. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  67. WHERE s.name IN ('upsert1')
  68. GROUP BY s.id, s.name
  69. upsert1 true 1
  70. # Ingest some more data, and ensure counters are maintained
  71. > ALTER CLUSTER cluster1 SET (REPLICATION FACTOR = 1)
  72. > INSERT INTO t VALUES ('key1', 'value')
  73. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  74. two:three
  75. # Statistics should remain the same
  76. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  77. FROM mz_sinks s
  78. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  79. WHERE s.name IN ('sink1')
  80. GROUP BY s.name
  81. sink1 2 2 true true
  82. > SELECT s.name,
  83. SUM(u.updates_committed) > 0,
  84. SUM(u.messages_received)
  85. FROM mz_sources s
  86. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  87. WHERE s.name IN ('upsert1')
  88. GROUP BY s.id, s.name
  89. upsert1 true 2