k8s_configmap.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from kubernetes.client import V1ConfigMap
  10. from kubernetes.client.exceptions import ApiException
  11. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  12. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  13. class K8sConfigMap(K8sResource):
  14. configmap = V1ConfigMap
  15. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE):
  16. super().__init__(namespace)
  17. def kind(self) -> str:
  18. return "configmap"
  19. def create(self) -> None:
  20. core_v1_api = self.api()
  21. try:
  22. assert self.configmap.metadata is not None
  23. assert self.configmap.metadata.name is not None
  24. except ApiException:
  25. pass
  26. core_v1_api.create_namespaced_config_map(
  27. body=self.configmap, namespace=self.namespace() # type: ignore
  28. )