title: "CockroachDB CDC using Kafka and Changefeeds" description: "How to propagate Change Data Capture (CDC) data from a CockroachDB database to Materialize" menu: main:
parent: "crdb"
name: "Using Kafka and Changefeeds"
identifier: "crdb-kafka-changefeeds"
weight: 5
{{< tip >}} {{< guided-tour-blurb-for-ingest-data >}} {{< /tip >}}
Change Data Capture (CDC) allows you to track and propagate changes in a CockroachDB database to downstream consumers. In this guide, we’ll cover how to use Materialize to create and efficiently maintain real-time views with incrementally updated results on top of CockroachDB CDC data.
[//]: # "TODO(morsapaes) Add Before you begin section for consistency and details like the minimum required Cockroach version to follow this."
[//]: # "TODO(morsapaes) Add more detailed steps and best practices, including checking if rangefeeds are already enabled (true for CockroachDB serverless), creating a dedicated user for replication, granting it the appropriate permissions, using CDC queries to reduce the amount of data sent over the wire, and so on."
As a first step, you must ensure rangefeeds are enabled in your CockroachDB instance so you can create changefeeds for the tables you want to replicate to Materialize.
As a user with the admin
role, enable the kv.rangefeed.enabled
cluster setting:
SET CLUSTER SETTING kv.rangefeed.enabled = true;
[//]: # "TODO(morsapaes) Instructions to create a changefeed vary depending on whether users are on CockroachDB core or enterprise."
Changefeeds
capture row-level changes resulting from INSERT
, UPDATE
, and DELETE
operations against CockroachDB tables and publish them as events to Kafka
(or other Kafka API-compatible broker). You can then use the Kafka source
to consume these changefeed events into Materialize, making the data available
for transformation.
Create a changefeed for each table you want to replicate:
CREATE CHANGEFEED FOR TABLE my_table
INTO 'kafka://broker:9092'
WITH format = avro,
confluent_schema_registry = 'http://registry:8081',
diff,
envelope = wrapped
We recommend creating changefeeds using the Avro format (format = avro
) and
the default diff envelope
(envelope = wrapped
), which is compatible with the message format
Materialize expects. Each table will produce data to a dedicated Kafka
topic, which can then be consumed by Materialize.
For detailed instructions on configuring your CockroachDB instance for CDC, refer to the CockroachDB documentation.
{{< note >}}
If you are prototyping and already have a cluster to host your Kafka
source (e.g. quickstart
), you can skip this step. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
{{< /note >}}
{{% kafka/cockroachdb/create-a-cluster %}}
[//]: # "TODO(morsapaes) Incorporate all options for network security and authentication later on. Starting with a simplified version that is consistent with the PostgreSQL/MySQL guides and can be used to model other Kafka-relate integration guides after."
Now that you've created an ingestion cluster, you can connect Materialize to
your Kafka broker and start ingesting data. The exact steps depend on your
authentication and networking configurations, so refer to the
CREATE CONNECTION
documentation for further
guidance.
In the SQL Shell, or your preferred SQL
client connected to Materialize, use the CREATE SECRET
command to securely store the credentials to connect to your Kafka broker
and, optionally, schema registry:
CREATE SECRET kafka_ssl_key AS '<BROKER_SSL_KEY>';
CREATE SECRET kafka_ssl_crt AS '<BROKER_SSL_CRT>';
CREATE SECRET csr_password AS '<CSR_PASSWORD>';
Use the CREATE CONNECTION
command to create
a connection object with access and authentication details for Materialize to
use:
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER '<host>',
SSL KEY = SECRET kafka_ssl_key,
SSL CERTIFICATE = SECRET kafka_ssl_crt
);
If you're using a schema registry, create an additional connection object:
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL '<csr_url>',
SSL KEY = SECRET csr_ssl_key,
SSL CERTIFICATE = SECRET csr_ssl_crt,
USERNAME = 'foo',
PASSWORD = SECRET csr_password
);
Use the CREATE SOURCE
command to connect Materialize
to your Kafka broker and start ingesting data from the target topic:
CREATE SOURCE kafka_repl
IN CLUSTER ingest_kafka
FROM KAFKA CONNECTION kafka_connection (TOPIC 'my_table')
-- CockroachDB's default envelope structure for changefeed messages is
-- compatible with the Debezium format, so you can use ENVELOPE DEBEZIUM
-- to interpret the data.
ENVELOPE DEBEZIUM;
By default, the source will be created in the active cluster; to use a
different cluster, use the IN CLUSTER
clause.
{{% kafka/cockroachdb/check-the-ingestion-status %}}
{{% ingest-data/ingest-data-kafka-debezium-view %}}
{{% ingest-data/ingest-data-kafka-debezium-index %}}
With Materialize ingesting your CockroachDB data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.
Explore your data with SHOW SOURCES
and SELECT
.
Compute real-time results in memory with CREATE VIEW
and CREATE INDEX
or in durable
storage with CREATE MATERIALIZED VIEW
.
Serve results to a PostgreSQL-compatible SQL client or driver with SELECT
or SUBSCRIBE
or to an external message broker with
CREATE SINK
.
Check out the tools and integrations supported by Materialize.