k8s_stateful_set.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import logging
  10. from kubernetes.client import V1StatefulSet
  11. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  12. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  13. LOGGER = logging.getLogger(__name__)
  14. class K8sStatefulSet(K8sResource):
  15. stateful_set: V1StatefulSet
  16. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE):
  17. super().__init__(namespace)
  18. self.stateful_set = self.generate_stateful_set()
  19. def generate_stateful_set(self) -> V1StatefulSet:
  20. raise NotImplementedError
  21. def kind(self) -> str:
  22. return "statefulset"
  23. def name(self) -> str:
  24. assert self.stateful_set.metadata is not None
  25. assert self.stateful_set.metadata.name is not None
  26. return self.stateful_set.metadata.name
  27. def create(self) -> None:
  28. apps_v1_api = self.apps_api()
  29. apps_v1_api.create_namespaced_stateful_set(
  30. body=self.stateful_set, namespace=self.namespace()
  31. )
  32. def replace(self) -> None:
  33. apps_v1_api = self.apps_api()
  34. name = self.name()
  35. LOGGER.info(f"Replacing stateful set {name}...")
  36. self.stateful_set = self.generate_stateful_set()
  37. apps_v1_api.replace_namespaced_stateful_set(
  38. name=name, body=self.stateful_set, namespace=self.namespace()
  39. )
  40. # Despite the name "status" this kubectl command will actually wait
  41. # until the rollout is complete.
  42. # See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
  43. self.kubectl("rollout", "status", f"statefulset/{name}")