123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License in the LICENSE file at the
- # root of this repository, or online at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import time
- import pytest
- from dbt.tests.util import run_dbt
- from fixtures import (
- test_materialized_view,
- test_materialized_view_deploy,
- test_materialized_view_index,
- test_sink,
- test_sink_deploy,
- test_source,
- test_view_index,
- )
- class TestApplyGrantsAndPrivileges:
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- role_exists = project.run_sql(
- "SELECT count(*) = 1 FROM mz_roles WHERE name = 'my_role'",
- fetch="one",
- )[0]
- if role_exists:
- project.run_sql("DROP OWNED BY my_role")
- project.run_sql("DROP ROLE IF EXISTS my_role")
- project.run_sql("DROP SCHEMA IF EXISTS blue_schema CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS green_schema CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS blue_cluster CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS green_cluster CASCADE")
- def test_apply_schema_default_privileges(self, project):
- project.run_sql("CREATE ROLE my_role")
- project.run_sql("GRANT my_role TO materialize")
- project.run_sql("CREATE SCHEMA blue_schema")
- project.run_sql("CREATE SCHEMA green_schema")
- project.run_sql(
- "ALTER DEFAULT PRIVILEGES FOR ROLE my_role IN SCHEMA blue_schema GRANT SELECT ON TABLES TO my_role"
- )
- run_dbt(
- [
- "run-operation",
- "internal_copy_schema_default_privs",
- "--args",
- "{from: blue_schema, to: green_schema}",
- ]
- )
- result = project.run_sql(
- """SELECT count(*) = 1
- FROM mz_internal.mz_show_default_privileges
- WHERE database = current_database()
- AND schema = 'green_schema'
- AND grantee = 'my_role'
- AND object_type = 'table'
- AND privilege_type = 'SELECT'""",
- fetch="one",
- )
- assert result[0]
- def test_apply_schema_grants(self, project):
- project.run_sql("CREATE ROLE my_role")
- project.run_sql("GRANT my_role TO materialize")
- project.run_sql("CREATE SCHEMA blue_schema")
- project.run_sql("CREATE SCHEMA green_schema")
- project.run_sql("GRANT CREATE ON SCHEMA green_schema TO my_role")
- project.run_sql("GRANT USAGE ON SCHEMA blue_schema TO my_role")
- run_dbt(
- [
- "run-operation",
- "internal_copy_schema_grants",
- "--args",
- "{from: blue_schema, to: green_schema}",
- ]
- )
- result = project.run_sql(
- """
- SELECT count(*) = 0
- FROM mz_internal.mz_show_schema_privileges
- WHERE grantee = 'my_role'
- AND name = 'green_schema'
- AND privilege_type = 'CREATE'""",
- fetch="one",
- )
- assert result[0]
- result = project.run_sql(
- """
- SELECT count(*) = 1
- FROM mz_internal.mz_show_schema_privileges
- WHERE grantee = 'my_role'
- AND name = 'green_schema'
- AND privilege_type = 'USAGE'""",
- fetch="one",
- )
- assert result[0]
- def test_apply_cluster_grants(self, project):
- project.run_sql("CREATE ROLE my_role")
- project.run_sql("GRANT my_role TO materialize")
- project.run_sql("CREATE CLUSTER blue_cluster SIZE = '1'")
- project.run_sql("CREATE CLUSTER green_cluster SIZE = '1'")
- project.run_sql("GRANT CREATE ON CLUSTER green_cluster TO my_role")
- project.run_sql("GRANT USAGE ON CLUSTER blue_cluster TO my_role")
- run_dbt(
- [
- "run-operation",
- "internal_copy_cluster_grants",
- "--args",
- "{from: blue_cluster, to: green_cluster}",
- ]
- )
- result = project.run_sql(
- """
- SELECT count(*) = 0
- FROM mz_internal.mz_show_cluster_privileges
- WHERE grantee = 'my_role'
- AND name = 'green_cluster'
- AND privilege_type = 'CREATE'""",
- fetch="one",
- )
- assert result[0]
- result = project.run_sql(
- """
- SELECT count(*) = 1
- FROM mz_internal.mz_show_cluster_privileges
- WHERE grantee = 'my_role'
- AND name = 'green_cluster'
- AND privilege_type = 'USAGE'""",
- fetch="one",
- )
- assert result[0]
- class TestCIFixture:
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("COMMENT ON CLUSTER quickstart IS NULL")
- project.run_sql("COMMENT ON SCHEMA public IS NULL")
- def test_ci_tags_match(self, project):
- run_dbt(
- [
- "run-operation",
- "set_cluster_ci_tag",
- "--args",
- "{cluster: quickstart, ci_tag: test}",
- ]
- )
- run_dbt(
- [
- "run-operation",
- "set_schema_ci_tag",
- "--args",
- "{schema: public, ci_tag: test}",
- ]
- )
- run_dbt(
- [
- "run-operation",
- "check_cluster_ci_tag",
- "--args",
- "{cluster: quickstart, ci_tag: test}",
- ]
- )
- run_dbt(
- [
- "run-operation",
- "check_schema_ci_tag",
- "--args",
- "{schema: public, ci_tag: test}",
- ]
- )
- def test_ci_tags_mismatch(self, project):
- run_dbt(
- [
- "run-operation",
- "set_cluster_ci_tag",
- "--args",
- "{cluster: quickstart, ci_tag: test}",
- ]
- )
- run_dbt(
- [
- "run-operation",
- "set_schema_ci_tag",
- "--args",
- "{schema: public, ci_tag: test}",
- ]
- )
- run_dbt(
- [
- "run-operation",
- "check_cluster_ci_tag",
- "--args",
- "{cluster: quickstart, ci_tag: different}",
- ],
- expect_pass=False,
- )
- run_dbt(
- [
- "run-operation",
- "check_schema_ci_tag",
- "--args",
- "{schema: public, ci_tag: different}",
- ],
- expect_pass=False,
- )
- class TestPermissionValidation:
- """Tests for database permissions. We run against the internal
- macros because the materialize user is a superuser and we'd short
- circuit these checks if executing deploy_validate_permissions"""
- def test_createcluster_permissions(self, project):
- run_dbt(["run-operation", "internal_ensure_createcluster_permission"])
- def test_database_create_permissions(self, project):
- run_dbt(["run-operation", "internal_ensure_database_permission"])
- def test_schema_owner_permissions(self, project):
- project.run_sql("CREATE SCHEMA my_schema")
- run_dbt(
- [
- "run-operation",
- "internal_ensure_schema_ownership",
- "--args",
- "{schemas: ['my_schema']}",
- ]
- )
- def test_cluster_owner_permissions(self, project):
- run_dbt(
- [
- "run-operation",
- "internal_ensure_cluster_ownership",
- "--args",
- "{clusters: ['quickstart']}",
- ]
- )
- class TestRunWithDeploy:
- @pytest.fixture(scope="class")
- def dbt_profile_target(self):
- return {
- "type": "materialize",
- "threads": 1,
- "host": "{{ env_var('DBT_HOST', 'localhost') }}",
- "user": "materialize",
- "pass": "password",
- "database": "materialize",
- "port": "{{ env_var('DBT_PORT', 6875) }}",
- "cluster": "quickstart",
- }
- @pytest.fixture(scope="class")
- def project_config_update(self):
- return {
- "vars": {
- "deployment": {
- "default": {"clusters": ["quickstart"], "schemas": ["public"]}
- }
- }
- }
- @pytest.fixture(scope="class")
- def models(self):
- return {
- "test_source.sql": test_source,
- "test_materialized_view_index.sql": test_materialized_view_index,
- "test_view_index.sql": test_view_index,
- }
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
- def test_deployment_run(self, project):
- # the test runner overrides schemas
- # so we can only validate the cluster
- # configuration is overridden.
- run_dbt(["run-operation", "deploy_init"])
- run_dbt(["run", "--vars", "deploy: True"])
- sources = project.run_sql(
- """
- SELECT count(*)
- FROM mz_sources
- JOIN mz_clusters ON mz_sources.cluster_id = mz_clusters.id
- WHERE mz_clusters.name = 'quickstart_dbt_deploy'
- AND mz_sources.id LIKE 'u%'""",
- fetch="one",
- )
- assert int(sources[0]) == 1
- mat_views = project.run_sql(
- """
- SELECT count(*)
- FROM mz_materialized_views
- JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
- WHERE mz_clusters.name = 'quickstart_dbt_deploy'
- AND mz_materialized_views.id LIKE 'u%'""",
- fetch="one",
- )
- assert int(mat_views[0]) == 1
- indexes = project.run_sql(
- """
- SELECT count(*)
- FROM mz_indexes
- JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
- WHERE mz_clusters.name = 'quickstart_dbt_deploy'
- AND mz_indexes.id LIKE 'u%'""",
- fetch="one",
- )
- assert int(indexes[0]) == 2
- run_dbt(["run-operation", "deploy_cleanup"])
- class TestSinkFail:
- @pytest.fixture(scope="class")
- def project_config_update(self):
- return {
- "vars": {
- "deployment": {
- "default": {"clusters": ["quickstart"], "schemas": ["public"]}
- }
- }
- }
- @pytest.fixture(scope="class")
- def models(self):
- return {
- "test_materialized_view.sql": test_materialized_view,
- "test_sink.sql": test_sink,
- }
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
- def test_source_fails(self, project):
- run_dbt(["run"])
- run_dbt(["run-operation", "deploy_init"], expect_pass=False)
- class TestTargetDeploy:
- @pytest.fixture(scope="class")
- def project_config_update(self):
- return {
- "vars": {
- "deployment": {
- "default": {
- "clusters": ["prod"],
- "schemas": ["prod", "staging"],
- }
- },
- }
- }
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("DROP CLUSTER IF EXISTS prod CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS prod_dbt_deploy CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS prod CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS prod_dbt_deploy CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS staging CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS staging_dbt_deploy CASCADE")
- def test_dbt_deploy(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA prod_dbt_deploy")
- project.run_sql("CREATE SCHEMA staging")
- project.run_sql("CREATE SCHEMA staging_dbt_deploy")
- before_clusters = dict(
- project.run_sql(
- "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- before_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- before_staging_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('staging', 'staging_dbt_deploy')",
- fetch="all",
- )
- )
- run_dbt(["run-operation", "deploy_promote", "--args", "{dry_run: true}"])
- after_clusters = dict(
- project.run_sql(
- "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- after_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- after_staging_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('staging', 'staging_dbt_deploy')",
- fetch="all",
- )
- )
- assert before_clusters["prod"] != after_clusters["prod_dbt_deploy"]
- assert before_clusters["prod_dbt_deploy"] != after_clusters["prod"]
- assert before_schemas["prod"] != after_schemas["prod_dbt_deploy"]
- assert before_schemas["prod_dbt_deploy"] != after_schemas["prod"]
- assert (
- before_staging_schemas["staging"]
- != after_staging_schemas["staging_dbt_deploy"]
- )
- assert (
- before_staging_schemas["staging_dbt_deploy"]
- != after_staging_schemas["staging"]
- )
- run_dbt(["run-operation", "deploy_promote"])
- after_clusters = dict(
- project.run_sql(
- "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- after_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- after_staging_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('staging', 'staging_dbt_deploy')",
- fetch="all",
- )
- )
- assert before_clusters["prod"] == after_clusters["prod_dbt_deploy"]
- assert before_clusters["prod_dbt_deploy"] == after_clusters["prod"]
- assert before_schemas["prod"] == after_schemas["prod_dbt_deploy"]
- assert before_schemas["prod_dbt_deploy"] == after_schemas["prod"]
- assert (
- before_staging_schemas["staging"]
- == after_staging_schemas["staging_dbt_deploy"]
- )
- assert (
- before_staging_schemas["staging_dbt_deploy"]
- == after_staging_schemas["staging"]
- )
- # Verify that both schemas are tagged correctly
- for schema_name in ["prod", "staging"]:
- tagged_schema_comment = project.run_sql(
- f"""
- SELECT c.comment
- FROM mz_internal.mz_comments c
- JOIN mz_schemas s USING (id)
- WHERE s.name = '{schema_name}';
- """,
- fetch="one",
- )
- assert (
- tagged_schema_comment is not None
- ), f"No comment found for schema {schema_name}"
- assert (
- "Deployment by" in tagged_schema_comment[0]
- ), f"Missing deployment info in {schema_name} comment"
- assert (
- "on" in tagged_schema_comment[0]
- ), f"Missing timestamp in {schema_name} comment"
- def test_dbt_deploy_with_force(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA prod_dbt_deploy")
- project.run_sql("CREATE SCHEMA staging")
- project.run_sql("CREATE SCHEMA staging_dbt_deploy")
- before_clusters = dict(
- project.run_sql(
- "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- before_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- run_dbt(["run-operation", "deploy_promote", "--args", "{wait: true}"])
- after_clusters = dict(
- project.run_sql(
- "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- after_schemas = dict(
- project.run_sql(
- "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
- fetch="all",
- )
- )
- assert before_clusters["prod"] == after_clusters["prod_dbt_deploy"]
- assert before_clusters["prod_dbt_deploy"] == after_clusters["prod"]
- assert before_schemas["prod"] == after_schemas["prod_dbt_deploy"]
- assert before_schemas["prod"] == after_schemas["prod_dbt_deploy"]
- def test_dbt_deploy_missing_deployment_cluster(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA prod_dbt_deploy")
- project.run_sql("CREATE SCHEMA staging")
- project.run_sql("CREATE SCHEMA staging_dbt_deploy")
- run_dbt(["run-operation", "deploy_promote"], expect_pass=False)
- def test_dbt_deploy_missing_deployment_schema(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- run_dbt(["run-operation", "deploy_promote"], expect_pass=False)
- def test_fails_on_unmanaged_cluster(self, project):
- project.run_sql("CREATE CLUSTER prod REPLICAS ()")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA staging")
- project.run_sql("CREATE SCHEMA staging_dbt_deploy")
- run_dbt(["run-operation", "deploy_init"], expect_pass=False)
- def test_dbt_deploy_init_with_refresh_hydration_time(self, project):
- project.run_sql(
- "CREATE CLUSTER prod (SIZE = '1', SCHEDULE = ON REFRESH (HYDRATION TIME ESTIMATE = '1 hour'))"
- )
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA staging")
- run_dbt(["run-operation", "deploy_init"])
- # Get the rehydration time from the cluster
- cluster_type = project.run_sql(
- """
- SELECT cs.type
- FROM mz_internal.mz_cluster_schedules cs
- JOIN mz_clusters c ON cs.cluster_id = c.id
- WHERE c.name = 'prod_dbt_deploy'
- """,
- fetch="one",
- )
- assert cluster_type == ("on-refresh",)
- def test_dbt_deploy_init_and_cleanup(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA staging")
- run_dbt(["run-operation", "deploy_init"])
- (size, replication_factor) = project.run_sql(
- "SELECT size, replication_factor FROM mz_clusters WHERE name = 'prod_dbt_deploy'",
- fetch="one",
- )
- assert size == "1"
- assert replication_factor == "1"
- result = project.run_sql(
- "SELECT count(*) = 1 FROM mz_schemas WHERE name = 'prod_dbt_deploy'",
- fetch="one",
- )
- assert bool(result[0])
- run_dbt(["run-operation", "deploy_cleanup"])
- result = project.run_sql(
- "SELECT count(*) = 0 FROM mz_clusters WHERE name = 'prod_dbt_deploy'",
- fetch="one",
- )
- assert bool(result[0])
- result = project.run_sql(
- "SELECT count(*) = 0 FROM mz_schemas WHERE name = 'prod_dbt_deploy'",
- fetch="one",
- )
- assert bool(result[0])
- def test_cluster_contains_objects(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA prod_dbt_deploy")
- project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
- project.run_sql("CREATE SCHEMA staging")
- project.run_sql("CREATE SCHEMA staging_dbt_deploy")
- project.run_sql(
- "CREATE MATERIALIZED VIEW mv IN CLUSTER prod_dbt_deploy AS SELECT 1"
- )
- run_dbt(["run-operation", "deploy_init"], expect_pass=False)
- run_dbt(
- [
- "run-operation",
- "deploy_init",
- "--args",
- "{ignore_existing_objects: True}",
- ]
- )
- def test_schema_contains_objects(self, project):
- project.run_sql("CREATE CLUSTER prod SIZE = '1'")
- project.run_sql("CREATE SCHEMA prod")
- project.run_sql("CREATE SCHEMA prod_dbt_deploy")
- project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
- project.run_sql("CREATE SCHEMA staging")
- project.run_sql("CREATE SCHEMA staging_dbt_deploy")
- project.run_sql("CREATE VIEW prod_dbt_deploy.view AS SELECT 1")
- run_dbt(["run-operation", "deploy_init"], expect_pass=False)
- run_dbt(
- [
- "run-operation",
- "deploy_init",
- "--args",
- "{ignore_existing_objects: True}",
- ]
- )
- class TestLagTolerance:
- @pytest.fixture(scope="class")
- def project_config_update(self):
- return {
- "vars": {
- "deployment": {
- "default": {"clusters": ["quickstart"], "schemas": ["public"]}
- }
- }
- }
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
- def test_deploy_await_custom_lag_threshold(self, project):
- run_dbt(["run-operation", "deploy_init"])
- result = run_dbt(
- [
- "run-operation",
- "deploy_await",
- "--args",
- "{poll_interval: 5, lag_threshold: '5s'}",
- ]
- )
- assert len(result) > 0 and result[0].status == "success"
- def test_deploy_promote_custom_lag_threshold(self, project):
- run_dbt(["run-operation", "deploy_init"])
- result = run_dbt(
- [
- "run-operation",
- "deploy_promote",
- "--args",
- "{wait: true, poll_interval: 5, lag_threshold: '5s'}",
- ]
- )
- assert len(result) > 0 and result[0].status == "success"
- def test_default_lag_threshold(self, project):
- run_dbt(["run-operation", "deploy_init"])
- result = run_dbt(["run-operation", "deploy_await"])
- assert len(result) > 0 and result[0].status == "success"
- def test_large_lag_threshold(self, project):
- run_dbt(["run-operation", "deploy_init"])
- result = run_dbt(
- [
- "run-operation",
- "deploy_await",
- "--args",
- "{poll_interval: 5, lag_threshold: '1h'}",
- ]
- )
- assert len(result) > 0 and result[0].status == "success"
- class TestEndToEndDeployment:
- @pytest.fixture(scope="class")
- def dbt_profile_target(self):
- return {
- "type": "materialize",
- "threads": 1,
- "host": "{{ env_var('DBT_HOST', 'localhost') }}",
- "user": "materialize",
- "pass": "password",
- "database": "materialize",
- "port": "{{ env_var('DBT_PORT', 6875) }}",
- "cluster": "quickstart",
- }
- @pytest.fixture(scope="class")
- def models(self):
- return {
- "test_materialized_view_deploy.sql": test_materialized_view_deploy,
- "test_sink_deploy.sql": test_sink_deploy,
- }
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("DROP ROLE IF EXISTS test_role")
- project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS sinks_cluster CASCADE")
- project.run_sql("DROP SCHEMA IF EXISTS sinks_schema CASCADE")
- project.run_sql("DROP TABLE IF EXISTS source_table")
- project.run_sql("DROP SOURCE IF EXISTS sink_validation_source")
- def test_full_deploy_process(self, project):
- # Create test role and grant it to materialize
- project.run_sql("CREATE ROLE test_role")
- project.run_sql("GRANT test_role TO materialize")
- # Prepare the source table, the sink cluster and schema
- project.run_sql("CREATE TABLE source_table (val INTEGER)")
- project.run_sql("CREATE CLUSTER sinks_cluster SIZE = '1'")
- project.run_sql("CREATE SCHEMA sinks_schema")
- project.run_sql("INSERT INTO source_table VALUES (1)")
- run_dbt(["run"])
- created_schema = project.created_schemas[0]
- # Set up initial grants on the production schema and cluster
- project.run_sql(f"GRANT USAGE ON SCHEMA {created_schema} TO test_role")
- project.run_sql(f"GRANT CREATE ON SCHEMA {created_schema} TO test_role")
- project.run_sql(
- f"ALTER DEFAULT PRIVILEGES FOR ROLE test_role IN SCHEMA {created_schema} GRANT SELECT ON TABLES TO test_role"
- )
- project.run_sql("GRANT USAGE ON CLUSTER quickstart TO test_role")
- project.run_sql("GRANT CREATE ON CLUSTER quickstart TO test_role")
- # Store initial grants for later comparison
- initial_schema_grants = project.run_sql(
- f"""
- WITH schema_privilege AS (
- SELECT mz_internal.mz_aclexplode(s.privileges).*
- FROM mz_schemas s
- JOIN mz_databases d ON s.database_id = d.id
- WHERE d.name = current_database()
- AND s.name = '{created_schema}'
- )
- SELECT privilege_type, grantee.name as grantee
- FROM schema_privilege
- JOIN mz_roles grantee ON grantee = grantee.id
- WHERE grantee.name = 'test_role'
- ORDER BY privilege_type, grantee
- """,
- fetch="all",
- )
- initial_cluster_grants = project.run_sql(
- """
- WITH cluster_privilege AS (
- SELECT mz_internal.mz_aclexplode(privileges).*
- FROM mz_clusters
- WHERE name = 'quickstart'
- )
- SELECT privilege_type, grantee.name as grantee
- FROM cluster_privilege
- JOIN mz_roles grantee ON grantee = grantee.id
- WHERE grantee.name = 'test_role'
- ORDER BY privilege_type, grantee
- """,
- fetch="all",
- )
- initial_default_privs = project.run_sql(
- f"""
- SELECT privilege_type, grantee, object_type
- FROM mz_internal.mz_show_default_privileges
- WHERE database = current_database()
- AND schema = '{created_schema}'
- AND grantee = 'test_role'
- ORDER BY privilege_type, grantee, object_type
- """,
- fetch="all",
- )
- project_config = f"{{deployment: {{default: {{clusters: ['quickstart'], schemas: ['{created_schema}']}}}}}}"
- project_config_deploy = f"{{deployment: {{default: {{clusters: ['quickstart'], schemas: ['{created_schema}']}}}}, deploy: True}}"
- # Validate the initial sink result
- project.run_sql(
- "CREATE SOURCE sink_validation_source FROM KAFKA CONNECTION kafka_connection (TOPIC 'testdrive-test-sink-1') FORMAT JSON"
- )
- run_with_retry(
- project, "SELECT count(*) FROM sink_validation_source", expected_count=1
- )
- # Insert another row and validate the sink
- project.run_sql("INSERT INTO source_table VALUES (2)")
- run_with_retry(
- project, "SELECT count(*) FROM sink_validation_source", expected_count=2
- )
- # Initialize the deployment environment
- run_dbt(["run-operation", "deploy_init", "--vars", project_config])
- # Verify grants were copied to deployment schema
- deploy_schema_grants = project.run_sql(
- f"""
- WITH schema_privilege AS (
- SELECT mz_internal.mz_aclexplode(s.privileges).*
- FROM mz_schemas s
- JOIN mz_databases d ON s.database_id = d.id
- WHERE d.name = current_database()
- AND s.name = '{created_schema}_dbt_deploy'
- )
- SELECT privilege_type, grantee.name as grantee
- FROM schema_privilege
- JOIN mz_roles grantee ON grantee = grantee.id
- WHERE grantee.name = 'test_role'
- ORDER BY privilege_type, grantee
- """,
- fetch="all",
- )
- # Verify grants were copied to deployment cluster
- deploy_cluster_grants = project.run_sql(
- """
- WITH cluster_privilege AS (
- SELECT mz_internal.mz_aclexplode(privileges).*
- FROM mz_clusters
- WHERE name = 'quickstart_dbt_deploy'
- )
- SELECT privilege_type, grantee.name as grantee
- FROM cluster_privilege
- JOIN mz_roles grantee ON grantee = grantee.id
- WHERE grantee.name = 'test_role'
- ORDER BY privilege_type, grantee
- """,
- fetch="all",
- )
- deploy_default_privs = project.run_sql(
- f"""
- SELECT privilege_type, grantee, object_type
- FROM mz_internal.mz_show_default_privileges
- WHERE database = current_database()
- AND schema = '{created_schema}_dbt_deploy'
- AND grantee = 'test_role'
- ORDER BY privilege_type, grantee, object_type
- """,
- fetch="all",
- )
- # Assert grants match between production and deployment environments
- assert (
- initial_schema_grants == deploy_schema_grants
- ), "Schema grants do not match between production and deployment"
- assert (
- initial_cluster_grants == deploy_cluster_grants
- ), "Cluster grants do not match between production and deployment"
- assert (
- initial_default_privs == deploy_default_privs
- ), "Default privileges do not match between production and deployment"
- # Run the deploy with the deploy flag set to True and exclude the sink creation
- run_dbt(
- [
- "run",
- "--vars",
- project_config_deploy,
- "--exclude",
- "config.materialized:sink",
- ]
- )
- # Ensure the validation source has not changed
- run_with_retry(
- project, "SELECT count(*) FROM sink_validation_source", expected_count=2
- )
- # Insert a new row and validate the new sink result after the deploy
- project.run_sql("INSERT INTO source_table VALUES (3)")
- run_with_retry(
- project, "SELECT count(*) FROM sink_validation_source", expected_count=3
- )
- # Get the IDs of the materialized views
- before_view_id = project.run_sql(
- f"SELECT id FROM mz_materialized_views WHERE name = 'test_materialized_view_deploy' AND schema_id = (SELECT id FROM mz_schemas WHERE name = '{created_schema}')",
- fetch="one",
- )[0]
- after_view_id = project.run_sql(
- f"SELECT id FROM mz_materialized_views WHERE name = 'test_materialized_view_deploy' AND schema_id = (SELECT id FROM mz_schemas WHERE name = '{created_schema}_dbt_deploy')",
- fetch="one",
- )[0]
- before_sink = project.run_sql(
- "SELECT name, id, create_sql FROM mz_sinks WHERE name = 'test_sink_deploy'",
- fetch="one",
- )
- assert (
- before_view_id in before_sink[2]
- ), "Before deployment, the sink should point to the original view ID"
- run_dbt(
- [
- "run-operation",
- "deploy_promote",
- "--args",
- "{dry_run: true}",
- "--vars",
- project_config,
- ]
- )
- after_sink = project.run_sql(
- "SELECT name, id, create_sql FROM mz_sinks WHERE name = 'test_sink_deploy'",
- fetch="one",
- )
- assert (
- before_view_id in after_sink[2]
- ), "The sink should point to the same view ID after a dry_run"
- assert before_sink[0] == after_sink[0], "Sink name should be the same"
- # Promote and ensure the sink points to the new objects
- run_dbt(["run-operation", "deploy_promote", "--vars", project_config])
- after_sink = project.run_sql(
- "SELECT name, id, create_sql FROM mz_sinks WHERE name = 'test_sink_deploy'",
- fetch="one",
- )
- after_view_id = project.run_sql(
- f"SELECT id FROM mz_materialized_views WHERE name = 'test_materialized_view_deploy' AND schema_id = (SELECT id FROM mz_schemas WHERE name = '{created_schema}')",
- fetch="one",
- )[0]
- assert (
- after_view_id in after_sink[2]
- ), "After deployment, the sink should point to the new view ID"
- assert before_sink[0] == after_sink[0], "Sink name should be the same"
- assert (
- before_view_id != after_view_id
- ), "Sink's view ID should be different after deployment"
- run_dbt(["run-operation", "deploy_cleanup", "--vars", project_config])
- def run_with_retry(project, sql, expected_count, retries=5, delay=3, fetch="one"):
- for i in range(retries):
- result = project.run_sql(sql, fetch=fetch)
- if result[0] == expected_count:
- return result
- time.sleep(delay)
- raise AssertionError(
- f"Expected count to be {expected_count}, but got {result[0]} after {retries} retries"
- )
|