materialized.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. import os
  11. import shutil
  12. import tempfile
  13. from enum import Enum
  14. from typing import Any
  15. from materialize import MZ_ROOT, docker
  16. from materialize.mz_version import MzVersion
  17. from materialize.mzcompose import (
  18. DEFAULT_CRDB_ENVIRONMENT,
  19. DEFAULT_MZ_ENVIRONMENT_ID,
  20. DEFAULT_MZ_VOLUMES,
  21. bootstrap_cluster_replica_size,
  22. cluster_replica_size_map,
  23. get_default_system_parameters,
  24. )
  25. from materialize.mzcompose.service import (
  26. Service,
  27. ServiceConfig,
  28. ServiceDependency,
  29. )
  30. from materialize.mzcompose.services.azurite import azure_blob_uri
  31. from materialize.mzcompose.services.minio import minio_blob_uri
  32. from materialize.mzcompose.services.postgres import METADATA_STORE
  33. class MaterializeEmulator(Service):
  34. """Just the Materialize Emulator with its defaults unchanged"""
  35. def __init__(self, image: str | None = None):
  36. name = "materialized"
  37. config: ServiceConfig = {
  38. "mzbuild": name,
  39. "ports": [6875, 6876, 6877, 6878, 26257],
  40. "healthcheck": {
  41. "test": ["CMD", "curl", "-f", "localhost:6878/api/readyz"],
  42. "interval": "1s",
  43. # A fully loaded Materialize can take a long time to start.
  44. "start_period": "600s",
  45. },
  46. }
  47. super().__init__(name=name, config=config)
  48. class Materialized(Service):
  49. class Size:
  50. DEFAULT_SIZE = 4
  51. def __init__(
  52. self,
  53. name: str | None = None,
  54. image: str | None = None,
  55. environment_extra: list[str] = [],
  56. volumes_extra: list[str] = [],
  57. depends_on: list[str] = [],
  58. memory: str | None = None,
  59. cpu: str | None = None,
  60. options: list[str] = [],
  61. persist_blob_url: str | None = None,
  62. default_size: int | str = Size.DEFAULT_SIZE,
  63. environment_id: str | None = None,
  64. propagate_crashes: bool = True,
  65. external_metadata_store: str | bool = False,
  66. external_blob_store: str | bool = False,
  67. blob_store_is_azure: bool = False,
  68. unsafe_mode: bool = True,
  69. restart: str | None = None,
  70. use_default_volumes: bool = True,
  71. ports: list[str] | None = None,
  72. system_parameter_defaults: dict[str, str] | None = None,
  73. additional_system_parameter_defaults: dict[str, str] | None = None,
  74. system_parameter_version: MzVersion | None = None,
  75. soft_assertions: bool = True,
  76. sanity_restart: bool = True,
  77. platform: str | None = None,
  78. healthcheck: list[str] | None = None,
  79. deploy_generation: int | None = None,
  80. force_migrations: str | None = None,
  81. publish: bool | None = None,
  82. stop_grace_period: str = "120s",
  83. metadata_store: str = METADATA_STORE,
  84. cluster_replica_size: dict[str, dict[str, Any]] | None = None,
  85. bootstrap_replica_size: str | None = None,
  86. default_replication_factor: int = 1,
  87. listeners_config_path: str = f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth.json",
  88. ) -> None:
  89. if name is None:
  90. name = "materialized"
  91. if healthcheck is None:
  92. healthcheck = ["CMD", "curl", "-f", "localhost:6878/api/readyz"]
  93. depends_graph: dict[str, ServiceDependency] = {
  94. s: {"condition": "service_started"} for s in depends_on
  95. }
  96. if bootstrap_replica_size is None:
  97. bootstrap_replica_size = bootstrap_cluster_replica_size()
  98. if cluster_replica_size is None:
  99. cluster_replica_size = cluster_replica_size_map()
  100. environment = [
  101. "MZ_NO_TELEMETRY=1",
  102. "MZ_NO_BUILTIN_CONSOLE=1",
  103. "MZ_EAT_MY_DATA=1",
  104. "MZ_TEST_ONLY_DUMMY_SEGMENT_CLIENT=true",
  105. f"MZ_SOFT_ASSERTIONS={int(soft_assertions)}",
  106. # The following settings can not be baked in the default image, as they
  107. # are enabled for testing purposes only
  108. "MZ_ORCHESTRATOR_PROCESS_TCP_PROXY_LISTEN_ADDR=0.0.0.0",
  109. "MZ_ORCHESTRATOR_PROCESS_PROMETHEUS_SERVICE_DISCOVERY_DIRECTORY=/mzdata/prometheus",
  110. "MZ_BOOTSTRAP_ROLE=materialize",
  111. # TODO move this to the listener config?
  112. "MZ_INTERNAL_PERSIST_PUBSUB_LISTEN_ADDR=0.0.0.0:6879",
  113. "MZ_PERSIST_PUBSUB_URL=http://127.0.0.1:6879",
  114. "MZ_AWS_CONNECTION_ROLE_ARN=arn:aws:iam::123456789000:role/MaterializeConnection",
  115. "MZ_AWS_EXTERNAL_ID_PREFIX=eb5cb59b-e2fe-41f3-87ca-d2176a495345",
  116. # Always use the persist catalog if the version has multiple implementations.
  117. "MZ_CATALOG_STORE=persist",
  118. # Please think twice before forwarding additional environment
  119. # variables from the host, as it's easy to write tests that are
  120. # then accidentally dependent on the state of the host machine.
  121. #
  122. # To dynamically change the environment during a workflow run,
  123. # use Composition.override.
  124. "MZ_LOG_FILTER",
  125. "CLUSTERD_LOG_FILTER",
  126. f"MZ_CLUSTER_REPLICA_SIZES={json.dumps(cluster_replica_size)}",
  127. f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
  128. f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
  129. f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
  130. f"MZ_BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
  131. f"MZ_BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
  132. f"MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
  133. # Note(SangJunBak): mz_system and mz_probe have no replicas by default in materialized
  134. # but we re-enable them here since many of our tests rely on them.
  135. f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
  136. f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
  137. f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
  138. *environment_extra,
  139. *DEFAULT_CRDB_ENVIRONMENT,
  140. ]
  141. image_version = None
  142. if not image:
  143. image_version = MzVersion.parse_cargo()
  144. elif ":" in image:
  145. image_version_str = image.split(":")[1]
  146. if docker.is_image_tag_of_release_version(image_version_str):
  147. image_version = MzVersion.parse_mz(image_version_str)
  148. if system_parameter_defaults is None:
  149. system_parameter_defaults = get_default_system_parameters(
  150. system_parameter_version or image_version
  151. )
  152. system_parameter_defaults["default_cluster_replication_factor"] = str(
  153. default_replication_factor
  154. )
  155. if additional_system_parameter_defaults is not None:
  156. system_parameter_defaults.update(additional_system_parameter_defaults)
  157. if len(system_parameter_defaults) > 0:
  158. environment += [
  159. "MZ_SYSTEM_PARAMETER_DEFAULT="
  160. + ";".join(
  161. f"{key}={value}" for key, value in system_parameter_defaults.items()
  162. )
  163. ]
  164. command = []
  165. if unsafe_mode:
  166. command += ["--unsafe-mode"]
  167. if not environment_id:
  168. environment_id = DEFAULT_MZ_ENVIRONMENT_ID
  169. command += [f"--environment-id={environment_id}"]
  170. if external_blob_store:
  171. blob_store = "azurite" if blob_store_is_azure else "minio"
  172. depends_graph[blob_store] = {"condition": "service_healthy"}
  173. address = blob_store if external_blob_store == True else external_blob_store
  174. persist_blob_url = (
  175. azure_blob_uri(address)
  176. if blob_store_is_azure
  177. else minio_blob_uri(address)
  178. )
  179. if persist_blob_url:
  180. command.append(f"--persist-blob-url={persist_blob_url}")
  181. if propagate_crashes:
  182. command += ["--orchestrator-process-propagate-crashes"]
  183. if deploy_generation is not None:
  184. command += [f"--deploy-generation={deploy_generation}"]
  185. if force_migrations is not None and image is None:
  186. command += [
  187. f"--unsafe-builtin-table-fingerprint-whitespace={force_migrations}",
  188. ]
  189. if not unsafe_mode:
  190. command += ["--unsafe-mode"]
  191. self.default_storage_size = "1" if default_size == 1 else f"{default_size}-1"
  192. self.default_replica_size = (
  193. "1"
  194. if default_size == 1
  195. else (
  196. f"{default_size}-{default_size}"
  197. if isinstance(default_size, int)
  198. else default_size
  199. )
  200. )
  201. if external_metadata_store:
  202. address = (
  203. metadata_store
  204. if external_metadata_store == True
  205. else external_metadata_store
  206. )
  207. depends_graph[metadata_store] = {"condition": "service_healthy"}
  208. command += [
  209. f"--persist-consensus-url=postgres://root@{address}:26257?options=--search_path=consensus",
  210. ]
  211. environment += [
  212. f"MZ_TIMESTAMP_ORACLE_URL=postgres://root@{address}:26257?options=--search_path=tsoracle",
  213. "MZ_NO_BUILTIN_POSTGRES=1",
  214. # For older Materialize versions
  215. "MZ_NO_BUILTIN_COCKROACH=1",
  216. # Set the adapter stash URL for older environments that need it (versions before
  217. # v0.92.0).
  218. f"MZ_ADAPTER_STASH_URL=postgres://root@{address}:26257?options=--search_path=adapter",
  219. ]
  220. command += [
  221. "--orchestrator-process-tcp-proxy-listen-addr=0.0.0.0",
  222. "--orchestrator-process-prometheus-service-discovery-directory=/mzdata/prometheus",
  223. ]
  224. command += options
  225. config: ServiceConfig = {
  226. # Use the service name as the hostname so that it is stable across
  227. # container recreation. (The default hostname is the container ID,
  228. # which changes when the container is recreated.) This is important
  229. # when using `external_metadata_store=False`, as the consensus/blob URLs
  230. # refer to the container's hostname, and we don't want those URLs to
  231. # change when the container is recreated.
  232. "hostname": name,
  233. }
  234. if publish is not None:
  235. config["publish"] = publish
  236. if image:
  237. config["image"] = image
  238. else:
  239. config["mzbuild"] = "materialized"
  240. if restart:
  241. policy, _, max_tries = restart.partition(":")
  242. if policy == "on-failure":
  243. environment += ["MZ_RESTART_ON_FAILURE=1"]
  244. if max_tries:
  245. environment += [f"MZ_RESTART_LIMIT={max_tries}"]
  246. elif policy == "no":
  247. pass
  248. else:
  249. raise RuntimeError(f"unknown restart policy: {policy}")
  250. # Depending on the Docker Compose version, this may either work or be
  251. # ignored with a warning. Unfortunately no portable way of setting the
  252. # memory limit is known.
  253. if memory or cpu:
  254. limits = {}
  255. if memory:
  256. limits["memory"] = memory
  257. if cpu:
  258. limits["cpus"] = cpu
  259. config["deploy"] = {"resources": {"limits": limits}}
  260. # Sanity restarts are rather slow and rarely find a bug, running them
  261. # on main and releases is enough to prevent regressions.
  262. if sanity_restart and (
  263. os.getenv("BUILDKITE_BRANCH") == "main" or os.getenv("BUILDKITE_TAG")
  264. ):
  265. # Workaround for https://github.com/docker/compose/issues/11133
  266. config["labels"] = {"sanity_restart": True}
  267. if platform:
  268. config["platform"] = platform
  269. volumes = []
  270. if image_version is None or image_version >= "v0.147.0-dev":
  271. assert os.path.exists(listeners_config_path)
  272. volumes.append(f"{listeners_config_path}:/listeners_config")
  273. environment.append("MZ_LISTENERS_CONFIG_PATH=/listeners_config")
  274. if image_version is None or image_version >= "v0.140.0-dev":
  275. if "MZ_CI_LICENSE_KEY" in os.environ:
  276. # We have to take care to write the license_key file atomically
  277. # so that it is always valid, even if multiple Materialized
  278. # objects are created concurrently.
  279. with tempfile.NamedTemporaryFile("w", delete=False) as tmp_file:
  280. tmp_path = tmp_file.name
  281. tmp_file.write(os.environ["MZ_CI_LICENSE_KEY"])
  282. os.chmod(tmp_path, 0o644)
  283. shutil.move(tmp_path, "license_key")
  284. environment += ["MZ_LICENSE_KEY=/license_key/license_key"]
  285. volumes += [f"{os.getcwd()}/license_key:/license_key/license_key"]
  286. if use_default_volumes:
  287. volumes += DEFAULT_MZ_VOLUMES
  288. volumes += volumes_extra
  289. config.update(
  290. {
  291. "depends_on": depends_graph,
  292. "command": command,
  293. "ports": [6875, 6876, 6877, 6878, 26257],
  294. "environment": environment,
  295. "volumes": volumes,
  296. "tmpfs": ["/tmp"],
  297. "healthcheck": {
  298. "test": healthcheck,
  299. "interval": "1s",
  300. # A fully loaded Materialize can take a long time to start.
  301. "start_period": "600s",
  302. },
  303. "stop_grace_period": stop_grace_period,
  304. }
  305. )
  306. if ports:
  307. config.update(
  308. {
  309. "allow_host_ports": True,
  310. "ports": ports,
  311. }
  312. )
  313. super().__init__(name=name, config=config)
  314. class DeploymentStatus(Enum):
  315. """See DeploymentStateInner for reference"""
  316. INITIALIZING = "Initializing"
  317. READY_TO_PROMOTE = "ReadyToPromote"
  318. PROMOTING = "Promoting"
  319. IS_LEADER = "IsLeader"
  320. LEADER_STATUS_HEALTHCHECK: list[str] = [
  321. "CMD",
  322. "curl",
  323. "-f",
  324. "localhost:6878/api/leader/status",
  325. ]