debezium.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from kubernetes.client import (
  10. V1Container,
  11. V1ContainerPort,
  12. V1Deployment,
  13. V1DeploymentSpec,
  14. V1EnvVar,
  15. V1LabelSelector,
  16. V1ObjectMeta,
  17. V1PodSpec,
  18. V1PodTemplateSpec,
  19. V1Service,
  20. V1ServicePort,
  21. V1ServiceSpec,
  22. )
  23. from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
  24. from materialize.cloudtest.k8s.api.k8s_deployment import K8sDeployment
  25. from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
  26. from materialize.cloudtest.k8s.api.k8s_service import K8sService
  27. class DebeziumDeployment(K8sDeployment):
  28. def __init__(
  29. self, namespace: str, redpanda_namespace: str, apply_node_selectors: bool
  30. ) -> None:
  31. super().__init__(namespace)
  32. ports = [V1ContainerPort(container_port=8083, name="debezium")]
  33. env = [
  34. V1EnvVar(
  35. name="BOOTSTRAP_SERVERS", value=f"redpanda.{redpanda_namespace}:9092"
  36. ),
  37. V1EnvVar(name="CONFIG_STORAGE_TOPIC", value="connect_configs"),
  38. V1EnvVar(name="OFFSET_STORAGE_TOPIC", value="connect_offsets"),
  39. V1EnvVar(name="STATUS_STORAGE_TOPIC", value="connect_statuses"),
  40. # We don't support JSON, so ensure that connect uses AVRO to encode messages and CSR to
  41. # record the schema
  42. V1EnvVar(
  43. name="KEY_CONVERTER", value="io.confluent.connect.avro.AvroConverter"
  44. ),
  45. V1EnvVar(
  46. name="VALUE_CONVERTER", value="io.confluent.connect.avro.AvroConverter"
  47. ),
  48. V1EnvVar(
  49. name="CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL",
  50. value=f"http://redpanda.{redpanda_namespace}:8081",
  51. ),
  52. V1EnvVar(
  53. name="CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL",
  54. value=f"http://redpanda.{redpanda_namespace}:8081",
  55. ),
  56. V1EnvVar(
  57. name="CONNECT_OFFSET_COMMIT_POLICY", value="AlwaysCommitOffsetPolicy"
  58. ),
  59. V1EnvVar(name="CONNECT_ERRORS_RETRY_TIMEOUT", value="60000"),
  60. V1EnvVar(name="CONNECT_ERRORS_RETRY_DELAY_MAX_MS", value="1000"),
  61. ]
  62. container = V1Container(
  63. name="debezium", image="debezium/connect:1.9.6.Final", env=env, ports=ports
  64. )
  65. node_selector = None
  66. if apply_node_selectors:
  67. node_selector = {"supporting-services": "true"}
  68. pod_spec = V1PodSpec(
  69. containers=[container],
  70. node_selector=node_selector,
  71. )
  72. template = V1PodTemplateSpec(
  73. metadata=V1ObjectMeta(namespace=namespace, labels={"app": "debezium"}),
  74. spec=pod_spec,
  75. )
  76. selector = V1LabelSelector(match_labels={"app": "debezium"})
  77. spec = V1DeploymentSpec(replicas=1, template=template, selector=selector)
  78. self.deployment = V1Deployment(
  79. api_version="apps/v1",
  80. kind="Deployment",
  81. metadata=V1ObjectMeta(name="debezium", namespace=namespace),
  82. spec=spec,
  83. )
  84. class DebeziumService(K8sService):
  85. def __init__(
  86. self,
  87. namespace: str,
  88. ) -> None:
  89. super().__init__(namespace)
  90. ports = [
  91. V1ServicePort(name="debezium", port=8083),
  92. ]
  93. self.service = V1Service(
  94. metadata=V1ObjectMeta(
  95. name="debezium", namespace=namespace, labels={"app": "debezium"}
  96. ),
  97. spec=V1ServiceSpec(
  98. type="NodePort", ports=ports, selector={"app": "debezium"}
  99. ),
  100. )
  101. def debezium_resources(
  102. debezium_namespace: str = DEFAULT_K8S_NAMESPACE,
  103. redpanda_namespace: str = DEFAULT_K8S_NAMESPACE,
  104. apply_node_selectors: bool = False,
  105. ) -> list[K8sResource]:
  106. return [
  107. DebeziumDeployment(
  108. debezium_namespace,
  109. redpanda_namespace=redpanda_namespace,
  110. apply_node_selectors=apply_node_selectors,
  111. ),
  112. DebeziumService(debezium_namespace),
  113. ]