ui.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Utilities for interacting with humans."""
  10. import asyncio
  11. import datetime
  12. import os
  13. import shlex
  14. import sys
  15. import time
  16. from collections.abc import AsyncGenerator, Callable, Generator, Iterable
  17. from contextlib import contextmanager
  18. from typing import Any
  19. from colored import attr, fg
  20. class Verbosity:
  21. """How noisy logs should be"""
  22. quiet: bool = False
  23. @classmethod
  24. def init_from_env(cls, explicit: bool | None) -> None:
  25. """Set to quiet based on MZ_QUIET being set to almost any value
  26. The only values that this gets set to false for are the empty string, 0, or no
  27. """
  28. cls.quiet = env_is_truthy("MZ_QUIET")
  29. if explicit is not None:
  30. cls.quiet = explicit
  31. def speaker(prefix: str) -> Callable[..., None]:
  32. """Create a function that will log with a prefix to stderr.
  33. Obeys `Verbosity.quiet`. Note that you must include any necessary
  34. spacing after the prefix.
  35. Example::
  36. >>> say = speaker("mz> ")
  37. >>> say("hello") # doctest: +SKIP
  38. mz> hello
  39. """
  40. def say(msg: str) -> None:
  41. if not Verbosity.quiet:
  42. print(f"{prefix}{msg}", file=sys.stderr)
  43. return say
  44. header = speaker("==> ")
  45. section = speaker("--- ")
  46. say = speaker("")
  47. def warn(message: str) -> None:
  48. """Emits a warning message to stderr."""
  49. print(f"{fg('yellow')}warning:{attr('reset')} {message}")
  50. def confirm(question: str) -> bool:
  51. """Render a question, returning True if the user says y or yes"""
  52. response = input(f"{question} [y/N]")
  53. return response.lower() in ("y", "yes")
  54. def progress(msg: str = "", prefix: str | None = None, *, finish: bool = False) -> None:
  55. """Print a progress message to stderr, using the same prefix format as speaker"""
  56. if prefix is not None:
  57. msg = f"{prefix}> {msg}"
  58. end = "" if not finish else "\n"
  59. print(msg, file=sys.stderr, flush=True, end=end)
  60. def timeout_loop(timeout: int, tick: float = 1.0) -> Generator[float, None, None]:
  61. """Loop until timeout, optionally sleeping until tick
  62. Always iterates at least once
  63. Args:
  64. timeout: maximum. number of seconds to wait
  65. tick: how long to ensure passes between loop iterations. Default: 1
  66. """
  67. end = time.monotonic() + timeout
  68. while True:
  69. before = time.monotonic()
  70. yield end - before
  71. after = time.monotonic()
  72. if after >= end:
  73. return
  74. if after - before < tick:
  75. if tick > 0:
  76. time.sleep(tick - (after - before))
  77. async def async_timeout_loop(
  78. timeout: int, tick: float = 1.0
  79. ) -> AsyncGenerator[float, None]:
  80. """Loop until timeout, asynchronously sleeping until tick
  81. Always iterates at least once
  82. Args:
  83. timeout: maximum. number of seconds to wait
  84. tick: how long to ensure passes between loop iterations. Default: 1
  85. """
  86. end = time.monotonic() + timeout
  87. while True:
  88. before = time.monotonic()
  89. yield end - before
  90. after = time.monotonic()
  91. if after >= end:
  92. return
  93. if after - before < tick:
  94. if tick > 0:
  95. await asyncio.sleep(tick - (after - before))
  96. def log_in_automation(msg: str) -> None:
  97. """Log to a file, if we're running in automation"""
  98. if env_is_truthy("MZ_IN_AUTOMATION"):
  99. with open("/tmp/mzcompose.log", "a") as fh:
  100. now = datetime.datetime.now().isoformat()
  101. print(f"[{now}] {msg}", file=fh)
  102. def shell_quote(args: Iterable[Any]) -> str:
  103. """Return shell-escaped string of all the parameters
  104. ::
  105. >>> shell_quote(["one", "two three"])
  106. "one 'two three'"
  107. """
  108. return " ".join(shlex.quote(str(arg)) for arg in args)
  109. def env_is_truthy(env_var: str, default: str = "0") -> bool:
  110. """Return true if `env_var` is set and is not one of: 0, '', no, false"""
  111. env = os.getenv(env_var, default)
  112. if env is not None:
  113. return env not in ("", "0", "no", "false")
  114. return False
  115. class UIError(Exception):
  116. """An error intended for display to humans.
  117. Use this exception type when the error is something the user can be expected
  118. to handle. If the error indicates a truly unexpected condition (i.e., a
  119. programming error), use a different exception type that will produce a
  120. backtrace instead.
  121. Attributes:
  122. hint: An optional hint to display alongside the error message.
  123. """
  124. def __init__(self, message: str, hint: str | None = None):
  125. super().__init__(message)
  126. self.hint = hint
  127. def set_hint(self, hint: str) -> None:
  128. """Attaches a hint to the error.
  129. This method will overwrite the existing hint, if any.
  130. """
  131. self.hint = hint
  132. class CommandFailureCausedUIError(UIError):
  133. """
  134. An UIError that is caused by executing a command.
  135. """
  136. def __init__(
  137. self,
  138. message: str,
  139. cmd: list[str],
  140. stdout: str | None = None,
  141. stderr: str | None = None,
  142. hint: str | None = None,
  143. ):
  144. super().__init__(message, hint)
  145. self.cmd = cmd
  146. self.stdout = stdout
  147. self.stderr = stderr
  148. @contextmanager
  149. def error_handler(prog: str) -> Any:
  150. """Catches and pretty-prints any raised `UIError`s.
  151. Args:
  152. prog: The name of the program with which to prefix the error message.
  153. """
  154. try:
  155. yield
  156. except UIError as e:
  157. print(f"{prog}: {fg('red')}error:{attr('reset')} {e}", file=sys.stderr)
  158. if e.hint:
  159. print(f"{attr('bold')}hint:{attr('reset')} {e.hint}")
  160. sys.exit(1)
  161. except KeyboardInterrupt:
  162. sys.exit(1)