ci_upload_heap_profiles.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # ci_upload_heap_profiles.py - Upload memory heap profiles during an mzcompose run
  11. import argparse
  12. import json
  13. import re
  14. import subprocess
  15. import sys
  16. import time
  17. from threading import Thread
  18. CLUSTERD_COMMAND_RE = re.compile(
  19. r"--internal-http-listen-addr=(?P<socket>[^ ]*).*--log-prefix=(?P<logprefix>[^ ]*)"
  20. )
  21. def main() -> int:
  22. parser = argparse.ArgumentParser(
  23. prog="ci-upload-heap-profiles",
  24. formatter_class=argparse.RawDescriptionHelpFormatter,
  25. description="ci-upload-heap-profiles uploads memory heap profiles during an mzcompose run",
  26. )
  27. parser.add_argument("composition", type=str)
  28. parser.add_argument("--upload", action=argparse.BooleanOptionalAction, default=True)
  29. args = parser.parse_args()
  30. mzcompose = ["bin/mzcompose", "--find", args.composition, "--mz-quiet"]
  31. time_str = time.strftime("%Y-%m-%d_%H_%M_%S")
  32. threads = []
  33. def run(service: str, backend: list[str], suffix: str = ""):
  34. heap_profile = subprocess.run(
  35. mzcompose + ["exec", service, "curl", "--silent"] + backend,
  36. text=False,
  37. capture_output=True,
  38. ).stdout
  39. if not heap_profile:
  40. return
  41. filename = f"prof-{service}{suffix}-{time_str}.pb.gz"
  42. with open(filename, "wb") as f:
  43. f.write(heap_profile)
  44. if args.upload:
  45. subprocess.run(
  46. [
  47. "buildkite-agent",
  48. "artifact",
  49. "upload",
  50. "--log-level",
  51. "error",
  52. filename,
  53. ],
  54. capture_output=True,
  55. )
  56. services = json.loads(
  57. subprocess.run(
  58. mzcompose + ["ps", "--format", "json"], text=True, capture_output=True
  59. ).stdout
  60. )
  61. for service in services:
  62. if service["Image"].startswith("materialize/clusterd:"):
  63. threads.append(
  64. Thread(
  65. target=run,
  66. args=(service["Service"], ["http://127.0.0.1:6878/heap"]),
  67. )
  68. )
  69. elif service["Image"].startswith("materialize/materialized:"):
  70. threads.append(
  71. Thread(
  72. target=run,
  73. args=(service["Service"], ["http://127.0.0.1:6878/prof/heap"]),
  74. )
  75. )
  76. for line in subprocess.run(
  77. mzcompose + ["exec", service["Service"], "ps", "aux"],
  78. text=True,
  79. capture_output=True,
  80. ).stdout.splitlines():
  81. if match := CLUSTERD_COMMAND_RE.search(line):
  82. threads.append(
  83. Thread(
  84. target=run,
  85. args=(
  86. service["Service"],
  87. [
  88. "--unix-socket",
  89. match.group("socket"),
  90. "http:/prof/heap",
  91. ],
  92. f"-{match.group('logprefix')}",
  93. ),
  94. )
  95. )
  96. for thread in threads:
  97. thread.start()
  98. for thread in threads:
  99. thread.join()
  100. return 0
  101. if __name__ == "__main__":
  102. sys.exit(main())