test_ci.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License in the LICENSE file at the
  6. # root of this repository, or online at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import pytest
  16. from dbt.tests.util import run_dbt
  17. class TestClusterOps:
  18. @pytest.fixture(autouse=True)
  19. def cleanup(self, project):
  20. project.run_sql("DROP CLUSTER IF EXISTS test_cluster CASCADE")
  21. project.run_sql("DROP CLUSTER IF EXISTS test_cluster_dbt_deploy CASCADE")
  22. project.run_sql("DROP CLUSTER IF EXISTS test_manual_schedule CASCADE")
  23. project.run_sql("DROP CLUSTER IF EXISTS test_on_refresh_schedule CASCADE")
  24. def test_create_and_drop_cluster(self, project):
  25. # Test creating a cluster
  26. run_dbt(
  27. [
  28. "run-operation",
  29. "create_cluster",
  30. "--args",
  31. '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  32. ]
  33. )
  34. # Verify cluster creation
  35. result = project.run_sql(
  36. "SELECT name FROM mz_clusters WHERE name = 'test_cluster_dbt_deploy'",
  37. fetch="one",
  38. )
  39. assert result is not None, "Cluster was not created successfully"
  40. # Verify cluster properties
  41. properties = get_cluster_properties(project, "test_cluster_dbt_deploy")
  42. assert properties[1] == "1"
  43. assert properties[2] == "1"
  44. assert properties[5] == "manual"
  45. assert properties[6] is None
  46. # Test dropping the cluster
  47. run_dbt(
  48. [
  49. "run-operation",
  50. "drop_cluster",
  51. "--args",
  52. '{"cluster_name": "test_cluster_dbt_deploy"}',
  53. ]
  54. )
  55. # Verify cluster dropping
  56. result = project.run_sql(
  57. "SELECT name FROM mz_clusters WHERE name = 'test_cluster_dbt_deploy'",
  58. fetch="one",
  59. )
  60. assert result is None, "Cluster was not dropped successfully"
  61. def test_create_existing_cluster(self, project):
  62. # Create the cluster for the first time
  63. run_dbt(
  64. [
  65. "run-operation",
  66. "create_cluster",
  67. "--args",
  68. '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  69. ]
  70. )
  71. # Try creating the same cluster again
  72. run_dbt(
  73. [
  74. "run-operation",
  75. "create_cluster",
  76. "--args",
  77. '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  78. ]
  79. )
  80. # Verify the cluster exists
  81. result = project.run_sql(
  82. "SELECT name FROM mz_clusters WHERE name = 'test_cluster_dbt_deploy'",
  83. fetch="one",
  84. )
  85. assert (
  86. result is not None
  87. ), "Cluster should exist after attempting to create it again"
  88. # Verify cluster properties
  89. properties = get_cluster_properties(project, "test_cluster_dbt_deploy")
  90. assert properties[1] == "1"
  91. assert properties[2] == "1"
  92. assert properties[5] == "manual"
  93. assert properties[6] is None
  94. # Cleanup
  95. run_dbt(
  96. [
  97. "run-operation",
  98. "drop_cluster",
  99. "--args",
  100. '{"cluster_name": "test_cluster_dbt_deploy"}',
  101. ]
  102. )
  103. def test_drop_nonexistent_cluster(self, project):
  104. # Attempt to drop a nonexistent cluster
  105. run_dbt(
  106. [
  107. "run-operation",
  108. "drop_cluster",
  109. "--args",
  110. '{"cluster_name": "nonexistent_cluster"}',
  111. ]
  112. )
  113. # Verify no cluster exists
  114. result = project.run_sql(
  115. "SELECT name FROM mz_clusters WHERE name = 'nonexistent_cluster'",
  116. fetch="one",
  117. )
  118. assert result is None, "Nonexistent cluster should not exist"
  119. def test_create_cluster_with_objects(self, project):
  120. # Create a cluster and add an object to it
  121. run_dbt(
  122. [
  123. "run-operation",
  124. "create_cluster",
  125. "--args",
  126. '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  127. ]
  128. )
  129. project.run_sql("CREATE MATERIALIZED VIEW test_view AS SELECT 1")
  130. # Attempt to create the same cluster without ignoring existing objects
  131. with pytest.raises(Exception):
  132. run_dbt(
  133. [
  134. "run-operation",
  135. "create_cluster",
  136. "--args",
  137. '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": false, "force_deploy_suffix": true}',
  138. ],
  139. expect_pass=False,
  140. )
  141. # Cleanup
  142. project.run_sql("DROP MATERIALIZED VIEW IF EXISTS test_view")
  143. run_dbt(
  144. [
  145. "run-operation",
  146. "drop_cluster",
  147. "--args",
  148. '{"cluster_name": "test_cluster_dbt_deploy"}',
  149. ]
  150. )
  151. def test_create_cluster_with_manual_schedule(self, project):
  152. # Test creating a cluster with manual schedule type
  153. run_dbt(
  154. [
  155. "run-operation",
  156. "create_cluster",
  157. "--args",
  158. '{"cluster_name": "test_manual_schedule", "size": "1", "replication_factor": 2, "schedule_type": "manual", "ignore_existing_objects": true, "force_deploy_suffix": true}',
  159. ]
  160. )
  161. # Verify cluster creation
  162. result = project.run_sql(
  163. "SELECT name FROM mz_clusters WHERE name = 'test_manual_schedule_dbt_deploy'",
  164. fetch="one",
  165. )
  166. assert (
  167. result is not None
  168. ), "Cluster with manual schedule was not created successfully"
  169. # Verify cluster properties
  170. properties = get_cluster_properties(project, "test_manual_schedule_dbt_deploy")
  171. assert properties[1] == "1"
  172. assert properties[2] == "2"
  173. assert properties[5] == "manual"
  174. assert properties[6] is None
  175. # Cleanup
  176. run_dbt(
  177. [
  178. "run-operation",
  179. "drop_cluster",
  180. "--args",
  181. '{"cluster_name": "test_manual_schedule_dbt_deploy"}',
  182. ]
  183. )
  184. def test_create_cluster_with_on_refresh_schedule(self, project):
  185. # Test creating a cluster with on-refresh schedule type
  186. run_dbt(
  187. [
  188. "run-operation",
  189. "create_cluster",
  190. "--args",
  191. '{"cluster_name": "test_on_refresh_schedule", "size": "1", "schedule_type": "on-refresh", "refresh_hydration_time_estimate": "10m", "ignore_existing_objects": true, "force_deploy_suffix": true}',
  192. ]
  193. )
  194. # Verify cluster creation
  195. result = project.run_sql(
  196. "SELECT name FROM mz_clusters WHERE name = 'test_on_refresh_schedule_dbt_deploy'",
  197. fetch="one",
  198. )
  199. assert (
  200. result is not None
  201. ), "Cluster with on-refresh schedule was not created successfully"
  202. # Verify cluster properties
  203. properties = get_cluster_properties(
  204. project, "test_on_refresh_schedule_dbt_deploy"
  205. )
  206. assert properties[1] == "1"
  207. assert properties[5] == "on-refresh"
  208. assert str(properties[6]) == "0:10:00"
  209. # Cleanup
  210. run_dbt(
  211. [
  212. "run-operation",
  213. "drop_cluster",
  214. "--args",
  215. '{"cluster_name": "test_on_refresh_schedule_dbt_deploy"}',
  216. ]
  217. )
  218. def test_create_cluster_without_force_deploy_suffix(self, project):
  219. # Test creating a cluster without force deploy suffix
  220. run_dbt(
  221. [
  222. "run-operation",
  223. "create_cluster",
  224. "--args",
  225. '{"cluster_name": "test_cluster", "size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": false}',
  226. ]
  227. )
  228. # Verify cluster creation
  229. result = project.run_sql(
  230. "SELECT name FROM mz_clusters WHERE name = 'test_cluster'",
  231. fetch="one",
  232. )
  233. assert (
  234. result is not None
  235. ), "Cluster without force deploy suffix was not created successfully"
  236. # Verify cluster properties
  237. properties = get_cluster_properties(project, "test_cluster")
  238. assert properties[1] == "1"
  239. assert properties[2] == "1"
  240. assert properties[5] == "manual"
  241. assert properties[6] is None
  242. # Cleanup
  243. run_dbt(
  244. [
  245. "run-operation",
  246. "drop_cluster",
  247. "--args",
  248. '{"cluster_name": "test_cluster"}',
  249. ]
  250. )
  251. def test_create_cluster_without_size_and_name(self, project):
  252. # Test creating a cluster without providing a size parameter
  253. run_dbt(
  254. [
  255. "run-operation",
  256. "create_cluster",
  257. "--args",
  258. '{"cluster_name": "test_cluster", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  259. ],
  260. expect_pass=False,
  261. )
  262. # Test creating a cluster without providing a name parameter
  263. run_dbt(
  264. [
  265. "run-operation",
  266. "create_cluster",
  267. "--args",
  268. '{"size": "1", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  269. ],
  270. expect_pass=False,
  271. )
  272. # Test invalid cluster size
  273. run_dbt(
  274. [
  275. "run-operation",
  276. "create_cluster",
  277. "--args",
  278. '{"cluster_name": "test_cluster", "size": "invalid_size", "replication_factor": 1, "ignore_existing_objects": true, "force_deploy_suffix": true}',
  279. ],
  280. expect_pass=False,
  281. )
  282. def get_cluster_properties(project, cluster_name):
  283. query = f"""
  284. SELECT
  285. c.managed,
  286. c.size,
  287. c.replication_factor,
  288. c.id AS cluster_id,
  289. c.name AS cluster_name,
  290. cs.type AS schedule_type,
  291. cs.refresh_hydration_time_estimate
  292. FROM mz_clusters c
  293. LEFT JOIN mz_internal.mz_cluster_schedules cs ON cs.cluster_id = c.id
  294. WHERE c.name = '{cluster_name}'
  295. """
  296. return project.run_sql(query, fetch="one")