clusterd.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import json
  10. from materialize.mzcompose import DEFAULT_MZ_ENVIRONMENT_ID, DEFAULT_MZ_VOLUMES
  11. from materialize.mzcompose.service import (
  12. Service,
  13. ServiceConfig,
  14. )
  15. class Clusterd(Service):
  16. def __init__(
  17. self,
  18. name: str = "clusterd",
  19. image: str | None = None,
  20. environment_id: str | None = None,
  21. environment_extra: list[str] = [],
  22. memory: str | None = None,
  23. cpu: str | None = None,
  24. options: list[str] = [],
  25. restart: str = "no",
  26. stop_grace_period: str = "120s",
  27. scratch_directory: str = "/scratch",
  28. volumes: list[str] = [],
  29. workers: int = 1,
  30. process_names: list[str] = [],
  31. mz_service: str = "materialized",
  32. ) -> None:
  33. environment = [
  34. "CLUSTERD_LOG_FILTER",
  35. f"CLUSTERD_GRPC_HOST={name}",
  36. "MZ_SOFT_ASSERTIONS=1",
  37. "MZ_EAT_MY_DATA=1",
  38. f"CLUSTERD_PERSIST_PUBSUB_URL=http://{mz_service}:6879",
  39. *environment_extra,
  40. ]
  41. if not environment_id:
  42. environment_id = DEFAULT_MZ_ENVIRONMENT_ID
  43. environment += [f"CLUSTERD_ENVIRONMENT_ID={environment_id}"]
  44. process_names = process_names if process_names else [name]
  45. process_index = process_names.index(name)
  46. compute_timely_config = timely_config(process_names, 2102, workers, 16)
  47. storage_timely_config = timely_config(process_names, 2103, workers, 1337)
  48. environment += [
  49. f"CLUSTERD_PROCESS={process_index}",
  50. f"CLUSTERD_COMPUTE_TIMELY_CONFIG={compute_timely_config}",
  51. f"CLUSTERD_STORAGE_TIMELY_CONFIG={storage_timely_config}",
  52. ]
  53. options = [f"--scratch-directory={scratch_directory}", *options]
  54. config: ServiceConfig = {}
  55. if image:
  56. config["image"] = image
  57. else:
  58. config["mzbuild"] = "clusterd"
  59. # Depending on the Docker Compose version, this may either work or be
  60. # ignored with a warning. Unfortunately no portable way of setting the
  61. # memory limit is known.
  62. if memory or cpu:
  63. limits = {}
  64. if memory:
  65. limits["memory"] = memory
  66. if cpu:
  67. limits["cpus"] = cpu
  68. config["deploy"] = {"resources": {"limits": limits}}
  69. config.update(
  70. {
  71. "command": options,
  72. "ports": [2100, 2101, 6878],
  73. "environment": environment,
  74. "volumes": volumes or DEFAULT_MZ_VOLUMES,
  75. "restart": restart,
  76. "stop_grace_period": stop_grace_period,
  77. }
  78. )
  79. super().__init__(name=name, config=config)
  80. def timely_config(
  81. process_names: list[str],
  82. port: int,
  83. workers: int,
  84. arrangement_exert_proportionality: int,
  85. ) -> str:
  86. config = {
  87. "workers": workers,
  88. "process": 0,
  89. "addresses": [f"{n}:{port}" for n in process_names],
  90. "arrangement_exert_proportionality": arrangement_exert_proportionality,
  91. "enable_zero_copy": False,
  92. "enable_zero_copy_lgalloc": False,
  93. "zero_copy_limit": None,
  94. }
  95. return json.dumps(config)