controller-frontiers.td 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test reporting of controller frontiers through `mz_internal.mz_frontiers` and
  10. # `mz_cluster_replica_frontiers`.
  11. #
  12. # These tests rely on testdrive's retry feature, as they query introspection
  13. # sources whose data might not be immediately available.
  14. > DROP CLUSTER IF EXISTS test
  15. > DROP CLUSTER IF EXISTS test_source
  16. > CREATE CLUSTER test REPLICAS (
  17. r1 (SIZE '1'),
  18. r2 (SIZE '1')
  19. )
  20. > CREATE CLUSTER test_source REPLICAS (
  21. s1 (SIZE '1')
  22. )
  23. > SET cluster = test;
  24. > CREATE TABLE t1 (a int)
  25. > INSERT INTO t1 VALUES (1)
  26. # Test that frontiers of materialized views are reported.
  27. > CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM t1
  28. > SELECT
  29. mvs.name,
  30. replicas.name
  31. FROM mz_cluster_replica_frontiers frontiers
  32. JOIN mz_materialized_views mvs
  33. ON frontiers.object_id = mvs.id
  34. LEFT JOIN mz_cluster_replicas replicas
  35. ON frontiers.replica_id = replicas.id
  36. WHERE
  37. frontiers.object_id LIKE 'u%' AND
  38. frontiers.write_frontier > 0
  39. mv1 r1
  40. mv1 r2
  41. > SELECT
  42. mvs.name
  43. FROM mz_internal.mz_frontiers frontiers
  44. JOIN mz_materialized_views mvs
  45. ON frontiers.object_id = mvs.id
  46. WHERE
  47. frontiers.object_id LIKE 'u%' AND
  48. frontiers.read_frontier > 0 AND
  49. frontiers.write_frontier > 0
  50. mv1
  51. # Test that frontiers of indexes are reported.
  52. > CREATE INDEX idx1 ON t1 (a)
  53. > SELECT
  54. indexes.name,
  55. replicas.name
  56. FROM mz_cluster_replica_frontiers frontiers
  57. JOIN mz_indexes indexes
  58. ON frontiers.object_id = indexes.id
  59. LEFT JOIN mz_cluster_replicas replicas
  60. ON frontiers.replica_id = replicas.id
  61. WHERE
  62. frontiers.object_id LIKE 'u%' AND
  63. frontiers.write_frontier > 0
  64. idx1 r1
  65. idx1 r2
  66. > SELECT
  67. indexes.name
  68. FROM mz_internal.mz_frontiers frontiers
  69. JOIN mz_indexes indexes
  70. ON frontiers.object_id = indexes.id
  71. WHERE
  72. frontiers.object_id LIKE 'u%' AND
  73. frontiers.read_frontier > 0 AND
  74. frontiers.write_frontier > 0
  75. idx1
  76. # Test that frontiers of continual tasks are reported.
  77. > CREATE CONTINUAL TASK ct1 (a int) ON INPUT t1 AS (
  78. INSERT INTO ct1 SELECT * FROM t1
  79. )
  80. > SELECT
  81. cts.name,
  82. replicas.name
  83. FROM mz_cluster_replica_frontiers frontiers
  84. JOIN mz_internal.mz_continual_tasks cts
  85. ON frontiers.object_id = cts.id
  86. LEFT JOIN mz_cluster_replicas replicas
  87. ON frontiers.replica_id = replicas.id
  88. WHERE
  89. frontiers.object_id LIKE 'u%' AND
  90. frontiers.write_frontier > 0
  91. ct1 r1
  92. ct1 r2
  93. > SELECT
  94. cts.name
  95. FROM mz_internal.mz_frontiers frontiers
  96. JOIN mz_internal.mz_continual_tasks cts
  97. ON frontiers.object_id = cts.id
  98. WHERE
  99. frontiers.object_id LIKE 'u%' AND
  100. frontiers.read_frontier > 0 AND
  101. frontiers.write_frontier > 0
  102. ct1
  103. # Test that frontiers of sources are reported.
  104. > CREATE SOURCE source1
  105. IN CLUSTER test_source
  106. FROM LOAD GENERATOR COUNTER (UP TO 100)
  107. > SELECT
  108. sources.name,
  109. replicas.name
  110. FROM mz_cluster_replica_frontiers frontiers
  111. JOIN mz_sources sources
  112. ON frontiers.object_id = sources.id
  113. LEFT JOIN mz_cluster_replicas replicas
  114. ON frontiers.replica_id = replicas.id
  115. WHERE
  116. frontiers.object_id LIKE 'u%' AND
  117. frontiers.write_frontier > 0
  118. source1 s1
  119. > SELECT
  120. sources.name
  121. FROM mz_internal.mz_frontiers frontiers
  122. JOIN mz_sources sources
  123. ON frontiers.object_id = sources.id
  124. WHERE
  125. frontiers.object_id LIKE 'u%' AND
  126. frontiers.read_frontier > 0 AND
  127. frontiers.write_frontier > 0
  128. source1
  129. source1_progress
  130. # Test that frontiers of sinks are reported.
  131. > CREATE CONNECTION kafka_conn
  132. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  133. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  134. URL '${testdrive.schema-registry-url}'
  135. )
  136. > CREATE SINK sink1
  137. IN CLUSTER test_source
  138. FROM t1
  139. INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}')
  140. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  141. ENVELOPE DEBEZIUM
  142. > SELECT
  143. sinks.name,
  144. replicas.name
  145. FROM mz_cluster_replica_frontiers frontiers
  146. JOIN mz_sinks sinks
  147. ON frontiers.object_id = sinks.id
  148. LEFT JOIN mz_cluster_replicas replicas
  149. ON frontiers.replica_id = replicas.id
  150. WHERE
  151. frontiers.object_id LIKE 'u%' AND
  152. frontiers.write_frontier > 0
  153. sink1 s1
  154. > SELECT
  155. sinks.name
  156. FROM mz_internal.mz_frontiers frontiers
  157. JOIN mz_sinks sinks
  158. ON frontiers.object_id = sinks.id
  159. WHERE
  160. frontiers.object_id LIKE 'u%' AND
  161. (frontiers.read_frontier > 0 OR frontiers.read_frontier IS NULL) AND
  162. frontiers.write_frontier > 0
  163. sink1
  164. # Test that the frontiers of introspection sources are reported.
  165. > SELECT
  166. replicas.name
  167. FROM mz_cluster_replica_frontiers frontiers
  168. JOIN mz_indexes indexes
  169. ON frontiers.object_id = indexes.id
  170. JOIN mz_clusters clusters
  171. ON indexes.cluster_id = clusters.id
  172. LEFT JOIN mz_cluster_replicas replicas
  173. ON frontiers.replica_id = replicas.id
  174. WHERE
  175. indexes.name LIKE 'mz_active_peeks_per_worker_u%_primary_idx' AND
  176. frontiers.write_frontier > 0 AND
  177. clusters.name = 'test'
  178. r1
  179. r2
  180. > SELECT
  181. count(*)
  182. FROM mz_internal.mz_frontiers frontiers
  183. JOIN mz_indexes indexes
  184. ON frontiers.object_id = indexes.id
  185. JOIN mz_clusters clusters
  186. ON indexes.cluster_id = clusters.id
  187. WHERE
  188. indexes.name LIKE 'mz_active_peeks_per_worker_u%_primary_idx' AND
  189. frontiers.read_frontier > 0 AND
  190. frontiers.write_frontier > 0 AND
  191. clusters.name = 'test'
  192. 1
  193. # Test that the frontiers of tables are reported.
  194. > SELECT
  195. tables.name
  196. FROM mz_internal.mz_frontiers frontiers
  197. JOIN mz_tables tables
  198. ON frontiers.object_id = tables.id
  199. WHERE
  200. frontiers.object_id LIKE 'u%' AND
  201. frontiers.read_frontier > 0 AND
  202. frontiers.write_frontier > 0
  203. t1
  204. # Test that the frontiers of storage-managed collections are reported.
  205. > SELECT
  206. sources.name
  207. FROM mz_internal.mz_frontiers frontiers
  208. JOIN mz_sources sources
  209. ON frontiers.object_id = sources.id
  210. WHERE
  211. sources.name = 'mz_frontiers' AND
  212. frontiers.read_frontier > 0 AND
  213. frontiers.write_frontier > 0
  214. mz_frontiers
  215. # Test that frontiers are added when replicas are created.
  216. > SELECT
  217. objects.name,
  218. replicas.name
  219. FROM mz_cluster_replica_frontiers frontiers
  220. JOIN mz_objects objects
  221. ON frontiers.object_id = objects.id
  222. JOIN mz_cluster_replicas replicas
  223. ON frontiers.replica_id = replicas.id
  224. JOIN mz_clusters clusters
  225. ON replicas.cluster_id = clusters.id
  226. WHERE
  227. objects.id LIKE 'u%' AND
  228. frontiers.write_frontier > 0 AND
  229. clusters.name = 'test'
  230. ct1 r1
  231. ct1 r2
  232. idx1 r1
  233. idx1 r2
  234. mv1 r1
  235. mv1 r2
  236. > CREATE CLUSTER REPLICA test.r3 SIZE '1'
  237. > SELECT
  238. objects.name,
  239. replicas.name
  240. FROM mz_cluster_replica_frontiers frontiers
  241. JOIN mz_objects objects
  242. ON frontiers.object_id = objects.id
  243. JOIN mz_cluster_replicas replicas
  244. ON frontiers.replica_id = replicas.id
  245. JOIN mz_clusters clusters
  246. ON replicas.cluster_id = clusters.id
  247. WHERE
  248. objects.id LIKE 'u%' AND
  249. frontiers.write_frontier > 0 AND
  250. clusters.name = 'test'
  251. ct1 r1
  252. ct1 r2
  253. ct1 r3
  254. idx1 r1
  255. idx1 r2
  256. idx1 r3
  257. mv1 r1
  258. mv1 r2
  259. mv1 r3
  260. # Test that frontiers are removed when replicas are dropped.
  261. > DROP CLUSTER REPLICA test.r1
  262. > SELECT
  263. objects.name,
  264. replicas.name
  265. FROM mz_cluster_replica_frontiers frontiers
  266. JOIN mz_objects objects
  267. ON frontiers.object_id = objects.id
  268. JOIN mz_cluster_replicas replicas
  269. ON frontiers.replica_id = replicas.id
  270. JOIN mz_clusters clusters
  271. ON replicas.cluster_id = clusters.id
  272. WHERE
  273. objects.id LIKE 'u%' AND
  274. frontiers.write_frontier > 0 AND
  275. clusters.name = 'test'
  276. ct1 r2
  277. ct1 r3
  278. idx1 r2
  279. idx1 r3
  280. mv1 r2
  281. mv1 r3
  282. # Test that empty frontiers show up as NULL.
  283. > CREATE MATERIALIZED VIEW mv2 AS SELECT 1
  284. > SELECT
  285. replicas.name,
  286. frontiers.write_frontier
  287. FROM mz_cluster_replica_frontiers frontiers
  288. JOIN mz_materialized_views mvs
  289. ON frontiers.object_id = mvs.id
  290. JOIN mz_cluster_replicas replicas
  291. ON frontiers.replica_id = replicas.id
  292. WHERE
  293. mvs.name = 'mv2'
  294. r2 <null>
  295. r3 <null>
  296. > SELECT
  297. frontiers.read_frontier,
  298. frontiers.write_frontier
  299. FROM mz_internal.mz_frontiers frontiers
  300. JOIN mz_materialized_views mvs
  301. ON frontiers.object_id = mvs.id
  302. WHERE
  303. mvs.name = 'mv2'
  304. 0 <null>
  305. # Test that frontiers are removed when objects are dropped.
  306. > DROP MATERIALIZED VIEW mv1
  307. > DROP MATERIALIZED VIEW mv2
  308. > DROP INDEX idx1
  309. > DROP CONTINUAL TASK ct1
  310. > DROP SOURCE source1 CASCADE
  311. > DROP SINK sink1
  312. > DROP TABLE t1
  313. > SELECT *
  314. FROM mz_cluster_replica_frontiers frontiers
  315. WHERE object_id LIKE 'u%'
  316. > SELECT *
  317. FROM mz_internal.mz_frontiers frontiers
  318. WHERE object_id LIKE 'u%'
  319. # Test that frontiers are correctly initialized for collections on clusters
  320. # with zero replicas.
  321. > CREATE CLUSTER empty SIZE '1', REPLICATION FACTOR 0
  322. > CREATE TABLE t2 (a int)
  323. > CREATE INDEX idx2 IN CLUSTER empty ON t2 (a)
  324. > SELECT read_frontier > 0, read_frontier = write_frontier
  325. FROM mz_internal.mz_frontiers
  326. JOIN mz_indexes ON (id = object_id)
  327. WHERE name = 'idx2'
  328. true true