is_cluster_ready.sql 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. -- Copyright Materialize, Inc. and contributors. All rights reserved.
  2. --
  3. -- Licensed under the Apache License, Version 2.0 (the "License");
  4. -- you may not use this file except in compliance with the License.
  5. -- You may obtain a copy of the License in the LICENSE file at the
  6. -- root of this repository, or online at
  7. --
  8. -- http://www.apache.org/licenses/LICENSE-2.0
  9. --
  10. -- Unless required by applicable law or agreed to in writing, software
  11. -- distributed under the License is distributed on an "AS IS" BASIS,
  12. -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. -- See the License for the specific language governing permissions and
  14. -- limitations under the License.
  15. {% macro is_cluster_ready(cluster=target.cluster|default(none), lag_threshold='1s') %}
  16. {% if cluster is none %}
  17. {{ exceptions.raise_compiler_error("No cluster specified and no default cluster found in target profile. " ~ current_target_name) }}
  18. {% endif %}
  19. {{ log("Checking pending objects for cluster " ~ cluster, info=True) }}
  20. {% set cluster_exists %}
  21. WITH total_replicas AS (
  22. SELECT cluster_id, count(*) AS replicas
  23. FROM mz_cluster_replicas
  24. GROUP BY cluster_id
  25. )
  26. SELECT COALESCE(replicas, 0) AS replicas
  27. FROM mz_clusters
  28. LEFT JOIN total_replicas ON mz_clusters.id = cluster_id
  29. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  30. {%- endset -%}
  31. {%- set result = run_query(cluster_exists) %}
  32. {%- if execute -%}
  33. {%- if result and result.rows|length > 0 -%}
  34. {%- set replicas_value = result.rows[0][0] %}
  35. {%- if replicas_value is none or replicas_value == 0 -%}
  36. {{ log("Cluster " ~ cluster ~ " has no running replicas", info=true) }}
  37. {{ return(false) }}
  38. {%- endif -%}
  39. {%- else -%}
  40. {{ log("Cluster " ~ cluster ~ " does not exist", info=true) }}
  41. {{ return(false) }}
  42. {%- endif -%}
  43. {%- endif -%}
  44. {%- set check_pending_objects_sql %}
  45. WITH dataflows AS (
  46. SELECT
  47. mz_cluster_replicas.id AS replica_id,
  48. mz_indexes.id AS object_id,
  49. mz_indexes.name,
  50. 'index' AS type
  51. FROM mz_indexes
  52. JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
  53. JOIN mz_cluster_replicas ON mz_clusters.id = mz_cluster_replicas.cluster_id
  54. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  55. UNION ALL
  56. SELECT
  57. mz_cluster_replicas.id AS replica_id,
  58. mz_materialized_views.id AS object_id,
  59. mz_materialized_views.name,
  60. 'materialized-view' AS type
  61. FROM mz_materialized_views
  62. JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
  63. JOIN mz_cluster_replicas ON mz_clusters.id = mz_cluster_replicas.cluster_id
  64. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  65. UNION ALL
  66. SELECT
  67. mz_cluster_replicas.id AS replica_id,
  68. mz_sources.id AS object_id,
  69. mz_sources.name,
  70. 'source' AS type
  71. FROM mz_sources
  72. JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id
  73. JOIN mz_cluster_replicas ON mz_clusters.id = mz_cluster_replicas.cluster_id
  74. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  75. UNION ALL
  76. SELECT
  77. mz_cluster_replicas.id AS replica_id,
  78. mz_sinks.id AS object_id,
  79. mz_sinks.name,
  80. 'sink' AS type
  81. FROM mz_sinks
  82. JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id
  83. JOIN mz_cluster_replicas ON mz_clusters.id = mz_cluster_replicas.cluster_id
  84. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  85. ),
  86. ready_dataflows AS (
  87. SELECT replica_id, object_id, name, type
  88. FROM dataflows
  89. JOIN mz_internal.mz_hydration_statuses AS h USING (object_id, replica_id)
  90. LEFT JOIN mz_internal.mz_materialization_lag AS l USING (object_id)
  91. WHERE h.hydrated AND (l.local_lag <= {{ dbt.string_literal(lag_threshold) }} OR l.local_lag IS NULL)
  92. ),
  93. pending_dataflows AS (
  94. SELECT replica_id, object_id, name, type
  95. FROM dataflows d
  96. EXCEPT
  97. SELECT replica_id, object_id, name, type
  98. FROM ready_dataflows r
  99. ),
  100. ready_replicas AS (
  101. SELECT mz_cluster_replicas.id AS replica_id
  102. FROM mz_cluster_replicas
  103. JOIN mz_clusters ON mz_clusters.id = mz_cluster_replicas.cluster_id
  104. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  105. EXCEPT
  106. SELECT replica_id
  107. FROM pending_dataflows
  108. )
  109. SELECT object_id, name, type
  110. FROM pending_dataflows
  111. WHERE NOT EXISTS (
  112. SELECT * FROM ready_replicas
  113. )
  114. {%- endset %}
  115. {%- set results = run_query(check_pending_objects_sql) %}
  116. {%- if execute -%}
  117. {#- If there are results, the query will return at least one row -#}
  118. {%- if results and results.column_names and results.rows|length > 0 -%}
  119. {#- There are pending objects, so print them -#}
  120. {{ log("Some objects still hydrating in cluster " ~ cluster ~ ":", info=True) }}
  121. {%- for row in results.rows -%}
  122. {{ log("- [" ~ row[2] ~ "(" ~ row[0] ~ ")]: " ~ row[1], info=True) }}
  123. {%- endfor -%}
  124. {{ return(false) }}
  125. {%- else -%}
  126. {#- No pending objects found for the specified cluster -#}
  127. {{ log("All objects hydrated in cluster " ~ cluster ~ ". The cluster is ready.", info=True) }}
  128. {{ return(true) }}
  129. {%- endif -%}
  130. {%- endif -%}
  131. {% endmacro %}