123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License in the LICENSE file at the
- # root of this repository, or online at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import pytest
- from dbt.tests.util import run_dbt
- class TestClusterOps:
- @pytest.fixture(autouse=True)
- def cleanup(self, project):
- project.run_sql("DROP CLUSTER IF EXISTS test_cluster CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS test_cluster_dbt_deploy CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS test_manual_schedule CASCADE")
- project.run_sql("DROP CLUSTER IF EXISTS test_on_refresh_schedule CASCADE")
- def test_create_and_drop_cluster(self, project):
- # Test creating a cluster
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ]
- )
- # Verify cluster creation
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'test_cluster_dbt_deploy'",
- fetch="one",
- )
- assert result is not None, "Cluster was not created successfully"
- # Verify cluster properties
- properties = get_cluster_properties(project, "test_cluster_dbt_deploy")
- assert properties[1] == "1"
- assert properties[2] == "1"
- assert properties[5] == "manual"
- assert properties[6] is None
- # Test dropping the cluster
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "test_cluster_dbt_deploy"}',
- ]
- )
- # Verify cluster dropping
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'test_cluster_dbt_deploy'",
- fetch="one",
- )
- assert result is None, "Cluster was not dropped successfully"
- def test_create_existing_cluster(self, project):
- # Create the cluster for the first time
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ]
- )
- # Try creating the same cluster again
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ]
- )
- # Verify the cluster exists
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'test_cluster_dbt_deploy'",
- fetch="one",
- )
- assert (
- result is not None
- ), "Cluster should exist after attempting to create it again"
- # Verify cluster properties
- properties = get_cluster_properties(project, "test_cluster_dbt_deploy")
- assert properties[1] == "1"
- assert properties[2] == "1"
- assert properties[5] == "manual"
- assert properties[6] is None
- # Cleanup
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "test_cluster_dbt_deploy"}',
- ]
- )
- def test_drop_nonexistent_cluster(self, project):
- # Attempt to drop a nonexistent cluster
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "nonexistent_cluster"}',
- ]
- )
- # Verify no cluster exists
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'nonexistent_cluster'",
- fetch="one",
- )
- assert result is None, "Nonexistent cluster should not exist"
- def test_create_cluster_with_objects(self, project):
- # Create a cluster and add an object to it
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ]
- )
- project.run_sql("CREATE MATERIALIZED VIEW test_view AS SELECT 1")
- # Attempt to create the same cluster without ignoring existing objects
- with pytest.raises(Exception):
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": false, "force_deploy_suffix": true}',
- ],
- expect_pass=False,
- )
- # Cleanup
- project.run_sql("DROP MATERIALIZED VIEW IF EXISTS test_view")
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "test_cluster_dbt_deploy"}',
- ]
- )
- def test_create_cluster_with_manual_schedule(self, project):
- # Test creating a cluster with manual schedule type
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_manual_schedule", "size": "1", "replication_factor": 2, "schedule_type": "manual", "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ]
- )
- # Verify cluster creation
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'test_manual_schedule_dbt_deploy'",
- fetch="one",
- )
- assert (
- result is not None
- ), "Cluster with manual schedule was not created successfully"
- # Verify cluster properties
- properties = get_cluster_properties(project, "test_manual_schedule_dbt_deploy")
- assert properties[1] == "1"
- assert properties[2] == "2"
- assert properties[5] == "manual"
- assert properties[6] is None
- # Cleanup
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "test_manual_schedule_dbt_deploy"}',
- ]
- )
- def test_create_cluster_with_on_refresh_schedule(self, project):
- # Test creating a cluster with on-refresh schedule type
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_on_refresh_schedule", "size": "1", "schedule_type": "on-refresh", "refresh_hydration_time_estimate": "10m", "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ]
- )
- # Verify cluster creation
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'test_on_refresh_schedule_dbt_deploy'",
- fetch="one",
- )
- assert (
- result is not None
- ), "Cluster with on-refresh schedule was not created successfully"
- # Verify cluster properties
- properties = get_cluster_properties(
- project, "test_on_refresh_schedule_dbt_deploy"
- )
- assert properties[1] == "1"
- assert properties[5] == "on-refresh"
- assert str(properties[6]) == "0:10:00"
- # Cleanup
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "test_on_refresh_schedule_dbt_deploy"}',
- ]
- )
- def test_create_cluster_without_force_deploy_suffix(self, project):
- # Test creating a cluster without force deploy suffix
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": false}',
- ]
- )
- # Verify cluster creation
- result = project.run_sql(
- "SELECT name FROM mz_clusters WHERE name = 'test_cluster'",
- fetch="one",
- )
- assert (
- result is not None
- ), "Cluster without force deploy suffix was not created successfully"
- # Verify cluster properties
- properties = get_cluster_properties(project, "test_cluster")
- assert properties[1] == "1"
- assert properties[2] == "1"
- assert properties[5] == "manual"
- assert properties[6] is None
- # Cleanup
- run_dbt(
- [
- "run-operation",
- "drop_cluster",
- "--args",
- '{"cluster_name": "test_cluster"}',
- ]
- )
- def test_create_cluster_without_size_and_name(self, project):
- # Test creating a cluster without providing a size parameter
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ],
- expect_pass=False,
- )
- # Test creating a cluster without providing a name parameter
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ],
- expect_pass=False,
- )
- # Test invalid cluster size
- run_dbt(
- [
- "run-operation",
- "create_cluster",
- "--args",
- '{"cluster_name": "test_cluster", "size": "invalid_size", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
- ],
- expect_pass=False,
- )
- def get_cluster_properties(project, cluster_name):
- query = f"""
- SELECT
- c.managed,
- c.size,
- c.replication_factor,
- c.id AS cluster_id,
- c.name AS cluster_name,
- cs.type AS schedule_type,
- cs.refresh_hydration_time_estimate
- FROM mz_clusters c
- LEFT JOIN mz_internal.mz_cluster_schedules cs ON cs.cluster_id = c.id
- WHERE c.name = '{cluster_name}'
- """
- return project.run_sql(query, fetch="one")
|