deploy_promote.sql 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. -- Copyright Materialize, Inc. and contributors. All rights reserved.
  2. --
  3. -- Licensed under the Apache License, Version 2.0 (the "License");
  4. -- you may not use this file except in compliance with the License.
  5. -- You may obtain a copy of the License in the LICENSE file at the
  6. -- root of this repository, or online at
  7. --
  8. -- http://www.apache.org/licenses/LICENSE-2.0
  9. --
  10. -- Unless required by applicable law or agreed to in writing, software
  11. -- distributed under the License is distributed on an "AS IS" BASIS,
  12. -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. -- See the License for the specific language governing permissions and
  14. -- limitations under the License.
  15. {% macro deploy_promote(wait=False, poll_interval=15, lag_threshold='1s', dry_run=False) %}
  16. {#
  17. Performs atomic deployment of current dbt targets to production,
  18. based on the deployment configuration specified in the dbt_project.yml file.
  19. This macro ensures all deployment targets, including schemas and clusters,
  20. are fully hydrated and deployed together as a single atomic operation.
  21. If any part of the deployment fails, the entire deployment is rolled back
  22. to maintain consistency and prevent partial updates.
  23. ## Arguments
  24. - `wait` (boolean, optional): Waits for the deployment to be fully hydrated.
  25. Defaults to false. We recommend calling `deploy_await` manually and running
  26. additional validation checks before promoting a deployment.
  27. - `poll_interval` (integer): The interval, in seconds, between each readiness
  28. check. Default: '15s'.
  29. - `lag_threshold` (string): The maximum lag threshold, which determines when
  30. all objects in the environment are considered hydrated and it''s safe to
  31. perform the cutover step. Default: '1s'.
  32. - `dry_run` (boolean, optional): When `true`, prints out the sequence of
  33. commands dbt would execute without actually promoting the deployment, for
  34. validation.
  35. ## Returns
  36. None: This macro performs deployment actions but does not return a value.
  37. #}
  38. {% set current_target_name = target.name %}
  39. {% set deployment = var('deployment') %}
  40. {% set target_config = deployment[current_target_name] %}
  41. -- Check if the target-specific configuration exists
  42. {% if not target_config %}
  43. {{ exceptions.raise_compiler_error("No deployment configuration found for target " ~ current_target_name) }}
  44. {% endif %}
  45. {{ log("Creating deployment environment for target " ~ current_target_name, info=True) }}
  46. {% set clusters = target_config.get('clusters', []) %}
  47. {% set schemas = target_config.get('schemas', []) %}
  48. -- Check that all production schemas
  49. -- and clusters already exist
  50. {% for schema in schemas %}
  51. {% set deploy_schema = schema ~ "_dbt_deploy" %}
  52. {% if not schema_exists(schema) %}
  53. {{ exceptions.raise_compiler_error("Production schema " ~ schema ~ " does not exist") }}
  54. {% endif %}
  55. {% if not schema_exists(deploy_schema) %}
  56. {{ exceptions.raise_compiler_error("Deployment schema " ~ deploy_schema ~ " does not exist") }}
  57. {% endif %}
  58. {% endfor %}
  59. {% for cluster in clusters %}
  60. {% set deploy_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=True) %}
  61. {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
  62. {% if not cluster_exists(origin_cluster) %}
  63. {{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " does not exist") }}
  64. {% endif %}
  65. {% if not cluster_exists(deploy_cluster) %}
  66. {{ exceptions.raise_compiler_error("Deployment cluster " ~ deploy_cluster ~ " does not exist") }}
  67. {% endif %}
  68. {% endfor %}
  69. {% if wait %}
  70. {{ deploy_await(poll_interval, lag_threshold) }}
  71. {% endif %}
  72. {% if not dry_run %}
  73. {% call statement('swap', fetch_result=True, auto_begin=False) -%}
  74. BEGIN;
  75. {% for schema in schemas %}
  76. {% set deploy_schema = schema ~ "_dbt_deploy" %}
  77. {{ log("Swapping schemas " ~ schema ~ " and " ~ deploy_schema, info=True) }}
  78. ALTER SCHEMA {{ adapter.quote(schema) }} SWAP WITH {{ adapter.quote(deploy_schema) }};
  79. {% endfor %}
  80. {% for cluster in clusters %}
  81. {% set deploy_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=True) %}
  82. {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
  83. {{ log("Swapping clusters " ~ origin_cluster ~ " and " ~ deploy_cluster, info=True) }}
  84. ALTER CLUSTER {{ adapter.quote(origin_cluster) }} SWAP WITH {{ adapter.quote(deploy_cluster) }};
  85. {% endfor %}
  86. COMMIT;
  87. {%- endcall %}
  88. {{ tag_deployed_schemas(schemas) }}
  89. {% else %}
  90. {{ log("Starting dry run...", info=True) }}
  91. {% for schema in schemas %}
  92. {% set deploy_schema = schema ~ "_dbt_deploy" %}
  93. {{ log("DRY RUN: Swapping schemas " ~ schema ~ " and " ~ deploy_schema, info=True) }}
  94. {{ log("DRY RUN: ALTER SCHEMA " ~ adapter.quote(schema) ~ " SWAP WITH " ~ adapter.quote(deploy_schema), info=True) }}
  95. {% endfor %}
  96. {% for cluster in clusters %}
  97. {% set deploy_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=True) %}
  98. {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
  99. {{ log("DRY RUN: Swapping clusters " ~ adapter.generate_final_cluster_name(cluster) ~ " and " ~ deploy_cluster, info=True) }}
  100. {{ log("DRY RUN: ALTER CLUSTER " ~ adapter.quote(origin_cluster) ~ " SWAP WITH " ~ adapter.quote(deploy_cluster), info=True) }}
  101. {% endfor %}
  102. {{ log("Dry run completed. The statements above were **not** executed against Materialize.", info=True) }}
  103. {% endif %}
  104. {% set sinks_to_alter = discover_sinks(schemas) %}
  105. {{ process_sinks(sinks_to_alter, dry_run) }}
  106. {% endmacro %}
  107. {% macro cluster_contains_sinks(cluster) %}
  108. {% set query %}
  109. SELECT count(*) > 0
  110. FROM mz_sinks
  111. JOIN mz_clusters ON mz_sinks.cluster_id = mz_clusters.id
  112. WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
  113. {% endset %}
  114. {% set count = run_query(query) %}
  115. {% if execute %}
  116. {{ return(count.rows[0][0]) }}
  117. {% endif %}
  118. {% endmacro %}
  119. {% macro schema_contains_sinks(schema) %}
  120. {% set query %}
  121. SELECT count(*) > 0
  122. FROM mz_sinks
  123. JOIN mz_schemas ON mz_sinks.schema_id = mz_schemas.id
  124. JOIN mz_databases ON mz_schemas.database_id = mz_databases.id
  125. WHERE mz_schemas.name = {{ dbt.string_literal(schema) }}
  126. AND mz_databases.name = current_database()
  127. {% endset %}
  128. {% set count = run_query(query) %}
  129. {% if execute %}
  130. {{ return(count.rows[0][0]) }}
  131. {% endif %}
  132. {% endmacro %}
  133. {% macro get_current_database() %}
  134. {% set current_database_query = "SELECT current_database()" %}
  135. {% set results = run_query(current_database_query) %}
  136. {% if execute %}
  137. {% set current_database = results.rows[0][0] %}
  138. {{ return(current_database) }}
  139. {% else %}
  140. {{ return(None) }}
  141. {% endif %}
  142. {% endmacro %}
  143. {% macro discover_sinks(schemas) %}
  144. {% set sinks_to_alter = [] %}
  145. {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | selectattr("config.materialized", "equalto", "sink") %}
  146. {% set upstream_node = graph.nodes[node.depends_on.nodes[0]] %}
  147. {% set upstream_schema = upstream_node.schema %}
  148. {% if upstream_schema in schemas %}
  149. {% set sink_database = node.database %}
  150. {% set sink_schema = node.schema %}
  151. {% set sink_name = node.name %}
  152. {% set new_upstream_relation = adapter.quote(upstream_node.database) ~ '.' ~ adapter.quote(upstream_schema) ~ '.' ~ adapter.quote(upstream_node.name) %}
  153. {% set sink = {
  154. "database": sink_database,
  155. "schema": sink_schema,
  156. "name": sink_name,
  157. "new_upstream_relation": new_upstream_relation
  158. } %}
  159. {% do sinks_to_alter.append(sink) %}
  160. {% endif %}
  161. {% endfor %}
  162. {{ return(sinks_to_alter) }}
  163. {% endmacro %}
  164. {% macro process_sinks(sinks, dry_run) %}
  165. {% if sinks|length > 0 %}
  166. {% for sink in sinks %}
  167. {% if sink['database'] and sink['schema'] and sink['name'] and sink['new_upstream_relation'] %}
  168. {% if not dry_run %}
  169. {% call statement('alter_sink_' ~ loop.index, fetch_result=True, auto_begin=False) %}
  170. {{ log("Running ALTER SINK " ~ adapter.quote(sink['database']) ~ "." ~ adapter.quote(sink['schema']) ~ "." ~ adapter.quote(sink['name']) ~ " SET FROM " ~ sink['new_upstream_relation'], info=True) }}
  171. ALTER SINK {{ adapter.quote(sink['database']) }}.{{ adapter.quote(sink['schema']) }}.{{ adapter.quote(sink['name']) }} SET FROM {{ sink['new_upstream_relation'] }};
  172. {% endcall %}
  173. {% else %}
  174. {{ log("DRY RUN: ALTER SINK " ~ adapter.quote(sink['database']) ~ "." ~ adapter.quote(sink['schema']) ~ "." ~ adapter.quote(sink['name']) ~ " SET FROM " ~ sink['new_upstream_relation'], info=True) }}
  175. {% endif %}
  176. {% endif %}
  177. {% endfor %}
  178. {% else %}
  179. {{ log("No sinks to process.", info=True) }}
  180. {% endif %}
  181. {% endmacro %}
  182. {% macro tag_deployed_schemas(schemas) %}
  183. {% set commit_sha = adapter.get_git_commit_sha() %}
  184. {% set result = run_query("SELECT current_user, now();") %}
  185. {% if result is not none and result.rows|length > 0 %}
  186. {% set db_user = result.columns[0][0] %}
  187. {% set deploy_time = result.columns[0][1] %}
  188. {% else %}
  189. {% set db_user = "unknown" %}
  190. {% set deploy_time = "unknown" %}
  191. {% endif %}
  192. {% set schema_comment %}
  193. Deployment by {{ db_user }} on {{ deploy_time }}
  194. {%- if commit_sha is not none and commit_sha != '' %}
  195. | Commit SHA: {{ commit_sha }}
  196. {%- endif %}
  197. {% endset %}
  198. {% for schema in schemas %}
  199. {{ log("Tagging schema: " ~ schema, info=True) }}
  200. {% set comment_sql %}
  201. COMMENT ON SCHEMA {{ adapter.quote(schema) }} IS {{ dbt.string_literal(schema_comment) }}
  202. {% endset %}
  203. {% do run_query(comment_sql) %}
  204. {% endfor %}
  205. {% endmacro %}