k8s_service.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import logging
  10. from typing import Any
  11. import pg8000
  12. import requests
  13. import sqlparse
  14. from kubernetes.client import V1Service
  15. from pg8000 import Connection, Cursor
  16. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  17. LOGGER = logging.getLogger(__name__)
  18. class K8sService(K8sResource):
  19. service: V1Service
  20. def kind(self) -> str:
  21. return "service"
  22. def create(self) -> None:
  23. core_v1_api = self.api()
  24. core_v1_api.create_namespaced_service(
  25. body=self.service, namespace=self.namespace()
  26. )
  27. def node_port(self, name: str | None = None) -> int:
  28. assert self.service and self.service.metadata and self.service.metadata.name
  29. service = self.api().read_namespaced_service(
  30. self.service.metadata.name, self.namespace()
  31. )
  32. assert service is not None
  33. spec = service.spec
  34. assert spec is not None
  35. ports = spec.ports
  36. assert ports is not None and len(ports) > 0
  37. port = next(p for p in ports if name is None or p.name == name)
  38. node_port = port.node_port
  39. assert node_port is not None
  40. return node_port
  41. def sql_conn(
  42. self,
  43. port: str | None = None,
  44. user: str = "materialize",
  45. ) -> Connection:
  46. """Get a connection to run SQL queries against the service"""
  47. return pg8000.connect(
  48. host="localhost",
  49. port=self.node_port(name=port),
  50. user=user,
  51. )
  52. def sql_cursor(
  53. self,
  54. port: str | None = None,
  55. user: str = "materialize",
  56. autocommit: bool = True,
  57. ) -> Cursor:
  58. """Get a cursor to run SQL queries against the service"""
  59. conn = self.sql_conn(port=port, user=user)
  60. conn.autocommit = autocommit
  61. return conn.cursor()
  62. def sql(
  63. self,
  64. sql: str,
  65. port: str | None = None,
  66. user: str = "materialize",
  67. ) -> None:
  68. """Run a batch of SQL statements against the service."""
  69. with self.sql_cursor(port=port, user=user) as cursor:
  70. for statement in sqlparse.split(sql):
  71. LOGGER.info(f"> {statement}")
  72. cursor.execute(statement)
  73. def sql_query(
  74. self,
  75. sql: str,
  76. port: str | None = None,
  77. user: str = "materialize",
  78. ) -> Any:
  79. """Execute a SQL query against the service and return results."""
  80. with self.sql_cursor(port=port, user=user) as cursor:
  81. LOGGER.info(f"> {sql}")
  82. cursor.execute(sql)
  83. return cursor.fetchall()
  84. def http_get(self, path: str) -> Any:
  85. url = f"http://localhost:{self.node_port('internalhttp')}/{path}"
  86. response = requests.get(url)
  87. response.raise_for_status()
  88. return response.text