materialization-lag.td 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test the contents of `mz_materialization_lag`.
  10. #
  11. # These tests rely on testdrive's retry feature, as they query introspection
  12. # relations whose data might not be immediately available.
  13. #
  14. # Note that when running under cloudtest, it's quite common to see lags of over
  15. # 1 second even when all replicas are healthy. So we need to build some
  16. # tolerances into these checks that verify whether induced lag is present. A
  17. # threshold value of 2 seconds seems to work fine.
  18. $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  19. $ postgres-execute connection=mz_system
  20. ALTER SYSTEM SET max_clusters = 15
  21. > CREATE CLUSTER source SIZE '1'
  22. > CREATE CLUSTER compute SIZE '1'
  23. > CREATE CLUSTER sink SIZE '1'
  24. # Set up a bunch of sources and materializations that depend on each other.
  25. # Put them on different clusters so we can spin some down to create lag.
  26. > CREATE SOURCE src
  27. IN CLUSTER source
  28. FROM LOAD GENERATOR counter
  29. > CREATE TABLE tbl (a int)
  30. > CREATE VIEW src_plus_tbl
  31. AS SELECT counter + a AS a FROM src, tbl
  32. > CREATE INDEX idx
  33. IN CLUSTER compute
  34. ON src_plus_tbl (a)
  35. > CREATE MATERIALIZED VIEW mv
  36. IN CLUSTER compute
  37. AS SELECT * FROM src_plus_tbl
  38. > CREATE CONNECTION kafka_conn
  39. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  40. > CREATE CONNECTION csr_conn
  41. TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}')
  42. > CREATE SINK snk
  43. IN CLUSTER sink
  44. FROM mv
  45. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  46. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  47. ENVELOPE DEBEZIUM
  48. # Make sure things are hydrated.
  49. > INSERT INTO tbl VALUES (1)
  50. > SELECT count(*) > 0 FROM mv
  51. true
  52. # When all clusters are running, there should be no/little visible lag.
  53. > SELECT name
  54. FROM mz_internal.mz_materialization_lag
  55. JOIN mz_objects ON (id = object_id)
  56. WHERE
  57. local_lag > INTERVAL '5s' OR
  58. global_lag > INTERVAL '5s'
  59. # When the source is down, there should be no visible lag either, as lag is
  60. # relative to the source frontiers.
  61. > ALTER CLUSTER source SET (REPLICATION FACTOR 0)
  62. > SELECT name
  63. FROM mz_internal.mz_materialization_lag
  64. JOIN mz_objects ON (id = object_id)
  65. WHERE
  66. local_lag > INTERVAL '5s' OR
  67. global_lag > INTERVAL '5s'
  68. > ALTER CLUSTER source SET (REPLICATION FACTOR 1)
  69. # Bring down the compute cluster and observe resulting lag.
  70. > ALTER CLUSTER compute SET (REPLICATION FACTOR 0)
  71. > SELECT name
  72. FROM mz_internal.mz_materialization_lag
  73. JOIN mz_objects o ON (id = object_id)
  74. WHERE local_lag > INTERVAL '5s'
  75. idx
  76. > SELECT name
  77. FROM mz_internal.mz_materialization_lag
  78. JOIN mz_objects ON (id = object_id)
  79. WHERE global_lag > INTERVAL '5s'
  80. idx
  81. mv
  82. snk
  83. # Bringing up the compute cluster again should remove the lag.
  84. > ALTER CLUSTER compute SET (REPLICATION FACTOR 1)
  85. > SELECT name
  86. FROM mz_internal.mz_materialization_lag
  87. JOIN mz_objects ON (id = object_id)
  88. WHERE
  89. local_lag > INTERVAL '5s' OR
  90. global_lag > INTERVAL '5s'
  91. # Bring down the sink cluster and observe resulting lag.
  92. > ALTER CLUSTER sink SET (REPLICATION FACTOR 0)
  93. > SELECT name
  94. FROM mz_internal.mz_materialization_lag
  95. JOIN mz_objects ON (id = object_id)
  96. WHERE local_lag > INTERVAL '5s'
  97. snk
  98. > SELECT name
  99. FROM mz_internal.mz_materialization_lag
  100. JOIN mz_objects ON (id = object_id)
  101. WHERE global_lag > INTERVAL '5s'
  102. snk
  103. # Bringing up the sink cluster again should remove the lag.
  104. > ALTER CLUSTER sink SET (REPLICATION FACTOR 1)
  105. > SELECT name
  106. FROM mz_internal.mz_materialization_lag
  107. JOIN mz_objects ON (id = object_id)
  108. WHERE
  109. local_lag > INTERVAL '5s' OR
  110. global_lag > INTERVAL '5s'
  111. # If a source has an empty frontier we can't compute a lag value anymore, so
  112. # the lag of dependant collections shows up as NULL instead.
  113. > ALTER CLUSTER compute SET (REPLICATION FACTOR 0)
  114. > CREATE MATERIALIZED VIEW mv_empty
  115. IN CLUSTER source
  116. AS SELECT 1;
  117. > CREATE MATERIALIZED VIEW mv_behind_empty
  118. IN CLUSTER compute
  119. AS SELECT * FROM mv_empty;
  120. > SELECT local_lag, global_lag
  121. FROM mz_internal.mz_materialization_lag
  122. JOIN mz_objects ON (id = object_id)
  123. WHERE name = 'mv_behind_empty'
  124. <null> <null>
  125. # Once the dependant collection manages to catch up, the lag should become 0.
  126. > ALTER CLUSTER compute SET (REPLICATION FACTOR 1)
  127. > SELECT local_lag, global_lag
  128. FROM mz_internal.mz_materialization_lag
  129. JOIN mz_objects ON (id = object_id)
  130. WHERE name = 'mv_behind_empty'
  131. 00:00:00 00:00:00
  132. # Test that when there are multiple inputs to a materialization the right one
  133. # is reported as the "slowest".
  134. > ALTER CLUSTER source SET (REPLICATION FACTOR 0)
  135. > SELECT o.name, d.name
  136. FROM mz_internal.mz_materialization_lag
  137. JOIN mz_objects o ON (o.id = object_id)
  138. JOIN mz_objects d ON (d.id = slowest_local_input_id)
  139. WHERE o.name = 'idx'
  140. idx src
  141. > SELECT o.name, d.name
  142. FROM mz_internal.mz_materialization_lag
  143. JOIN mz_objects o ON (o.id = object_id)
  144. JOIN mz_objects d ON (d.id = slowest_global_input_id)
  145. WHERE o.name in ('idx', 'mv', 'snk')
  146. idx src
  147. mv src
  148. snk src
  149. $ postgres-execute connection=mz_system
  150. ALTER SYSTEM RESET max_clusters