debezium.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.mzcompose.service import (
  10. Service,
  11. ServiceDependency,
  12. )
  13. class Debezium(Service):
  14. def __init__(
  15. self,
  16. name: str = "debezium",
  17. port: int = 8083,
  18. redpanda: bool = False,
  19. environment: list[str] = [
  20. "CONNECT_BOOTSTRAP_SERVERS=kafka:9092",
  21. "CONNECT_GROUP_ID=connect",
  22. "CONNECT_CONFIG_STORAGE_TOPIC=connect_configs",
  23. "CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1",
  24. "CONNECT_OFFSET_STORAGE_TOPIC=connect_offsets",
  25. "CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1",
  26. "CONNECT_STATUS_STORAGE_TOPIC=connect_statuses",
  27. "CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1",
  28. # We don't support JSON, so ensure that connect uses Avro to encode
  29. # messages and CSR to record the schema.
  30. "CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter",
  31. "CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter",
  32. "CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081",
  33. "CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081",
  34. "CONNECT_OFFSET_COMMIT_POLICY=AlwaysCommitOffsetPolicy",
  35. "CONNECT_ERRORS_RETRY_TIMEOUT=60000",
  36. "CONNECT_ERRORS_RETRY_DELAY_MAX_MS=1000",
  37. ],
  38. ) -> None:
  39. depends_on: dict[str, ServiceDependency] = {
  40. "kafka": {"condition": "service_healthy"},
  41. "schema-registry": {"condition": "service_healthy"},
  42. }
  43. environment.append(f"CONNECT_REST_ADVERTISED_HOST_NAME={name}")
  44. if redpanda:
  45. depends_on = {"redpanda": {"condition": "service_healthy"}}
  46. super().__init__(
  47. name=name,
  48. config={
  49. "mzbuild": "debezium",
  50. "init": True,
  51. "ports": [port],
  52. "environment": environment,
  53. "depends_on": depends_on,
  54. "healthcheck": {
  55. "test": ["CMD", "curl", "-f", "localhost:8083"],
  56. "interval": "1s",
  57. "start_period": "120s",
  58. },
  59. },
  60. )