orchestratord.py 13 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. #
  10. # orchestratord.py — build and run environments in a local kind cluster
  11. import argparse
  12. import json
  13. import os
  14. import socket
  15. import subprocess
  16. import threading
  17. from collections.abc import Callable, Sequence
  18. from time import sleep
  19. from typing import TypeVar
  20. from urllib.parse import urlparse, urlunparse
  21. from uuid import uuid4
  22. import yaml
  23. from materialize import MZ_ROOT, ui
  24. DEV_IMAGE_TAG = "local-dev"
  25. DEFAULT_POSTGRES = (
  26. "postgres://root@postgres.materialize.svc.cluster.local:5432/materialize"
  27. )
  28. DEFAULT_MINIO = "s3://minio:minio123@persist/persist?endpoint=http%3A%2F%2Fminio.materialize.svc.cluster.local%3A9000&region=minio"
  29. def main():
  30. os.chdir(MZ_ROOT)
  31. parser = argparse.ArgumentParser(
  32. prog="orchestratord",
  33. description="""Runs orchestratord within a local kind cluster""",
  34. )
  35. parser.add_argument(
  36. "--kind-cluster-name",
  37. default=os.environ.get("KIND_CLUSTER_NAME", "kind"),
  38. )
  39. subparsers = parser.add_subparsers(required=True)
  40. parser_run = subparsers.add_parser("run")
  41. parser_run.add_argument("--dev", action="store_true")
  42. parser_run.add_argument("--namespace", default="materialize")
  43. parser_run.add_argument("--values")
  44. parser_run.add_argument("--set", action="append")
  45. parser_run.set_defaults(func=run)
  46. parser_reset = subparsers.add_parser("reset")
  47. parser_reset.add_argument("--namespace", default="materialize")
  48. parser_reset.set_defaults(func=reset)
  49. parser_environment = subparsers.add_parser("environment")
  50. parser_environment.add_argument("--dev", action="store_true")
  51. parser_environment.add_argument("--namespace", default="materialize")
  52. parser_environment.add_argument(
  53. "--environment-name",
  54. default="12345678-1234-1234-1234-123456789012",
  55. )
  56. parser_environment.add_argument("--postgres-url", default=DEFAULT_POSTGRES)
  57. parser_environment.add_argument("--s3-bucket", default=DEFAULT_MINIO)
  58. parser_environment.add_argument("--license-key-file", required=True)
  59. parser_environment.add_argument(
  60. "--external-login-password-mz-system", required=False
  61. )
  62. parser_environment.add_argument(
  63. "--enable-rbac",
  64. type=bool,
  65. default=False,
  66. required=False,
  67. )
  68. parser_environment.set_defaults(func=environment)
  69. parser_portforward = subparsers.add_parser("port-forward")
  70. parser_portforward.add_argument("--namespace", default="materialize")
  71. parser_portforward.add_argument(
  72. "--environment-name",
  73. default="12345678-1234-1234-1234-123456789012",
  74. )
  75. parser_portforward.set_defaults(func=portforward)
  76. args = parser.parse_args()
  77. args.func(args)
  78. def run(args: argparse.Namespace):
  79. acquire(
  80. "orchestratord",
  81. dev=args.dev,
  82. cluster=args.kind_cluster_name,
  83. )
  84. helm_args = [
  85. "helm",
  86. "install",
  87. "orchestratord",
  88. "misc/helm-charts/operator",
  89. "--atomic",
  90. f"--set=operator.image.tag={DEV_IMAGE_TAG}",
  91. "--create-namespace",
  92. f"--namespace={args.namespace}",
  93. ]
  94. if args.values is not None:
  95. helm_args.extend(["--values", args.values])
  96. if args.set is not None:
  97. helm_args.extend([f"--set={v}" for v in args.set])
  98. subprocess.check_call(helm_args)
  99. def reset(args: argparse.Namespace):
  100. environments = (
  101. kubectl(
  102. "get",
  103. "--all-namespaces",
  104. "materializes",
  105. '-o=jsonpath={range .items[*]}{.metadata.namespace}/{.metadata.name}{"\\n"}{end}',
  106. cluster=args.kind_cluster_name,
  107. )
  108. .decode()
  109. .splitlines()
  110. )
  111. for environment in environments:
  112. (namespace, environment_name) = environment.split("/", 1)
  113. env_kubectl = make_env_kubectl(args, namespace)
  114. mz = json.loads(
  115. env_kubectl(
  116. "get",
  117. "materialize",
  118. environment_name,
  119. "-o",
  120. "json",
  121. )
  122. )
  123. backend_secret_name = mz["spec"]["backendSecretName"]
  124. mz_system_password_secret_name = mz["spec"].get("externalLoginSecretMzSystem")
  125. env_kubectl("delete", "--wait=true", "materialize", environment_name)
  126. env_kubectl("delete", "--wait=true", "secret", backend_secret_name)
  127. if mz_system_password_secret_name is not None:
  128. env_kubectl(
  129. "delete", "--wait=true", "secret", mz_system_password_secret_name
  130. )
  131. try:
  132. subprocess.check_call(
  133. [
  134. "helm",
  135. "uninstall",
  136. "orchestratord",
  137. f"--namespace={args.namespace}",
  138. ]
  139. )
  140. kubectl(
  141. "delete",
  142. "namespace",
  143. args.namespace,
  144. cluster=args.kind_cluster_name,
  145. )
  146. except subprocess.CalledProcessError:
  147. pass
  148. def environment(args: argparse.Namespace):
  149. env_kubectl = make_env_kubectl(args)
  150. for image in ["environmentd", "clusterd", "balancerd"]:
  151. acquire(
  152. image,
  153. dev=args.dev,
  154. cluster=args.kind_cluster_name,
  155. )
  156. environmentd_image_ref = f"materialize/environmentd:{DEV_IMAGE_TAG}"
  157. try:
  158. kubectl(
  159. "get",
  160. "namespace",
  161. args.namespace,
  162. cluster=args.kind_cluster_name,
  163. )
  164. except subprocess.CalledProcessError:
  165. kubectl(
  166. "create",
  167. "namespace",
  168. args.namespace,
  169. cluster=args.kind_cluster_name,
  170. )
  171. environment_id = str(uuid4())
  172. def root_psql(cmd: str):
  173. env_kubectl(
  174. "run",
  175. "psql-setup",
  176. "--labels=app=environmentd",
  177. "--image=postgres",
  178. "--command=true",
  179. "--restart=Never",
  180. "--attach=true",
  181. "--rm=true",
  182. "--",
  183. "psql",
  184. args.postgres_url,
  185. "-c",
  186. cmd,
  187. )
  188. env_kubectl("wait", "--for=delete", "pod/psql-setup")
  189. pg_user = f"materialize_{environment_id}"
  190. pg_pass = "password"
  191. pg_db = f"materialize_{environment_id}"
  192. try:
  193. root_psql(f"""create role "{pg_user}" with nologin""")
  194. root_psql(f"""alter role "{pg_user}" with login password '{pg_pass}'""")
  195. root_psql(f"""create database "{pg_db}" with owner "{pg_user}" """)
  196. except subprocess.CalledProcessError:
  197. pass
  198. postgres_url_parts = urlparse(args.postgres_url)
  199. metadata_backend_url = urlunparse(
  200. postgres_url_parts._replace(
  201. netloc=f"{pg_user}:{pg_pass}@{postgres_url_parts.hostname}{f':{postgres_url_parts.port}' if postgres_url_parts.port is not None else ''}",
  202. path=f"/{pg_db}",
  203. )
  204. )
  205. s3_bucket_parts = urlparse(args.s3_bucket)
  206. persist_backend_url = urlunparse(
  207. s3_bucket_parts._replace(
  208. path=f"{s3_bucket_parts.path}/{environment_id}",
  209. )
  210. )
  211. with open(args.license_key_file) as f:
  212. license_key = f.read()
  213. backend_secret_name = f"materialize-backend-{environment_id}"
  214. resources = [
  215. {
  216. "apiVersion": "v1",
  217. "kind": "Secret",
  218. "metadata": {
  219. "name": backend_secret_name,
  220. },
  221. "stringData": {
  222. "metadata_backend_url": metadata_backend_url,
  223. "persist_backend_url": persist_backend_url,
  224. "license_key": license_key,
  225. },
  226. },
  227. {
  228. "apiVersion": "materialize.cloud/v1alpha1",
  229. "kind": "Materialize",
  230. "metadata": {
  231. "name": args.environment_name,
  232. },
  233. "spec": {
  234. "environmentdImageRef": environmentd_image_ref,
  235. "backendSecretName": backend_secret_name,
  236. "environmentId": environment_id,
  237. "authenticatorKind": "None",
  238. "enableRbac": args.enable_rbac,
  239. },
  240. },
  241. ]
  242. if args.external_login_password_mz_system is not None:
  243. resources[0]["stringData"][
  244. "external_login_password_mz_system"
  245. ] = args.external_login_password_mz_system
  246. resources[-1]["spec"]["authenticatorKind"] = "Password"
  247. env_kubectl("apply", "-f", "-", input=yaml.safe_dump_all(resources).encode())
  248. resource_id = get_resource_id(args)
  249. retry(
  250. lambda: env_kubectl(
  251. "wait",
  252. "--for=condition=Available",
  253. f"deployment/mz{resource_id}-console",
  254. ),
  255. exception_types=(subprocess.CalledProcessError,),
  256. )
  257. def portforward(args: argparse.Namespace):
  258. env_kubectl = make_env_kubectl(args)
  259. resource_id = get_resource_id(args)
  260. port_forward_targets = {
  261. "console/http": lambda port: f"http://localhost:{port}/",
  262. "balancerd/pgwire": lambda port: f"postgres://{os.environ['USER']}@localhost:{port}/materialize",
  263. "balancerd/http": lambda port: f"http://localhost:{port}/",
  264. "environmentd/internal-sql": lambda port: f"postgres://mz_system@localhost:{port}/materialize",
  265. "environmentd/internal-http": lambda port: f"http://localhost:{port}/",
  266. }
  267. port_forwards = {}
  268. sockets = []
  269. try:
  270. for port_forward_target in port_forward_targets:
  271. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  272. sockets.append(s)
  273. s.bind(("", 0))
  274. s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  275. port_forwards[port_forward_target] = s.getsockname()[1]
  276. finally:
  277. for s in sockets:
  278. s.close()
  279. def port_forward(target: str, port: str):
  280. target_service, target_port = target.split("/")
  281. # in theory doing a port forward to the service directly should work,
  282. # but i can't seem to get it to
  283. selector = json.loads(
  284. env_kubectl(
  285. "get",
  286. "service",
  287. f"mz{resource_id}-{target_service}",
  288. "-o=jsonpath={.spec.selector}",
  289. )
  290. )
  291. pod = env_kubectl(
  292. "get",
  293. "pod",
  294. "-o=name",
  295. *[f"--selector={k}={v}" for k, v in selector.items()],
  296. ).splitlines()[0]
  297. env_kubectl(
  298. "port-forward",
  299. pod,
  300. f"{port}:{target_port}",
  301. )
  302. threads = [
  303. threading.Thread(target=port_forward, args=[target, port])
  304. for target, port in port_forwards.items()
  305. ]
  306. for t in threads:
  307. t.start()
  308. for target, port in port_forwards.items():
  309. print(f"{target}: {port_forward_targets[target](port)}")
  310. for t in threads:
  311. t.join()
  312. def get_resource_id(args: argparse.Namespace):
  313. env_kubectl = make_env_kubectl(args)
  314. def try_get_resource_id():
  315. resource_id = (
  316. env_kubectl(
  317. "get",
  318. "materialize",
  319. args.environment_name,
  320. "-o=jsonpath={.status.resourceId}",
  321. )
  322. .decode()
  323. .strip()
  324. )
  325. assert resource_id != ""
  326. return resource_id
  327. return retry(
  328. try_get_resource_id,
  329. exception_types=(AssertionError, subprocess.CalledProcessError),
  330. )
  331. def make_env_kubectl(args: argparse.Namespace, namespace: str | None = None):
  332. def env_kubectl(*cmd_args: str, **subprocess_args):
  333. return kubectl(
  334. "-n",
  335. namespace or args.namespace,
  336. *cmd_args,
  337. cluster=args.kind_cluster_name,
  338. **subprocess_args,
  339. )
  340. return env_kubectl
  341. def acquire(image: str, dev: bool, cluster: str):
  342. args = []
  343. if dev:
  344. args.append("--dev")
  345. subprocess.check_call(["bin/mzimage", "acquire", image, *args])
  346. fingerprint = (
  347. subprocess.check_output(
  348. [
  349. "bin/mzimage",
  350. "fingerprint",
  351. image,
  352. *args,
  353. ]
  354. )
  355. .decode()
  356. .strip()
  357. )
  358. subprocess.check_call(
  359. [
  360. "docker",
  361. "tag",
  362. f"materialize/{image}:mzbuild-{fingerprint}",
  363. f"materialize/{image}:{DEV_IMAGE_TAG}",
  364. ]
  365. )
  366. kind(
  367. "load",
  368. "docker-image",
  369. f"materialize/{image}:{DEV_IMAGE_TAG}",
  370. cluster=cluster,
  371. )
  372. def kind(*args: str, cluster: str, **subprocess_args):
  373. return subprocess.check_output(
  374. ["kind", "--name", cluster, *args],
  375. **subprocess_args,
  376. )
  377. def kubectl(*args: str, cluster: str, **subprocess_args):
  378. return subprocess.check_output(
  379. ["kubectl", "--context", f"kind-{cluster}", *args],
  380. **subprocess_args,
  381. )
  382. T = TypeVar("T")
  383. def retry(
  384. f: Callable[[], T],
  385. max_attempts: int = 60,
  386. sleep_secs: int = 1,
  387. exception_types: Sequence[type[Exception]] = [AssertionError],
  388. ) -> T | None:
  389. result = None
  390. for attempt in range(max_attempts):
  391. try:
  392. result = f()
  393. break
  394. except tuple(exception_types):
  395. if attempt == max_attempts:
  396. raise
  397. sleep(sleep_secs)
  398. return result
  399. if __name__ == "__main__":
  400. with ui.error_handler("orchestratord"):
  401. main()