schema_registry.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.mzcompose import DEFAULT_CONFLUENT_PLATFORM_VERSION
  10. from materialize.mzcompose.service import Service, ServiceConfig
  11. class SchemaRegistry(Service):
  12. def __init__(
  13. self,
  14. name: str = "schema-registry",
  15. aliases: list[str] = [],
  16. image: str = "confluentinc/cp-schema-registry",
  17. tag: str = DEFAULT_CONFLUENT_PLATFORM_VERSION,
  18. port: int = 8081,
  19. kafka_servers: list[tuple[str, str]] = [("kafka", "9092")],
  20. environment_extra: list[str] = [],
  21. depends_on_extra: list[str] = [],
  22. volumes: list[str] = [],
  23. platform: str | None = None,
  24. ) -> None:
  25. bootstrap_servers = ",".join(
  26. f"PLAINTEXT://{host}:{port}" for host, port in kafka_servers
  27. )
  28. environment = [
  29. # Under Docker, Kafka can be really slow, which means the default
  30. # Kafka connection timeout of 500ms is much too slow.
  31. "SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=10000",
  32. f"SCHEMA_REGISTRY_HOST_NAME={name}",
  33. f"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS={bootstrap_servers}",
  34. *environment_extra,
  35. ]
  36. config: ServiceConfig = {
  37. "image": f"{image}:{tag}",
  38. "ports": [port],
  39. "networks": {"default": {"aliases": aliases}},
  40. "environment": environment,
  41. "depends_on": {
  42. **{host: {"condition": "service_healthy"} for host, _ in kafka_servers},
  43. **{s: {"condition": "service_started"} for s in depends_on_extra},
  44. },
  45. "healthcheck": {
  46. "test": [
  47. "CMD",
  48. "curl",
  49. # We provide credentials in case the schema registry is
  50. # configured to require HTTP authentication, as there's
  51. # no health check endpoint that's excluded from
  52. # authentication requirements. The credentials are
  53. # safely ignored if the schema registry is not
  54. # configured to require them.
  55. "-fu",
  56. "materialize:sekurity",
  57. "localhost:8081",
  58. ],
  59. "interval": "1s",
  60. "start_period": "120s",
  61. },
  62. "volumes": volumes,
  63. }
  64. if platform:
  65. config["platform"] = platform
  66. super().__init__(
  67. name=name,
  68. config=config,
  69. )