endpoints.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import psycopg
  10. from materialize import git
  11. from materialize.mzcompose.composition import Composition
  12. from materialize.mzcompose.services.materialized import Materialized
  13. from materialize.mzcompose.services.postgres import Postgres
  14. from materialize.scalability.endpoint.endpoint import Endpoint
  15. POSTGRES_ENDPOINT_NAME = "postgres"
  16. TARGET_MATERIALIZE_LOCAL = "local"
  17. TARGET_MATERIALIZE_REMOTE = "remote"
  18. TARGET_POSTGRES = "postgres"
  19. TARGET_HEAD = "HEAD"
  20. class MaterializeRemote(Endpoint):
  21. """Connect to a remote Materialize instance using a psql URL"""
  22. def __init__(self, materialize_url: str) -> None:
  23. super().__init__(specified_target=TARGET_MATERIALIZE_REMOTE)
  24. self.materialize_url = materialize_url
  25. def url(self) -> str:
  26. return self.materialize_url
  27. def up(self) -> None:
  28. pass
  29. def __str__(self) -> str:
  30. return f"MaterializeRemote ({self.materialize_url})"
  31. class PostgresContainer(Endpoint):
  32. def __init__(self, composition: Composition) -> None:
  33. self.composition = composition
  34. self._port: int | None = None
  35. super().__init__(specified_target=TARGET_POSTGRES)
  36. def host(self) -> str:
  37. return "localhost"
  38. def port(self) -> int:
  39. assert self._port is not None
  40. return self._port
  41. def user(self) -> str:
  42. return "postgres"
  43. def password(self) -> str:
  44. return "postgres"
  45. def up(self) -> None:
  46. self.composition.down(destroy_volumes=True)
  47. with self.composition.override(Postgres()):
  48. self.composition.up("postgres")
  49. self._port = self.composition.default_port("postgres")
  50. def try_load_version(self) -> str:
  51. return POSTGRES_ENDPOINT_NAME
  52. def __str__(self) -> str:
  53. return "PostgresContainer"
  54. class MaterializeNonRemote(Endpoint):
  55. def __init__(self, specified_target: str):
  56. super().__init__(specified_target)
  57. def host(self) -> str:
  58. return "localhost"
  59. def internal_host(self) -> str:
  60. return "localhost"
  61. def user(self) -> str:
  62. return "materialize"
  63. def password(self) -> str:
  64. return "materialize"
  65. def internal_port(self) -> int:
  66. raise NotImplementedError
  67. def lift_limits(self) -> None:
  68. priv_conn = psycopg.connect(
  69. host=self.internal_host(), user="mz_system", port=self.internal_port()
  70. )
  71. priv_conn.autocommit = True
  72. priv_cursor = priv_conn.cursor()
  73. priv_cursor.execute("ALTER SYSTEM SET max_connections = 65535;")
  74. priv_cursor.execute("ALTER SYSTEM SET max_tables = 65535;")
  75. priv_cursor.execute("ALTER SYSTEM SET max_materialized_views = 65535;")
  76. class MaterializeLocal(MaterializeNonRemote):
  77. """Connect to a Materialize instance running on the local host"""
  78. def __init__(self) -> None:
  79. super().__init__(specified_target=TARGET_MATERIALIZE_LOCAL)
  80. def port(self) -> int:
  81. return 6875
  82. def internal_port(self) -> int:
  83. return 6877
  84. def up(self) -> None:
  85. self.lift_limits()
  86. def __str__(self) -> str:
  87. return f"MaterializeLocal ({self.host()})"
  88. class MaterializeContainer(MaterializeNonRemote):
  89. def __init__(
  90. self,
  91. composition: Composition,
  92. specified_target: str,
  93. resolved_target: str,
  94. use_balancerd: bool,
  95. image: str | None = None,
  96. alternative_image: str | None = None,
  97. ) -> None:
  98. self.composition = composition
  99. self.image = image
  100. self.alternative_image = (
  101. alternative_image if image != alternative_image else None
  102. )
  103. self._port: int | None = None
  104. self._resolved_target = resolved_target
  105. self.use_balancerd = use_balancerd
  106. super().__init__(specified_target)
  107. def resolved_target(self) -> str:
  108. return self._resolved_target
  109. def port(self) -> int:
  110. assert self._port is not None
  111. return self._port
  112. def internal_port(self) -> int:
  113. return self.composition.port("materialized", 6877)
  114. def up(self) -> None:
  115. self.composition.down(destroy_volumes=True)
  116. print(f"Image is {self.image} (alternative: {self.alternative_image})")
  117. if self.image is not None and self.alternative_image is not None:
  118. if not self.composition.try_pull_service_image(
  119. Materialized(
  120. image=self.image,
  121. external_metadata_store=True,
  122. metadata_store="cockroach",
  123. )
  124. ):
  125. # explicitly specified image cannot be found and alternative exists
  126. print(
  127. f"Unable to find image {self.image}, proceeding with alternative image {self.alternative_image}!"
  128. )
  129. self.image = self.alternative_image
  130. else:
  131. print(f"Found image {self.image}, proceeding with this image.")
  132. self.up_internal()
  133. self.lift_limits()
  134. def up_internal(self) -> None:
  135. with self.composition.override(
  136. Materialized(
  137. image=self.image,
  138. sanity_restart=False,
  139. external_metadata_store=True,
  140. metadata_store="cockroach",
  141. )
  142. ):
  143. self.composition.up("materialized")
  144. if self.use_balancerd:
  145. self.composition.up("balancerd")
  146. self._port = self.composition.default_port("balancerd")
  147. else:
  148. self._port = self.composition.default_port("materialized")
  149. def __str__(self) -> str:
  150. return f"MaterializeContainer ({self.image} specified as {self.specified_target()})"
  151. def endpoint_name_to_description(endpoint_name: str) -> str:
  152. if endpoint_name == POSTGRES_ENDPOINT_NAME:
  153. return endpoint_name
  154. commit_sha = endpoint_name.split(" ")[1].strip("()")
  155. # empty when mz_version() reports a Git SHA that is not available in the current repository
  156. commit_message = git.get_commit_message(commit_sha) or "unknown"
  157. return f"{endpoint_name} - {commit_message}"