123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- # Copyright 2020 Josh Wills. All rights reserved.
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License in the LICENSE file at the
- # root of this repository, or online at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import subprocess
- import time
- from collections import namedtuple
- from dataclasses import dataclass
- from typing import Any, Dict, List, Optional, Set
- import dbt_common.exceptions
- from dbt_common.contracts.constraints import (
- ColumnLevelConstraint,
- ConstraintType,
- )
- from dbt_common.dataclass_schema import ValidationError, dbtClassMixin
- from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
- from dbt.adapters.base.meta import available
- from dbt.adapters.capability import (
- Capability,
- CapabilityDict,
- CapabilitySupport,
- Support,
- )
- from dbt.adapters.materialize.connections import MaterializeConnectionManager
- from dbt.adapters.materialize.exceptions import (
- RefreshIntervalConfigError,
- RefreshIntervalConfigNotDictError,
- )
- from dbt.adapters.materialize.relation import MaterializeRelation
- from dbt.adapters.postgres.column import PostgresColumn
- from dbt.adapters.postgres.impl import PostgresAdapter, SQLAdapter
- from dbt.adapters.sql.impl import LIST_RELATIONS_MACRO_NAME
- # types in ./misc/dbt-materialize need to import generic types from typing
- @dataclass
- class MaterializeIndexConfig(dbtClassMixin):
- columns: Optional[List[str]] = None
- default: Optional[bool] = False
- name: Optional[str] = None
- cluster: Optional[str] = None
- @classmethod
- def parse(cls, raw_index) -> Optional["MaterializeIndexConfig"]:
- if raw_index is None:
- return None
- try:
- cls.validate(raw_index)
- return cls.from_dict(raw_index)
- except ValidationError as exc:
- msg = dbt_common.exceptions.validator_error_message(exc)
- dbt_common.exceptions.CompilationError(
- f"Could not parse index config: {msg}"
- )
- except TypeError:
- dbt_common.exceptions.CompilationError(
- "Invalid index config:\n"
- f" Got: {raw_index}\n"
- ' Expected a dictionary with at minimum a "columns" key'
- )
- # NOTE(morsapaes): Materialize allows configuring a refresh interval for the
- # materialized view materialization. If no config option is specified, the
- # default is REFRESH ON COMMIT. We add an explicitly attribute for the special
- # case of parametrizing the configuration option e.g. in macros.
- @dataclass
- class MaterializeRefreshIntervalConfig(dbtClassMixin):
- at: Optional[str] = None
- at_creation: Optional[bool] = False
- every: Optional[str] = None
- aligned_to: Optional[str] = None
- on_commit: Optional[bool] = False
- @classmethod
- def parse(
- cls, raw_refresh_interval
- ) -> Optional["MaterializeRefreshIntervalConfig"]:
- if raw_refresh_interval is None:
- return None
- try:
- cls.validate(raw_refresh_interval)
- return cls.from_dict(raw_refresh_interval)
- except ValidationError as exc:
- raise RefreshIntervalConfigError(exc)
- except TypeError:
- raise RefreshIntervalConfigNotDictError(raw_refresh_interval)
- @dataclass
- class MaterializeConfig(AdapterConfig):
- cluster: Optional[str] = None
- refresh_interval: Optional[MaterializeRefreshIntervalConfig] = None
- retain_history: Optional[str] = None
- class MaterializeAdapter(PostgresAdapter, SQLAdapter):
- ConnectionManager = MaterializeConnectionManager
- Relation = MaterializeRelation
- AdapterSpecificConfigs = MaterializeConfig
- # NOTE(morsapaes): Materialize supports enforcing the NOT NULL constraint
- # for materialized views (via the ASSERT NOT NULL clause) and tables. As a
- # reminder, tables are modeled as static materialized views (see database-issues#1623).
- CONSTRAINT_SUPPORT = {
- ConstraintType.check: ConstraintSupport.NOT_SUPPORTED,
- ConstraintType.not_null: ConstraintSupport.ENFORCED,
- ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED,
- ConstraintType.primary_key: ConstraintSupport.NOT_SUPPORTED,
- ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
- }
- # NOTE(morsapaes): Materialize supports the functionality required to enable
- # metadata source freshness checks, but the value of this feature in a data
- # warehouse built for real-time is limited, so we do not implement it.
- _capabilities = CapabilityDict(
- {
- Capability.SchemaMetadataByRelations: CapabilitySupport(
- support=Support.NotImplemented
- ),
- Capability.TableLastModifiedMetadata: CapabilitySupport(
- support=Support.NotImplemented
- ),
- }
- )
- @classmethod
- def is_cancelable(cls) -> bool:
- return True
- def _link_cached_relations(self, manifest):
- # NOTE(benesch): this *should* reimplement the parent class's method
- # for Materialize, but presently none of our tests care if we just
- # disable this method instead.
- #
- # [0]: https://github.com/dbt-labs/dbt-core/blob/13b18654f03d92eab3f5a9113e526a2a844f145d/plugins/postgres/dbt/adapters/postgres/impl.py#L126-L133
- pass
- def _link_cached_database_relations(self, schemas: Set[str]):
- """
- :param schemas: The set of schemas that should have links added.
- """
- database = self.config.credentials.database
- _Relation = namedtuple("_Relation", "database schema identifier")
- links = [
- (
- _Relation(database, dep_schema, dep_identifier),
- _Relation(database, ref_schema, ref_identifier),
- )
- for dep_schema, dep_identifier, ref_schema, ref_identifier in self.execute_macro(
- "materialize__get_relations"
- )
- # don't record in cache if this relation isn't in a relevant schema
- if ref_schema in schemas
- ]
- for dependent, referenced in links:
- self.cache.add_link(
- referenced=self.Relation.create(**referenced._asdict()),
- dependent=self.Relation.create(**dependent._asdict()),
- )
- def verify_database(self, database):
- pass
- def _get_catalog_schemas(self, manifest):
- # Materialize allows cross-database references, so we need to adjust this method
- schemas = super(SQLAdapter, self)._get_catalog_schemas(manifest)
- return schemas
- def parse_index(self, raw_index: Any) -> Optional[MaterializeIndexConfig]:
- return MaterializeIndexConfig.parse(raw_index)
- @available
- def parse_refresh_interval(
- self, raw_refresh_interval: Any
- ) -> Optional[MaterializeRefreshIntervalConfig]:
- return MaterializeRefreshIntervalConfig.parse(raw_refresh_interval)
- def list_relations_without_caching(
- self, schema_relation: MaterializeRelation
- ) -> List[MaterializeRelation]:
- kwargs = {"schema_relation": schema_relation}
- results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
- relations = []
- quote_policy = {"database": True, "schema": True, "identifier": True}
- columns = ["database", "schema", "name", "type"]
- for _database, _schema, _identifier, _type in results.select(columns):
- try:
- _type = self.Relation.get_relation_type(_type.lower())
- except ValueError:
- _type = self.Relation.get_relation_type.View
- relations.append(
- self.Relation.create(
- database=_database,
- schema=_schema,
- identifier=_identifier,
- quote_policy=quote_policy,
- type=_type,
- )
- )
- return relations
- # NOTE(morsapaes): Materialize doesn't inline not_null constraints
- # in the DDL in the same way as other adapters, so we override the
- # default constraint rendering functions.
- @classmethod
- def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> str:
- if constraint.type == ConstraintType.not_null:
- rendered_column_constraint = None
- rendered_column_constraint = "assert not null"
- return rendered_column_constraint
- else:
- return ""
- @available
- @classmethod
- def render_raw_columns_constraints(
- cls, raw_columns: Dict[str, Dict[str, Any]]
- ) -> List:
- rendered_column_constraints = []
- for v in raw_columns.values():
- if v.get("constraints"):
- col_name = cls.quote(v["name"]) if v.get("quote") else v["name"]
- rendered_column_constraint = [col_name]
- for con in v.get("constraints", None):
- constraint = cls._parse_column_constraint(con)
- c = cls.process_parsed_constraint(
- constraint, cls.render_column_constraint
- )
- if c is not None:
- rendered_column_constraint.insert(0, c)
- rendered_column_constraints.append(" ".join(rendered_column_constraint))
- return rendered_column_constraints
- @available
- @classmethod
- def sleep(cls, seconds):
- time.sleep(seconds)
- @available
- @classmethod
- def get_git_commit_sha(cls):
- try:
- result = subprocess.run(
- ["git", "rev-parse", "HEAD"],
- capture_output=True,
- check=True,
- text=True,
- )
- return result.stdout.strip()
- except subprocess.CalledProcessError:
- return None
- def get_column_schema_from_query(self, sql: str) -> List[PostgresColumn]:
- # The idea is that this function returns the names and types of the
- # columns returned by `sql` without actually executing the statement.
- #
- # The easiest way to do this would be to create a prepared statement
- # using `sql` and then inspect the result type of the prepared
- # statement. Unfortunately the Python DB-API and the underlying psycopg2
- # driver do not support preparing statements.
- #
- # So instead we create a temporary view based on the SQL statement and
- # inspect the types of the columns in the view using `mz_columns`.
- #
- # Note that the upstream PostgreSQL adapter takes a different approach.
- # It executes `SELECT * FROM (<query>) WHERE FALSE LIMIT 0`, which in
- # PostgreSQL is optimized to a no-op that returns no rows, and then
- # inspects the description of the returned cursor. That doesn't work
- # well in Materialize, because `SELECT ... LIMIT 0` requires a cluster
- # (even though it returns no rows), and we're not guaranteed that the
- # connection has a valid cluster.
- # Determine the name to use for the view. Unfortunately Materialize
- # comingles data about temporary views from all sessions in the system
- # catalog, so we need to manually include the connection ID in the name
- # to be able to identify the catalog entries for the temporary views
- # created by this specific connection.
- connection_id = self.connections.get_thread_connection().handle.info.backend_pid
- view_name = f"__dbt_sbq{connection_id}"
- # NOTE(morsapaes): because we need to consider the existence of a
- # sql_header configuration (see dbt-core #7714) but don't support
- # running the header SQL in the same transaction as the model SQL
- # statement, we split the input based on the string appended to the
- # header in materialize__get_empty_subquery_sql.
- sql_header, sql_view_def = sql.split("--#dbt_sbq_parse_header#--")
- if sql_header:
- self.connections.execute(sql_header)
- self.connections.execute(f"create temporary view {view_name} as {sql_view_def}")
- # Fetch the names and types of each column in the view. Schema ID 0
- # indicates the temporary schema.
- _, table = self.connections.execute(
- f"""select c.name, c.type_oid
- from mz_columns c
- join mz_relations r ON c.id = r.id
- where r.schema_id = '0' AND r.name = '{view_name}'""",
- fetch=True,
- )
- columns = [
- self.Column.create(
- column_name, self.connections.data_type_code_to_name(column_type_code)
- )
- # https://peps.python.org/pep-0249/#description
- for column_name, column_type_code in table.rows
- ]
- # Drop the temporary view, so that we can reuse its name if this
- # function is called again on the same thread. It's okay if we leak this
- # view (e.g., if an error above causes us to return early), as temporary
- # views are automatically dropped by Materialize when the connection
- # terminates.
- self.connections.execute(f"drop view {view_name}")
- return columns
- @available
- def generate_final_cluster_name(
- self, cluster_name: str, force_deploy_suffix: bool = False
- ) -> Optional[str]:
- cluster_name = self.execute_macro(
- "generate_cluster_name",
- kwargs={"custom_cluster_name": cluster_name},
- )
- if (
- self.connections.profile.cli_vars.get("deploy", False)
- or force_deploy_suffix
- ):
- cluster_name = self.execute_macro(
- "generate_deploy_cluster_name",
- kwargs={"custom_cluster_name": cluster_name},
- )
- return cluster_name
|