Support the headers
section of kafka messages in sources. This is a string
-> bytes
multimap of arbitrary
data. The purpose of this design document is to describe how we can implement this.
Add basic support for unstructured data from headers that can be cast/converted later in the sql layer
It is a non-goal to support
Kafka Connect Header
's,
which are, as far as I can tell, dynamically-schema'd headers built on kafka headers. This is similar to the
dynamic schema's supported by Kafka Connect.
Add an INCLUDE HEADERS
syntax that allows users to declare that they want a headers
column for their source,
as provided in kafka sources. This will provide a list of from (text, bytes (bytea
)) pairs. This is because
Kafka headers can have multiple values per-key, and preserve order.
Users are expected to cast these bytes values to the types they want in a later view.
This option will only be available for ENVELOPE NONE
and ENVELOPE UPSERT
.
Kafka source creation will be extended so that INCLUDE
has a new HEADERS
option, so the full
syntax becomes:
INCLUDE (
KEY (AS <name>)? |
TIMESTAMP (AS <name>)? |
OFFSET (AS <name>)? |
PARTITION (AS <name>)? |
TOPIC (AS <name>)? |
HEADERS (AS <name>)?
)+
as an example:
CREATE MATERIALIZED SOURCE avroavro
FROM KAFKA BROKER '...' TOPI '...`
FORMAT ...
INCLUDE HEADERS
ENVELOPE ...
This will add a new column to the row from the kafka source that is of type
list[Record { key: text, values: bytea}]
with the name headers
, with
the column name being overrideable
Once the planner places the headers in the right spot, all we need to do is pass down if we want the headers as a boolean,
into the kafka source creation. The SourceDesc
will contain the extra column. The source would pack the SourceMessage
with the headers (or an empty list) for each record.
A list of key-value records is not very ergonomic. Over time, we can provide helper functions that improve this. Some examples:
headerstomap
: convert the headers list into a map with "smallest value wins" semantics.avro_decode
function: This is more complex as it may require altering how we build dataflowsImplement typing as part of the sql definition, either sql types, or some way of declaring how to get a schema
and its FORMAT
. This would significantly complicate the Kafka CREATE SOURCE
statement, which is already complex.
Allow users to pre-declare specific header keys, and provide columns for each one.
Consider an option that enforces utf8 for header values
ENVELOPE DEBEZIUM
, etc. be supported somehow?