cloudbench.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Launch benchmark for a particular commit on cloud infrastructure, using bin/scratch"""
  10. import argparse
  11. import base64
  12. import csv
  13. import datetime
  14. import itertools
  15. import os
  16. import shlex
  17. import sys
  18. import time
  19. from typing import NamedTuple, cast
  20. import boto3
  21. from materialize import MZ_ROOT, git, scratch, spawn, util
  22. from materialize.cli.scratch import check_required_vars
  23. from materialize.scratch import print_instances
  24. # This is duplicated with the one in cli/scratch.
  25. # TODO - factor it out.
  26. def main() -> None:
  27. os.chdir(MZ_ROOT)
  28. parser = argparse.ArgumentParser()
  29. subparsers = parser.add_subparsers(dest="subcommand", required=True)
  30. for cmd_name, configure, run in [
  31. ("start", configure_start, start),
  32. ("check", configure_check, check),
  33. # ("mine", mine.configure_parser, mine.run),
  34. # ("destroy", destroy.configure_parser, destroy.run),
  35. ]:
  36. s = subparsers.add_parser(cmd_name)
  37. configure(s)
  38. s.set_defaults(run=run)
  39. args = parser.parse_args()
  40. args.run(args)
  41. def configure_start(parser: argparse.ArgumentParser) -> None:
  42. parser.add_argument(
  43. "--profile",
  44. choices=["basic", "confluent"],
  45. type=str,
  46. required=True,
  47. help="Predefined set of machines to use in the cluster. 'basic' is only the Materialize instance; 'confluent' also includes a machine running the Kafka, Schema Registry, etc.",
  48. )
  49. parser.add_argument(
  50. "--trials",
  51. "-n",
  52. type=int,
  53. default=1,
  54. help="The number of trials to run per git rev",
  55. )
  56. parser.add_argument(
  57. "--revs",
  58. type=str,
  59. default="HEAD",
  60. help="Comma-separated list of git revs to benchmark",
  61. )
  62. parser.add_argument(
  63. "bench_script",
  64. type=str,
  65. nargs=argparse.REMAINDER,
  66. help="Benchmark script (and optional arguments)",
  67. )
  68. parser.add_argument(
  69. "--append_metadata",
  70. help="whether to append extra metadata to each CSV row before uploading to S3",
  71. action="store_true",
  72. )
  73. parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
  74. class BenchSuccessResult(NamedTuple):
  75. stdout: str
  76. class BenchFailureLogs(NamedTuple):
  77. log: str
  78. def configure_check(parser: argparse.ArgumentParser) -> None:
  79. parser.add_argument("--s3_root", type=str, default=DEFAULT_BUCKET)
  80. parser.add_argument("bench_id", type=str, nargs=1)
  81. DEFAULT_BUCKET = "mz-cloudbench"
  82. def try_get_object(key: str, bucket: str) -> str | None:
  83. client = boto3.client("s3")
  84. try:
  85. result = client.get_object(Bucket=bucket, Key=key)
  86. return result["Body"].read().decode("utf-8")
  87. except client.exceptions.NoSuchKey:
  88. return None
  89. def check(ns: argparse.Namespace) -> None:
  90. check_required_vars()
  91. bench_id = ns.bench_id[0]
  92. manifest = (
  93. boto3.client("s3")
  94. .get_object(Bucket=ns.s3_root, Key=f"{bench_id}/MANIFEST")["Body"]
  95. .read()
  96. .decode("utf-8")
  97. .strip()
  98. )
  99. insts = manifest.split("\n")
  100. if not insts:
  101. raise RuntimeError(f"No instances found for bench ID {bench_id}")
  102. results: list[BenchSuccessResult | BenchFailureLogs | None] = [None for _ in insts]
  103. not_done = list(range(len(results)))
  104. while not_done:
  105. for i in not_done:
  106. maybe_result = try_get_object(f"{bench_id}/{insts[i]}.csv", ns.s3_root)
  107. if maybe_result is None:
  108. maybe_out = try_get_object(
  109. f"{bench_id}/{insts[i]}-FAILURE.log", ns.s3_root
  110. )
  111. if maybe_out is None:
  112. continue
  113. results[i] = BenchFailureLogs(maybe_out)
  114. else:
  115. results[i] = BenchSuccessResult(stdout=maybe_result)
  116. not_done = [i for i in not_done if not results[i]]
  117. if not_done:
  118. print("Benchmark not done; waiting 60 seconds", file=sys.stderr)
  119. time.sleep(60)
  120. for r in results:
  121. assert isinstance(r, BenchSuccessResult) or isinstance(r, BenchFailureLogs)
  122. done_results = cast(list[BenchFailureLogs | BenchSuccessResult], results)
  123. failed = [
  124. (i, r) for i, r in enumerate(done_results) if isinstance(r, BenchFailureLogs)
  125. ]
  126. if failed:
  127. for i, f in failed:
  128. print(
  129. f"Run of instance {insts[i]} failed, log:\n{f.log}",
  130. file=sys.stderr,
  131. )
  132. raise RuntimeError(f"{len(failed)} runs FAILED!")
  133. good_results = cast(list[BenchSuccessResult], done_results)
  134. readers = [
  135. csv.DictReader(f"{line}\n" for line in r.stdout.split("\n"))
  136. for r in good_results
  137. ]
  138. csv_results = ((d.values() for d in r) for r in readers)
  139. for r in readers:
  140. assert isinstance(r.fieldnames, list)
  141. for fn in r.fieldnames:
  142. assert isinstance(fn, str)
  143. headers = set(tuple(cast(list[str], r.fieldnames)) for r in readers)
  144. if len(headers) > 1:
  145. raise RuntimeError("Mismatched headers")
  146. w = csv.writer(sys.stdout)
  147. w.writerow(
  148. cast(list[str], readers[0].fieldnames) + ["InstanceIndex", "Rev", "Trial"]
  149. )
  150. for inst, r in zip(insts, csv_results):
  151. components = inst.split("-")
  152. for i, entry in enumerate(r):
  153. w.writerow(itertools.chain(entry, (components[0], components[1], i)))
  154. def start(ns: argparse.Namespace) -> None:
  155. check_required_vars()
  156. revs = ns.revs.split(",")
  157. clusters = list(
  158. itertools.product(range(ns.trials), (git.rev_parse(rev) for rev in revs))
  159. )
  160. bench_script = ns.bench_script
  161. script_name = bench_script[0]
  162. script_args = " ".join(shlex.quote(arg) for arg in bench_script[1:])
  163. # zip up the `misc` repository, for shipment to the remote machine
  164. os.chdir("misc/python")
  165. spawn.runv(["python3", "./setup.py", "sdist"])
  166. with open("./dist/materialize-0.0.0.tar.gz", "rb") as f:
  167. pkg_data = f.read()
  168. os.chdir(os.environ["MZ_ROOT"])
  169. if ns.append_metadata:
  170. munge_result = 'awk \'{ if (NR == 1) { print $0 ",Timestamp,BenchId,ClusterId,GitRef,S3Root" } else { print $0 ",\'$(date +%s)",$MZ_CB_BENCH_ID,$MZ_CB_CLUSTER_ID,$MZ_CB_GIT_REV,$MZ_CB_S3_ROOT"\'"}}\''
  171. else:
  172. munge_result = "cat"
  173. mz_launch_script = f"""echo {shlex.quote(base64.b64encode(pkg_data).decode('utf-8'))} | base64 -d > mz.tar.gz
  174. python3 -m venv /tmp/mzenv >&2
  175. . /tmp/mzenv/bin/activate >&2
  176. python3 -m pip install --upgrade pip >&2
  177. pip3 install ./mz.tar.gz[dev] >&2
  178. MZ_ROOT=/home/ubuntu/materialize python3 -m {script_name} {script_args}
  179. result=$?
  180. echo $result > ~/bench_exit_code
  181. if [ $result -eq 0 ]; then
  182. {munge_result} < ~/materialize/results.csv | aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID.csv >&2
  183. else
  184. aws s3 cp - s3://{ns.s3_root}/$MZ_CB_BENCH_ID/$MZ_CB_CLUSTER_ID-FAILURE.log < ~/mzscratch.log >&2
  185. fi
  186. sudo shutdown -h now # save some money
  187. """
  188. if ns.profile == "basic":
  189. descs = [
  190. scratch.MachineDesc(
  191. name="materialized",
  192. launch_script=mz_launch_script,
  193. instance_type="r5a.4xlarge",
  194. ami="ami-0aeb7c931a5a61206",
  195. tags={},
  196. size_gb=64,
  197. ),
  198. ]
  199. elif ns.profile == "confluent":
  200. confluent_launch_script = """
  201. curl https://packages.confluent.io/deb/7.0/archive.key | sudo apt-key add -
  202. sudo add-apt-repository "deb https://packages.confluent.io/deb/7.0 stable main"
  203. sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
  204. sudo apt-get update
  205. sudo apt-get install -y openjdk-8-jre-headless confluent-kafka confluent-schema-registry
  206. sudo systemctl start confluent-zookeeper
  207. sudo systemctl start confluent-kafka
  208. sudo systemctl start confluent-schema-registry
  209. """
  210. descs = [
  211. scratch.MachineDesc(
  212. name="materialized",
  213. launch_script=mz_launch_script,
  214. instance_type="r5a.4xlarge",
  215. ami="ami-0aeb7c931a5a61206",
  216. tags={},
  217. size_gb=64,
  218. ),
  219. scratch.MachineDesc(
  220. name="confluent",
  221. launch_script=confluent_launch_script,
  222. instance_type="r5a.4xlarge",
  223. ami="ami-0aeb7c931a5a61206",
  224. tags={},
  225. size_gb=1000,
  226. checkout=False,
  227. ),
  228. ]
  229. else:
  230. raise RuntimeError(f"Profile {ns.profile} is not implemented yet")
  231. bench_id = util.nonce(8)
  232. manifest_bytes = "".join(f"{i}-{rev}\n" for i, rev in clusters).encode()
  233. boto3.client("s3").put_object(
  234. Body=manifest_bytes, Bucket="mz-cloudbench", Key=f"{bench_id}/MANIFEST"
  235. )
  236. # TODO - Do these in parallel
  237. launched = []
  238. for i, rev in clusters:
  239. launched += scratch.launch_cluster(
  240. descs=descs,
  241. nonce=f"{bench_id}-{i}-{rev}",
  242. extra_tags={
  243. "bench_id": bench_id,
  244. "bench_rev": rev,
  245. "bench_i": str(i),
  246. "LaunchedBy": scratch.whoami(),
  247. },
  248. extra_env={
  249. "MZ_CB_BENCH_ID": bench_id,
  250. "MZ_CB_CLUSTER_ID": f"{i}-{rev}",
  251. "MZ_CB_GIT_REV": rev,
  252. "MZ_CB_S3_ROOT": ns.s3_root,
  253. },
  254. delete_after=datetime.datetime.utcnow() + datetime.timedelta(days=1),
  255. git_rev=rev,
  256. )
  257. print("Launched instances:")
  258. print_instances(launched, format="table") # todo
  259. print(
  260. f"""Launched cloud bench with ID {bench_id}.
  261. To wait for results, run: bin/cloudbench check {bench_id}"""
  262. )
  263. if __name__ == "__main__":
  264. main()