title: "PostgreSQL CDC using Kafka and Debezium" description: "How to propagate Change Data Capture (CDC) data from a PostgreSQL database to Materialize using Kafka and Debezium" aliases:
{{< warning >}} You can use Debezium to propagate Change Data Capture (CDC) data to Materialize from a PostgreSQL database, but we strongly recommend using the native PostgreSQL source instead. {{</ warning >}}
{{< guided-tour-blurb-for-ingest-data >}}
Change Data Capture (CDC) allows you to track and propagate changes in a
PostgreSQL database to downstream consumers based on its Write-Ahead Log
(WAL
). In this guide, we’ll cover how to use Materialize to create and
efficiently maintain real-time views with incrementally updated results
on top of CDC data.
You can use Debezium and the Kafka source
to propagate CDC data from PostgreSQL to Materialize in the unlikely event that
using thenative PostgreSQL source is not an
option. Debezium captures row-level changes resulting from INSERT
, UPDATE
and DELETE
operations in the upstream database and publishes them as events
to Kafka using Kafka Connect-compatible connectors.
Minimum requirements: PostgreSQL 11+
Before deploying a Debezium connector, you need to ensure that the upstream database is configured to support logical replication.
{{< tabs >}} {{< tab "Self-hosted">}}
As a superuser:
Check the wal_level
configuration
setting:
SHOW wal_level;
The default value is replica
. For CDC, you'll need to set it to logical
in the database configuration file (postgresql.conf
). Keep in mind that
changing the wal_level
requires a restart of the PostgreSQL instance and
can affect database performance.
Restart the database so all changes can take effect.
{{< /tab >}}
{{< tab "AWS RDS">}}
We recommend following the AWS RDS documentation for detailed information on logical replication configuration and best practices.
As a superuser (rds_superuser
):
Create a custom RDS parameter group and associate it with your instance. You will not be able to set custom parameters on the default RDS parameter groups.
In the custom RDS parameter group, set the rds.logical_replication
static
parameter to 1
.
Add the egress IP addresses associated with your Materialize region to the
security group of the RDS instance. You can find these addresses by querying
the mz_egress_ips
table in Materialize.
Restart the database so all changes can take effect.
{{< /tab >}}
{{< tab "AWS Aurora">}}
{{< note >}} Aurora Serverless (v1) does not support logical replication, so it's not possible to use this service with Materialize. {{</ note >}}
We recommend following the AWS Aurora documentation for detailed information on logical replication configuration and best practices.
As a superuser:
Create a DB cluster parameter group for your instance using the following settings:
Set Parameter group family to your version of Aurora PostgreSQL.
Set Type to DB Cluster Parameter Group.
In the DB cluster parameter group, set the rds.logical_replication
static
parameter to 1
.
In the DB cluster parameter group, set reasonable values for
max_replication_slots
, max_wal_senders
, max_logical_replication_workers
,
and max_worker_processes parameters
based on your expected usage.
Add the egress IP addresses associated with your Materialize region to the
security group of the DB instance. You can find these addresses by querying the
mz_egress_ips
table in Materialize.
Restart the database so all changes can take effect.
{{< /tab >}}
{{< tab "Azure DB">}}
We recommend following the Azure DB for PostgreSQL documentation for detailed information on logical replication configuration and best practices.
In the Azure portal, or using the Azure CLI, enable logical replication for the PostgreSQL instance.
Add the egress IP addresses associated with your Materialize region to the
list of allowed IP addresses under the "Connections security" menu. You can
find these addresses by querying the mz_egress_ips
table in Materialize.
Restart the database so all changes can take effect.
{{< /tab >}}
{{< tab "Cloud SQL">}}
We recommend following the Cloud SQL for PostgreSQL documentation for detailed information on logical replication configuration and best practices.
As a superuser (cloudsqlsuperuser
):
In the Google Cloud Console, enable logical replication by setting the
cloudsql.logical_decoding
configuration parameter to on
.
Add the egress IP addresses associated with your Materialize region to the
list of allowed IP addresses. You can find these addresses by querying the
mz_egress_ips
table in Materialize.
Restart the database so all changes can take effect.
{{< /tab >}}
{{< /tabs >}}
Once logical replication is enabled:
Grant enough privileges to ensure Debezium can operate in the database. The specific privileges will depend on how much control you want to give to the replication user, so we recommend following the Debezium documentation.
If a table that you want to replicate has a primary key defined, you can
use your default replica identity value. If a table you want to replicate
has no primary key defined, you must set the replica identity value to
FULL
:
ALTER TABLE repl_table REPLICA IDENTITY FULL;
This setting determines the amount of information that is written to the WAL
in UPDATE
and DELETE
operations. Setting it to FULL
will include the
previous values of all the table’s columns in the change events.
As a heads up, you should expect a performance hit in the database from increased CPU usage. For more information, see the PostgreSQL documentation.
Minimum requirements: Debezium 1.5+
Debezium is deployed as a set of Kafka Connect-compatible connectors, so you first need to define a SQL connector configuration and then start the connector by adding it to Kafka Connect.
{{< warning >}}
If you deploy the PostgreSQL Debezium connector in Confluent Cloud,
you must override the default value of After-state only
to false
.
{{</ warning >}}
{{< tabs >}} {{< tab "Debezium 1.5+">}}
Create a connector configuration file and save it as register-postgres.json
:
{
"name": "your-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name":"pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "pg_repl",
"table.include.list": "public.table1",
"publication.autocreate.mode":"filtered",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": false
}
}
You can read more about each configuration property in the Debezium documentation.
By default, the connector writes events for each table to a Kafka topic
named serverName.schemaName.tableName
.
Start the PostgreSQL Debezium connector using the configuration file:
export CURRENT_HOST='<your-host>'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://$CURRENT_HOST:8083/connectors/ -d @register-postgres.json
Check that the connector is running:
curl http://$CURRENT_HOST:8083/connectors/your-connector/status
The first time it connects to a PostgreSQL server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--topic pg_repl.public.table1
{{< /tab >}} {{< tab "Debezium 2.0+">}}
Beginning with Debezium 2.0.0, Confluent Schema Registry support is not included in the Debezium containers. To enable the Confluent Schema Registry for a Debezium container, install the following Confluent Avro converter JAR files into the Connect plugin directory:
kafka-connect-avro-converter
kafka-connect-avro-data
kafka-avro-serializer
kafka-schema-serializer
kafka-schema-registry-client
common-config
common-utils
You can read more about this in the Debezium documentation.
Create a connector configuration file and save it as
register-postgres.json
:
{
"name": "your-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name":"pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "pg_repl",
"schema.include.list": "public",
"table.include.list": "public.table1",
"publication.autocreate.mode":"filtered",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://<scheme-registry>:8081",
"value.converter.schema.registry.url": "http://<scheme-registry>:8081",
"value.converter.schemas.enable": false
}
}
You can read more about each configuration property in the Debezium documentation.
By default, the connector writes events for each table to a Kafka topic
named serverName.schemaName.tableName
.
Start the Debezium Postgres connector using the configuration file:
export CURRENT_HOST='<your-host>'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://$CURRENT_HOST:8083/connectors/ -d @register-postgres.json
Check that the connector is running:
curl http://$CURRENT_HOST:8083/connectors/your-connector/status
The first time it connects to a Postgres server, Debezium takes a consistent snapshot of the tables selected for replication, so you should see that the pre-existing records in the replicated table are initially pushed into your Kafka topic:
/usr/bin/kafka-avro-console-consumer \
--bootstrap-server kafka:9092 \
--from-beginning \
--topic pg_repl.public.table1
{{< /tab >}} {{< /tabs >}}
{{< debezium-json >}}
Debezium emits change events using an envelope that contains detailed
information about upstream database operations, like the before
and after
values for each record. To create a source that interprets the
Debezium envelope in Materialize:
CREATE SOURCE kafka_repl
FROM KAFKA CONNECTION kafka_connection (TOPIC 'pg_repl.public.table1')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
ENVELOPE DEBEZIUM;
By default, the source will be created in the active cluster; to use a different
cluster, use the IN CLUSTER
clause.
This allows you to replicate tables with REPLICA IDENTITY DEFAULT
, INDEX
, or
FULL
.
{{% ingest-data/ingest-data-kafka-debezium-view %}}
{{% ingest-data/ingest-data-kafka-debezium-index %}}