123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- # Copyright 2020 Josh Wills. All rights reserved.
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License in the LICENSE file at the
- # root of this repository, or online at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from dataclasses import dataclass
- from typing import Optional
- import dbt_common.exceptions
- import psycopg2
- from dbt_common.semver import versions_compatible
- from dbt.adapters.events.logging import AdapterLogger
- from dbt.adapters.postgres import PostgresConnectionManager, PostgresCredentials
- from .__version__ import version as __version__
- # If you bump this version, bump it in README.md too.
- SUPPORTED_MATERIALIZE_VERSIONS = ">=0.68.0"
- logger = AdapterLogger("Materialize")
- # Override the psycopg2 connect function in order to inject Materialize-specific
- # session parameter defaults.
- #
- # This approach is a bit hacky, but some of these session parameters *must* be
- # set as part of connection initiation, so we can't simply run `SET` commands
- # after the session is established.
- def connect(**kwargs):
- options = [
- # Ensure that dbt's catalog queries get routed to the
- # `mz_catalog_server` cluster, even if the server or role's default is
- # different.
- "--auto_route_catalog_queries=on",
- # dbt prints notices to stdout, which is very distracting because dbt
- # can establish many new connections during `dbt run`.
- "--welcome_message=off",
- # Disable warnings about the session's default database or cluster not
- # existing, as these get quite spammy, especially with multiple threads.
- #
- # Details: it's common for the default cluster for the role dbt is
- # connecting as (often `quickstart`) to be absent. For many dbt
- # deployments, clusters are explicitly specified on a model-by-model
- # basis, and there in fact is no natural "default" cluster. So warning
- # repeatedly that the default cluster doesn't exist isn't helpful, since
- # each DDL statement will specify a different, valid cluster. If a DDL
- # statement ever specifies an invalid cluster, dbt will still produce an
- # error about the invalid cluster, even with this setting enabled.
- "--current_object_missing_warnings=off",
- *(kwargs.get("options") or []),
- ]
- kwargs["options"] = " ".join(options)
- return _connect(**kwargs)
- _connect = psycopg2.connect
- psycopg2.connect = connect
- @dataclass
- class MaterializeCredentials(PostgresCredentials):
- # NOTE(morsapaes) The cluster parameter defined in `profiles.yml` is not
- # picked up in the connection string, but is picked up wherever we fall
- # back to `target.cluster`. When no cluster is specified, either in
- # `profiles.yml` or as a configuration, we should default to the default
- # cluster configured for the connected dbt user (or, the active cluster for
- # the connection). This is strictly better than hardcoding `quickstart` as
- # the `target.cluster`, which might not exist and leads to all sorts of
- # annoying errors. This will still fail if the defalt cluster for the
- # connected user is invalid or set to `mz_catalog_server` (which cannot be
- # modified).
- cluster: Optional[str] = None
- application_name: Optional[str] = f"dbt-materialize v{__version__}"
- @property
- def type(self):
- return "materialize"
- def _connection_keys(self):
- return (
- "host",
- "port",
- "user",
- "database",
- "schema",
- "cluster",
- "sslmode",
- "keepalives_idle",
- "connect_timeout",
- "search_path",
- "retries",
- "application_name",
- )
- class MaterializeConnectionManager(PostgresConnectionManager):
- TYPE = "materialize"
- @classmethod
- def open(cls, connection):
- connection = super().open(connection)
- # Prevents psycopg connection from automatically opening transactions.
- # More info: https://www.psycopg.org/docs/usage.html#transactions-control
- connection.handle.autocommit = True
- mz_version = connection.handle.info.parameter_status(
- "mz_version"
- ) # e.g. v0.79.0-dev (937dfde5e)
- mz_version = mz_version.split()[0] # e.g. v0.79.0-dev
- mz_version = mz_version[1:] # e.g. 0.79.0-dev
- if not versions_compatible(mz_version, SUPPORTED_MATERIALIZE_VERSIONS):
- raise dbt_common.exceptions.DbtRuntimeError(
- f"Detected unsupported Materialize version {mz_version}\n"
- f" Supported versions: {SUPPORTED_MATERIALIZE_VERSIONS}"
- )
- return connection
- def cancel(self, connection):
- # The PostgreSQL implementation calls `pg_terminate_backend` from a new
- # connection to terminate `connection`. At the time of writing,
- # Materialize doesn't support `pg_terminate_backend`, so we implement
- # cancellation by calling `close` on the connection.
- #
- # NOTE(benesch): I'm not entirely sure why the PostgreSQL implementation
- # uses `pg_terminate_backend`. I suspect that disconnecting the network
- # connection by calling `connection.handle.close()` is not immediately
- # noticed by the PostgreSQL server, and so the queries running on that
- # connection may continue executing to completion. Materialize, however,
- # will quickly notice if the network socket disconnects and cancel any
- # queries that were initiated by that connection.
- connection_name = connection.name
- try:
- logger.debug("Closing connection '{}' to force cancellation")
- connection.handle.close()
- except psycopg2.InterfaceError as exc:
- # if the connection is already closed, not much to cancel!
- if "already closed" in str(exc):
- logger.debug(f"Connection {connection_name} was already closed")
- return
- # probably bad, re-raise it
- raise
- # Disable transactions. Materialize transactions do not support arbitrary
- # queries in transactions and therefore many of dbt's internal macros
- # produce invalid transactions.
- #
- # Disabling transactions has precedent in dbt-snowflake and dbt-biquery.
- # See, for example:
- # https://github.com/dbt-labs/dbt-snowflake/blob/ffbb05391/dbt/adapters/snowflake/connections.py#L359-L374
- def add_begin_query(self, *args, **kwargs):
- pass
- def add_commit_query(self, *args, **kwargs):
- pass
- def begin(self):
- pass
- def commit(self):
- pass
- def clear_transaction(self):
- pass
|