123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- #
- # ci_upload_heap_profiles.py - Upload memory heap profiles during an mzcompose run
- import argparse
- import json
- import re
- import subprocess
- import sys
- import time
- from threading import Thread
- CLUSTERD_COMMAND_RE = re.compile(
- r"--internal-http-listen-addr=(?P<socket>[^ ]*).*--log-prefix=(?P<logprefix>[^ ]*)"
- )
- def main() -> int:
- parser = argparse.ArgumentParser(
- prog="ci-upload-heap-profiles",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description="ci-upload-heap-profiles uploads memory heap profiles during an mzcompose run",
- )
- parser.add_argument("composition", type=str)
- parser.add_argument("--upload", action=argparse.BooleanOptionalAction, default=True)
- args = parser.parse_args()
- mzcompose = ["bin/mzcompose", "--find", args.composition, "--mz-quiet"]
- time_str = time.strftime("%Y-%m-%d_%H_%M_%S")
- threads = []
- def run(service: str, backend: list[str], suffix: str = ""):
- heap_profile = subprocess.run(
- mzcompose + ["exec", service, "curl", "--silent"] + backend,
- text=False,
- capture_output=True,
- ).stdout
- if not heap_profile:
- return
- filename = f"prof-{service}{suffix}-{time_str}.pb.gz"
- with open(filename, "wb") as f:
- f.write(heap_profile)
- if args.upload:
- subprocess.run(
- [
- "buildkite-agent",
- "artifact",
- "upload",
- "--log-level",
- "error",
- filename,
- ],
- capture_output=True,
- )
- services = json.loads(
- subprocess.run(
- mzcompose + ["ps", "--format", "json"], text=True, capture_output=True
- ).stdout
- )
- for service in services:
- if service["Image"].startswith("materialize/clusterd:"):
- threads.append(
- Thread(
- target=run,
- args=(service["Service"], ["http://127.0.0.1:6878/heap"]),
- )
- )
- elif service["Image"].startswith("materialize/materialized:"):
- threads.append(
- Thread(
- target=run,
- args=(service["Service"], ["http://127.0.0.1:6878/prof/heap"]),
- )
- )
- for line in subprocess.run(
- mzcompose + ["exec", service["Service"], "ps", "aux"],
- text=True,
- capture_output=True,
- ).stdout.splitlines():
- if match := CLUSTERD_COMMAND_RE.search(line):
- threads.append(
- Thread(
- target=run,
- args=(
- service["Service"],
- [
- "--unix-socket",
- match.group("socket"),
- "http:/prof/heap",
- ],
- f"-{match.group('logprefix')}",
- ),
- )
- )
- for thread in threads:
- thread.start()
- for thread in threads:
- thread.join()
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|