Users have regularly requested access to certain pieces of Kafka (and other) per-message metadata within views. The most common requests have been for the timestamp and offset fields.
The proposal here is to extend our syntax to allow users to opt-in to including message headers on a per-source basis.
Give users easy access to commonly-requested metadata without any backwards compatibility concerns.
Generic support for Headers. The Kafka API allows users to specify arbitrary per-message headers. As far as I know we haven't received requests for this yet, and the Kafka API does not provide enough information for us to automatically decode header values (it's just bytes), so the exact shape of what we would give to users requires more design work.
We will extend the existing INCLUDE
syntax to take additionaloptional metadata
field specifiers: timestamp
, offset
, partition
, and topic
clause. These
are all the remaining fields in a Kafka message that are not headers
. Each
field inside the field list can take an optional AS <name>
clause, in case of
conflicts with the key or value names.
With this, the full syntax for the INCLUDE
clause becomes:
INCLUDE (
KEY (AS <name>)? |
TIMESTAMP (AS <name>)? |
OFFSET (AS <name>)? |
PARTITION (AS <name>)? |
TOPIC (AS <name>)?
)+
With an example of just the new syntax looking like:
INCLUDE TIMESTAMP
And an example of it being combined with existing uses of INCLUDE KEY
full
syntax:
INCLUDE KEY, TIMESTAMP, OFFSET, PARTITION, TOPIC
and with the renaming syntax:
INCLUDE KEY AS mykey, TIMESTAMP AS ts, OFFSET AS "Off"
Each of the new fields will become a column in the dataflow, with an appropriate type:
TIMESTAMP
will become a nullable TIMESTAMP WITH TIME ZONE
(javadoc)OFFSET
will become a nullable BIGINT
(javadoc)PARTITION
will become an INTEGER
(javadoc)TOPIC
will become a STRING
(javadoc)Envelopes will have varying support for metadata fields, due variously to their
inherent properties and engineering effort. The key constraint is that any
envelope that knows how to do its own retractions (i.e. the Debezium and
Materialize envelopes) require that the full retraction be present in the data.
Essentially, retraction-aware envelopes provide materialized
with information
that is morally equivalent to:
// "insert" event
{
"retract": null,
"insert": [1, "a"]
}
// "update" event
{
"retract": [1, "a"],
"insert": [1, "b"]
}
The retraction-aware envelopes understand the semantics of these events and will correctly provide retraction/insertion messages to their dataflows.
The problem comes if we add metadata, it is guaranteed that each message will be unique:
// "insert" event
{
"retract": null,
"insert": [1, "a"],
"offset": 100
}
// "update" event
{
"retract": [1, "a"],
"insert": [1, "b"],
"offset": 704
}
If we insert the offset from the insert event then the row in Materialize
becomes [1, "a", 100]
. When we see the update event, the event itself does not
have enough information to provide a retraction, the naive retraction would be
to try and DELETE [1, "a", 704]
, but that does not exist and will therefore
cause errors in dataflows.
This can be worked around by maintaining UPSERT-like state for each envelope, but it is unclear how needed this work is, and so the next section describes the intended initial state and future work possible or required.
ENVELOPE NONE
: works inherently, no additional work required
Both of UPSERT
and DEBEZIUM UPSERT
require the approximately the same amount
of effort as ENVELOPE NONE
and should be part of the initial implementation.
Envelopes DEBEZIUM
and MATERIALIZE
on data streams that have primary-key
semantics (i.e. where rows are guaranteed to be unique according to some key
which is a subset of the data) can both be made to work by creating a shadow
upsert-like map from previously-seen entry keys to the metadata that they were
originally inserted with. For ENVELOPE DEBEZIUM
this would just allow a memory
saving over DEBEZIUM UPSERT
since Materialize would not need to store all the
data from each row, just the metadata.
It is unclear what the use case for the combination of these envelopes with metadata is, so we are not expecting this to be worked on until we receive use cases that need them.
Self-retracting formats that operate on streams of data where rows do not have primary-key semantics cannot be made to work without input from the user -- if the same row was inserted three times, with three different offsets, which one do you retract?
It seems silly to include it since each source comes from exactly one topic. Since this proposal is opt-in, though, it costs us and the users nothing to include it unless it is used, and I could imagine users wanting it in JOINs or UNIONs between sources.
We have discussed including metadata fields by default and relying on projection pushdown to eliminate unused fields.
This has several drawbacks though:
timestamp
then sources would immediately become unusable (with an
"ambiguous column" error).SELECT *
all the way from source to final
materialization.