title: "Striim Cloud" description: "How to ingest Striim Change Data Capture (CDC) data into Materialize using the Kafka source" aliases:
Striim is a real-time data integration platform that offers a variety of connectors for databases, messaging systems, and other data sources. This guide walks through the steps to ingest Striim Change Data Capture (CDC) into Materialize using the Kafka source.
{{< note >}} We are in touch with the Striim team to build a direct CDC connector to Materialize. If you're interested in this integration, please contact our team! {{</ note >}}
As is, integrating Striim Cloud with Materialize requires using a message broker (like Apache Kafka) as an intermediary, as well as a schema registry (like Confluent Schema Registry) configured for Avro schema management.
Ensure that you have:
Follow Striim's guidance to enable and configure CDC from your source relational database:
In Striim, create a KafkaWriter
using the version that corresponds to your target Kafka broker.
Select the writer created in the previous step as the Input stream.
Configure the Kafka broker details and the target topic name.
Under Advanced settings > Kafka Config, add the
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
configuration to serialize records using the Confluent wire format.
Add a MESSAGE KEY
field to specify the primary key of the table. This
will configure the KafkaWriter
to use the table’s primary key as the Kafka
message key.
From the Formatter dropdown menu, select AvroFormatter
to serialize records using the Avro format.
Configure the Schema Registry URL, and its authentication credentials (if necessary).
Set the format to Table
.
Leave the Schema Registry Subject Name and Schema Registry Subject Name Mapping fields empty.
Some data types might not be correctly supported in Striim's Avro handling (e.g.
bigint
).
If you run into Avro serialization issues with the KafkaWriter
, please reach
out to Striim support or the Striim community.
Once your Striim Cloud service is configured for CDC, start the app to begin streaming changes from your source relational database into the specified Kafka cluster. Next, you'll configure Materialize to consume this data.
{{< note >}}
If you are prototyping and already have a cluster to host your Kafka source
(e.g. quickstart
), you don't need to create a new cluster. For production
scenarios, we recommend separating your workloads into multiple clusters for
resource isolation.
{{< /note >}}
In the SQL Shell, or your preferred SQL
client connected to Materialize, use the CREATE CONNECTION
command to create connection objects with access and authentication details
to your Kafka cluster and schema registry:
CREATE SECRET kafka_password AS '<your-password>';
CREATE SECRET csr_password AS '<your-password>';
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER '<broker-url>',
SECURITY PROTOCOL = 'SASL_PLAINTEXT',
SASL MECHANISMS = 'SCRAM-SHA-256', -- or `PLAIN` or `SCRAM-SHA-512`
SASL USERNAME = '<your-username>',
SASL PASSWORD = SECRET kafka_password
SSH TUNNEL ssh_connection
);
CREATE CONNECTION csr_connection TO CONFLUENT SCHEMA REGISTRY (
URL '<schema-registry-url>',
USERNAME = '<your-username>',
PASSWORD = SECRET csr_password
);
Use the CREATE SOURCE
command to connect
Materialize to your Kafka broker and schema registry using the connections you
created in the previous step.
CREATE SOURCE src
FROM KAFKA CONNECTION kafka_connection (TOPIC '<topic-name>')
KEY FORMAT TEXT
VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
VALUE STRATEGY ID <id>
ENVELOPE UPSERT;
Fetching the VALUE STRATEGY ID
Striim uses nonstandard subject names in the schema registry, which prevents Materialize from finding the schema using the default configuration. You must manually fetch and add the schema identifier. To get this value, open a terminal and use the following command:
curl <schema-registry-url>:8081/subjects/<schema-name>/versions/latest | jq .id