impl.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. # Copyright 2020 Josh Wills. All rights reserved.
  2. # Copyright Materialize, Inc. and contributors. All rights reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License in the LICENSE file at the
  7. # root of this repository, or online at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import subprocess
  17. import time
  18. from collections import namedtuple
  19. from dataclasses import dataclass
  20. from typing import Any, Dict, List, Optional, Set
  21. import dbt_common.exceptions
  22. from dbt_common.contracts.constraints import (
  23. ColumnLevelConstraint,
  24. ConstraintType,
  25. )
  26. from dbt_common.dataclass_schema import ValidationError, dbtClassMixin
  27. from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
  28. from dbt.adapters.base.meta import available
  29. from dbt.adapters.capability import (
  30. Capability,
  31. CapabilityDict,
  32. CapabilitySupport,
  33. Support,
  34. )
  35. from dbt.adapters.materialize.connections import MaterializeConnectionManager
  36. from dbt.adapters.materialize.exceptions import (
  37. RefreshIntervalConfigError,
  38. RefreshIntervalConfigNotDictError,
  39. )
  40. from dbt.adapters.materialize.relation import MaterializeRelation
  41. from dbt.adapters.postgres.column import PostgresColumn
  42. from dbt.adapters.postgres.impl import PostgresAdapter, SQLAdapter
  43. from dbt.adapters.sql.impl import LIST_RELATIONS_MACRO_NAME
  44. # types in ./misc/dbt-materialize need to import generic types from typing
  45. @dataclass
  46. class MaterializeIndexConfig(dbtClassMixin):
  47. columns: Optional[List[str]] = None
  48. default: Optional[bool] = False
  49. name: Optional[str] = None
  50. cluster: Optional[str] = None
  51. @classmethod
  52. def parse(cls, raw_index) -> Optional["MaterializeIndexConfig"]:
  53. if raw_index is None:
  54. return None
  55. try:
  56. cls.validate(raw_index)
  57. return cls.from_dict(raw_index)
  58. except ValidationError as exc:
  59. msg = dbt_common.exceptions.validator_error_message(exc)
  60. dbt_common.exceptions.CompilationError(
  61. f"Could not parse index config: {msg}"
  62. )
  63. except TypeError:
  64. dbt_common.exceptions.CompilationError(
  65. "Invalid index config:\n"
  66. f" Got: {raw_index}\n"
  67. ' Expected a dictionary with at minimum a "columns" key'
  68. )
  69. # NOTE(morsapaes): Materialize allows configuring a refresh interval for the
  70. # materialized view materialization. If no config option is specified, the
  71. # default is REFRESH ON COMMIT. We add an explicitly attribute for the special
  72. # case of parametrizing the configuration option e.g. in macros.
  73. @dataclass
  74. class MaterializeRefreshIntervalConfig(dbtClassMixin):
  75. at: Optional[str] = None
  76. at_creation: Optional[bool] = False
  77. every: Optional[str] = None
  78. aligned_to: Optional[str] = None
  79. on_commit: Optional[bool] = False
  80. @classmethod
  81. def parse(
  82. cls, raw_refresh_interval
  83. ) -> Optional["MaterializeRefreshIntervalConfig"]:
  84. if raw_refresh_interval is None:
  85. return None
  86. try:
  87. cls.validate(raw_refresh_interval)
  88. return cls.from_dict(raw_refresh_interval)
  89. except ValidationError as exc:
  90. raise RefreshIntervalConfigError(exc)
  91. except TypeError:
  92. raise RefreshIntervalConfigNotDictError(raw_refresh_interval)
  93. @dataclass
  94. class MaterializeConfig(AdapterConfig):
  95. cluster: Optional[str] = None
  96. refresh_interval: Optional[MaterializeRefreshIntervalConfig] = None
  97. retain_history: Optional[str] = None
  98. class MaterializeAdapter(PostgresAdapter, SQLAdapter):
  99. ConnectionManager = MaterializeConnectionManager
  100. Relation = MaterializeRelation
  101. AdapterSpecificConfigs = MaterializeConfig
  102. # NOTE(morsapaes): Materialize supports enforcing the NOT NULL constraint
  103. # for materialized views (via the ASSERT NOT NULL clause) and tables. As a
  104. # reminder, tables are modeled as static materialized views (see database-issues#1623).
  105. CONSTRAINT_SUPPORT = {
  106. ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
  107. ConstraintType.not_null: ConstraintSupport.ENFORCED,
  108. ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED,
  109. ConstraintType.primary_key: ConstraintSupport.NOT_SUPPORTED,
  110. ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
  111. }
  112. # NOTE(morsapaes): Materialize supports the functionality required to enable
  113. # metadata source freshness checks, but the value of this feature in a data
  114. # warehouse built for real-time is limited, so we do not implement it.
  115. _capabilities = CapabilityDict(
  116. {
  117. Capability.SchemaMetadataByRelations: CapabilitySupport(
  118. support=Support.NotImplemented
  119. ),
  120. Capability.TableLastModifiedMetadata: CapabilitySupport(
  121. support=Support.NotImplemented
  122. ),
  123. }
  124. )
  125. @classmethod
  126. def is_cancelable(cls) -> bool:
  127. return True
  128. def _link_cached_relations(self, manifest):
  129. # NOTE(benesch): this *should* reimplement the parent class's method
  130. # for Materialize, but presently none of our tests care if we just
  131. # disable this method instead.
  132. #
  133. # [0]: https://github.com/dbt-labs/dbt-core/blob/13b18654f03d92eab3f5a9113e526a2a844f145d/plugins/postgres/dbt/adapters/postgres/impl.py#L126-L133
  134. pass
  135. def _link_cached_database_relations(self, schemas: Set[str]):
  136. """
  137. :param schemas: The set of schemas that should have links added.
  138. """
  139. database = self.config.credentials.database
  140. _Relation = namedtuple("_Relation", "database schema identifier")
  141. links = [
  142. (
  143. _Relation(database, dep_schema, dep_identifier),
  144. _Relation(database, ref_schema, ref_identifier),
  145. )
  146. for dep_schema, dep_identifier, ref_schema, ref_identifier in self.execute_macro(
  147. "materialize__get_relations"
  148. )
  149. # don't record in cache if this relation isn't in a relevant schema
  150. if ref_schema in schemas
  151. ]
  152. for dependent, referenced in links:
  153. self.cache.add_link(
  154. referenced=self.Relation.create(**referenced._asdict()),
  155. dependent=self.Relation.create(**dependent._asdict()),
  156. )
  157. def verify_database(self, database):
  158. pass
  159. def _get_catalog_schemas(self, manifest):
  160. # Materialize allows cross-database references, so we need to adjust this method
  161. schemas = super(SQLAdapter, self)._get_catalog_schemas(manifest)
  162. return schemas
  163. def parse_index(self, raw_index: Any) -> Optional[MaterializeIndexConfig]:
  164. return MaterializeIndexConfig.parse(raw_index)
  165. @available
  166. def parse_refresh_interval(
  167. self, raw_refresh_interval: Any
  168. ) -> Optional[MaterializeRefreshIntervalConfig]:
  169. return MaterializeRefreshIntervalConfig.parse(raw_refresh_interval)
  170. def list_relations_without_caching(
  171. self, schema_relation: MaterializeRelation
  172. ) -> List[MaterializeRelation]:
  173. kwargs = {"schema_relation": schema_relation}
  174. results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
  175. relations = []
  176. quote_policy = {"database": True, "schema": True, "identifier": True}
  177. columns = ["database", "schema", "name", "type"]
  178. for _database, _schema, _identifier, _type in results.select(columns):
  179. try:
  180. _type = self.Relation.get_relation_type(_type.lower())
  181. except ValueError:
  182. _type = self.Relation.get_relation_type.View
  183. relations.append(
  184. self.Relation.create(
  185. database=_database,
  186. schema=_schema,
  187. identifier=_identifier,
  188. quote_policy=quote_policy,
  189. type=_type,
  190. )
  191. )
  192. return relations
  193. # NOTE(morsapaes): Materialize doesn't inline not_null constraints
  194. # in the DDL in the same way as other adapters, so we override the
  195. # default constraint rendering functions.
  196. @classmethod
  197. def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> str:
  198. if constraint.type == ConstraintType.not_null:
  199. rendered_column_constraint = None
  200. rendered_column_constraint = "assert not null"
  201. return rendered_column_constraint
  202. else:
  203. return ""
  204. @available
  205. @classmethod
  206. def render_raw_columns_constraints(
  207. cls, raw_columns: Dict[str, Dict[str, Any]]
  208. ) -> List:
  209. rendered_column_constraints = []
  210. for v in raw_columns.values():
  211. if v.get("constraints"):
  212. col_name = cls.quote(v["name"]) if v.get("quote") else v["name"]
  213. rendered_column_constraint = [col_name]
  214. for con in v.get("constraints", None):
  215. constraint = cls._parse_column_constraint(con)
  216. c = cls.process_parsed_constraint(
  217. constraint, cls.render_column_constraint
  218. )
  219. if c is not None:
  220. rendered_column_constraint.insert(0, c)
  221. rendered_column_constraints.append(" ".join(rendered_column_constraint))
  222. return rendered_column_constraints
  223. @available
  224. @classmethod
  225. def sleep(cls, seconds):
  226. time.sleep(seconds)
  227. @available
  228. @classmethod
  229. def get_git_commit_sha(cls):
  230. try:
  231. result = subprocess.run(
  232. ["git", "rev-parse", "HEAD"],
  233. capture_output=True,
  234. check=True,
  235. text=True,
  236. )
  237. return result.stdout.strip()
  238. except subprocess.CalledProcessError:
  239. return None
  240. def get_column_schema_from_query(self, sql: str) -> List[PostgresColumn]:
  241. # The idea is that this function returns the names and types of the
  242. # columns returned by `sql` without actually executing the statement.
  243. #
  244. # The easiest way to do this would be to create a prepared statement
  245. # using `sql` and then inspect the result type of the prepared
  246. # statement. Unfortunately the Python DB-API and the underlying psycopg2
  247. # driver do not support preparing statements.
  248. #
  249. # So instead we create a temporary view based on the SQL statement and
  250. # inspect the types of the columns in the view using `mz_columns`.
  251. #
  252. # Note that the upstream PostgreSQL adapter takes a different approach.
  253. # It executes `SELECT * FROM (<query>) WHERE FALSE LIMIT 0`, which in
  254. # PostgreSQL is optimized to a no-op that returns no rows, and then
  255. # inspects the description of the returned cursor. That doesn't work
  256. # well in Materialize, because `SELECT ... LIMIT 0` requires a cluster
  257. # (even though it returns no rows), and we're not guaranteed that the
  258. # connection has a valid cluster.
  259. # Determine the name to use for the view. Unfortunately Materialize
  260. # comingles data about temporary views from all sessions in the system
  261. # catalog, so we need to manually include the connection ID in the name
  262. # to be able to identify the catalog entries for the temporary views
  263. # created by this specific connection.
  264. connection_id = self.connections.get_thread_connection().handle.info.backend_pid
  265. view_name = f"__dbt_sbq{connection_id}"
  266. # NOTE(morsapaes): because we need to consider the existence of a
  267. # sql_header configuration (see dbt-core #7714) but don't support
  268. # running the header SQL in the same transaction as the model SQL
  269. # statement, we split the input based on the string appended to the
  270. # header in materialize__get_empty_subquery_sql.
  271. sql_header, sql_view_def = sql.split("--#dbt_sbq_parse_header#--")
  272. if sql_header:
  273. self.connections.execute(sql_header)
  274. self.connections.execute(f"create temporary view {view_name} as {sql_view_def}")
  275. # Fetch the names and types of each column in the view. Schema ID 0
  276. # indicates the temporary schema.
  277. _, table = self.connections.execute(
  278. f"""select c.name, c.type_oid
  279. from mz_columns c
  280. join mz_relations r ON c.id = r.id
  281. where r.schema_id = '0' AND r.name = '{view_name}'""",
  282. fetch=True,
  283. )
  284. columns = [
  285. self.Column.create(
  286. column_name, self.connections.data_type_code_to_name(column_type_code)
  287. )
  288. # https://peps.python.org/pep-0249/#description
  289. for column_name, column_type_code in table.rows
  290. ]
  291. # Drop the temporary view, so that we can reuse its name if this
  292. # function is called again on the same thread. It's okay if we leak this
  293. # view (e.g., if an error above causes us to return early), as temporary
  294. # views are automatically dropped by Materialize when the connection
  295. # terminates.
  296. self.connections.execute(f"drop view {view_name}")
  297. return columns
  298. @available
  299. def generate_final_cluster_name(
  300. self, cluster_name: str, force_deploy_suffix: bool = False
  301. ) -> Optional[str]:
  302. cluster_name = self.execute_macro(
  303. "generate_cluster_name",
  304. kwargs={"custom_cluster_name": cluster_name},
  305. )
  306. if (
  307. self.connections.profile.cli_vars.get("deploy", False)
  308. or force_deploy_suffix
  309. ):
  310. cluster_name = self.execute_macro(
  311. "generate_deploy_cluster_name",
  312. kwargs={"custom_cluster_name": cluster_name},
  313. )
  314. return cluster_name