123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- """Launch benchmark for a particular commit on cloud infrastructure, using bin/scratch"""
- import argparse
- import base64
- import csv
- import datetime
- import itertools
- import os
- import shlex
- import sys
- import time
- from typing import NamedTuple, cast
- import boto3
- from materialize import MZ_ROOT, git, scratch, spawn, util
- from materialize.cli.scratch import check_required_vars
- from materialize.scratch import print_instances
- # This is duplicated with the one in cli/scratch.
- # TODO - factor it out.
- def main() -> None:
- os.chdir(MZ_ROOT)
- parser = argparse.ArgumentParser()
- subparsers = parser.add_subparsers(dest="subcommand", required=True)
- for cmd_name, configure, run in [
- ("start", configure_start, start),
- ("check", configure_check, check),
- # ("mine", mine.configure_parser, mine.run),
- # ("destroy", destroy.configure_parser, destroy.run),
- ]:
- s = subparsers.add_parser(cmd_name)
- configure(s)
- s.set_defaults(run=run)
- args = parser.parse_args()
- args.run(args)
- def configure_start(parser: argparse.ArgumentParser) -> None:
- parser.add_argument(
- "--profile",
- choices=["basic", "confluent"],
- type=str,
- required=True,
- help="Predefined set of machines to use in the cluster. 'basic' is only the Materialize instance; 'confluent' also includes a machine running the Kafka, Schema Registry, etc.",
- )
- parser.add_argument(
- "--trials",
- "-n",
- type=int,
- default=1,
- help="The number of trials to run per git rev",
- )
- parser.add_argument(
- "--revs",
- type=str,
- default="HEAD",
- help="Comma-separated list of git revs to benchmark",
- )
- parser.add_argument(
- "bench_script",
- type=str,
- nargs=argparse.REMAINDER,
- help="Benchmark script (and optional arguments)",
- )
- parser.add_argument(
- "--append_metadata",
- help="whether to append extra metadata to each CSV row before uploading to S3",
- action="store_true",
- )
- parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
- class BenchSuccessResult(NamedTuple):
- stdout: str
- class BenchFailureLogs(NamedTuple):
- log: str
- def configure_check(parser: argparse.ArgumentParser) -> None:
- parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
- parser.add_argument("bench_id", type=str, nargs=1)
- DEFAULT_BUCKET = "mz-cloudbench"
- def try_get_object(key: str, bucket: str) -> str | None:
- client = boto3.client("s3")
- try:
- result = client.get_object(Bucket=bucket, Key=key)
- return result["Body"].read().decode("utf-8")
- except client.exceptions.NoSuchKey:
- return None
- def check(ns: argparse.Namespace) -> None:
- check_required_vars()
- bench_id = ns.bench_id[0]
- manifest = (
- boto3.client("s3")
- .get_object(Bucket=ns.s3_root, Key=f"{bench_id}/MANIFEST")["Body"]
- .read()
- .decode("utf-8")
- .strip()
- )
- insts = manifest.split("\n")
- if not insts:
- raise RuntimeError(f"No instances found for bench ID {bench_id}")
- results: list[BenchSuccessResult | BenchFailureLogs | None] = [None for _ in insts]
- not_done = list(range(len(results)))
- while not_done:
- for i in not_done:
- maybe_result = try_get_object(f"{bench_id}/{insts[i]}.csv", ns.s3_root)
- if maybe_result is None:
- maybe_out = try_get_object(
- f"{bench_id}/{insts[i]}-FAILURE.log", ns.s3_root
- )
- if maybe_out is None:
- continue
- results[i] = BenchFailureLogs(maybe_out)
- else:
- results[i] = BenchSuccessResult(stdout=maybe_result)
- not_done = [i for i in not_done if not results[i]]
- if not_done:
- print("Benchmark not done; waiting 60 seconds", file=sys.stderr)
- time.sleep(60)
- for r in results:
- assert isinstance(r, BenchSuccessResult) or isinstance(r, BenchFailureLogs)
- done_results = cast(list[BenchFailureLogs | BenchSuccessResult], results)
- failed = [
- (i, r) for i, r in enumerate(done_results) if isinstance(r, BenchFailureLogs)
- ]
- if failed:
- for i, f in failed:
- print(
- f"Run of instance {insts[i]} failed, log:\n{f.log}",
- file=sys.stderr,
- )
- raise RuntimeError(f"{len(failed)} runs FAILED!")
- good_results = cast(list[BenchSuccessResult], done_results)
- readers = [
- csv.DictReader(f"{line}\n" for line in r.stdout.split("\n"))
- for r in good_results
- ]
- csv_results = ((d.values() for d in r) for r in readers)
- for r in readers:
- assert isinstance(r.fieldnames, list)
- for fn in r.fieldnames:
- assert isinstance(fn, str)
- headers = set(tuple(cast(list[str], r.fieldnames)) for r in readers)
- if len(headers) > 1:
- raise RuntimeError("Mismatched headers")
- w = csv.writer(sys.stdout)
- w.writerow(
- cast(list[str], readers[0].fieldnames) + ["InstanceIndex", "Rev", "Trial"]
- )
- for inst, r in zip(insts, csv_results):
- components = inst.split("-")
- for i, entry in enumerate(r):
- w.writerow(itertools.chain(entry, (components[0], components[1], i)))
- def start(ns: argparse.Namespace) -> None:
- check_required_vars()
- revs = ns.revs.split(",")
- clusters = list(
- itertools.product(range(ns.trials), (git.rev_parse(rev) for rev in revs))
- )
- bench_script = ns.bench_script
- script_name = bench_script[0]
- script_args = " ".join(shlex.quote(arg) for arg in bench_script[1:])
- # zip up the `misc` repository, for shipment to the remote machine
- os.chdir("misc/python")
- spawn.runv(["python3", "./setup.py", "sdist"])
- with open("./dist/materialize-0.0.0.tar.gz", "rb") as f:
- pkg_data = f.read()
- os.chdir(os.environ["MZ_ROOT"])
- if ns.append_metadata:
- munge_result = 'awk \'{ if (NR == 1) { print $0 ",Timestamp,BenchId,ClusterId,GitRef,S3Root" } else { print $0 ",\'$(date +%s)",$MZ_CB_BENCH_ID,$MZ_CB_CLUSTER_ID,$MZ_CB_GIT_REV,$MZ_CB_S3_ROOT"\'"}}\''
- else:
- munge_result = "cat"
- mz_launch_script = f"""echo {shlex.quote(base64.b64encode(pkg_data).decode('utf-8'))} | base64 -d > mz.tar.gz
- python3 -m venv /tmp/mzenv >&2
- . /tmp/mzenv/bin/activate >&2
- python3 -m pip install --upgrade pip >&2
- pip3 install ./mz.tar.gz[dev] >&2
- MZ_ROOT=/home/ubuntu/materialize python3 -m {script_name} {script_args}
- result=$?
- echo $result > ~/bench_exit_code
- if [ $result -eq 0 ]; then
- {munge_result} < ~/materialize/results.csv | aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID.csv >&2
- else
- aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID-FAILURE.log < ~/mzscratch.log >&2
- fi
- sudo shutdown -h now # save some money
- """
- if ns.profile == "basic":
- descs = [
- scratch.MachineDesc(
- name="materialized",
- launch_script=mz_launch_script,
- instance_type="r5a.4xlarge",
- ami="ami-0aeb7c931a5a61206",
- tags={},
- size_gb=64,
- ),
- ]
- elif ns.profile == "confluent":
- confluent_launch_script = """
- curl https://packages.confluent.io/deb/7.0/archive.key | sudo apt-key add -
- sudo add-apt-repository "deb https://packages.confluent.io/deb/7.0 stable main"
- sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
- sudo apt-get update
- sudo apt-get install -y openjdk-8-jre-headless confluent-kafka confluent-schema-registry
- sudo systemctl start confluent-zookeeper
- sudo systemctl start confluent-kafka
- sudo systemctl start confluent-schema-registry
- """
- descs = [
- scratch.MachineDesc(
- name="materialized",
- launch_script=mz_launch_script,
- instance_type="r5a.4xlarge",
- ami="ami-0aeb7c931a5a61206",
- tags={},
- size_gb=64,
- ),
- scratch.MachineDesc(
- name="confluent",
- launch_script=confluent_launch_script,
- instance_type="r5a.4xlarge",
- ami="ami-0aeb7c931a5a61206",
- tags={},
- size_gb=1000,
- checkout=False,
- ),
- ]
- else:
- raise RuntimeError(f"Profile {ns.profile} is not implemented yet")
- bench_id = util.nonce(8)
- manifest_bytes = "".join(f"{i}-{rev}\n" for i, rev in clusters).encode()
- boto3.client("s3").put_object(
- Body=manifest_bytes, Bucket="mz-cloudbench", Key=f"{bench_id}/MANIFEST"
- )
- # TODO - Do these in parallel
- launched = []
- for i, rev in clusters:
- launched += scratch.launch_cluster(
- descs=descs,
- nonce=f"{bench_id}-{i}-{rev}",
- extra_tags={
- "bench_id": bench_id,
- "bench_rev": rev,
- "bench_i": str(i),
- "LaunchedBy": scratch.whoami(),
- },
- extra_env={
- "MZ_CB_BENCH_ID": bench_id,
- "MZ_CB_CLUSTER_ID": f"{i}-{rev}",
- "MZ_CB_GIT_REV": rev,
- "MZ_CB_S3_ROOT": ns.s3_root,
- },
- delete_after=datetime.datetime.utcnow() + datetime.timedelta(days=1),
- git_rev=rev,
- )
- print("Launched instances:")
- print_instances(launched, format="table") # todo
- print(
- f"""Launched cloud bench with ID {bench_id}.
- To wait for results, run: bin/cloudbench check {bench_id}"""
- )
- if __name__ == "__main__":
- main()
|