12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from materialize.mzcompose import (
- DEFAULT_CONFLUENT_PLATFORM_VERSION,
- )
- from materialize.mzcompose.service import (
- Service,
- ServiceConfig,
- )
- class Kafka(Service):
- def __init__(
- self,
- name: str = "kafka",
- image: str = "confluentinc/cp-kafka",
- tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
- ports: list[str | int] | None = None,
- allow_host_ports: bool = False,
- auto_create_topics: bool = False,
- broker_id: int = 1,
- offsets_topic_replication_factor: int = 1,
- advertised_listeners: list[str] = [],
- environment: list[str] = [
- "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
- "KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false",
- "KAFKA_MIN_INSYNC_REPLICAS=1",
- "KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1",
- "KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1",
- "KAFKA_MESSAGE_MAX_BYTES=15728640",
- "KAFKA_REPLICA_FETCH_MAX_BYTES=15728640",
- "KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=100",
- ],
- environment_extra: list[str] = [],
- depends_on_extra: list[str] = [],
- volumes: list[str] = [],
- platform: str | None = None,
- ) -> None:
- if not advertised_listeners:
- advertised_listeners = [f"PLAINTEXT://{name}:9092"]
- environment = [
- *environment,
- f"KAFKA_ADVERTISED_LISTENERS={','.join(advertised_listeners)}",
- f"KAFKA_BROKER_ID={broker_id}",
- *environment_extra,
- ]
- if ports is None:
- ports = [l.split(":")[-1] for l in advertised_listeners]
- config: ServiceConfig = {
- "image": f"{image}:{tag}",
- "ports": ports,
- "allow_host_ports": allow_host_ports,
- "environment": [
- *environment,
- f"KAFKA_AUTO_CREATE_TOPICS_ENABLE={auto_create_topics}",
- f"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR={offsets_topic_replication_factor}",
- ],
- "depends_on": {
- "zookeeper": {"condition": "service_healthy"},
- **{s: {"condition": "service_started"} for s in depends_on_extra},
- },
- "healthcheck": {
- "test": ["CMD", "nc", "-z", "localhost", "9092"],
- "interval": "1s",
- "start_period": "120s",
- },
- "volumes": volumes,
- }
- if platform:
- config["platform"] = platform
- super().__init__(name=name, config=config)
|