k8s_cluster_role.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from kubernetes.client import V1ClusterRole
  10. from kubernetes.client.exceptions import ApiException
  11. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  12. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  13. class K8sClusterRole(K8sResource):
  14. role: V1ClusterRole
  15. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE):
  16. super().__init__(namespace)
  17. def kind(self) -> str:
  18. return "clusterrole"
  19. def create(self) -> None:
  20. rbac_api = self.rbac_api()
  21. # kubectl delete all -all does not clean up role bindings
  22. try:
  23. assert self.role.metadata is not None
  24. assert self.role.metadata.name is not None
  25. rbac_api.delete_cluster_role(name=self.role.metadata.name)
  26. except ApiException:
  27. pass
  28. rbac_api.create_cluster_role(
  29. body=self.role,
  30. )