test_dbt_clone.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License in the LICENSE file at the
  6. # root of this repository, or online at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import pytest
  16. from dbt.tests.adapter.dbt_clone.fixtures import (
  17. custom_can_clone_tables_false_macros_sql,
  18. get_schema_name_sql,
  19. infinite_macros_sql,
  20. macros_sql,
  21. )
  22. from dbt.tests.adapter.dbt_clone.test_dbt_clone import BaseClone
  23. from dbt.tests.util import run_dbt
  24. class BaseCloneOverride(BaseClone):
  25. @pytest.fixture(scope="class")
  26. def snapshots(self):
  27. pass
  28. def run_and_save_state(self, project_root, with_snapshot=False):
  29. results = run_dbt(["seed"])
  30. assert len(results) == 1
  31. results = run_dbt(["run"])
  32. assert len(results) == 2
  33. results = run_dbt(["test"])
  34. assert len(results) == 2
  35. # copy files
  36. self.copy_state(project_root)
  37. class BaseCloneNotPossible(BaseCloneOverride):
  38. @pytest.fixture(scope="class")
  39. def macros(self):
  40. return {
  41. "macros.sql": macros_sql,
  42. "my_can_clone_tables.sql": custom_can_clone_tables_false_macros_sql,
  43. "infinite_macros.sql": infinite_macros_sql,
  44. "get_schema_name.sql": get_schema_name_sql,
  45. }
  46. def test_can_clone_false(self, project, unique_schema, other_schema):
  47. project.create_test_schema(other_schema)
  48. self.run_and_save_state(project.project_root, with_snapshot=True)
  49. clone_args = [
  50. "clone",
  51. "--defer",
  52. "--state",
  53. "state",
  54. "--target",
  55. "otherschema",
  56. ]
  57. results = run_dbt(clone_args)
  58. assert len(results) == 3
  59. schema_relations = project.adapter.list_relations(
  60. database=project.database, schema=other_schema
  61. )
  62. assert all(r.type == "view" for r in schema_relations)
  63. # objects already exist, so this is a no-op
  64. results = run_dbt(clone_args)
  65. assert len(results) == 3
  66. assert all("no-op" in r.message.lower() for r in results)
  67. # recreate all objects
  68. results = run_dbt([*clone_args, "--full-refresh"])
  69. assert len(results) == 3
  70. # select only models this time
  71. results = run_dbt([*clone_args, "--resource-type", "model"])
  72. assert len(results) == 2
  73. assert all("no-op" in r.message.lower() for r in results)
  74. class TestMaterializeCloneNotPossible(BaseCloneNotPossible):
  75. @pytest.fixture(autouse=True)
  76. def clean_up(self, project):
  77. yield
  78. with project.adapter.connection_named("__test"):
  79. relation = project.adapter.Relation.create(
  80. database=project.database, schema=f"{project.test_schema}_seeds"
  81. )
  82. project.adapter.drop_schema(relation)
  83. relation = project.adapter.Relation.create(
  84. database=project.database, schema=project.test_schema
  85. )
  86. project.adapter.drop_schema(relation)
  87. pass