environmentd.py 17 KB


  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. import operator
  11. import os
  12. import urllib.parse
  13. from collections.abc import Callable
  14. from kubernetes.client import (
  15. V1ConfigMap,
  16. V1ConfigMapVolumeSource,
  17. V1Container,
  18. V1ContainerPort,
  19. V1EnvVar,
  20. V1EnvVarSource,
  21. V1KeyToPath,
  22. V1LabelSelector,
  23. V1ObjectFieldSelector,
  24. V1ObjectMeta,
  25. V1PersistentVolumeClaim,
  26. V1PersistentVolumeClaimSpec,
  27. V1PodSpec,
  28. V1PodTemplateSpec,
  29. V1ResourceRequirements,
  30. V1Secret,
  31. V1SecretVolumeSource,
  32. V1Service,
  33. V1ServicePort,
  34. V1ServiceSpec,
  35. V1StatefulSet,
  36. V1StatefulSetSpec,
  37. V1Toleration,
  38. V1Volume,
  39. V1VolumeMount,
  40. )
  41. from materialize import MZ_ROOT
  42. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  43. from materialize.cloudtest.k8s.api.k8s_configmap import K8sConfigMap
  44. from materialize.cloudtest.k8s.api.k8s_secret import K8sSecret
  45. from materialize.cloudtest.k8s.api.k8s_service import K8sService
  46. from materialize.cloudtest.k8s.api.k8s_stateful_set import K8sStatefulSet
  47. from materialize.mz_version import MzVersion
  48. from materialize.mzcompose import (
  49. bootstrap_cluster_replica_size,
  50. cluster_replica_size_map,
  51. get_default_system_parameters,
  52. )
  53. class EnvironmentdSecret(K8sSecret):
  54. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
  55. super().__init__(namespace)
  56. self.secret = V1Secret(
  57. metadata=V1ObjectMeta(name="license-key"),
  58. string_data={
  59. "license_key": os.environ["MZ_CI_LICENSE_KEY"],
  60. },
  61. )
  62. class ListenersConfigMap(K8sConfigMap):
  63. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
  64. super().__init__(namespace)
  65. with open(f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth.json") as f:
  66. data = f.read()
  67. self.configmap = V1ConfigMap(
  68. metadata=V1ObjectMeta(name="listeners-config"),
  69. data={
  70. "listeners.json": data,
  71. },
  72. )
  73. class EnvironmentdService(K8sService):
  74. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
  75. super().__init__(namespace)
  76. service_port = V1ServicePort(name="sql", port=6875)
  77. http_port = V1ServicePort(name="http", port=6876)
  78. internal_port = V1ServicePort(name="internal", port=6877)
  79. internal_http_port = V1ServicePort(name="internalhttp", port=6878)
  80. self.service = V1Service(
  81. api_version="v1",
  82. kind="Service",
  83. metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}),
  84. spec=V1ServiceSpec(
  85. type="NodePort",
  86. ports=[service_port, internal_port, http_port, internal_http_port],
  87. selector={"app": "environmentd"},
  88. ),
  89. )
  90. class MaterializedAliasService(K8sService):
  91. """Some testdrive tests expect that Mz is accessible as 'materialized'"""
  92. def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
  93. super().__init__(namespace)
  94. self.service = V1Service(
  95. api_version="v1",
  96. kind="Service",
  97. metadata=V1ObjectMeta(name="materialized"),
  98. spec=V1ServiceSpec(
  99. type="ExternalName",
  100. external_name=f"environmentd.{namespace}.svc.cluster.local",
  101. ),
  102. )
  103. class EnvironmentdStatefulSet(K8sStatefulSet):
  104. def __init__(
  105. self,
  106. tag: str | None = None,
  107. release_mode: bool = True,
  108. coverage_mode: bool = False,
  109. sanitizer_mode: str = "none",
  110. log_filter: str | None = None,
  111. namespace: str = DEFAULT_K8S_NAMESPACE,
  112. minio_namespace: str = DEFAULT_K8S_NAMESPACE,
  113. cockroach_namespace: str = DEFAULT_K8S_NAMESPACE,
  114. apply_node_selectors: bool = False,
  115. ) -> None:
  116. self.tag = tag
  117. self.release_mode = release_mode
  118. self.coverage_mode = coverage_mode
  119. self.sanitizer_mode = sanitizer_mode
  120. self.log_filter = log_filter
  121. self.env: dict[str, str] = {}
  122. self.extra_args: list[str] = []
  123. self.minio_namespace = minio_namespace
  124. self.cockroach_namespace = cockroach_namespace
  125. self.apply_node_selectors = apply_node_selectors
  126. super().__init__(namespace)
  127. def generate_stateful_set(self) -> V1StatefulSet:
  128. metadata = V1ObjectMeta(name="environmentd", labels={"app": "environmentd"})
  129. label_selector = V1LabelSelector(match_labels={"app": "environmentd"})
  130. ports = [V1ContainerPort(container_port=5432, name="sql")]
  131. volume_mounts = [
  132. V1VolumeMount(
  133. name="license-key",
  134. mount_path="/license_key",
  135. ),
  136. V1VolumeMount(
  137. name="listeners-configmap",
  138. mount_path="/listeners",
  139. ),
  140. ]
  141. if self.coverage_mode:
  142. volume_mounts.append(V1VolumeMount(name="coverage", mount_path="/coverage"))
  143. container = V1Container(
  144. name="environmentd",
  145. image=self.image(
  146. "environmentd",
  147. tag=self.tag,
  148. release_mode=self.release_mode,
  149. ),
  150. args=self.args(),
  151. env=self.env_vars(),
  152. ports=ports,
  153. volume_mounts=volume_mounts,
  154. )
  155. node_selector = None
  156. if self.apply_node_selectors:
  157. node_selector = {"environmentd": "true"}
  158. taint_toleration = V1Toleration(
  159. key="environmentd",
  160. operator="Equal",
  161. value="true",
  162. effect="NoSchedule",
  163. )
  164. volumes = [
  165. V1Volume(
  166. name="license-key",
  167. secret=V1SecretVolumeSource(
  168. default_mode=292,
  169. optional=False,
  170. secret_name="license-key",
  171. items=[
  172. V1KeyToPath(
  173. key="license_key",
  174. path="license_key",
  175. )
  176. ],
  177. ),
  178. ),
  179. V1Volume(
  180. name="listeners-configmap",
  181. config_map=V1ConfigMapVolumeSource(
  182. name="listeners-config",
  183. default_mode=292,
  184. optional=False,
  185. items=[
  186. V1KeyToPath(
  187. key="listeners.json",
  188. path="listeners.json",
  189. )
  190. ],
  191. ),
  192. ),
  193. ]
  194. pod_spec = V1PodSpec(
  195. containers=[container],
  196. tolerations=[taint_toleration],
  197. node_selector=node_selector,
  198. termination_grace_period_seconds=0,
  199. volumes=volumes,
  200. )
  201. template_spec = V1PodTemplateSpec(metadata=metadata, spec=pod_spec)
  202. return V1StatefulSet(
  203. api_version="apps/v1",
  204. kind="StatefulSet",
  205. metadata=metadata,
  206. spec=V1StatefulSetSpec(
  207. service_name="environmentd",
  208. replicas=1,
  209. pod_management_policy="Parallel",
  210. selector=label_selector,
  211. template=template_spec,
  212. volume_claim_templates=self.claim_templates(),
  213. ),
  214. )
  215. def claim_templates(self) -> list[V1PersistentVolumeClaim]:
  216. claim_templates = []
  217. if self.coverage_mode:
  218. claim_templates.append(
  219. V1PersistentVolumeClaim(
  220. metadata=V1ObjectMeta(name="coverage"),
  221. spec=V1PersistentVolumeClaimSpec(
  222. access_modes=["ReadWriteOnce"],
  223. resources=V1ResourceRequirements(requests={"storage": "10Gi"}),
  224. ),
  225. )
  226. )
  227. return claim_templates
  228. def args(self) -> list[str]:
  229. s3_endpoint = urllib.parse.quote(
  230. f"http://minio-service.{self.minio_namespace}:9000"
  231. )
  232. args = [
  233. "--availability-zone=1",
  234. "--availability-zone=2",
  235. "--availability-zone=3",
  236. "--availability-zone=quickstart",
  237. "--aws-account-id=123456789000",
  238. "--aws-external-id-prefix=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
  239. "--environment-id=cloudtest-test-00000000-0000-0000-0000-000000000000-0",
  240. f"--persist-blob-url=s3://minio:minio123@persist/persist?endpoint={s3_endpoint}&region=minio",
  241. "--orchestrator=kubernetes",
  242. "--orchestrator-kubernetes-image-pull-policy=if-not-present",
  243. "--orchestrator-kubernetes-service-fs-group=999",
  244. f"--persist-consensus-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=consensus",
  245. "--unsafe-mode",
  246. # cloudtest may be called upon to spin up older versions of
  247. # Materialize too! If you are adding a command-line option that is
  248. # only supported on newer releases, do not add it here. Add it as a
  249. # version-gated argument below, using `self._meets_minimum_version`.
  250. ]
  251. if self._meets_minimum_version("0.38.0"):
  252. args += [
  253. "--clusterd-image",
  254. self.image(
  255. "clusterd",
  256. tag=self.tag,
  257. release_mode=self.release_mode,
  258. ),
  259. ]
  260. else:
  261. args += [
  262. "--storaged-image",
  263. self.image(
  264. "storaged",
  265. tag=self.tag,
  266. release_mode=self.release_mode,
  267. ),
  268. "--computed-image",
  269. self.image(
  270. "computed",
  271. tag=self.tag,
  272. release_mode=self.release_mode,
  273. ),
  274. ]
  275. if self._meets_minimum_version("0.53.0"):
  276. args += [
  277. "--bootstrap-role",
  278. "materialize",
  279. ]
  280. if self._meets_minimum_version("0.54.0"):
  281. args += [
  282. "--internal-persist-pubsub-listen-addr=0.0.0.0:6879",
  283. "--persist-pubsub-url=http://persist-pubsub",
  284. ]
  285. if self._meets_minimum_version("0.60.0-dev"):
  286. args += [
  287. # Kind sets up a basic local-file storage class based on Rancher, named `standard`
  288. "--orchestrator-kubernetes-ephemeral-volume-class=standard"
  289. ]
  290. if self._meets_minimum_version("0.63.0-dev"):
  291. args += ["--secrets-controller=kubernetes"]
  292. if self._meets_minimum_version("0.79.0-dev"):
  293. args += [
  294. f"--timestamp-oracle-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=tsoracle"
  295. ]
  296. if not self._meets_minimum_version("0.105.0-dev"):
  297. args += [
  298. f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage"
  299. ]
  300. if self._meets_minimum_version("0.118.0-dev"):
  301. args += [
  302. "--announce-egress-address=1.2.3.4/32",
  303. "--announce-egress-address=88.77.66.0/28",
  304. "--announce-egress-address=2001:db8::/60",
  305. ]
  306. else:
  307. args += [
  308. "--announce-egress-ip=1.2.3.4",
  309. "--announce-egress-ip=88.77.66.55",
  310. ]
  311. if self._meets_minimum_version("0.147.0-dev"):
  312. args.append("--listeners-config-path=/listeners/listeners.json")
  313. else:
  314. args += [
  315. "--internal-sql-listen-addr=0.0.0.0:6877",
  316. "--internal-http-listen-addr=0.0.0.0:6878",
  317. ]
  318. return args + self.extra_args
  319. def env_vars(self) -> list[V1EnvVar]:
  320. system_parameter_defaults = get_default_system_parameters()
  321. if self.log_filter:
  322. system_parameter_defaults["log_filter"] = self.log_filter
  323. if self._meets_maximum_version("0.63.99"):
  324. system_parameter_defaults["enable_managed_clusters"] = "true"
  325. value_from = V1EnvVarSource(
  326. field_ref=V1ObjectFieldSelector(field_path="metadata.name")
  327. )
  328. env = [
  329. V1EnvVar(name="MZ_TEST_ONLY_DUMMY_SEGMENT_CLIENT", value="true"),
  330. V1EnvVar(name="MZ_SOFT_ASSERTIONS", value="1"),
  331. V1EnvVar(name="MZ_POD_NAME", value_from=value_from),
  332. V1EnvVar(name="AWS_REGION", value="minio"),
  333. V1EnvVar(name="AWS_ACCESS_KEY_ID", value="minio"),
  334. V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value="minio123"),
  335. V1EnvVar(name="MZ_AWS_ACCOUNT_ID", value="123456789000"),
  336. V1EnvVar(
  337. name="MZ_AWS_EXTERNAL_ID_PREFIX",
  338. value="eb5cb59b-e2fe-41f3-87ca-d2176a495345",
  339. ),
  340. V1EnvVar(
  341. name="MZ_AWS_PRIVATELINK_AVAILABILITY_ZONES", value="use1-az1,use1-az2"
  342. ),
  343. V1EnvVar(
  344. name="MZ_AWS_CONNECTION_ROLE_ARN",
  345. value="arn:aws:iam::123456789000:role/MaterializeConnection",
  346. ),
  347. V1EnvVar(
  348. name="MZ_SYSTEM_PARAMETER_DEFAULT",
  349. value=";".join(
  350. [
  351. f"{key}={value}"
  352. for key, value in system_parameter_defaults.items()
  353. ]
  354. ),
  355. ),
  356. # Set the adapter stash URL for older environments that need it (versions before
  357. # v0.92.0).
  358. V1EnvVar(
  359. name="MZ_ADAPTER_STASH_URL",
  360. value=f"postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter",
  361. ),
  362. V1EnvVar(
  363. name="MZ_CLUSTER_REPLICA_SIZES",
  364. value=f"{json.dumps(cluster_replica_size_map())}",
  365. ),
  366. V1EnvVar(
  367. name="MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
  368. value=bootstrap_cluster_replica_size(),
  369. ),
  370. V1EnvVar(
  371. name="MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
  372. value=bootstrap_cluster_replica_size(),
  373. ),
  374. V1EnvVar(
  375. name="MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
  376. value=bootstrap_cluster_replica_size(),
  377. ),
  378. V1EnvVar(
  379. name="MZ_BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
  380. value=bootstrap_cluster_replica_size(),
  381. ),
  382. V1EnvVar(
  383. name="MZ_BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
  384. value=bootstrap_cluster_replica_size(),
  385. ),
  386. V1EnvVar(
  387. name="MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
  388. value=bootstrap_cluster_replica_size(),
  389. ),
  390. ]
  391. if self._meets_minimum_version("0.118.0-dev"):
  392. env += [
  393. V1EnvVar(
  394. name="MZ_ANNOUNCE_EGRESS_ADDRESS",
  395. value="1.2.3.4/32,88.77.66.0/28,2001:db8::/60",
  396. )
  397. ]
  398. else:
  399. env += [V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55")]
  400. if self._meets_minimum_version("0.140.0-dev"):
  401. env += [
  402. V1EnvVar(
  403. name="MZ_LICENSE_KEY",
  404. value="/license_key/license_key",
  405. )
  406. ]
  407. if self.coverage_mode:
  408. env.extend(
  409. [
  410. V1EnvVar(
  411. name="LLVM_PROFILE_FILE",
  412. value="/coverage/environmentd-%p-%9m%c.profraw",
  413. ),
  414. V1EnvVar(
  415. name="CI_COVERAGE_ENABLED",
  416. value="1",
  417. ),
  418. V1EnvVar(name="MZ_ORCHESTRATOR_KUBERNETES_COVERAGE", value="1"),
  419. ]
  420. )
  421. if self.sanitizer_mode != "none":
  422. env.extend(
  423. [
  424. V1EnvVar(
  425. name="CI_SANITIZER_MODE",
  426. value=self.sanitizer_mode,
  427. ),
  428. ]
  429. )
  430. for k, v in self.env.items():
  431. env.append(V1EnvVar(name=k, value=v))
  432. return env
  433. def _meets_version(self, version: str, operator: Callable, default: bool) -> bool:
  434. """Determine whether environmentd matches a given version based on a comparison operator"""
  435. if self.tag is None:
  436. return default
  437. try:
  438. tag_version = MzVersion.parse_mz(self.tag)
  439. except ValueError:
  440. return default
  441. cmp_version = MzVersion.parse_without_prefix(version)
  442. return bool(operator(tag_version, cmp_version))
  443. def _meets_minimum_version(self, version: str) -> bool:
  444. return self._meets_version(version=version, operator=operator.ge, default=True)
  445. def _meets_maximum_version(self, version: str) -> bool:
  446. return self._meets_version(version=version, operator=operator.le, default=False)