k8s_role_binding.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from kubernetes.client import V1RoleBinding
  10. from kubernetes.client.exceptions import ApiException
  11. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  12. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  13. class K8sRoleBinding(K8sResource):
  14. role_binding: V1RoleBinding
  15. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE):
  16. super().__init__(namespace)
  17. def kind(self) -> str:
  18. return "rolebinding"
  19. def create(self) -> None:
  20. rbac_api = self.rbac_api()
  21. # kubectl delete all -all does not clean up role bindings
  22. try:
  23. assert self.role_binding.metadata is not None
  24. assert self.role_binding.metadata.name is not None
  25. rbac_api.delete_namespaced_role_binding(
  26. name=self.role_binding.metadata.name, namespace=self.namespace()
  27. )
  28. except ApiException:
  29. pass
  30. rbac_api.create_namespaced_role_binding(
  31. body=self.role_binding,
  32. namespace=self.namespace(),
  33. )