controller-frontiers.td 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test reporting of controller frontiers through `mz_internal.mz_frontiers` and
  10. # `mz_cluster_replica_frontiers`.
  11. #
  12. # These tests rely on testdrive's retry feature, as they query introspection
  13. # sources whose data might not be immediately available.
  14. > DROP CLUSTER IF EXISTS test
  15. > DROP CLUSTER IF EXISTS test_source
  16. > CREATE CLUSTER test REPLICAS (
  17. r1 (SIZE '1'),
  18. r2 (SIZE '1')
  19. )
  20. > CREATE CLUSTER test_source REPLICAS (
  21. s1 (SIZE '1')
  22. )
  23. > SET cluster = test;
  24. > CREATE TABLE t1 (a int)
  25. > INSERT INTO t1 VALUES (1)
  26. # Test that frontiers of materialized views are reported.
  27. > CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM t1
  28. > SELECT
  29. mvs.name,
  30. replicas.name
  31. FROM mz_cluster_replica_frontiers frontiers
  32. JOIN mz_materialized_views mvs
  33. ON frontiers.object_id = mvs.id
  34. LEFT JOIN mz_cluster_replicas replicas
  35. ON frontiers.replica_id = replicas.id
  36. WHERE
  37. frontiers.object_id LIKE 'u%' AND
  38. frontiers.write_frontier > 0
  39. mv1 r1
  40. mv1 r2
  41. > SELECT
  42. mvs.name
  43. FROM mz_internal.mz_frontiers frontiers
  44. JOIN mz_materialized_views mvs
  45. ON frontiers.object_id = mvs.id
  46. WHERE
  47. frontiers.object_id LIKE 'u%' AND
  48. frontiers.read_frontier > 0 AND
  49. frontiers.write_frontier > 0
  50. mv1
  51. # Test that frontiers of indexes are reported.
  52. > CREATE INDEX idx1 ON t1 (a)
  53. > SELECT
  54. indexes.name,
  55. replicas.name
  56. FROM mz_cluster_replica_frontiers frontiers
  57. JOIN mz_indexes indexes
  58. ON frontiers.object_id = indexes.id
  59. LEFT JOIN mz_cluster_replicas replicas
  60. ON frontiers.replica_id = replicas.id
  61. WHERE
  62. frontiers.object_id LIKE 'u%' AND
  63. frontiers.write_frontier > 0
  64. idx1 r1
  65. idx1 r2
  66. > SELECT
  67. indexes.name
  68. FROM mz_internal.mz_frontiers frontiers
  69. JOIN mz_indexes indexes
  70. ON frontiers.object_id = indexes.id
  71. WHERE
  72. frontiers.object_id LIKE 'u%' AND
  73. frontiers.read_frontier > 0 AND
  74. frontiers.write_frontier > 0
  75. idx1
  76. # Test that frontiers of sources are reported.
  77. > CREATE SOURCE source1
  78. IN CLUSTER test_source
  79. FROM LOAD GENERATOR COUNTER (UP TO 100)
  80. > SELECT
  81. sources.name,
  82. replicas.name
  83. FROM mz_cluster_replica_frontiers frontiers
  84. JOIN mz_sources sources
  85. ON frontiers.object_id = sources.id
  86. LEFT JOIN mz_cluster_replicas replicas
  87. ON frontiers.replica_id = replicas.id
  88. WHERE
  89. frontiers.object_id LIKE 'u%' AND
  90. frontiers.write_frontier > 0
  91. source1 s1
  92. > SELECT
  93. sources.name
  94. FROM mz_internal.mz_frontiers frontiers
  95. JOIN mz_sources sources
  96. ON frontiers.object_id = sources.id
  97. WHERE
  98. frontiers.object_id LIKE 'u%' AND
  99. frontiers.read_frontier > 0 AND
  100. frontiers.write_frontier > 0
  101. source1
  102. source1_progress
  103. # Test that frontiers of sinks are reported.
  104. > CREATE CONNECTION kafka_conn
  105. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  106. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  107. URL '${testdrive.schema-registry-url}'
  108. )
  109. > CREATE SINK sink1
  110. IN CLUSTER test_source
  111. FROM t1
  112. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  113. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  114. ENVELOPE DEBEZIUM
  115. > SELECT
  116. sinks.name,
  117. replicas.name
  118. FROM mz_cluster_replica_frontiers frontiers
  119. JOIN mz_sinks sinks
  120. ON frontiers.object_id = sinks.id
  121. LEFT JOIN mz_cluster_replicas replicas
  122. ON frontiers.replica_id = replicas.id
  123. WHERE
  124. frontiers.object_id LIKE 'u%' AND
  125. frontiers.write_frontier > 0
  126. sink1 s1
  127. > SELECT
  128. sinks.name
  129. FROM mz_internal.mz_frontiers frontiers
  130. JOIN mz_sinks sinks
  131. ON frontiers.object_id = sinks.id
  132. WHERE
  133. frontiers.object_id LIKE 'u%' AND
  134. (frontiers.read_frontier > 0 OR frontiers.read_frontier IS NULL) AND
  135. frontiers.write_frontier > 0
  136. sink1
  137. # Test that the frontiers of introspection sources are reported.
  138. > SELECT
  139. replicas.name
  140. FROM mz_cluster_replica_frontiers frontiers
  141. JOIN mz_indexes indexes
  142. ON frontiers.object_id = indexes.id
  143. JOIN mz_clusters clusters
  144. ON indexes.cluster_id = clusters.id
  145. LEFT JOIN mz_cluster_replicas replicas
  146. ON frontiers.replica_id = replicas.id
  147. WHERE
  148. indexes.name LIKE 'mz_active_peeks_per_worker_u%_primary_idx' AND
  149. frontiers.write_frontier > 0 AND
  150. clusters.name = 'test'
  151. r1
  152. r2
  153. > SELECT
  154. count(*)
  155. FROM mz_internal.mz_frontiers frontiers
  156. JOIN mz_indexes indexes
  157. ON frontiers.object_id = indexes.id
  158. JOIN mz_clusters clusters
  159. ON indexes.cluster_id = clusters.id
  160. WHERE
  161. indexes.name LIKE 'mz_active_peeks_per_worker_u%_primary_idx' AND
  162. frontiers.read_frontier > 0 AND
  163. frontiers.write_frontier > 0 AND
  164. clusters.name = 'test'
  165. 1
  166. # Test that the frontiers of tables are reported.
  167. > SELECT
  168. tables.name
  169. FROM mz_internal.mz_frontiers frontiers
  170. JOIN mz_tables tables
  171. ON frontiers.object_id = tables.id
  172. WHERE
  173. frontiers.object_id LIKE 'u%' AND
  174. frontiers.read_frontier > 0 AND
  175. frontiers.write_frontier > 0
  176. t1
  177. # Test that the frontiers of storage-managed collections are reported.
  178. > SELECT
  179. sources.name
  180. FROM mz_internal.mz_frontiers frontiers
  181. JOIN mz_sources sources
  182. ON frontiers.object_id = sources.id
  183. WHERE
  184. sources.name = 'mz_frontiers' AND
  185. frontiers.read_frontier > 0 AND
  186. frontiers.write_frontier > 0
  187. mz_frontiers
  188. # Test that frontiers are added when replicas are created.
  189. > SELECT
  190. objects.name,
  191. replicas.name
  192. FROM mz_cluster_replica_frontiers frontiers
  193. JOIN mz_objects objects
  194. ON frontiers.object_id = objects.id
  195. JOIN mz_cluster_replicas replicas
  196. ON frontiers.replica_id = replicas.id
  197. JOIN mz_clusters clusters
  198. ON replicas.cluster_id = clusters.id
  199. WHERE
  200. objects.id LIKE 'u%' AND
  201. frontiers.write_frontier > 0 AND
  202. clusters.name = 'test'
  203. idx1 r1
  204. idx1 r2
  205. mv1 r1
  206. mv1 r2
  207. > CREATE CLUSTER REPLICA test.r3 SIZE '1'
  208. > SELECT
  209. objects.name,
  210. replicas.name
  211. FROM mz_cluster_replica_frontiers frontiers
  212. JOIN mz_objects objects
  213. ON frontiers.object_id = objects.id
  214. JOIN mz_cluster_replicas replicas
  215. ON frontiers.replica_id = replicas.id
  216. JOIN mz_clusters clusters
  217. ON replicas.cluster_id = clusters.id
  218. WHERE
  219. objects.id LIKE 'u%' AND
  220. frontiers.write_frontier > 0 AND
  221. clusters.name = 'test'
  222. idx1 r1
  223. idx1 r2
  224. idx1 r3
  225. mv1 r1
  226. mv1 r2
  227. mv1 r3
  228. # Test that frontiers are removed when replicas are dropped.
  229. > DROP CLUSTER REPLICA test.r1
  230. > SELECT
  231. objects.name,
  232. replicas.name
  233. FROM mz_cluster_replica_frontiers frontiers
  234. JOIN mz_objects objects
  235. ON frontiers.object_id = objects.id
  236. JOIN mz_cluster_replicas replicas
  237. ON frontiers.replica_id = replicas.id
  238. JOIN mz_clusters clusters
  239. ON replicas.cluster_id = clusters.id
  240. WHERE
  241. objects.id LIKE 'u%' AND
  242. frontiers.write_frontier > 0 AND
  243. clusters.name = 'test'
  244. idx1 r2
  245. idx1 r3
  246. mv1 r2
  247. mv1 r3
  248. # Test that empty frontiers show up as NULL.
  249. > CREATE MATERIALIZED VIEW mv2 AS SELECT 1
  250. > SELECT
  251. replicas.name,
  252. frontiers.write_frontier
  253. FROM mz_cluster_replica_frontiers frontiers
  254. JOIN mz_materialized_views mvs
  255. ON frontiers.object_id = mvs.id
  256. JOIN mz_cluster_replicas replicas
  257. ON frontiers.replica_id = replicas.id
  258. WHERE
  259. mvs.name = 'mv2'
  260. r2 <null>
  261. r3 <null>
  262. > SELECT
  263. frontiers.read_frontier,
  264. frontiers.write_frontier
  265. FROM mz_internal.mz_frontiers frontiers
  266. JOIN mz_materialized_views mvs
  267. ON frontiers.object_id = mvs.id
  268. WHERE
  269. mvs.name = 'mv2'
  270. 0 <null>
  271. # Test that frontiers are removed when objects are dropped.
  272. > DROP MATERIALIZED VIEW mv1
  273. > DROP MATERIALIZED VIEW mv2
  274. > DROP INDEX idx1
  275. > DROP SOURCE source1 CASCADE
  276. > DROP SINK sink1
  277. > DROP TABLE t1
  278. > SELECT *
  279. FROM mz_cluster_replica_frontiers frontiers
  280. WHERE object_id LIKE 'u%'
  281. > SELECT *
  282. FROM mz_internal.mz_frontiers frontiers
  283. WHERE object_id LIKE 'u%'
  284. # Test that frontiers are correctly initialized on for collections on clusters
  285. # with zero replicas.
  286. > CREATE CLUSTER empty SIZE '1', REPLICATION FACTOR 0
  287. > CREATE TABLE t2 (a int)
  288. > CREATE INDEX idx2 IN CLUSTER empty ON t2 (a)
  289. > SELECT read_frontier > 0, read_frontier = write_frontier
  290. FROM mz_internal.mz_frontiers
  291. JOIN mz_indexes ON (id = object_id)
  292. WHERE name = 'idx2'
  293. true true