create_cluster.sql 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. -- Copyright Materialize, Inc. and contributors. All rights reserved.
  2. --
  3. -- Licensed under the Apache License, Version 2.0 (the "License");
  4. -- you may not use this file except in compliance with the License.
  5. -- You may obtain a copy of the License in the LICENSE file at the
  6. -- root of this repository, or online at
  7. --
  8. -- http://www.apache.org/licenses/LICENSE-2.0
  9. --
  10. -- Unless required by applicable law or agreed to in writing, software
  11. -- distributed under the License is distributed on an "AS IS" BASIS,
  12. -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. -- See the License for the specific language governing permissions and
  14. -- limitations under the License.
  15. {#
  16. This macro creates a cluster with the specified properties.
  17. ## Arguments
  18. - cluster_name (str): The name of the cluster. This parameter is required.
  19. - size (str): The size of the cluster. This parameter is required.
  20. - replication_factor (int, optional): The replication factor for the cluster. Only applicable when schedule_type is 'manual'.
  21. - schedule_type (str, optional): The type of schedule for the cluster. Accepts 'manual' or 'on-refresh'.
  22. - refresh_hydration_time_estimate (str, optional): The estimated hydration time for the cluster. Only applicable when schedule_type is 'on-refresh'.
  23. - ignore_existing_objects (bool, optional): Whether to ignore existing objects in the cluster. Defaults to false.
  24. - force_deploy_suffix (bool, optional): Whether to forcefully add a deploy suffix to the cluster name. Defaults to false.
  25. Incompatibilities:
  26. - replication_factor is only applicable when schedule_type is 'manual'.
  27. - refresh_hydration_time_estimate is only applicable when schedule_type is 'on-refresh'.
  28. #}
  29. {% macro create_cluster(
  30. cluster_name,
  31. size,
  32. replication_factor=none,
  33. schedule_type=none,
  34. refresh_hydration_time_estimate=none,
  35. ignore_existing_objects=false,
  36. force_deploy_suffix=false
  37. ) %}
  38. {# Input validation #}
  39. {% if not cluster_name %}
  40. {{ exceptions.raise_compiler_error("cluster_name must be provided") }}
  41. {% endif %}
  42. {% if not size %}
  43. {{ exceptions.raise_compiler_error("size must be provided") }}
  44. {% endif %}
  45. {% set deploy_cluster = adapter.generate_final_cluster_name(cluster_name, force_deploy_suffix) %}
  46. {% if cluster_exists(deploy_cluster) %}
  47. {{ log("Deployment cluster " ~ deploy_cluster ~ " already exists", info=True) }}
  48. {% set cluster_empty %}
  49. WITH dataflows AS (
  50. SELECT mz_indexes.id
  51. FROM mz_indexes
  52. JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
  53. WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}
  54. UNION ALL
  55. SELECT mz_materialized_views.id
  56. FROM mz_materialized_views
  57. JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
  58. WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}
  59. UNION ALL
  60. SELECT mz_sources.id
  61. FROM mz_sources
  62. JOIN mz_clusters ON mz_clusters.id = mz_sources.cluster_id
  63. WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}
  64. UNION ALL
  65. SELECT mz_sinks.id
  66. FROM mz_sinks
  67. JOIN mz_clusters ON mz_clusters.id = mz_sinks.cluster_id
  68. WHERE mz_clusters.name = {{ dbt.string_literal(deploy_cluster) }}
  69. )
  70. SELECT count(*)
  71. FROM dataflows
  72. WHERE id LIKE 'u%'
  73. {% endset %}
  74. {% set cluster_object_count = run_query(cluster_empty) %}
  75. {% if execute %}
  76. {% if cluster_object_count and cluster_object_count.columns[0] and cluster_object_count.rows[0][0] > 0 %}
  77. {% if check_cluster_ci_tag(deploy_cluster) %}
  78. {{ log("Cluster " ~ deploy_cluster ~ " was already created for this pull request", info=True) }}
  79. {% elif ignore_existing_objects %}
  80. {{ log("[Warning] Deployment cluster " ~ deploy_cluster ~ " is not empty", info=True) }}
  81. {{ log("[Warning] Confirm the objects it contains are expected before deployment", info=True) }}
  82. {% else %}
  83. {{ exceptions.raise_compiler_error("
  84. Deployment cluster " ~ deploy_cluster ~ " already exists and is not empty.
  85. This is potentially dangerous as you may end up deploying objects to production you
  86. do not intend.
  87. If you are certain the objects in this cluster are supposed to exist, you can ignore this
  88. error by setting ignore_existing_objects to True.
  89. dbt run-operation create_cluster --args '{ignore_existing_objects: True}'
  90. ") }}
  91. {% endif %}
  92. {% endif %}
  93. {% endif %}
  94. {% else %}
  95. {{ log("Creating deployment cluster " ~ deploy_cluster, info=True)}}
  96. {% set create_cluster_ddl %}
  97. CREATE CLUSTER {{ deploy_cluster }} (
  98. SIZE = {{ dbt.string_literal(size) }}
  99. {% if replication_factor is not none and ( schedule_type == 'manual' or schedule_type is none ) %}
  100. , REPLICATION FACTOR = {{ replication_factor }}
  101. {% elif schedule_type == 'on-refresh' %}
  102. {% if refresh_hydration_time_estimate is not none %}
  103. , SCHEDULE = ON REFRESH (HYDRATION TIME ESTIMATE = {{ dbt.string_literal(refresh_hydration_time_estimate) }})
  104. {% else %}
  105. , SCHEDULE = ON REFRESH
  106. {% endif %}
  107. {% endif %}
  108. )
  109. {% endset %}
  110. {{ run_query(create_cluster_ddl) }}
  111. {{ set_cluster_ci_tag(deploy_cluster) }}
  112. {% endif %}
  113. {{ return(deploy_cluster) }}
  114. {% endmacro %}