materialization-lag.td 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test the contents of `mz_materialization_lag`.
  10. #
  11. # These tests rely on testdrive's retry feature, as they query introspection
  12. # relations whose data might not be immediately available.
  13. #
  14. # Note that when running under cloudtest, it's quite common to see lags of over
  15. # 1 second even when all replicas are healthy. So we need to build some
  16. # tolerances into these checks that verify whether induced lag is present. A
  17. # threshold value of 2 seconds seems to work fine.
  18. $ set-sql-timeout duration=60s
  19. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  20. $ postgres-execute connection=mz_system
  21. ALTER SYSTEM SET max_clusters = 15
  22. > CREATE CLUSTER source SIZE '1'
  23. > CREATE CLUSTER compute SIZE '1'
  24. > CREATE CLUSTER sink SIZE '1'
  25. # Set up a bunch of sources and materializations that depend on each other.
  26. # Put them on different clusters so we can spin some down to create lag.
  27. > CREATE SOURCE src
  28. IN CLUSTER source
  29. FROM LOAD GENERATOR counter
  30. > CREATE TABLE tbl (a int)
  31. > CREATE VIEW src_plus_tbl
  32. AS SELECT counter + a AS a FROM src, tbl
  33. > CREATE INDEX idx
  34. IN CLUSTER compute
  35. ON src_plus_tbl (a)
  36. > CREATE MATERIALIZED VIEW mv
  37. IN CLUSTER compute
  38. AS SELECT * FROM src_plus_tbl
  39. > CREATE CONNECTION kafka_conn
  40. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  41. > CREATE CONNECTION csr_conn
  42. TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}')
  43. > CREATE SINK snk
  44. IN CLUSTER sink
  45. FROM mv
  46. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  47. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  48. ENVELOPE DEBEZIUM
  49. # Make sure things are hydrated.
  50. > INSERT INTO tbl VALUES (1)
  51. > SELECT count(*) > 0 FROM mv
  52. true
  53. # When all clusters are running, there should be no/little visible lag.
  54. > SELECT name
  55. FROM mz_internal.mz_materialization_lag
  56. JOIN mz_objects ON (id = object_id)
  57. WHERE
  58. local_lag > INTERVAL '5s' OR
  59. global_lag > INTERVAL '5s'
  60. # When the source is down, there should be no visible lag either, as lag is
  61. # relative to the source frontiers.
  62. > ALTER CLUSTER source SET (REPLICATION FACTOR 0)
  63. > SELECT name
  64. FROM mz_internal.mz_materialization_lag
  65. JOIN mz_objects ON (id = object_id)
  66. WHERE
  67. local_lag > INTERVAL '5s' OR
  68. global_lag > INTERVAL '5s'
  69. > ALTER CLUSTER source SET (REPLICATION FACTOR 1)
  70. # Bring down the compute cluster and observe resulting lag.
  71. > ALTER CLUSTER compute SET (REPLICATION FACTOR 0)
  72. > SELECT name
  73. FROM mz_internal.mz_materialization_lag
  74. JOIN mz_objects o ON (id = object_id)
  75. WHERE local_lag > INTERVAL '5s'
  76. idx
  77. > SELECT name
  78. FROM mz_internal.mz_materialization_lag
  79. JOIN mz_objects ON (id = object_id)
  80. WHERE global_lag > INTERVAL '5s'
  81. idx
  82. mv
  83. snk
  84. # Bringing up the compute cluster again should remove the lag.
  85. > ALTER CLUSTER compute SET (REPLICATION FACTOR 1)
  86. > SELECT name
  87. FROM mz_internal.mz_materialization_lag
  88. JOIN mz_objects ON (id = object_id)
  89. WHERE
  90. local_lag > INTERVAL '5s' OR
  91. global_lag > INTERVAL '5s'
  92. # Bring down the sink cluster and observe resulting lag.
  93. > ALTER CLUSTER sink SET (REPLICATION FACTOR 0)
  94. > SELECT name
  95. FROM mz_internal.mz_materialization_lag
  96. JOIN mz_objects ON (id = object_id)
  97. WHERE local_lag > INTERVAL '5s'
  98. snk
  99. > SELECT name
  100. FROM mz_internal.mz_materialization_lag
  101. JOIN mz_objects ON (id = object_id)
  102. WHERE global_lag > INTERVAL '5s'
  103. snk
  104. # Bringing up the sink cluster again should remove the lag.
  105. > ALTER CLUSTER sink SET (REPLICATION FACTOR 1)
  106. > SELECT name
  107. FROM mz_internal.mz_materialization_lag
  108. JOIN mz_objects ON (id = object_id)
  109. WHERE
  110. local_lag > INTERVAL '5s' OR
  111. global_lag > INTERVAL '5s'
  112. # If a source has an empty frontier we can't compute a lag value anymore, so
  113. # the lag of dependant collections shows up as NULL instead.
  114. > ALTER CLUSTER compute SET (REPLICATION FACTOR 0)
  115. > CREATE MATERIALIZED VIEW mv_empty
  116. IN CLUSTER source
  117. AS SELECT 1;
  118. > CREATE MATERIALIZED VIEW mv_behind_empty
  119. IN CLUSTER compute
  120. AS SELECT * FROM mv_empty;
  121. > SELECT local_lag, global_lag
  122. FROM mz_internal.mz_materialization_lag
  123. JOIN mz_objects ON (id = object_id)
  124. WHERE name = 'mv_behind_empty'
  125. <null> <null>
  126. # Once the dependant collection manages to catch up, the lag should become 0.
  127. > ALTER CLUSTER compute SET (REPLICATION FACTOR 1)
  128. > SELECT local_lag, global_lag
  129. FROM mz_internal.mz_materialization_lag
  130. JOIN mz_objects ON (id = object_id)
  131. WHERE name = 'mv_behind_empty'
  132. 00:00:00 00:00:00
  133. # Test that when there are multiple inputs to a materialization the right one
  134. # is reported as the "slowest".
  135. > ALTER CLUSTER source SET (REPLICATION FACTOR 0)
  136. > SELECT o.name, d.name
  137. FROM mz_internal.mz_materialization_lag
  138. JOIN mz_objects o ON (o.id = object_id)
  139. JOIN mz_objects d ON (d.id = slowest_local_input_id)
  140. WHERE o.name = 'idx'
  141. idx src
  142. > SELECT o.name, d.name
  143. FROM mz_internal.mz_materialization_lag
  144. JOIN mz_objects o ON (o.id = object_id)
  145. JOIN mz_objects d ON (d.id = slowest_global_input_id)
  146. WHERE o.name in ('idx', 'mv', 'snk')
  147. idx src
  148. mv src
  149. snk src
  150. $ postgres-execute connection=mz_system
  151. ALTER SYSTEM RESET max_clusters