123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- import json
- from materialize.mzcompose import DEFAULT_MZ_ENVIRONMENT_ID, DEFAULT_MZ_VOLUMES
- from materialize.mzcompose.service import (
- Service,
- ServiceConfig,
- )
- class Clusterd(Service):
- def __init__(
- self,
- name: str = "clusterd",
- image: str | None = None,
- environment_id: str | None = None,
- environment_extra: list[str] = [],
- memory: str | None = None,
- cpu: str | None = None,
- options: list[str] = [],
- restart: str = "no",
- stop_grace_period: str = "120s",
- scratch_directory: str = "/scratch",
- volumes: list[str] = [],
- workers: int = 1,
- process_names: list[str] = [],
- mz_service: str = "materialized",
- ) -> None:
- environment = [
- "CLUSTERD_LOG_FILTER",
- f"CLUSTERD_GRPC_HOST={name}",
- "MZ_SOFT_ASSERTIONS=1",
- "MZ_EAT_MY_DATA=1",
- f"CLUSTERD_PERSIST_PUBSUB_URL=http://{mz_service}:6879",
- *environment_extra,
- ]
- if not environment_id:
- environment_id = DEFAULT_MZ_ENVIRONMENT_ID
- environment += [f"CLUSTERD_ENVIRONMENT_ID={environment_id}"]
- process_names = process_names if process_names else [name]
- process_index = process_names.index(name)
- compute_timely_config = timely_config(process_names, 2102, workers, 16)
- storage_timely_config = timely_config(process_names, 2103, workers, 1337)
- environment += [
- f"CLUSTERD_PROCESS={process_index}",
- f"CLUSTERD_COMPUTE_TIMELY_CONFIG={compute_timely_config}",
- f"CLUSTERD_STORAGE_TIMELY_CONFIG={storage_timely_config}",
- ]
- options = [f"--scratch-directory={scratch_directory}", *options]
- config: ServiceConfig = {}
- if image:
- config["image"] = image
- else:
- config["mzbuild"] = "clusterd"
- # Depending on the Docker Compose version, this may either work or be
- # ignored with a warning. Unfortunately no portable way of setting the
- # memory limit is known.
- if memory or cpu:
- limits = {}
- if memory:
- limits["memory"] = memory
- if cpu:
- limits["cpus"] = cpu
- config["deploy"] = {"resources": {"limits": limits}}
- config.update(
- {
- "command": options,
- "ports": [2100, 2101, 6878],
- "environment": environment,
- "volumes": volumes or DEFAULT_MZ_VOLUMES,
- "restart": restart,
- "stop_grace_period": stop_grace_period,
- }
- )
- super().__init__(name=name, config=config)
- def timely_config(
- process_names: list[str],
- port: int,
- workers: int,
- arrangement_exert_proportionality: int,
- ) -> str:
- config = {
- "workers": workers,
- "process": 0,
- "addresses": [f"{n}:{port}" for n in process_names],
- "arrangement_exert_proportionality": arrangement_exert_proportionality,
- "enable_zero_copy": False,
- "enable_zero_copy_lgalloc": False,
- "zero_copy_limit": None,
- }
- return json.dumps(config)
|