123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from kubernetes.client import (
- V1Container,
- V1ContainerPort,
- V1Deployment,
- V1DeploymentSpec,
- V1EnvVar,
- V1LabelSelector,
- V1ObjectMeta,
- V1PodSpec,
- V1PodTemplateSpec,
- V1Service,
- V1ServicePort,
- V1ServiceSpec,
- )
- from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
- from materialize.cloudtest.k8s.api.k8s_deployment import K8sDeployment
- from materialize.cloudtest.k8s.api.k8s_resource import K8sResource
- from materialize.cloudtest.k8s.api.k8s_service import K8sService
- class DebeziumDeployment(K8sDeployment):
- def __init__(
- self, namespace: str, redpanda_namespace: str, apply_node_selectors: bool
- ) -> None:
- super().__init__(namespace)
- ports = [V1ContainerPort(container_port=8083, name="debezium")]
- env = [
- V1EnvVar(
- name="BOOTSTRAP_SERVERS", value=f"redpanda.{redpanda_namespace}:9092"
- ),
- V1EnvVar(name="CONFIG_STORAGE_TOPIC", value="connect_configs"),
- V1EnvVar(name="OFFSET_STORAGE_TOPIC", value="connect_offsets"),
- V1EnvVar(name="STATUS_STORAGE_TOPIC", value="connect_statuses"),
- # We don't support JSON, so ensure that connect uses AVRO to encode messages and CSR to
- # record the schema
- V1EnvVar(
- name="KEY_CONVERTER", value="io.confluent.connect.avro.AvroConverter"
- ),
- V1EnvVar(
- name="VALUE_CONVERTER", value="io.confluent.connect.avro.AvroConverter"
- ),
- V1EnvVar(
- name="CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL",
- value=f"http://redpanda.{redpanda_namespace}:8081",
- ),
- V1EnvVar(
- name="CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL",
- value=f"http://redpanda.{redpanda_namespace}:8081",
- ),
- V1EnvVar(
- name="CONNECT_OFFSET_COMMIT_POLICY", value="AlwaysCommitOffsetPolicy"
- ),
- V1EnvVar(name="CONNECT_ERRORS_RETRY_TIMEOUT", value="60000"),
- V1EnvVar(name="CONNECT_ERRORS_RETRY_DELAY_MAX_MS", value="1000"),
- ]
- container = V1Container(
- name="debezium", image="debezium/connect:1.9.6.Final", env=env, ports=ports
- )
- node_selector = None
- if apply_node_selectors:
- node_selector = {"supporting-services": "true"}
- pod_spec = V1PodSpec(
- containers=[container],
- node_selector=node_selector,
- )
- template = V1PodTemplateSpec(
- metadata=V1ObjectMeta(namespace=namespace, labels={"app": "debezium"}),
- spec=pod_spec,
- )
- selector = V1LabelSelector(match_labels={"app": "debezium"})
- spec = V1DeploymentSpec(replicas=1, template=template, selector=selector)
- self.deployment = V1Deployment(
- api_version="apps/v1",
- kind="Deployment",
- metadata=V1ObjectMeta(name="debezium", namespace=namespace),
- spec=spec,
- )
- class DebeziumService(K8sService):
- def __init__(
- self,
- namespace: str,
- ) -> None:
- super().__init__(namespace)
- ports = [
- V1ServicePort(name="debezium", port=8083),
- ]
- self.service = V1Service(
- metadata=V1ObjectMeta(
- name="debezium", namespace=namespace, labels={"app": "debezium"}
- ),
- spec=V1ServiceSpec(
- type="NodePort", ports=ports, selector={"app": "debezium"}
- ),
- )
- def debezium_resources(
- debezium_namespace: str = DEFAULT_K8S_NAMESPACE,
- redpanda_namespace: str = DEFAULT_K8S_NAMESPACE,
- apply_node_selectors: bool = False,
- ) -> list[K8sResource]:
- return [
- DebeziumDeployment(
- debezium_namespace,
- redpanda_namespace=redpanda_namespace,
- apply_node_selectors=apply_node_selectors,
- ),
- DebeziumService(debezium_namespace),
- ]
|