Kafka is unique in our supported sources (although not all possible sources, e.g. DynamoDB Streams) in that it partitions the data portion of messages into a Key part and a Value part. The Key is intended to be the equivalent of a primary key in a database, and it is commonly used that way by Kafka users. Following from that, the Key is semantically meaningful to the Kafka system -- it has effects on which shard the message goes to, and affects old-message deletion. We rely on the Key primarily for Upsert logic, ensuring deduplication of messages based on it.
An important aspect of the Key is that it is a separate data section -- it never shares backing storage with the Value. This means that it is possible to have data in the Key that appears nowhere in the Value, putting the responsibility on consumers or Kafka connectors to stitch the data back together.
Materialize does not support accessing the Key part of messages from our SQL layer, which has been mentioned as a pain point on several occasions. This is a design to resolve that pain point.
Allowing users to specify that a Key should be part of the dataflow, and accessible to all SQL processing.
We have two other projects that we know that we would eventually like to implement that may have interactions with this design:
This project does not aim to resolve any aspect of those future projects, but anything that conflicts with those goals in this design should definitely be called out.
Enhance the Kafka create source syntax like so:
CREATE SOURCE <source-name> FROM KAFKA BROKER '<>'
<format>
[INCLUDE KEY [AS <key-column-name>]]
where <format>
is either:
An explicit key/value format:
KEY FORMAT <specifier>
VALUE FORMAT <specifier>
A bare format that specifies a Confluent Schema Registry (CSR) config
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY ...
If INCLUDE KEY
is specified, the source schema will be augmented to prepend
all of the values of the key as columns in the dataflow. Format types that do
not specify subfields (e.g. FORMAT TEXT
) will use their standard column names
(i.e. text
).
Alternatively, if the INCLUDE KEY AS <name>
syntax is used then the key value
will get the name specified:
If any key column names colide with value columns using the bare INCLUDE KEY
syntax, an error will be raised before dataflow construction suggesting the use
of the AS <name>
clause.
Justification for this set of defaults is that it is reasonable to expect that
the common case for the INCLUDE KEY
will not have collisions between the key
and value fields: if the key fields are a subset of the value fields (as is the
case in Debezium) then there isn't any reason to include the key, since all its
data is already in the value.
CREATE SOURCE text_text FROM KAFKA BROKER '...' TOPIC '...'
KEY FORMAT TEXT VALUE FORMAT TEXT
INCLUDE KEY AS key
ENVELOPE UPSERT;
This will create a new source, it requires specifying the record syntax because we use the same name by default, which would result in a name conflict.
Usage in a view looks like:
CREATE MATERIALIZED VIEW text_text_view AS
SELECT key, text FROM text_text
WHERE ...;
CREATE SOURCE text_avro FROM KAFKA BROKER '...' TOPIC '...'
KEY FORMAT TEXT
VALUE FORMAT AVRO USING SCHEMA '{"type": "record", "name": "value", "fields": [ {"name": "field", "type": "int"} ] }';
Usage in a view looks like:
CREATE MATERIALIZED VIEW text_avro_view AS
SELECT text, field as interesting FROM text_avro;
CREATE SOURCE avro_avro FROM KAFKA BROKER '...' TOPIC '...'
KEY FORMAT AVRO USING SCHEMA '{"type": "record", "name": "boring", "fields": [
{"name": "key_field", "type": "int"} ] }'
VALUE FORMAT AVRO USING SCHEMA '{"type": "record", "name": "value", "fields": [
{"name": "valueish", "type": "int"} ] }';
Usage in a view looks like:
CREATE MATERIALIZED VIEW text_avro_view AS
SELECT key_field, valueish FROM avro_avro; -- equivalent to SELECT *
Note that the fields
in the key and value are the same, even though the record
name is not:
CREATE SOURCE avro_avro FROM KAFKA BROKER '...' TOPIC '...'
KEY FORMAT AVRO USING SCHEMA '{"type": "record", "name": "boring", "fields": [
{"name": "valueish", "type": "int"} ] }'
VALUE FORMAT AVRO USING SCHEMA '{"type": "record", "name": "value", "fields": [
{"name": "valueish", "type": "int"} ] }'
INCLUDE KEY AS key;
Usage in a view looks like:
CREATE MATERIALIZED VIEW text_avro_view AS
SELECT (key).valueish, valueish FROM avro_avro;
Mostly standard for new syntax additions. We will need to wait until
purify_format
to provide any of the specific error-detection because of the
fact that Key schemas may come from the Confluent Schema Registry.
Physically unpacking key fields will happen identically independent of if AS
RECORD
is specified, only the projection mapping and names available to SQL
will differ.
We could just improve documentation around getting Kafka keys into the value section using Kafka itself.
It might be possible to provide something like a kafka_key()
function that
introduces a demand for the key, pulling it into dataflows.
Instead of automatically extracting Key columns at all, we could provide a syntax like:
CREATE SOURCE <source-name> FROM KAFKA BROKER '<>'
<format>
[AS KEY VALUE]
which which would change emit the Kafka message key as a key
record, but would
also change the message value to be available as a value
record (no longer
automatically expanding the fields into columns).
Downsides to this approach are that a small bit of syntax dramatically changes
usage patterns from the SQL layer. That has effects both on documentation which
would now have to deal with a niche alternative usage. Changing to AS KEY
VALUE
would force a rewrite or intermediate view if a user realizes that they
need data from the key after already using a topic for awhile.
Semantically it would be fine to (semantically) always allow access to key fields and then rely on the use (or not) of that column to drive the instantiation of the key.
The largest downsides that I see to this philosophy are:
SELECT *
-type queries will change the semantics of
which columns they present. Especially undesirable for things like debezium,
where the key is afaik always a duplicated subset of the value fields.key
/value
record syntax
from the previous alternative, which would be both more annoying (for users
and for docs) and an even larger backcompat change.The original proposal suggested the syntax INCLUDE KEY [AS RECORD <name>]
and
always embedded the key as field on a record whether or not the upstream type
is a record, but the current proposal is just AS <name>
.
The original proposal resulted in usage like so:
CREATE SOURCE text_text FROM KAFKA BROKER '...' TOPIC '...'
KEY FORMAT TEXT VALUE FORMAT TEXT
INCLUDE KEY AS RECORD key;
CREATE MATERIALIZED VIEW text_text_view AS
SELECT (key).text, text FROM text_text;
Which is just sort of annoying.
We support renaming columns as part of create source statements using syntax like:
CREATE SOURCE table_name(column1_name, column2_name, ..) FROM
Where each column must be explicitly named or renamed in order. Any columns not included in the renaming are optimized out, and cannot be referred to in downstream views. This will always be available to users.
The downside to not providing an explicit key-renaming syntax is increased fragility in user views, requiring them to duplicate column names from schemas. It is also true that the syntax may be ambiguous or fragile in the face of upstream schema migrations.
We could make the AS <name>
syntax required. The proposed design is based on
the assumption that the common case is that key fields do not conflict with
value fields, but a more conservative option is just requiring users to
specify everything and do more work for value access.
We could default the key field for simple formats (i.e. text, bytes) to use
the name key
instead of text
or bytes
, and thereby probably reduce the
likely need for the AS <name>
syntax.