endpoint.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from typing import Any
  10. import psycopg
  11. class Endpoint:
  12. _version: str | None = None
  13. def __init__(self, specified_target: str):
  14. self._specified_target = specified_target
  15. def sql_connection(
  16. self, quiet: bool = False
  17. ) -> psycopg.connection.Connection[tuple[Any, ...]]:
  18. if not quiet:
  19. print(f"Connecting to URL: {self.url()}")
  20. conn = psycopg.connect(self.url())
  21. conn.autocommit = True
  22. return conn
  23. def url(self) -> str:
  24. return (
  25. f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}"
  26. )
  27. def specified_target(self) -> str:
  28. return self._specified_target
  29. def resolved_target(self) -> str:
  30. return self.specified_target()
  31. def host(self) -> str:
  32. raise NotImplementedError
  33. def user(self) -> str:
  34. raise NotImplementedError
  35. def password(self) -> str:
  36. raise NotImplementedError
  37. def port(self) -> int:
  38. raise NotImplementedError
  39. def up(self) -> None:
  40. raise NotImplementedError
  41. def sql(self, sql: str) -> None:
  42. conn = self.sql_connection()
  43. cursor = conn.cursor()
  44. cursor.execute(sql.encode("utf8"))
  45. def try_load_version(self) -> str:
  46. """
  47. Tries to load the version from the database or returns 'unknown' otherwise.
  48. This first invocation requires the endpoint to be up; subsequent invocations will use the cached information.
  49. """
  50. if self._version is not None:
  51. return self._version
  52. try:
  53. cursor = self.sql_connection().cursor()
  54. cursor.execute(b"SELECT mz_version()")
  55. row = cursor.fetchone()
  56. assert row is not None
  57. self._version = str(row[0])
  58. return self._version
  59. except:
  60. return "unknown"