wait.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import logging
  10. import subprocess
  11. from materialize import ui
  12. from materialize.cloudtest import DEFAULT_K8S_CONTEXT_NAME
  13. from materialize.cloudtest.util.print_pods import print_pods
  14. from materialize.ui import UIError
  15. LOGGER = logging.getLogger(__name__)
  16. def wait(
  17. condition: str,
  18. resource: str,
  19. timeout_secs: int = 300,
  20. context: str = DEFAULT_K8S_CONTEXT_NAME,
  21. *,
  22. label: str | None = None,
  23. namespace: str | None = None,
  24. server: str | None = None,
  25. ) -> None:
  26. cmd = [
  27. "kubectl",
  28. "wait",
  29. "--for",
  30. condition,
  31. resource,
  32. "--timeout",
  33. f"{timeout_secs}s",
  34. "--context",
  35. context,
  36. ]
  37. if label is not None:
  38. cmd.extend(["--selector", label])
  39. if namespace is not None:
  40. cmd.extend(["--namespace", namespace])
  41. if server is not None:
  42. cmd.extend(["--server", server])
  43. ui.progress(f'waiting for {" ".join(cmd)} ... ')
  44. error = None
  45. for remaining in ui.timeout_loop(timeout_secs, tick=0.1):
  46. try:
  47. output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode(
  48. "ascii"
  49. )
  50. # output is:
  51. # - an empty string when a 'delete' condition is satisfied
  52. # - 'condition met' for all other conditions
  53. if len(output) == 0 or "condition met" in output:
  54. ui.progress("success!", finish=True)
  55. return
  56. except subprocess.CalledProcessError as e:
  57. # use a less verbose output than log_subprocess_error here
  58. LOGGER.info(f"{e} {e.output.decode('ascii')}")
  59. error = e
  60. ui.progress(finish=True)
  61. print_pods()
  62. raise UIError(f"kubectl wait never returned 'condition met': {error}")