statistics-maintenance.td 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  11. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  12. ALTER SYSTEM SET storage_statistics_interval = 2000
  13. > CREATE CLUSTER cluster1 (SIZE '2')
  14. > CREATE CONNECTION kafka_conn
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  16. > CREATE TABLE t (a text, b text)
  17. > CREATE MATERIALIZED VIEW simple_view AS SELECT * from t;
  18. > INSERT INTO t VALUES ('key1', 'value')
  19. # Setup various sinks and sources
  20. > CREATE SINK sink1
  21. IN CLUSTER cluster1
  22. FROM simple_view
  23. INTO KAFKA CONNECTION kafka_conn (TOPIC 'topic-${testdrive.seed}')
  24. KEY (a)
  25. FORMAT JSON
  26. ENVELOPE DEBEZIUM
  27. $ kafka-create-topic topic=upsert partitions=1
  28. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  29. one:two
  30. > CREATE SOURCE upsert1
  31. IN CLUSTER cluster1
  32. FROM KAFKA CONNECTION kafka_conn (TOPIC
  33. 'testdrive-upsert-${testdrive.seed}'
  34. )
  35. > CREATE TABLE upsert1_tbl FROM SOURCE upsert1 (REFERENCE "testdrive-upsert-${testdrive.seed}")
  36. KEY FORMAT BYTES
  37. VALUE FORMAT BYTES
  38. ENVELOPE UPSERT
  39. > SELECT * FROM upsert1_tbl
  40. one two
  41. # The `CREATE TABLE ... FROM SOURCE` commands caused a recreation of the
  42. # respective source dataflows, during which we might have lost the statistics
  43. # about committed updates from the snapshot. Ingest some more data to ensure we
  44. # see some `updates_committed`.
  45. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  46. two:three
  47. # Ensure we produce statistics
  48. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  49. FROM mz_sinks s
  50. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  51. WHERE s.name IN ('sink1')
  52. GROUP BY s.name
  53. sink1 1 1 true true
  54. > SELECT s.name,
  55. SUM(u.updates_committed) > 0,
  56. SUM(u.messages_received) >= 2
  57. FROM mz_sources s
  58. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  59. WHERE s.name IN ('upsert1')
  60. GROUP BY s.id, s.name
  61. upsert1 true true
  62. # Shut down the cluster
  63. > ALTER CLUSTER cluster1 SET (REPLICATION FACTOR = 0)
  64. # Statistics should remain the same
  65. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  66. FROM mz_sinks s
  67. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  68. WHERE s.name IN ('sink1')
  69. GROUP BY s.name
  70. sink1 1 1 true true
  71. > SELECT s.name,
  72. SUM(u.updates_committed) > 0,
  73. SUM(u.messages_received) >= 2
  74. FROM mz_sources s
  75. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  76. WHERE s.name IN ('upsert1')
  77. GROUP BY s.id, s.name
  78. upsert1 true true
  79. # Ingest some more data, and ensure counters are maintained
  80. > ALTER CLUSTER cluster1 SET (REPLICATION FACTOR = 1)
  81. > INSERT INTO t VALUES ('key1', 'value')
  82. $ kafka-ingest format=bytes topic=upsert key-format=bytes key-terminator=:
  83. three:four
  84. # Statistics should remain the same
  85. > SELECT s.name, SUM(u.messages_staged), SUM(u.messages_committed), SUM(u.bytes_staged) > 0, SUM(bytes_staged) = SUM(bytes_committed)
  86. FROM mz_sinks s
  87. JOIN mz_internal.mz_sink_statistics_raw u ON s.id = u.id
  88. WHERE s.name IN ('sink1')
  89. GROUP BY s.name
  90. sink1 2 2 true true
  91. > SELECT s.name,
  92. SUM(u.updates_committed) > 0,
  93. SUM(u.messages_received) >= 4
  94. FROM mz_sources s
  95. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  96. WHERE s.name IN ('upsert1')
  97. GROUP BY s.id, s.name
  98. upsert1 true true