12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from materialize.mzcompose import DEFAULT_CONFLUENT_PLATFORM_VERSION
- from materialize.mzcompose.service import Service, ServiceConfig
- class SchemaRegistry(Service):
- def __init__(
- self,
- name: str = "schema-registry",
- aliases: list[str] = [],
- image: str = "confluentinc/cp-schema-registry",
- tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
- port: int = 8081,
- kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
- environment_extra: list[str] = [],
- depends_on_extra: list[str] = [],
- volumes: list[str] = [],
- platform: str | None = None,
- ) -> None:
- bootstrap_servers = ",".join(
- f"PLAINTEXT://{host}:{port}" for host, port in kafka_servers
- )
- environment = [
- # Under Docker, Kafka can be really slow, which means the default
- # Kafka connection timeout of 500ms is much too slow.
- "SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000",
- f"SCHEMA_REGISTRY_HOST_NAME={name}",
- f"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS={bootstrap_servers}",
- *environment_extra,
- ]
- config: ServiceConfig = {
- "image": f"{image}:{tag}",
- "ports": [port],
- "networks": {"default": {"aliases": aliases}},
- "environment": environment,
- "depends_on": {
- **{host: {"condition": "service_healthy"} for host, _ in kafka_servers},
- **{s: {"condition": "service_started"} for s in depends_on_extra},
- },
- "healthcheck": {
- "test": [
- "CMD",
- "curl",
- # We provide credentials in case the schema registry is
- # configured to require HTTP authentication, as there's
- # no health check endpoint that's excluded from
- # authentication requirements. The credentials are
- # safely ignored if the schema registry is not
- # configured to require them.
- "-fu",
- "materialize:sekurity",
- "localhost:8081",
- ],
- "interval": "1s",
- "start_period": "120s",
- },
- "volumes": volumes,
- }
- if platform:
- config["platform"] = platform
- super().__init__(
- name=name,
- config=config,
- )
|