avro_ingest.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """Ingest some Avro records, and report how long it takes"""
  10. import argparse
  11. import json
  12. import os
  13. import time
  14. from typing import IO, NamedTuple, cast
  15. import docker
  16. import psutil
  17. import psycopg
  18. import requests
  19. from docker.models.containers import Container
  20. from psycopg.errors import ProgrammingError
  21. from materialize import MZ_ROOT, mzbuild, ui
  22. def wait_for_confluent(host: str) -> None:
  23. url = f"http://{host}:8081/subjects"
  24. while True:
  25. try:
  26. print(f"Checking if schema registry at {url} is accessible...")
  27. r = requests.get(url)
  28. if r.status_code == 200:
  29. print("Schema registry is ready")
  30. return
  31. except requests.exceptions.ConnectionError as e:
  32. print(e)
  33. time.sleep(5)
  34. def mz_proc(container: Container) -> psutil.Process:
  35. container.reload()
  36. pid = container.attrs["State"]["Pid"] # type: ignore
  37. docker_init = psutil.Process(pid)
  38. for child in docker_init.children(recursive=True):
  39. if child.name() == "materialized":
  40. return child
  41. raise RuntimeError("Couldn't find materialized pid")
  42. class PrevStats(NamedTuple):
  43. wall_time: float
  44. user_cpu: float
  45. system_cpu: float
  46. def print_stats(container: Container, prev: PrevStats, file: IO) -> PrevStats:
  47. proc = mz_proc(container)
  48. memory = proc.memory_info()
  49. cpu = proc.cpu_times()
  50. new_prev = PrevStats(time.time(), cpu.user, cpu.system)
  51. print(
  52. f"{memory.rss},{memory.vms},{new_prev.user_cpu - prev.user_cpu},{new_prev.system_cpu - prev.system_cpu},{new_prev.wall_time - prev.wall_time}",
  53. file=file,
  54. flush=True,
  55. )
  56. return new_prev
  57. def main() -> None:
  58. parser = argparse.ArgumentParser()
  59. parser.add_argument(
  60. "--confluent-host",
  61. default="confluent",
  62. help="The hostname of a machine running the Confluent Platform",
  63. )
  64. parser.add_argument(
  65. "-n",
  66. "--trials",
  67. default=1,
  68. type=int,
  69. help="Number of measurements to take",
  70. )
  71. parser.add_argument(
  72. "-r",
  73. "--records",
  74. default=1000000,
  75. type=int,
  76. help="Number of Avro records to generate",
  77. )
  78. args = parser.parse_args()
  79. os.chdir(MZ_ROOT)
  80. coverage = ui.env_is_truthy("CI_COVERAGE_ENABLED")
  81. bazel = ui.env_is_truthy("CI_BAZEL_BUILD")
  82. bazel_remote_cache = os.getenv("CI_BAZEL_REMOTE_CACHE")
  83. bazel_lto = ui.env_is_truthy("CI_BAZEL_LTO")
  84. repo = mzbuild.Repository(
  85. MZ_ROOT,
  86. coverage=coverage,
  87. bazel=bazel,
  88. bazel_remote_cache=bazel_remote_cache,
  89. bazel_lto=bazel_lto,
  90. )
  91. wait_for_confluent(args.confluent_host)
  92. images = ["kgen", "materialized"]
  93. deps = repo.resolve_dependencies([repo.images[name] for name in images])
  94. deps.acquire()
  95. docker_client = docker.from_env()
  96. # NOTE: We have to override the type below because if `detach=True` it
  97. # returns a Container, and the typechecker doesn't know that.
  98. mz_container: Container = cast(
  99. Container,
  100. docker_client.containers.run(
  101. deps["materialized"].spec(),
  102. detach=True,
  103. network_mode="host",
  104. ),
  105. )
  106. docker_client.containers.run(
  107. deps["kgen"].spec(),
  108. [
  109. f"--num-records={args.records}",
  110. f"--bootstrap-server={args.confluent_host}:9092",
  111. f"--schema-registry-url=http://{args.confluent_host}:8081",
  112. "--topic=bench_data",
  113. "--keys=avro",
  114. "--values=avro",
  115. f"--avro-schema={VALUE_SCHEMA}",
  116. f"--avro-distribution={VALUE_DISTRIBUTION}",
  117. f"--avro-key-schema={KEY_SCHEMA}",
  118. f"--avro-key-distribution={KEY_DISTRIBUTION}",
  119. ],
  120. network_mode="host",
  121. )
  122. conn = psycopg.connect(host="localhost", port=6875, user="materialize")
  123. conn.autocommit = True
  124. with conn.cursor() as cur:
  125. cur.execute(
  126. f"""CREATE CONNECTION IF NOT EXISTS csr_conn
  127. TO CONFLUENT SCHEMA REGISTRY (
  128. URL 'http://{args.confluent_host}:8081'
  129. )""".encode()
  130. )
  131. cur.execute(
  132. f"""CREATE CONNECTION kafka_conn
  133. TO KAFKA (BROKER '{args.confluent_host}:9092', SECURITY PROTOCOL PLAINTEXT)""".encode()
  134. )
  135. cur.execute(
  136. """CREATE SOURCE src
  137. FROM KAFKA CONNECTION kafka_conn (TOPIC 'bench_data')
  138. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn"""
  139. )
  140. results_file = open("results.csv", "w")
  141. print("Rss,Vms,User Cpu,System Cpu,Wall Time", file=results_file, flush=True)
  142. prev = PrevStats(time.time(), 0.0, 0.0)
  143. for _ in range(args.trials):
  144. cur.execute("DROP VIEW IF EXISTS cnt")
  145. cur.execute("CREATE MATERIALIZED VIEW cnt AS SELECT count(*) FROM src")
  146. while True:
  147. try:
  148. cur.execute("SELECT * FROM cnt")
  149. row = cur.fetchone()
  150. assert row is not None
  151. n = row[0]
  152. if n >= args.records:
  153. break
  154. except ProgrammingError:
  155. pass
  156. time.sleep(1)
  157. prev = print_stats(mz_container, prev, results_file)
  158. KEY_SCHEMA = json.dumps(
  159. {
  160. "name": "testrecordkey",
  161. "type": "record",
  162. "namespace": "com.acme.avro",
  163. "fields": [{"name": "Key1", "type": "long"}, {"name": "Key2", "type": "long"}],
  164. }
  165. )
  166. KEY_DISTRIBUTION = json.dumps(
  167. {
  168. "com.acme.avro.testrecordkey::Key1": [0, 100],
  169. "com.acme.avro.testrecordkey::Key2": [0, 250000],
  170. }
  171. )
  172. VALUE_SCHEMA = json.dumps(
  173. {
  174. "name": "testrecord",
  175. "type": "record",
  176. "namespace": "com.acme.avro",
  177. "fields": [
  178. {"name": "Key1Unused", "type": "long"},
  179. {"name": "Key2Unused", "type": "long"},
  180. {
  181. "name": "OuterRecord",
  182. "type": {
  183. "name": "OuterRecord",
  184. "type": "record",
  185. "fields": [
  186. {
  187. "name": "Record1",
  188. "type": {
  189. "name": "Record1",
  190. "type": "record",
  191. "fields": [
  192. {
  193. "name": "InnerRecord1",
  194. "type": {
  195. "name": "InnerRecord1",
  196. "type": "record",
  197. "fields": [
  198. {"name": "Point", "type": "long"}
  199. ],
  200. },
  201. },
  202. {
  203. "name": "InnerRecord2",
  204. "type": {
  205. "name": "InnerRecord2",
  206. "type": "record",
  207. "fields": [
  208. {"name": "Point", "type": "long"}
  209. ],
  210. },
  211. },
  212. ],
  213. },
  214. },
  215. {
  216. "name": "Record2",
  217. "type": {
  218. "name": "Record2",
  219. "type": "record",
  220. "fields": [
  221. {
  222. "name": "InnerRecord3",
  223. "type": {
  224. "name": "InnerRecord3",
  225. "type": "record",
  226. "fields": [
  227. {"name": "Point", "type": "long"}
  228. ],
  229. },
  230. },
  231. {
  232. "name": "InnerRecord4",
  233. "type": {
  234. "name": "InnerRecord4",
  235. "type": "record",
  236. "fields": [
  237. {"name": "Point", "type": "long"}
  238. ],
  239. },
  240. },
  241. ],
  242. },
  243. },
  244. ],
  245. },
  246. },
  247. ],
  248. }
  249. )
  250. VALUE_DISTRIBUTION = json.dumps(
  251. {
  252. "com.acme.avro.testrecord::Key1Unused": [0, 100],
  253. "com.acme.avro.testrecord::Key2Unused": [0, 250000],
  254. "com.acme.avro.InnerRecord1::Point": [10000, 1000000000],
  255. "com.acme.avro.InnerRecord2::Point": [10000, 1000000000],
  256. "com.acme.avro.InnerRecord3::Point": [10000, 1000000000],
  257. "com.acme.avro.InnerRecord4::Point": [10000, 10000000000],
  258. }
  259. )
  260. if __name__ == "__main__":
  261. main()