title: "Kafka and Redpanda" description: "How to export results from Materialize to Kafka/Redpanda." aliases:
Materialize bundles a native connector that allow writing data to Kafka and Redpanda.
For details on the connector, including syntax, supported formats and examples,
refer to CREATE SINK
.
{{< tip >}}
Redpanda uses the same syntax as Kafka CREATE SINK
.
{{</ tip >}}
If the specified Kafka topic does not exist, Materialize will attempt to create it using the broker's default number of partitions, default replication factor, default compaction policy, and default retention policy, unless any specific overrides are provided as part of the connection options.
If the connection's progress
topic does not exist,
Materialize will attempt to create it with a single partition, the broker's
default replication factor, compaction enabled, and both size- and time-based
retention disabled. The replication factor can be overridden using the PROGRESS
TOPIC REPLICATION FACTOR
option when creating a connection CREATE
CONNECTION
.
To customize topic-level configuration, including compaction settings and other
values, use the TOPIC CONFIG
option in the connection
options to set any relevant kafka
topic configs.
If you manually create the topic or progress topic in Kafka before
running CREATE SINK
, observe the following guidance:
Topic | Configuration | Guidance |
---|---|---|
Data topic | Partition count | Your choice, based on your performance and ordering requirements. |
Data topic | Replication factor | Your choice, based on your durability requirements. |
Data topic | Compaction | Your choice, based on your downstream applications' requirements. If using the Upsert envelope, enabling compaction is typically the right choice. |
Data topic | Retention | Your choice, based on your downstream applications' requirements. |
Progress topic | Partition count | Must be set to 1. Using multiple partitions can cause Materialize to violate its exactly-once guarantees. |
Progress topic | Replication factor | Your choice, based on your durability requirements. |
Progress topic | Compaction | We recommend enabling compaction to avoid accumulating unbounded state. Disabling compaction may cause performance issues, but will not cause correctness issues. |
Progress topic | Retention | Must be disabled. Enabling retention can cause Materialize to violate its exactly-once guarantees. |
Progress topic | Tiered storage | We recommend disabling tiered storage to allow for more aggressive data compaction. Fully compacted data requires minimal storage, typically only tens of bytes per sink, making it cost-effective to maintain directly on local disk. |
{{< warning >}} {{% kafka-sink-drop %}} {{</ warning >}}
By default, Kafka sinks provide exactly-once processing guarantees, which ensures that messages are not duplicated or dropped in failure scenarios.
To achieve this, Materialize stores some internal metadata in an additional
progress topic. This topic is shared among all sinks that use a particular
Kafka connection. The name of the progress
topic can be specified when creating a
connection; otherwise, a default name of
_materialize-progress-{REGION ID}-{CONNECTION ID}
is used. In either case,
Materialize will attempt to create the topic if it does not exist. The contents
of this topic are not user-specified.
Exactly-once semantics are an end-to-end property of a system, but Materialize only controls the initial produce step. To ensure end-to-end exactly-once message delivery, you should ensure that:
unclean.leader.election.enable=false
).isolation.level=read_committed
).For more details, see the Kafka documentation.
By default, Materialize assigns a partition to each message using the following strategy:
If a message has no key, all messages are sent to partition 0.
To configure a custom partitioning strategy, you can use the PARTITION BY
option. This option allows you to specify a SQL expression that computes a hash
for each message, which determines what partition to assign to the message:
-- General syntax.
CREATE SINK ... INTO KAFKA CONNECTION <name> (PARTITION BY = <expression>) ...;
-- Example.
CREATE SINK ... INTO KAFKA CONNECTION <name> (
PARTITION BY = kafka_murmur2(name || address)
) ...;
The expression:
uint8
].Materialize uses the computed hash value to assign a partition to each message as follows:
NULL
or computing the hash produces an error, assign
partition 0.partition_id = hash %
partition_count
).Materialize provides several hash functions which are commonly used in Kafka partition assignment:
crc32
kafka_murmur2
seahash
For a full example of using the PARTITION BY
option, see Custom
partioning.
{{< include-md file="shared-content/kafka-transaction-markers.md" >}}