test_deploy.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License in the LICENSE file at the
  6. # root of this repository, or online at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import time
  16. import pytest
  17. from dbt.tests.util import run_dbt
  18. from fixtures import (
  19. test_materialized_view,
  20. test_materialized_view_deploy,
  21. test_materialized_view_index,
  22. test_sink,
  23. test_sink_deploy,
  24. test_source,
  25. test_view_index,
  26. )
  27. class TestApplyGrantsAndPrivileges:
  28. @pytest.fixture(autouse=True)
  29. def cleanup(self, project):
  30. role_exists = project.run_sql(
  31. "SELECT count(*) = 1 FROM mz_roles WHERE name = 'my_role'",
  32. fetch="one",
  33. )[0]
  34. if role_exists:
  35. project.run_sql("DROP OWNED BY my_role")
  36. project.run_sql("DROP ROLE IF EXISTS my_role")
  37. project.run_sql("DROP SCHEMA IF EXISTS blue_schema CASCADE")
  38. project.run_sql("DROP SCHEMA IF EXISTS green_schema CASCADE")
  39. project.run_sql("DROP CLUSTER IF EXISTS blue_cluster CASCADE")
  40. project.run_sql("DROP CLUSTER IF EXISTS green_cluster CASCADE")
  41. def test_apply_schema_default_privileges(self, project):
  42. project.run_sql("CREATE ROLE my_role")
  43. project.run_sql("GRANT my_role TO materialize")
  44. project.run_sql("CREATE SCHEMA blue_schema")
  45. project.run_sql("CREATE SCHEMA green_schema")
  46. project.run_sql(
  47. "ALTER DEFAULT PRIVILEGES FOR ROLE my_role IN SCHEMA blue_schema GRANT SELECT ON TABLES TO my_role"
  48. )
  49. run_dbt(
  50. [
  51. "run-operation",
  52. "internal_copy_schema_default_privs",
  53. "--args",
  54. "{from: blue_schema, to: green_schema}",
  55. ]
  56. )
  57. result = project.run_sql(
  58. """SELECT count(*) = 1
  59. FROM mz_internal.mz_show_default_privileges
  60. WHERE database = current_database()
  61. AND schema = 'green_schema'
  62. AND grantee = 'my_role'
  63. AND object_type = 'table'
  64. AND privilege_type = 'SELECT'""",
  65. fetch="one",
  66. )
  67. assert result[0]
  68. def test_apply_schema_grants(self, project):
  69. project.run_sql("CREATE ROLE my_role")
  70. project.run_sql("GRANT my_role TO materialize")
  71. project.run_sql("CREATE SCHEMA blue_schema")
  72. project.run_sql("CREATE SCHEMA green_schema")
  73. project.run_sql("GRANT CREATE ON SCHEMA green_schema TO my_role")
  74. project.run_sql("GRANT USAGE ON SCHEMA blue_schema TO my_role")
  75. run_dbt(
  76. [
  77. "run-operation",
  78. "internal_copy_schema_grants",
  79. "--args",
  80. "{from: blue_schema, to: green_schema}",
  81. ]
  82. )
  83. result = project.run_sql(
  84. """
  85. SELECT count(*) = 0
  86. FROM mz_internal.mz_show_schema_privileges
  87. WHERE grantee = 'my_role'
  88. AND name = 'green_schema'
  89. AND privilege_type = 'CREATE'""",
  90. fetch="one",
  91. )
  92. assert result[0]
  93. result = project.run_sql(
  94. """
  95. SELECT count(*) = 1
  96. FROM mz_internal.mz_show_schema_privileges
  97. WHERE grantee = 'my_role'
  98. AND name = 'green_schema'
  99. AND privilege_type = 'USAGE'""",
  100. fetch="one",
  101. )
  102. assert result[0]
  103. def test_apply_cluster_grants(self, project):
  104. project.run_sql("CREATE ROLE my_role")
  105. project.run_sql("GRANT my_role TO materialize")
  106. project.run_sql("CREATE CLUSTER blue_cluster SIZE = '1'")
  107. project.run_sql("CREATE CLUSTER green_cluster SIZE = '1'")
  108. project.run_sql("GRANT CREATE ON CLUSTER green_cluster TO my_role")
  109. project.run_sql("GRANT USAGE ON CLUSTER blue_cluster TO my_role")
  110. run_dbt(
  111. [
  112. "run-operation",
  113. "internal_copy_cluster_grants",
  114. "--args",
  115. "{from: blue_cluster, to: green_cluster}",
  116. ]
  117. )
  118. result = project.run_sql(
  119. """
  120. SELECT count(*) = 0
  121. FROM mz_internal.mz_show_cluster_privileges
  122. WHERE grantee = 'my_role'
  123. AND name = 'green_cluster'
  124. AND privilege_type = 'CREATE'""",
  125. fetch="one",
  126. )
  127. assert result[0]
  128. result = project.run_sql(
  129. """
  130. SELECT count(*) = 1
  131. FROM mz_internal.mz_show_cluster_privileges
  132. WHERE grantee = 'my_role'
  133. AND name = 'green_cluster'
  134. AND privilege_type = 'USAGE'""",
  135. fetch="one",
  136. )
  137. assert result[0]
  138. class TestCIFixture:
  139. @pytest.fixture(autouse=True)
  140. def cleanup(self, project):
  141. project.run_sql("COMMENT ON CLUSTER quickstart IS NULL")
  142. project.run_sql("COMMENT ON SCHEMA public IS NULL")
  143. def test_ci_tags_match(self, project):
  144. run_dbt(
  145. [
  146. "run-operation",
  147. "set_cluster_ci_tag",
  148. "--args",
  149. "{cluster: quickstart, ci_tag: test}",
  150. ]
  151. )
  152. run_dbt(
  153. [
  154. "run-operation",
  155. "set_schema_ci_tag",
  156. "--args",
  157. "{schema: public, ci_tag: test}",
  158. ]
  159. )
  160. run_dbt(
  161. [
  162. "run-operation",
  163. "check_cluster_ci_tag",
  164. "--args",
  165. "{cluster: quickstart, ci_tag: test}",
  166. ]
  167. )
  168. run_dbt(
  169. [
  170. "run-operation",
  171. "check_schema_ci_tag",
  172. "--args",
  173. "{schema: public, ci_tag: test}",
  174. ]
  175. )
  176. def test_ci_tags_mismatch(self, project):
  177. run_dbt(
  178. [
  179. "run-operation",
  180. "set_cluster_ci_tag",
  181. "--args",
  182. "{cluster: quickstart, ci_tag: test}",
  183. ]
  184. )
  185. run_dbt(
  186. [
  187. "run-operation",
  188. "set_schema_ci_tag",
  189. "--args",
  190. "{schema: public, ci_tag: test}",
  191. ]
  192. )
  193. run_dbt(
  194. [
  195. "run-operation",
  196. "check_cluster_ci_tag",
  197. "--args",
  198. "{cluster: quickstart, ci_tag: different}",
  199. ],
  200. expect_pass=False,
  201. )
  202. run_dbt(
  203. [
  204. "run-operation",
  205. "check_schema_ci_tag",
  206. "--args",
  207. "{schema: public, ci_tag: different}",
  208. ],
  209. expect_pass=False,
  210. )
  211. class TestPermissionValidation:
  212. """Tests for database permissions. We run against the internal
  213. macros because the materialize user is a superuser and we'd short
  214. circuit these checks if executing deploy_validate_permissions"""
  215. def test_createcluster_permissions(self, project):
  216. run_dbt(["run-operation", "internal_ensure_createcluster_permission"])
  217. def test_database_create_permissions(self, project):
  218. run_dbt(["run-operation", "internal_ensure_database_permission"])
  219. def test_schema_owner_permissions(self, project):
  220. project.run_sql("CREATE SCHEMA my_schema")
  221. run_dbt(
  222. [
  223. "run-operation",
  224. "internal_ensure_schema_ownership",
  225. "--args",
  226. "{schemas: ['my_schema']}",
  227. ]
  228. )
  229. def test_cluster_owner_permissions(self, project):
  230. run_dbt(
  231. [
  232. "run-operation",
  233. "internal_ensure_cluster_ownership",
  234. "--args",
  235. "{clusters: ['quickstart']}",
  236. ]
  237. )
  238. class TestRunWithDeploy:
  239. @pytest.fixture(scope="class")
  240. def dbt_profile_target(self):
  241. return {
  242. "type": "materialize",
  243. "threads": 1,
  244. "host": "{{ env_var('DBT_HOST', 'localhost') }}",
  245. "user": "materialize",
  246. "pass": "password",
  247. "database": "materialize",
  248. "port": "{{ env_var('DBT_PORT', 6875) }}",
  249. "cluster": "quickstart",
  250. }
  251. @pytest.fixture(scope="class")
  252. def project_config_update(self):
  253. return {
  254. "vars": {
  255. "deployment": {
  256. "default": {"clusters": ["quickstart"], "schemas": ["public"]}
  257. }
  258. }
  259. }
  260. @pytest.fixture(scope="class")
  261. def models(self):
  262. return {
  263. "test_source.sql": test_source,
  264. "test_materialized_view_index.sql": test_materialized_view_index,
  265. "test_view_index.sql": test_view_index,
  266. }
  267. @pytest.fixture(autouse=True)
  268. def cleanup(self, project):
  269. project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
  270. project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
  271. def test_deployment_run(self, project):
  272. # the test runner overrides schemas
  273. # so we can only validate the cluster
  274. # configuration is overridden.
  275. run_dbt(["run-operation", "deploy_init"])
  276. run_dbt(["run", "--vars", "deploy: True"])
  277. sources = project.run_sql(
  278. """
  279. SELECT count(*)
  280. FROM mz_sources
  281. JOIN mz_clusters ON mz_sources.cluster_id = mz_clusters.id
  282. WHERE mz_clusters.name = 'quickstart_dbt_deploy'
  283. AND mz_sources.id LIKE 'u%'""",
  284. fetch="one",
  285. )
  286. assert int(sources[0]) == 1
  287. mat_views = project.run_sql(
  288. """
  289. SELECT count(*)
  290. FROM mz_materialized_views
  291. JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
  292. WHERE mz_clusters.name = 'quickstart_dbt_deploy'
  293. AND mz_materialized_views.id LIKE 'u%'""",
  294. fetch="one",
  295. )
  296. assert int(mat_views[0]) == 1
  297. indexes = project.run_sql(
  298. """
  299. SELECT count(*)
  300. FROM mz_indexes
  301. JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
  302. WHERE mz_clusters.name = 'quickstart_dbt_deploy'
  303. AND mz_indexes.id LIKE 'u%'""",
  304. fetch="one",
  305. )
  306. assert int(indexes[0]) == 2
  307. run_dbt(["run-operation", "deploy_cleanup"])
  308. class TestSinkFail:
  309. @pytest.fixture(scope="class")
  310. def project_config_update(self):
  311. return {
  312. "vars": {
  313. "deployment": {
  314. "default": {"clusters": ["quickstart"], "schemas": ["public"]}
  315. }
  316. }
  317. }
  318. @pytest.fixture(scope="class")
  319. def models(self):
  320. return {
  321. "test_materialized_view.sql": test_materialized_view,
  322. "test_sink.sql": test_sink,
  323. }
  324. @pytest.fixture(autouse=True)
  325. def cleanup(self, project):
  326. project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
  327. project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
  328. def test_source_fails(self, project):
  329. run_dbt(["run"])
  330. run_dbt(["run-operation", "deploy_init"], expect_pass=False)
  331. class TestTargetDeploy:
  332. @pytest.fixture(scope="class")
  333. def project_config_update(self):
  334. return {
  335. "vars": {
  336. "deployment": {
  337. "default": {
  338. "clusters": ["prod"],
  339. "schemas": ["prod", "staging"],
  340. }
  341. },
  342. }
  343. }
  344. @pytest.fixture(autouse=True)
  345. def cleanup(self, project):
  346. project.run_sql("DROP CLUSTER IF EXISTS prod CASCADE")
  347. project.run_sql("DROP CLUSTER IF EXISTS prod_dbt_deploy CASCADE")
  348. project.run_sql("DROP SCHEMA IF EXISTS prod CASCADE")
  349. project.run_sql("DROP SCHEMA IF EXISTS prod_dbt_deploy CASCADE")
  350. project.run_sql("DROP SCHEMA IF EXISTS staging CASCADE")
  351. project.run_sql("DROP SCHEMA IF EXISTS staging_dbt_deploy CASCADE")
  352. def test_dbt_deploy(self, project):
  353. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  354. project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
  355. project.run_sql("CREATE SCHEMA prod")
  356. project.run_sql("CREATE SCHEMA prod_dbt_deploy")
  357. project.run_sql("CREATE SCHEMA staging")
  358. project.run_sql("CREATE SCHEMA staging_dbt_deploy")
  359. before_clusters = dict(
  360. project.run_sql(
  361. "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
  362. fetch="all",
  363. )
  364. )
  365. before_schemas = dict(
  366. project.run_sql(
  367. "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
  368. fetch="all",
  369. )
  370. )
  371. before_staging_schemas = dict(
  372. project.run_sql(
  373. "SELECT name, id FROM mz_schemas WHERE name IN ('staging', 'staging_dbt_deploy')",
  374. fetch="all",
  375. )
  376. )
  377. run_dbt(["run-operation", "deploy_promote", "--args", "{dry_run: true}"])
  378. after_clusters = dict(
  379. project.run_sql(
  380. "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
  381. fetch="all",
  382. )
  383. )
  384. after_schemas = dict(
  385. project.run_sql(
  386. "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
  387. fetch="all",
  388. )
  389. )
  390. after_staging_schemas = dict(
  391. project.run_sql(
  392. "SELECT name, id FROM mz_schemas WHERE name IN ('staging', 'staging_dbt_deploy')",
  393. fetch="all",
  394. )
  395. )
  396. assert before_clusters["prod"] != after_clusters["prod_dbt_deploy"]
  397. assert before_clusters["prod_dbt_deploy"] != after_clusters["prod"]
  398. assert before_schemas["prod"] != after_schemas["prod_dbt_deploy"]
  399. assert before_schemas["prod_dbt_deploy"] != after_schemas["prod"]
  400. assert (
  401. before_staging_schemas["staging"]
  402. != after_staging_schemas["staging_dbt_deploy"]
  403. )
  404. assert (
  405. before_staging_schemas["staging_dbt_deploy"]
  406. != after_staging_schemas["staging"]
  407. )
  408. run_dbt(["run-operation", "deploy_promote"])
  409. after_clusters = dict(
  410. project.run_sql(
  411. "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
  412. fetch="all",
  413. )
  414. )
  415. after_schemas = dict(
  416. project.run_sql(
  417. "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
  418. fetch="all",
  419. )
  420. )
  421. after_staging_schemas = dict(
  422. project.run_sql(
  423. "SELECT name, id FROM mz_schemas WHERE name IN ('staging', 'staging_dbt_deploy')",
  424. fetch="all",
  425. )
  426. )
  427. assert before_clusters["prod"] == after_clusters["prod_dbt_deploy"]
  428. assert before_clusters["prod_dbt_deploy"] == after_clusters["prod"]
  429. assert before_schemas["prod"] == after_schemas["prod_dbt_deploy"]
  430. assert before_schemas["prod_dbt_deploy"] == after_schemas["prod"]
  431. assert (
  432. before_staging_schemas["staging"]
  433. == after_staging_schemas["staging_dbt_deploy"]
  434. )
  435. assert (
  436. before_staging_schemas["staging_dbt_deploy"]
  437. == after_staging_schemas["staging"]
  438. )
  439. # Verify that both schemas are tagged correctly
  440. for schema_name in ["prod", "staging"]:
  441. tagged_schema_comment = project.run_sql(
  442. f"""
  443. SELECT c.comment
  444. FROM mz_internal.mz_comments c
  445. JOIN mz_schemas s USING (id)
  446. WHERE s.name = '{schema_name}';
  447. """,
  448. fetch="one",
  449. )
  450. assert (
  451. tagged_schema_comment is not None
  452. ), f"No comment found for schema {schema_name}"
  453. assert (
  454. "Deployment by" in tagged_schema_comment[0]
  455. ), f"Missing deployment info in {schema_name} comment"
  456. assert (
  457. "on" in tagged_schema_comment[0]
  458. ), f"Missing timestamp in {schema_name} comment"
  459. def test_dbt_deploy_with_force(self, project):
  460. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  461. project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
  462. project.run_sql("CREATE SCHEMA prod")
  463. project.run_sql("CREATE SCHEMA prod_dbt_deploy")
  464. project.run_sql("CREATE SCHEMA staging")
  465. project.run_sql("CREATE SCHEMA staging_dbt_deploy")
  466. before_clusters = dict(
  467. project.run_sql(
  468. "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
  469. fetch="all",
  470. )
  471. )
  472. before_schemas = dict(
  473. project.run_sql(
  474. "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
  475. fetch="all",
  476. )
  477. )
  478. run_dbt(["run-operation", "deploy_promote", "--args", "{wait: true}"])
  479. after_clusters = dict(
  480. project.run_sql(
  481. "SELECT name, id FROM mz_clusters WHERE name IN ('prod', 'prod_dbt_deploy')",
  482. fetch="all",
  483. )
  484. )
  485. after_schemas = dict(
  486. project.run_sql(
  487. "SELECT name, id FROM mz_schemas WHERE name IN ('prod', 'prod_dbt_deploy')",
  488. fetch="all",
  489. )
  490. )
  491. assert before_clusters["prod"] == after_clusters["prod_dbt_deploy"]
  492. assert before_clusters["prod_dbt_deploy"] == after_clusters["prod"]
  493. assert before_schemas["prod"] == after_schemas["prod_dbt_deploy"]
  494. assert before_schemas["prod"] == after_schemas["prod_dbt_deploy"]
  495. def test_dbt_deploy_missing_deployment_cluster(self, project):
  496. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  497. project.run_sql("CREATE SCHEMA prod")
  498. project.run_sql("CREATE SCHEMA prod_dbt_deploy")
  499. project.run_sql("CREATE SCHEMA staging")
  500. project.run_sql("CREATE SCHEMA staging_dbt_deploy")
  501. run_dbt(["run-operation", "deploy_promote"], expect_pass=False)
  502. def test_dbt_deploy_missing_deployment_schema(self, project):
  503. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  504. project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
  505. project.run_sql("CREATE SCHEMA prod")
  506. run_dbt(["run-operation", "deploy_promote"], expect_pass=False)
  507. def test_fails_on_unmanaged_cluster(self, project):
  508. project.run_sql("CREATE CLUSTER prod REPLICAS ()")
  509. project.run_sql("CREATE SCHEMA prod")
  510. project.run_sql("CREATE SCHEMA staging")
  511. project.run_sql("CREATE SCHEMA staging_dbt_deploy")
  512. run_dbt(["run-operation", "deploy_init"], expect_pass=False)
  513. def test_dbt_deploy_init_with_refresh_hydration_time(self, project):
  514. project.run_sql(
  515. "CREATE CLUSTER prod (SIZE = '1', SCHEDULE = ON REFRESH (HYDRATION TIME ESTIMATE = '1 hour'))"
  516. )
  517. project.run_sql("CREATE SCHEMA prod")
  518. project.run_sql("CREATE SCHEMA staging")
  519. run_dbt(["run-operation", "deploy_init"])
  520. # Get the rehydration time from the cluster
  521. cluster_type = project.run_sql(
  522. """
  523. SELECT cs.type
  524. FROM mz_internal.mz_cluster_schedules cs
  525. JOIN mz_clusters c ON cs.cluster_id = c.id
  526. WHERE c.name = 'prod_dbt_deploy'
  527. """,
  528. fetch="one",
  529. )
  530. assert cluster_type == ("on-refresh",)
  531. def test_dbt_deploy_init_and_cleanup(self, project):
  532. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  533. project.run_sql("CREATE SCHEMA prod")
  534. project.run_sql("CREATE SCHEMA staging")
  535. run_dbt(["run-operation", "deploy_init"])
  536. (size, replication_factor) = project.run_sql(
  537. "SELECT size, replication_factor FROM mz_clusters WHERE name = 'prod_dbt_deploy'",
  538. fetch="one",
  539. )
  540. assert size == "1"
  541. assert replication_factor == "1"
  542. result = project.run_sql(
  543. "SELECT count(*) = 1 FROM mz_schemas WHERE name = 'prod_dbt_deploy'",
  544. fetch="one",
  545. )
  546. assert bool(result[0])
  547. run_dbt(["run-operation", "deploy_cleanup"])
  548. result = project.run_sql(
  549. "SELECT count(*) = 0 FROM mz_clusters WHERE name = 'prod_dbt_deploy'",
  550. fetch="one",
  551. )
  552. assert bool(result[0])
  553. result = project.run_sql(
  554. "SELECT count(*) = 0 FROM mz_schemas WHERE name = 'prod_dbt_deploy'",
  555. fetch="one",
  556. )
  557. assert bool(result[0])
  558. def test_cluster_contains_objects(self, project):
  559. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  560. project.run_sql("CREATE SCHEMA prod")
  561. project.run_sql("CREATE SCHEMA prod_dbt_deploy")
  562. project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
  563. project.run_sql("CREATE SCHEMA staging")
  564. project.run_sql("CREATE SCHEMA staging_dbt_deploy")
  565. project.run_sql(
  566. "CREATE MATERIALIZED VIEW mv IN CLUSTER prod_dbt_deploy AS SELECT 1"
  567. )
  568. run_dbt(["run-operation", "deploy_init"], expect_pass=False)
  569. run_dbt(
  570. [
  571. "run-operation",
  572. "deploy_init",
  573. "--args",
  574. "{ignore_existing_objects: True}",
  575. ]
  576. )
  577. def test_schema_contains_objects(self, project):
  578. project.run_sql("CREATE CLUSTER prod SIZE = '1'")
  579. project.run_sql("CREATE SCHEMA prod")
  580. project.run_sql("CREATE SCHEMA prod_dbt_deploy")
  581. project.run_sql("CREATE CLUSTER prod_dbt_deploy SIZE = '1'")
  582. project.run_sql("CREATE SCHEMA staging")
  583. project.run_sql("CREATE SCHEMA staging_dbt_deploy")
  584. project.run_sql("CREATE VIEW prod_dbt_deploy.view AS SELECT 1")
  585. run_dbt(["run-operation", "deploy_init"], expect_pass=False)
  586. run_dbt(
  587. [
  588. "run-operation",
  589. "deploy_init",
  590. "--args",
  591. "{ignore_existing_objects: True}",
  592. ]
  593. )
  594. class TestLagTolerance:
  595. @pytest.fixture(scope="class")
  596. def project_config_update(self):
  597. return {
  598. "vars": {
  599. "deployment": {
  600. "default": {"clusters": ["quickstart"], "schemas": ["public"]}
  601. }
  602. }
  603. }
  604. @pytest.fixture(autouse=True)
  605. def cleanup(self, project):
  606. project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
  607. project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
  608. def test_deploy_await_custom_lag_threshold(self, project):
  609. run_dbt(["run-operation", "deploy_init"])
  610. result = run_dbt(
  611. [
  612. "run-operation",
  613. "deploy_await",
  614. "--args",
  615. "{poll_interval: 5, lag_threshold: '5s'}",
  616. ]
  617. )
  618. assert len(result) > 0 and result[0].status == "success"
  619. def test_deploy_promote_custom_lag_threshold(self, project):
  620. run_dbt(["run-operation", "deploy_init"])
  621. result = run_dbt(
  622. [
  623. "run-operation",
  624. "deploy_promote",
  625. "--args",
  626. "{wait: true, poll_interval: 5, lag_threshold: '5s'}",
  627. ]
  628. )
  629. assert len(result) > 0 and result[0].status == "success"
  630. def test_default_lag_threshold(self, project):
  631. run_dbt(["run-operation", "deploy_init"])
  632. result = run_dbt(["run-operation", "deploy_await"])
  633. assert len(result) > 0 and result[0].status == "success"
  634. def test_large_lag_threshold(self, project):
  635. run_dbt(["run-operation", "deploy_init"])
  636. result = run_dbt(
  637. [
  638. "run-operation",
  639. "deploy_await",
  640. "--args",
  641. "{poll_interval: 5, lag_threshold: '1h'}",
  642. ]
  643. )
  644. assert len(result) > 0 and result[0].status == "success"
  645. class TestEndToEndDeployment:
  646. @pytest.fixture(scope="class")
  647. def dbt_profile_target(self):
  648. return {
  649. "type": "materialize",
  650. "threads": 1,
  651. "host": "{{ env_var('DBT_HOST', 'localhost') }}",
  652. "user": "materialize",
  653. "pass": "password",
  654. "database": "materialize",
  655. "port": "{{ env_var('DBT_PORT', 6875) }}",
  656. "cluster": "quickstart",
  657. }
  658. @pytest.fixture(scope="class")
  659. def models(self):
  660. return {
  661. "test_materialized_view_deploy.sql": test_materialized_view_deploy,
  662. "test_sink_deploy.sql": test_sink_deploy,
  663. }
  664. @pytest.fixture(autouse=True)
  665. def cleanup(self, project):
  666. project.run_sql("DROP ROLE IF EXISTS test_role")
  667. project.run_sql("DROP CLUSTER IF EXISTS quickstart_dbt_deploy CASCADE")
  668. project.run_sql("DROP SCHEMA IF EXISTS public_dbt_deploy CASCADE")
  669. project.run_sql("DROP CLUSTER IF EXISTS sinks_cluster CASCADE")
  670. project.run_sql("DROP SCHEMA IF EXISTS sinks_schema CASCADE")
  671. project.run_sql("DROP TABLE IF EXISTS source_table")
  672. project.run_sql("DROP SOURCE IF EXISTS sink_validation_source")
  673. def test_full_deploy_process(self, project):
  674. # Create test role and grant it to materialize
  675. project.run_sql("CREATE ROLE test_role")
  676. project.run_sql("GRANT test_role TO materialize")
  677. # Prepare the source table, the sink cluster and schema
  678. project.run_sql("CREATE TABLE source_table (val INTEGER)")
  679. project.run_sql("CREATE CLUSTER sinks_cluster SIZE = '1'")
  680. project.run_sql("CREATE SCHEMA sinks_schema")
  681. project.run_sql("INSERT INTO source_table VALUES (1)")
  682. run_dbt(["run"])
  683. created_schema = project.created_schemas[0]
  684. # Set up initial grants on the production schema and cluster
  685. project.run_sql(f"GRANT USAGE ON SCHEMA {created_schema} TO test_role")
  686. project.run_sql(f"GRANT CREATE ON SCHEMA {created_schema} TO test_role")
  687. project.run_sql(
  688. f"ALTER DEFAULT PRIVILEGES FOR ROLE test_role IN SCHEMA {created_schema} GRANT SELECT ON TABLES TO test_role"
  689. )
  690. project.run_sql("GRANT USAGE ON CLUSTER quickstart TO test_role")
  691. project.run_sql("GRANT CREATE ON CLUSTER quickstart TO test_role")
  692. # Store initial grants for later comparison
  693. initial_schema_grants = project.run_sql(
  694. f"""
  695. WITH schema_privilege AS (
  696. SELECT mz_internal.mz_aclexplode(s.privileges).*
  697. FROM mz_schemas s
  698. JOIN mz_databases d ON s.database_id = d.id
  699. WHERE d.name = current_database()
  700. AND s.name = '{created_schema}'
  701. )
  702. SELECT privilege_type, grantee.name as grantee
  703. FROM schema_privilege
  704. JOIN mz_roles grantee ON grantee = grantee.id
  705. WHERE grantee.name = 'test_role'
  706. ORDER BY privilege_type, grantee
  707. """,
  708. fetch="all",
  709. )
  710. initial_cluster_grants = project.run_sql(
  711. """
  712. WITH cluster_privilege AS (
  713. SELECT mz_internal.mz_aclexplode(privileges).*
  714. FROM mz_clusters
  715. WHERE name = 'quickstart'
  716. )
  717. SELECT privilege_type, grantee.name as grantee
  718. FROM cluster_privilege
  719. JOIN mz_roles grantee ON grantee = grantee.id
  720. WHERE grantee.name = 'test_role'
  721. ORDER BY privilege_type, grantee
  722. """,
  723. fetch="all",
  724. )
  725. initial_default_privs = project.run_sql(
  726. f"""
  727. SELECT privilege_type, grantee, object_type
  728. FROM mz_internal.mz_show_default_privileges
  729. WHERE database = current_database()
  730. AND schema = '{created_schema}'
  731. AND grantee = 'test_role'
  732. ORDER BY privilege_type, grantee, object_type
  733. """,
  734. fetch="all",
  735. )
  736. project_config = f"{{deployment: {{default: {{clusters: ['quickstart'], schemas: ['{created_schema}']}}}}}}"
  737. project_config_deploy = f"{{deployment: {{default: {{clusters: ['quickstart'], schemas: ['{created_schema}']}}}}, deploy: True}}"
  738. # Validate the initial sink result
  739. project.run_sql(
  740. "CREATE SOURCE sink_validation_source FROM KAFKA CONNECTION kafka_connection (TOPIC 'testdrive-test-sink-1') FORMAT JSON"
  741. )
  742. run_with_retry(
  743. project, "SELECT count(*) FROM sink_validation_source", expected_count=1
  744. )
  745. # Insert another row and validate the sink
  746. project.run_sql("INSERT INTO source_table VALUES (2)")
  747. run_with_retry(
  748. project, "SELECT count(*) FROM sink_validation_source", expected_count=2
  749. )
  750. # Initialize the deployment environment
  751. run_dbt(["run-operation", "deploy_init", "--vars", project_config])
  752. # Verify grants were copied to deployment schema
  753. deploy_schema_grants = project.run_sql(
  754. f"""
  755. WITH schema_privilege AS (
  756. SELECT mz_internal.mz_aclexplode(s.privileges).*
  757. FROM mz_schemas s
  758. JOIN mz_databases d ON s.database_id = d.id
  759. WHERE d.name = current_database()
  760. AND s.name = '{created_schema}_dbt_deploy'
  761. )
  762. SELECT privilege_type, grantee.name as grantee
  763. FROM schema_privilege
  764. JOIN mz_roles grantee ON grantee = grantee.id
  765. WHERE grantee.name = 'test_role'
  766. ORDER BY privilege_type, grantee
  767. """,
  768. fetch="all",
  769. )
  770. # Verify grants were copied to deployment cluster
  771. deploy_cluster_grants = project.run_sql(
  772. """
  773. WITH cluster_privilege AS (
  774. SELECT mz_internal.mz_aclexplode(privileges).*
  775. FROM mz_clusters
  776. WHERE name = 'quickstart_dbt_deploy'
  777. )
  778. SELECT privilege_type, grantee.name as grantee
  779. FROM cluster_privilege
  780. JOIN mz_roles grantee ON grantee = grantee.id
  781. WHERE grantee.name = 'test_role'
  782. ORDER BY privilege_type, grantee
  783. """,
  784. fetch="all",
  785. )
  786. deploy_default_privs = project.run_sql(
  787. f"""
  788. SELECT privilege_type, grantee, object_type
  789. FROM mz_internal.mz_show_default_privileges
  790. WHERE database = current_database()
  791. AND schema = '{created_schema}_dbt_deploy'
  792. AND grantee = 'test_role'
  793. ORDER BY privilege_type, grantee, object_type
  794. """,
  795. fetch="all",
  796. )
  797. # Assert grants match between production and deployment environments
  798. assert (
  799. initial_schema_grants == deploy_schema_grants
  800. ), "Schema grants do not match between production and deployment"
  801. assert (
  802. initial_cluster_grants == deploy_cluster_grants
  803. ), "Cluster grants do not match between production and deployment"
  804. assert (
  805. initial_default_privs == deploy_default_privs
  806. ), "Default privileges do not match between production and deployment"
  807. # Run the deploy with the deploy flag set to True and exclude the sink creation
  808. run_dbt(
  809. [
  810. "run",
  811. "--vars",
  812. project_config_deploy,
  813. "--exclude",
  814. "config.materialized:sink",
  815. ]
  816. )
  817. # Ensure the validation source has not changed
  818. run_with_retry(
  819. project, "SELECT count(*) FROM sink_validation_source", expected_count=2
  820. )
  821. # Insert a new row and validate the new sink result after the deploy
  822. project.run_sql("INSERT INTO source_table VALUES (3)")
  823. run_with_retry(
  824. project, "SELECT count(*) FROM sink_validation_source", expected_count=3
  825. )
  826. # Get the IDs of the materialized views
  827. before_view_id = project.run_sql(
  828. f"SELECT id FROM mz_materialized_views WHERE name = 'test_materialized_view_deploy' AND schema_id = (SELECT id FROM mz_schemas WHERE name = '{created_schema}')",
  829. fetch="one",
  830. )[0]
  831. after_view_id = project.run_sql(
  832. f"SELECT id FROM mz_materialized_views WHERE name = 'test_materialized_view_deploy' AND schema_id = (SELECT id FROM mz_schemas WHERE name = '{created_schema}_dbt_deploy')",
  833. fetch="one",
  834. )[0]
  835. before_sink = project.run_sql(
  836. "SELECT name, id, create_sql FROM mz_sinks WHERE name = 'test_sink_deploy'",
  837. fetch="one",
  838. )
  839. assert (
  840. before_view_id in before_sink[2]
  841. ), "Before deployment, the sink should point to the original view ID"
  842. run_dbt(
  843. [
  844. "run-operation",
  845. "deploy_promote",
  846. "--args",
  847. "{dry_run: true}",
  848. "--vars",
  849. project_config,
  850. ]
  851. )
  852. after_sink = project.run_sql(
  853. "SELECT name, id, create_sql FROM mz_sinks WHERE name = 'test_sink_deploy'",
  854. fetch="one",
  855. )
  856. assert (
  857. before_view_id in after_sink[2]
  858. ), "The sink should point to the same view ID after a dry_run"
  859. assert before_sink[0] == after_sink[0], "Sink name should be the same"
  860. # Promote and ensure the sink points to the new objects
  861. run_dbt(["run-operation", "deploy_promote", "--vars", project_config])
  862. after_sink = project.run_sql(
  863. "SELECT name, id, create_sql FROM mz_sinks WHERE name = 'test_sink_deploy'",
  864. fetch="one",
  865. )
  866. after_view_id = project.run_sql(
  867. f"SELECT id FROM mz_materialized_views WHERE name = 'test_materialized_view_deploy' AND schema_id = (SELECT id FROM mz_schemas WHERE name = '{created_schema}')",
  868. fetch="one",
  869. )[0]
  870. assert (
  871. after_view_id in after_sink[2]
  872. ), "After deployment, the sink should point to the new view ID"
  873. assert before_sink[0] == after_sink[0], "Sink name should be the same"
  874. assert (
  875. before_view_id != after_view_id
  876. ), "Sink's view ID should be different after deployment"
  877. run_dbt(["run-operation", "deploy_cleanup", "--vars", project_config])
  878. def run_with_retry(project, sql, expected_count, retries=5, delay=3, fetch="one"):
  879. for i in range(retries):
  880. result = project.run_sql(sql, fetch=fetch)
  881. if result[0] == expected_count:
  882. return result
  883. time.sleep(delay)
  884. raise AssertionError(
  885. f"Expected count to be {expected_count}, but got {result[0]} after {retries} retries"
  886. )