kafka.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.mzcompose import (
  10. DEFAULT_CONFLUENT_PLATFORM_VERSION,
  11. )
  12. from materialize.mzcompose.service import (
  13. Service,
  14. ServiceConfig,
  15. )
  16. class Kafka(Service):
  17. def __init__(
  18. self,
  19. name: str = "kafka",
  20. image: str = "confluentinc/cp-kafka",
  21. tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
  22. ports: list[str | int] | None = None,
  23. allow_host_ports: bool = False,
  24. auto_create_topics: bool = False,
  25. broker_id: int = 1,
  26. offsets_topic_replication_factor: int = 1,
  27. advertised_listeners: list[str] = [],
  28. environment: list[str] = [
  29. "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
  30. "KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false",
  31. "KAFKA_MIN_INSYNC_REPLICAS=1",
  32. "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
  33. "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
  34. "KAFKA_MESSAGE_MAX_BYTES=15728640",
  35. "KAFKA_REPLICA_FETCH_MAX_BYTES=15728640",
  36. "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100",
  37. ],
  38. environment_extra: list[str] = [],
  39. depends_on_extra: list[str] = [],
  40. volumes: list[str] = [],
  41. platform: str | None = None,
  42. ) -> None:
  43. if not advertised_listeners:
  44. advertised_listeners = [f"PLAINTEXT://{name}:9092"]
  45. environment = [
  46. *environment,
  47. f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
  48. f"KAFKA_BROKER_ID={broker_id}",
  49. *environment_extra,
  50. ]
  51. if ports is None:
  52. ports = [l.split(":")[-1] for l in advertised_listeners]
  53. config: ServiceConfig = {
  54. "image": f"{image}:{tag}",
  55. "ports": ports,
  56. "allow_host_ports": allow_host_ports,
  57. "environment": [
  58. *environment,
  59. f"KAFKA_AUTO_CREATE_TOPICS_ENABLE={auto_create_topics}",
  60. f"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR={offsets_topic_replication_factor}",
  61. ],
  62. "depends_on": {
  63. "zookeeper": {"condition": "service_healthy"},
  64. **{s: {"condition": "service_started"} for s in depends_on_extra},
  65. },
  66. "healthcheck": {
  67. "test": ["CMD", "nc", "-z", "localhost", "9092"],
  68. "interval": "1s",
  69. "start_period": "120s",
  70. },
  71. "volumes": volumes,
  72. }
  73. if platform:
  74. config["platform"] = platform
  75. super().__init__(name=name, config=config)