123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- -- Copyright Materialize, Inc. and contributors. All rights reserved.
- --
- -- Licensed under the Apache License, Version 2.0 (the "License");
- -- you may not use this file except in compliance with the License.
- -- You may obtain a copy of the License in the LICENSE file at the
- -- root of this repository, or online at
- --
- -- http://www.apache.org/licenses/LICENSE-2.0
- --
- -- Unless required by applicable law or agreed to in writing, software
- -- distributed under the License is distributed on an "AS IS" BASIS,
- -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- -- See the License for the specific language governing permissions and
- -- limitations under the License.
- {% macro deploy_init(ignore_existing_objects=False) %}
- {% set current_target_name = target.name %}
- {% set deployment = var('deployment') %}
- {% set target_config = deployment[current_target_name] %}
- -- Check if the target-specific configuration exists
- {% if not target_config %}
- {{ exceptions.raise_compiler_error("No deployment configuration found for target " ~ current_target_name) }}
- {% endif %}
- {{ log("Creating deployment environment for target " ~ current_target_name, info=True) }}
- {% set clusters = target_config.get('clusters', []) %}
- {% set schemas = target_config.get('schemas', []) %}
- -- Check that all production schemas
- -- and clusters already exist
- {% for schema in schemas %}
- {% if not schema_exists(schema) %}
- {{ exceptions.raise_compiler_error("Production schema " ~ schema ~ " does not exist") }}
- {% endif %}
- {% if schema_contains_sinks(schema) %}
- {{ exceptions.raise_compiler_error("""
- Production schema " ~ schema ~ " contains sinks.
- Blue/green deployments require sinks to be in a dedicated schema.
- """) }}
- {% endif %}
- {% endfor %}
- {% for cluster in clusters %}
- {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
- {% if not cluster_exists(origin_cluster) %}
- {{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " does not exist") }}
- {% endif %}
- {% if cluster_contains_sinks(origin_cluster) %}
- {{ exceptions.raise_compiler_error("""
- Production cluster " ~ origin_cluster ~ " contains sinks.
- Blue/green deployments require sinks to be in a dedicated cluster.
- """) }}
- {% endif %}
- {% endfor %}
- {{ deploy_validate_permissions(clusters, schemas) }}
- {% for schema in schemas %}
- {% set deploy_schema = schema ~ "_dbt_deploy" %}
- {% if schema_exists(deploy_schema) %}
- {{ log("Deployment schema " ~ deploy_schema ~ " already exists", info=True)}}
- {% set schema_empty %}
- SELECT count(*)
- FROM mz_objects
- JOIN mz_schemas ON mz_objects.schema_id = mz_schemas.id
- JOIN mz_databases ON mz_databases.id = mz_schemas.database_id
- WHERE mz_schemas.name = {{ dbt.string_literal(deploy_schema) }}
- AND mz_objects.id LIKE 'u%'
- AND mz_databases.name = current_database()
- {% endset %}
- {% set schema_object_count = run_query(schema_empty) %}
- {% if execute %}
- {% if schema_object_count and schema_object_count.columns[0] and schema_object_count.rows[0][0] > 0 %}
- {% if check_schema_ci_tag(deploy_schema) %}
- {{ log("Schema " ~ deploy_schema ~ " was already created for this pull request", info=True) }}
- {% elif ignore_existing_objects %}
- {{ log("[Warning] Deployment schema " ~ deploy_schema ~ " is not empty", info=True) }}
- {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }}
- {% else %}
- {{ exceptions.raise_compiler_error("""
- Deployment schema """ ~ deploy_schema ~ """ already exists and is not empty.
- This is potentially dangerous as you may end up deploying objects to production you
- do not intend.
- If you are certain the objects in this schema are supposed to exist, you can ignore this
- error by setting ignore_existing_objects to True.
- dbt run-operation create_deployment_environment --args '{ignore_existing_objects: True}'
- """) }}
- {% endif %}
- {% endif %}
- {% endif %}
- {% else %}
- {{ log("Creating deployment schema " ~ deploy_schema, info=True)}}
- {% set create_schema %}
- CREATE SCHEMA {{ deploy_schema }};
- {% endset %}
- {{ run_query(create_schema) }}
- {{ set_schema_ci_tag() }}
- {{ internal_copy_schema_default_privs(schema, deploy_schema) }}
- {{ internal_copy_schema_grants(schema, deploy_schema) }}
- {% endif %}
- {% endfor %}
- {% for cluster in clusters %}
- {% set origin_cluster = adapter.generate_final_cluster_name(cluster, force_deploy_suffix=False) %}
- {% set cluster_configuration %}
- SELECT
- c.managed,
- c.size,
- c.replication_factor,
- c.id AS cluster_id,
- c.name AS cluster_name,
- cs.type AS schedule_type,
- cs.refresh_hydration_time_estimate
- FROM mz_clusters c
- LEFT JOIN mz_internal.mz_cluster_schedules cs ON cs.cluster_id = c.id
- WHERE c.name = {{ dbt.string_literal(origin_cluster) }}
- {% endset %}
- {% set cluster_config_results = run_query(cluster_configuration) %}
- {% if execute %}
- {% set results = cluster_config_results.rows[0] %}
- {% set managed = results[0] %}
- {% set size = results[1] %}
- {% set replication_factor = results[2] %}
- {% set schedule_type = results[5] %}
- {% set refresh_hydration_time_estimate = results[6] %}
- {% if not managed %}
- {{ exceptions.raise_compiler_error("Production cluster " ~ origin_cluster ~ " is not managed") }}
- {% endif %}
- {% set deploy_cluster = create_cluster(
- cluster_name=cluster,
- size=size,
- replication_factor=replication_factor,
- schedule_type=schedule_type,
- refresh_hydration_time_estimate=refresh_hydration_time_estimate,
- ignore_existing_objects=ignore_existing_objects,
- force_deploy_suffix=True
- ) %}
- {{ internal_copy_cluster_grants(cluster, deploy_cluster) }}
- {% endif %}
- {% endfor %}
- {% endmacro %}
- {% macro internal_copy_schema_default_privs(from, to) %}
- {% set find_default_privs %}
- SELECT
- 'ALTER DEFAULT PRIVILEGES ' ||
- CASE
- WHEN object_owner = 'PUBLIC' THEN 'FOR ALL ROLES '
- ELSE 'FOR ROLE ' || quote_ident(object_owner) || ' '
- END ||
- 'IN SCHEMA {{ to }} ' ||
- 'GRANT ' || privilege_type || ' ' ||
- 'ON ' || object_type || 's ' ||
- CASE
- WHEN grantee = 'PUBLIC' THEN 'TO PUBLIC'
- ELSE 'TO ' || quote_ident(grantee)
- END
- FROM mz_internal.mz_show_default_privileges
- WHERE database = current_database() AND schema = {{ dbt.string_literal(from) }}
- AND object_owner <> 'none' AND grantee <> 'none'
- {% endset %}
- {% set alter_defaults = run_query(find_default_privs) %}
- {% if execute %}
- {% for alter in alter_defaults.rows %}
- {{ run_query(alter[0]) }}
- {% endfor %}
- {% endif %}
- {% endmacro %}
- {% macro internal_copy_schema_grants(from, to) %}
- {% set find_revokes %}
- WITH schema_privilege AS (
- SELECT mz_internal.mz_aclexplode(s.privileges).*
- FROM mz_schemas s
- JOIN mz_databases d ON s.database_id = d.id
- WHERE d.name = current_database()
- AND s.name = {{ dbt.string_literal(to) }}
- )
- SELECT 'REVOKE ' || s.privilege_type || ' ' ||
- 'ON SCHEMA ' || quote_ident({{ dbt.string_literal(to) }}) || ' ' ||
- 'FROM ' || quote_ident(grantee.name)
- FROM schema_privilege AS s
- JOIN mz_roles AS grantee ON s.grantee = grantee.id
- WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support', current_role)
- {% endset %}
- {% set revokes = run_query(find_revokes) %}
- {% if execute %}
- {% for revoke in revokes.rows %}
- {{ run_query(revoke[0]) }}
- {% endfor %}
- {% endif %}
- {% set find_grants %}
- WITH schema_privilege AS (
- SELECT mz_internal.mz_aclexplode(s.privileges).*
- FROM mz_schemas s
- JOIN mz_databases d ON s.database_id = d.id
- WHERE d.name = current_database()
- AND s.name = {{ dbt.string_literal(from) }}
- )
- SELECT 'GRANT ' || s.privilege_type || ' ' ||
- 'ON SCHEMA ' || quote_ident({{ dbt.string_literal(to) }}) || ' ' ||
- 'TO ' || quote_ident(grantee.name)
- FROM schema_privilege AS s
- JOIN mz_roles AS grantee ON s.grantee = grantee.id
- WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support', current_role)
- {% endset %}
- {% set grants = run_query(find_grants) %}
- {% if execute %}
- {% for grant in grants.rows %}
- {{ run_query(grant[0]) }}
- {% endfor %}
- {% endif %}
- {% endmacro %}
- {% macro internal_copy_cluster_grants(from, to) %}
- {% set find_revokes %}
- WITH cluster_privilege AS (
- SELECT mz_internal.mz_aclexplode(privileges).*
- FROM mz_clusters
- WHERE name = {{ dbt.string_literal(to) }}
- )
- SELECT 'REVOKE ' || c.privilege_type || ' ' ||
- 'ON CLUSTER ' || quote_ident({{ dbt.string_literal(to) }}) || ' ' ||
- 'FROM ' || quote_ident(grantee.name)
- FROM cluster_privilege AS c
- JOIN mz_roles AS grantee ON c.grantee = grantee.id
- WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
- AND grantee.name <> current_role
- {% endset %}
- {% set revokes = run_query(find_revokes) %}
- {% if execute %}
- {% for revoke in revokes.rows %}
- {{ run_query(revoke[0]) }}
- {% endfor %}
- {% endif %}
- {% set find_grants %}
- WITH cluster_privilege AS (
- SELECT mz_internal.mz_aclexplode(privileges).*
- FROM mz_clusters
- WHERE name = {{ dbt.string_literal(from) }}
- )
- SELECT 'GRANT ' || c.privilege_type || ' ' ||
- 'ON CLUSTER ' || quote_ident({{ dbt.string_literal(to) }}) || ' ' ||
- 'TO ' || quote_ident(grantee.name)
- FROM cluster_privilege AS c
- JOIN mz_roles AS grantee ON c.grantee = grantee.id
- WHERE grantee.name NOT IN ('none', 'mz_system', 'mz_support')
- {% endset %}
- {% set grants = run_query(find_grants) %}
- {% if execute %}
- {% for grant in grants.rows %}
- {{ run_query(grant[0]) }}
- {% endfor %}
- {% endif %}
- {% endmacro %}
|