k8s_resource.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import os
  10. import subprocess
  11. from kubernetes.client import AppsV1Api, CoreV1Api, RbacAuthorizationV1Api
  12. from kubernetes.config import new_client_from_config # type: ignore
  13. from materialize import MZ_ROOT, mzbuild, ui
  14. from materialize.cloudtest import DEFAULT_K8S_CONTEXT_NAME
  15. from materialize.cloudtest.util.common import run_process_with_error_information
  16. from materialize.cloudtest.util.wait import wait
  17. from materialize.rustc_flags import Sanitizer
  18. class K8sResource:
  19. def __init__(self, namespace: str):
  20. self.selected_namespace = namespace
  21. def kubectl(
  22. self,
  23. *args: str,
  24. input: str | None = None,
  25. capture_output: bool = False,
  26. suppress_command_error_output: bool = False,
  27. ) -> None:
  28. cmd = [
  29. "kubectl",
  30. "--context",
  31. self.context(),
  32. "--namespace",
  33. self.namespace(),
  34. *args,
  35. ]
  36. if suppress_command_error_output:
  37. subprocess.run(
  38. cmd, text=True, input=input, check=True, capture_output=capture_output
  39. )
  40. else:
  41. run_process_with_error_information(
  42. cmd, input, capture_output=capture_output
  43. )
  44. def api(self) -> CoreV1Api:
  45. api_client = new_client_from_config(context=self.context())
  46. return CoreV1Api(api_client)
  47. def apps_api(self) -> AppsV1Api:
  48. api_client = new_client_from_config(context=self.context())
  49. return AppsV1Api(api_client)
  50. def rbac_api(self) -> RbacAuthorizationV1Api:
  51. api_client = new_client_from_config(context=self.context())
  52. return RbacAuthorizationV1Api(api_client)
  53. def context(self) -> str:
  54. return DEFAULT_K8S_CONTEXT_NAME
  55. def namespace(self) -> str:
  56. return self.selected_namespace
  57. def kind(self) -> str:
  58. raise NotImplementedError
  59. def create(self) -> None:
  60. raise NotImplementedError
  61. def image(
  62. self,
  63. service: str,
  64. tag: str | None = None,
  65. release_mode: bool = True,
  66. org: str | None = "materialize",
  67. ) -> str:
  68. if tag is not None:
  69. image_name = f"{service}:{tag}"
  70. if org is not None:
  71. image_name = f"{org}/{image_name}"
  72. return image_name
  73. else:
  74. coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
  75. sanitizer = Sanitizer[os.getenv("CI_SANITIZER", "none")]
  76. bazel = ui.env_is_truthy("CI_BAZEL_BUILD")
  77. bazel_remote_cache = os.getenv("CI_BAZEL_REMOTE_CACHE")
  78. bazel_lto = ui.env_is_truthy("CI_BAZEL_LTO")
  79. repo = mzbuild.Repository(
  80. MZ_ROOT,
  81. profile=(
  82. mzbuild.Profile.RELEASE if release_mode else mzbuild.Profile.DEV
  83. ),
  84. coverage=coverage,
  85. sanitizer=sanitizer,
  86. bazel=bazel,
  87. bazel_remote_cache=bazel_remote_cache,
  88. bazel_lto=bazel_lto,
  89. )
  90. deps = repo.resolve_dependencies([repo.images[service]])
  91. rimage = deps[service]
  92. return rimage.spec()
  93. def wait(
  94. self,
  95. condition: str,
  96. resource: str,
  97. ) -> None:
  98. wait(condition=condition, resource=resource, namespace=self.selected_namespace)