connections.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # Copyright 2020 Josh Wills. All rights reserved.
  2. # Copyright Materialize, Inc. and contributors. All rights reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License in the LICENSE file at the
  7. # root of this repository, or online at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. from dataclasses import dataclass
  17. from typing import Optional
  18. import dbt_common.exceptions
  19. import psycopg2
  20. from dbt_common.semver import versions_compatible
  21. from dbt.adapters.events.logging import AdapterLogger
  22. from dbt.adapters.postgres import PostgresConnectionManager, PostgresCredentials
  23. from .__version__ import version as __version__
  24. # If you bump this version, bump it in README.md too.
  25. SUPPORTED_MATERIALIZE_VERSIONS = ">=0.68.0"
  26. logger = AdapterLogger("Materialize")
  27. # Override the psycopg2 connect function in order to inject Materialize-specific
  28. # session parameter defaults.
  29. #
  30. # This approach is a bit hacky, but some of these session parameters *must* be
  31. # set as part of connection initiation, so we can't simply run `SET` commands
  32. # after the session is established.
  33. def connect(**kwargs):
  34. options = [
  35. # Ensure that dbt's catalog queries get routed to the
  36. # `mz_catalog_server` cluster, even if the server or role's default is
  37. # different.
  38. "--auto_route_catalog_queries=on",
  39. # dbt prints notices to stdout, which is very distracting because dbt
  40. # can establish many new connections during `dbt run`.
  41. "--welcome_message=off",
  42. # Disable warnings about the session's default database or cluster not
  43. # existing, as these get quite spammy, especially with multiple threads.
  44. #
  45. # Details: it's common for the default cluster for the role dbt is
  46. # connecting as (often `quickstart`) to be absent. For many dbt
  47. # deployments, clusters are explicitly specified on a model-by-model
  48. # basis, and there in fact is no natural "default" cluster. So warning
  49. # repeatedly that the default cluster doesn't exist isn't helpful, since
  50. # each DDL statement will specify a different, valid cluster. If a DDL
  51. # statement ever specifies an invalid cluster, dbt will still produce an
  52. # error about the invalid cluster, even with this setting enabled.
  53. "--current_object_missing_warnings=off",
  54. *(kwargs.get("options") or []),
  55. ]
  56. kwargs["options"] = " ".join(options)
  57. return _connect(**kwargs)
  58. _connect = psycopg2.connect
  59. psycopg2.connect = connect
  60. @dataclass
  61. class MaterializeCredentials(PostgresCredentials):
  62. # NOTE(morsapaes) The cluster parameter defined in `profiles.yml` is not
  63. # picked up in the connection string, but is picked up wherever we fall
  64. # back to `target.cluster`. When no cluster is specified, either in
  65. # `profiles.yml` or as a configuration, we should default to the default
  66. # cluster configured for the connected dbt user (or, the active cluster for
  67. # the connection). This is strictly better than hardcoding `quickstart` as
  68. # the `target.cluster`, which might not exist and leads to all sorts of
  69. # annoying errors. This will still fail if the defalt cluster for the
  70. # connected user is invalid or set to `mz_catalog_server` (which cannot be
  71. # modified).
  72. cluster: Optional[str] = None
  73. application_name: Optional[str] = f"dbt-materialize v{__version__}"
  74. @property
  75. def type(self):
  76. return "materialize"
  77. def _connection_keys(self):
  78. return (
  79. "host",
  80. "port",
  81. "user",
  82. "database",
  83. "schema",
  84. "cluster",
  85. "sslmode",
  86. "keepalives_idle",
  87. "connect_timeout",
  88. "search_path",
  89. "retries",
  90. "application_name",
  91. )
  92. class MaterializeConnectionManager(PostgresConnectionManager):
  93. TYPE = "materialize"
  94. @classmethod
  95. def open(cls, connection):
  96. connection = super().open(connection)
  97. # Prevents psycopg connection from automatically opening transactions.
  98. # More info: https://www.psycopg.org/docs/usage.html#transactions-control
  99. connection.handle.autocommit = True
  100. mz_version = connection.handle.info.parameter_status(
  101. "mz_version"
  102. ) # e.g. v0.79.0-dev (937dfde5e)
  103. mz_version = mz_version.split()[0] # e.g. v0.79.0-dev
  104. mz_version = mz_version[1:] # e.g. 0.79.0-dev
  105. if not versions_compatible(mz_version, SUPPORTED_MATERIALIZE_VERSIONS):
  106. raise dbt_common.exceptions.DbtRuntimeError(
  107. f"Detected unsupported Materialize version {mz_version}\n"
  108. f" Supported versions: {SUPPORTED_MATERIALIZE_VERSIONS}"
  109. )
  110. return connection
  111. def cancel(self, connection):
  112. # The PostgreSQL implementation calls `pg_terminate_backend` from a new
  113. # connection to terminate `connection`. At the time of writing,
  114. # Materialize doesn't support `pg_terminate_backend`, so we implement
  115. # cancellation by calling `close` on the connection.
  116. #
  117. # NOTE(benesch): I'm not entirely sure why the PostgreSQL implementation
  118. # uses `pg_terminate_backend`. I suspect that disconnecting the network
  119. # connection by calling `connection.handle.close()` is not immediately
  120. # noticed by the PostgreSQL server, and so the queries running on that
  121. # connection may continue executing to completion. Materialize, however,
  122. # will quickly notice if the network socket disconnects and cancel any
  123. # queries that were initiated by that connection.
  124. connection_name = connection.name
  125. try:
  126. logger.debug("Closing connection '{}' to force cancellation")
  127. connection.handle.close()
  128. except psycopg2.InterfaceError as exc:
  129. # if the connection is already closed, not much to cancel!
  130. if "already closed" in str(exc):
  131. logger.debug(f"Connection {connection_name} was already closed")
  132. return
  133. # probably bad, re-raise it
  134. raise
  135. # Disable transactions. Materialize transactions do not support arbitrary
  136. # queries in transactions and therefore many of dbt's internal macros
  137. # produce invalid transactions.
  138. #
  139. # Disabling transactions has precedent in dbt-snowflake and dbt-biquery.
  140. # See, for example:
  141. # https://github.com/dbt-labs/dbt-snowflake/blob/ffbb05391/dbt/adapters/snowflake/connections.py#L359-L374
  142. def add_begin_query(self, *args, **kwargs):
  143. pass
  144. def add_commit_query(self, *args, **kwargs):
  145. pass
  146. def begin(self):
  147. pass
  148. def commit(self):
  149. pass
  150. def clear_transaction(self):
  151. pass