123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- -- Copyright Materialize, Inc. and contributors. All rights reserved.
- --
- -- Licensed under the Apache License, Version 2.0 (the "License");
- -- you may not use this file except in compliance with the License.
- -- You may obtain a copy of the License in the LICENSE file at the
- -- root of this repository, or online at
- --
- -- http://www.apache.org/licenses/LICENSE-2.0
- --
- -- Unless required by applicable law or agreed to in writing, software
- -- distributed under the License is distributed on an "AS IS" BASIS,
- -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- -- See the License for the specific language governing permissions and
- -- limitations under the License.
- {% macro deploy_promote(wait=False, poll_interval=15, lag_threshold='1s', dry_run=False) %}
- {#
- Performs atomic deployment of current dbt targets to production,
- based on the deployment configuration specified in the dbt_project.yml file.
- This macro ensures all deployment targets, including schemas and clusters,
- are fully hydrated and deployed together as a single atomic operation.
- If any part of the deployment fails, the entire deployment is rolled back
- to maintain consistency and prevent partial updates.
- ## Arguments
- - `wait` (boolean, optional): Waits for the deployment to be fully hydrated.
- Defaults to false. We recommend calling `deploy_await` manually and running
- additional validation checks before promoting a deployment.
- - `poll_interval` (integer): The interval, in seconds, between each readiness
- check. Default: '15s'.
- - `lag_threshold` (string): The maximum lag threshold, which determines when
- all objects in the environment are considered hydrated and it''s safe to
- perform the cutover step. Default: '1s'.
- - `dry_run` (boolean, optional): When `true`, prints out the sequence of
- commands dbt would execute without actually promoting the deployment, for
- validation.
- ## Returns
- None: This macro performs deployment actions but does not return a value.
- #}
- {% set current_target_name = target.name %}
- {% set deployment = var('deployment') %}
- {% set target_config = deployment[current_target_name] %}
- -- Check if the target-specific configuration exists
- {% if not target_config %}
- {{ exceptions.raise_compiler_error("No deployment configuration found for target " ~ current_target_name) }}
- {% endif %}
- {{ log("Creating deployment environment for target " ~ current_target_name, info=True) }}
- {% set clusters = target_config.get('clusters', []) %}
- {% set schemas = target_config.get('schemas', []) %}
- -- Check that all production schemas
- -- and clusters already exist
- {% for schema in schemas %}
- {% set deploy_schema = schema ~ "_dbt_deploy" %}
- {% if not schema_exists(schema) %}
- {{ exceptions.raise_compiler_error("Production schema " ~ schema ~ " does not exist") }}
- {% endif %}
- {% if not schema_exists(deploy_schema) %}
- {{ exceptions.raise_compiler_error("Deployment schema " ~ deploy_schema ~ " does not exist") }}
- {% endif %}
- {% endfor %}
- {% for cluster in clusters %}
- {% set deploy_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=True) %}
- {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
- {% if not cluster_exists(origin_cluster) %}
- {{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " does not exist") }}
- {% endif %}
- {% if not cluster_exists(deploy_cluster) %}
- {{ exceptions.raise_compiler_error("Deployment cluster " ~ deploy_cluster ~ " does not exist") }}
- {% endif %}
- {% endfor %}
- {% if wait %}
- {{ deploy_await(poll_interval, lag_threshold) }}
- {% endif %}
- {% if not dry_run %}
- {% call statement('swap', fetch_result=True, auto_begin=False) -%}
- BEGIN;
- {% for schema in schemas %}
- {% set deploy_schema = schema ~ "_dbt_deploy" %}
- {{ log("Swapping schemas " ~ schema ~ " and " ~ deploy_schema, info=True) }}
- ALTER SCHEMA {{ adapter.quote(schema) }} SWAP WITH {{ adapter.quote(deploy_schema) }};
- {% endfor %}
- {% for cluster in clusters %}
- {% set deploy_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=True) %}
- {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
- {{ log("Swapping clusters " ~ origin_cluster ~ " and " ~ deploy_cluster, info=True) }}
- ALTER CLUSTER {{ adapter.quote(origin_cluster) }} SWAP WITH {{ adapter.quote(deploy_cluster) }};
- {% endfor %}
- COMMIT;
- {%- endcall %}
- {{ tag_deployed_schemas(schemas) }}
- {% else %}
- {{ log("Starting dry run...", info=True) }}
- {% for schema in schemas %}
- {% set deploy_schema = schema ~ "_dbt_deploy" %}
- {{ log("DRY RUN: Swapping schemas " ~ schema ~ " and " ~ deploy_schema, info=True) }}
- {{ log("DRY RUN: ALTER SCHEMA " ~ adapter.quote(schema) ~ " SWAP WITH " ~ adapter.quote(deploy_schema), info=True) }}
- {% endfor %}
- {% for cluster in clusters %}
- {% set deploy_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=True) %}
- {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
- {{ log("DRY RUN: Swapping clusters " ~ adapter.generate_final_cluster_name(cluster) ~ " and " ~ deploy_cluster, info=True) }}
- {{ log("DRY RUN: ALTER CLUSTER " ~ adapter.quote(origin_cluster) ~ " SWAP WITH " ~ adapter.quote(deploy_cluster), info=True) }}
- {% endfor %}
- {{ log("Dry run completed. The statements above were **not** executed against Materialize.", info=True) }}
- {% endif %}
- {% set sinks_to_alter = discover_sinks(schemas) %}
- {{ process_sinks(sinks_to_alter, dry_run) }}
- {% endmacro %}
- {% macro cluster_contains_sinks(cluster) %}
- {% set query %}
- SELECT count(*) > 0
- FROM mz_sinks
- JOIN mz_clusters ON mz_sinks.cluster_id = mz_clusters.id
- WHERE mz_clusters.name = {{ dbt.string_literal(cluster) }}
- {% endset %}
- {% set count = run_query(query) %}
- {% if execute %}
- {{ return(count.rows[0][0]) }}
- {% endif %}
- {% endmacro %}
- {% macro schema_contains_sinks(schema) %}
- {% set query %}
- SELECT count(*) > 0
- FROM mz_sinks
- JOIN mz_schemas ON mz_sinks.schema_id = mz_schemas.id
- JOIN mz_databases ON mz_schemas.database_id = mz_databases.id
- WHERE mz_schemas.name = {{ dbt.string_literal(schema) }}
- AND mz_databases.name = current_database()
- {% endset %}
- {% set count = run_query(query) %}
- {% if execute %}
- {{ return(count.rows[0][0]) }}
- {% endif %}
- {% endmacro %}
- {% macro get_current_database() %}
- {% set current_database_query = "SELECT current_database()" %}
- {% set results = run_query(current_database_query) %}
- {% if execute %}
- {% set current_database = results.rows[0][0] %}
- {{ return(current_database) }}
- {% else %}
- {{ return(None) }}
- {% endif %}
- {% endmacro %}
- {% macro discover_sinks(schemas) %}
- {% set sinks_to_alter = [] %}
- {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | selectattr("config.materialized", "equalto", "sink") %}
- {% set upstream_node = graph.nodes[node.depends_on.nodes[0]] %}
- {% set upstream_schema = upstream_node.schema %}
- {% if upstream_schema in schemas %}
- {% set sink_database = node.database %}
- {% set sink_schema = node.schema %}
- {% set sink_name = node.name %}
- {% set new_upstream_relation = adapter.quote(upstream_node.database) ~ '.' ~ adapter.quote(upstream_schema) ~ '.' ~ adapter.quote(upstream_node.name) %}
- {% set sink = {
- "database": sink_database,
- "schema": sink_schema,
- "name": sink_name,
- "new_upstream_relation": new_upstream_relation
- } %}
- {% do sinks_to_alter.append(sink) %}
- {% endif %}
- {% endfor %}
- {{ return(sinks_to_alter) }}
- {% endmacro %}
- {% macro process_sinks(sinks, dry_run) %}
- {% if sinks|length > 0 %}
- {% for sink in sinks %}
- {% if sink['database'] and sink['schema'] and sink['name'] and sink['new_upstream_relation'] %}
- {% if not dry_run %}
- {% call statement('alter_sink_' ~ loop.index, fetch_result=True, auto_begin=False) %}
- {{ log("Running ALTER SINK " ~ adapter.quote(sink['database']) ~ "." ~ adapter.quote(sink['schema']) ~ "." ~ adapter.quote(sink['name']) ~ " SET FROM " ~ sink['new_upstream_relation'], info=True) }}
- ALTER SINK {{ adapter.quote(sink['database']) }}.{{ adapter.quote(sink['schema']) }}.{{ adapter.quote(sink['name']) }} SET FROM {{ sink['new_upstream_relation'] }};
- {% endcall %}
- {% else %}
- {{ log("DRY RUN: ALTER SINK " ~ adapter.quote(sink['database']) ~ "." ~ adapter.quote(sink['schema']) ~ "." ~ adapter.quote(sink['name']) ~ " SET FROM " ~ sink['new_upstream_relation'], info=True) }}
- {% endif %}
- {% endif %}
- {% endfor %}
- {% else %}
- {{ log("No sinks to process.", info=True) }}
- {% endif %}
- {% endmacro %}
- {% macro tag_deployed_schemas(schemas) %}
- {% set commit_sha = adapter.get_git_commit_sha() %}
- {% set result = run_query("SELECT current_user, now();") %}
- {% if result is not none and result.rows|length > 0 %}
- {% set db_user = result.columns[0][0] %}
- {% set deploy_time = result.columns[0][1] %}
- {% else %}
- {% set db_user = "unknown" %}
- {% set deploy_time = "unknown" %}
- {% endif %}
- {% set schema_comment %}
- Deployment by {{ db_user }} on {{ deploy_time }}
- {%- if commit_sha is not none and commit_sha != '' %}
- | Commit SHA: {{ commit_sha }}
- {%- endif %}
- {% endset %}
- {% for schema in schemas %}
- {{ log("Tagging schema: " ~ schema, info=True) }}
- {% set comment_sql %}
- COMMENT ON SCHEMA {{ adapter.quote(schema) }} IS {{ dbt.string_literal(schema_comment) }}
- {% endset %}
- {% do run_query(comment_sql) %}
- {% endfor %}
- {% endmacro %}
|