teleport.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import os
  10. import subprocess
  11. import threading
  12. import time
  13. from textwrap import dedent
  14. import psutil
  15. from materialize import build_config, ui
  16. class TeleportProxy:
  17. @classmethod
  18. def spawn(cls, app_name: str, port: str):
  19. """Spawn a Teleport proxy for the provided app_name."""
  20. teleport_state = build_config.TeleportLocalState.read()
  21. # If there is already a Teleport proxy running, no need to restart one.
  22. running_pid = TeleportProxy.check(app_name)
  23. if running_pid:
  24. ui.say(f"Teleport proxy already running, PID: {running_pid}")
  25. return
  26. else:
  27. # If the existing PID doesn't exist, clear it from state.
  28. teleport_state.set_pid(app_name, None)
  29. teleport_state.set_address(app_name, None)
  30. teleport_state.write()
  31. # Otherwise spawn a Teleport proxy.
  32. cmd_args = ["tsh", "proxy", "app", f"{app_name}", "--port", port]
  33. child = subprocess.Popen(
  34. cmd_args,
  35. stdout=subprocess.DEVNULL,
  36. stderr=subprocess.DEVNULL,
  37. preexec_fn=os.setpgrp,
  38. )
  39. ui.say(f"starting Teleport proxy for '{app_name}'...")
  40. def wait(child, teleport_state, address):
  41. wait_start = time.time()
  42. while time.time() - wait_start < 2:
  43. child_terminated = child.poll()
  44. if child_terminated:
  45. other_tshs = [
  46. p.pid
  47. for p in psutil.process_iter(["pid", "name"])
  48. if p.name() == "tsh"
  49. ]
  50. ui.warn(
  51. dedent(
  52. f"""
  53. Teleport proxy failed to start, 'tsh' process already running!
  54. existing 'tsh' processes: {other_tshs}
  55. exit code: {child_terminated}
  56. """
  57. )
  58. )
  59. break
  60. # Timed out! Check if the process is running.
  61. child_pid_status = psutil.pid_exists(child.pid)
  62. if child_pid_status:
  63. # Record the PID, if the process started successfully.
  64. teleport_state.set_pid(app_name, child.pid)
  65. teleport_state.set_address(app_name, address)
  66. teleport_state.write()
  67. # Spawn a thread that will wait for the Teleport proxy to start, and
  68. # record it's PID, or warn that it failed to start.
  69. address = f"http://localhost:{port}"
  70. thread = threading.Thread(target=wait, args=[child, teleport_state, address])
  71. thread.start()
  72. @classmethod
  73. def check(cls, app_name: str) -> str | None:
  74. """Check if a Teleport proxy is already running for the specified app_name."""
  75. teleport_state = build_config.TeleportLocalState.read()
  76. existing_pid = teleport_state.get_pid(app_name)
  77. if existing_pid and psutil.pid_exists(int(existing_pid)):
  78. return teleport_state.get_pid(app_name)
  79. else:
  80. return None