"Reclocking" a stream of data is (for the purposes of this document) translating its events from one gauge of progress to another.
One example is taking Kafka topics (whose records are stamped with progressing (partition, offset)
pairs) into Materialize's system timeline (whose records are stamped with milliseconds since the unix epoch).
Many of our sources of data come with their own understanding of forward progress. Often, these are incompatible with each other, and with native notions of time within Materialize. For example,
Kafka topics contain multiple "parts", each of which advances through "offsets".
Postgres sources come from a log where entries have a "log sequence number" (LSN).
File sources have a line number.
Materialize CDC sources have explicit timestamps.
The source-specific "gauges" of forward progress are useful, but they are not directly comparable. This document describes one framework for converting between gauges including in to and potentially out of Materialize's system timeline.
The primary goal of this document is to describe a formal way to translate collections that evolve with one gauge into collections that evolve with another gauge. That formalism should have the properties that:
The formalism should support the goals of A. Permitting the use of streams from one timeline in another timeline. B. Describing durable state that can make sources like Kafka, Postgres, and files exactly replayable in the system timeline. C. A BYO format for users who want to provide explicit timestamp information for sources that are presented not in that timeline. D. A way to describe in one timeline progress made through another timeline (e.g. reporting Kafka read progress in MZ system timestamps).
This document has no opinions on the physical representation of the reclocking data.
This document has no opinions on the implementation of the reclocking actions.
This document has no opinions on the policies of which streams should be reclocked into which domains.
This document has no opinions on the correct interpretation of the terms VOLATILE
and NONVOLATILE
.
Streams of data often provide a gauge of progress: stamping records with values and indicating which values will not be seen again. This information describes "progress" through a stream, and allows consumers of the stream to rely on the presence of certain contents. Multiple records may share the same gauge value, indicating that they should occur at the same moment. Records may not be in order of the gauge, but once a value is indicated to have passed it should not appear on a subsequent record.
remap
We are going to use a remap
collection, which is a map from a target gauge IntoTime
to values from the source gauge FromTime
that must themselves form an antichain.
The easiest way for me to represent this is as a differential dataflow collection,
// G's timestamp should be `IntoTime`.
// The `from` values at any time should form an antichain.
remap: Collection<G, FromTime>
We can also think of this as a function, or as a hash map, but would need to figure out how to express progress through that stream as well.
The only constraint on remap
is monotonicity: if into1 <= into2
then remap[into1] <= remap[into2]
, where the second inequality is the partial order on antichains.
Neither FromTime
nor IntoTime
need to be totally ordered.
reclocked
From the source
collection and the remap
collection, we can define a new collection reclocked
as:
the collection that contains at `into` all messages from `source` whose gauge value is not greater than or equal to some element of `remap[into]`.
The reclocked
collection is in the IntoTime
gauge, and contains messages from source
but no longer in the FromTime
gauge (perhaps the gauge values can be preserved as data).
All source events at a given FromTime
either all occur or all do not occur at each gauge value of IntoTime
.
One natural implementation of reclocking could determine the remap
relation by observing simultaneously:
FromTime
,IntoTime
.
Whenever IntoTime
advances, we record the current values of FromTime
as the new contents of remap
.
The intended behavior is that the reclocked collection contains all data from gauge values that are known to be complete.To produce the reclocked stream from the source stream and the remap
collection,
remap
collection which is in the IntoTime
gauge.remap
we can determine which FromTime
values we must await from the source stream, and emit them.IntoTime
, and likely needs a non-stream representation (e.g. the capture/replay representation, an arrangement, or some other way to externalize progress information).I'm not aware of other designs that fit the same requirements. We could of course remove any of the requirements.
Our current implementation in support of exactly-once Kafka sinks does something similar, though I believe "less intentionally". I'm not sure what I mean by that, other than that this document would is meant to be prescriptive about what you can do, rather than descriptive about what does happen.
I think we still need to discuss fitness for use for the enumerated use cases.
There is not yet a prototype implementation, so there may be unforeseen issues. For example, I haven't thought through either stream completing.
The use of antichains is potentially confusing for sources. For example, Kakfa's (partition, offset) pairs would be frustrating with dynamically arriving parts if the part identifiers are not both ordered and starting from small identifiers. Otherwise, recording "these parts have not yet started" is non-trivial. One could use a different meaning of the antichain, for example "all records less than an antichain element", though this seems likely to be at odds with timely and differential.
The goal of reclocking is to translate potentially incomparable sources into a common reckoning.
One primary motivation is to allow us to record consistent moments between the input data and persistent data we might record ourselves.
Reclocking provides the basis by which we can recover from a failure, read durable remap
information and persistent collection data, and resume operation at input offsets that allow us to continue correct operation.
Let's detail this for a simple case, where the input data are independent records.
We will reclock input data, durably recording associations between FromTime
and IntoTime
in remap
.
Additionally, we will likely record a collection of the data itself, also in a collection that changes as a function of IntoTime
.
Imagine we ensure that we commit an output collection IntoTime
only after that time is committed to remap
, containing exactly those records up through its FromTime
at the same IntoTime
.
At any moment, say just after a crash, we can choose the greatest IntoTime
available in the output collection, and resume processing from the FromTime
present in remap
at that IntoTime
.
This is an example of how remap
, durably recorded, introduces the basis for consistent recovery.
Let's detail this for a more complicated case, where the input data are independently data and transaction metadata, the latter describing which data are in a transaction.
We will reclock both of the inputs, although there are now multiple offsets across independent topics.
The remap
collection must contain all of this information: FromTime
for each involved input.
We would like to record remap
and an output collection containing at each time the data up through the offsets of the reclocking collection at that time.
However, there is additional state that we must record to recover correctly: the data and transaction metadata that are not yet complete.
We can instead produce two output collections:
* A TEMP collection containing at each `IntoTime` the data and transaction records up through `remap`'s `FromTime` at this `IntoTime`, excluding any records that form a complete transaction.
* An output collection, containing at each `IntoTime` the data from transactions that are complete on records up through `remap`'s `FromTime` at this `IntoTime`.
These two collections are sufficient to summarize at each IntoTime
all of the records up through remap
's FromTime
at this IntoTime
.
However, unlike the simpler case, the summary is not exactly the output collection, but the output collection plus some side information; "state" that we must hold on to.
The above approach actually fails to provide certain appealing end-to-end consistency properties we might want of "timestamp assignment" (I intentionally avoided "reclocking").
Above, a transaction is available at the first IntoTime
for which it and its constituent data are available through remap
's FromTime
.
However, the transaction may occur strictly after another transaction that is not yet complete.
We may want to change the definition of the two output collections, to respect the transaction order, to be:
* A TEMP collection containing at each `IntoTime` the data and transaction records up through `remap`'s `FromTime` for this `IntoTime`, excluding records accepted in to the output.
* An output collection, containing at each `IntoTime` the data from transactions for which all less-or-equal transactions are complete on records up through `remap`'s `FromTime` for this `IntoTime`.
Notice how we restricted the emission of transactions to await all prior transactions completing. This will result in a larger TEMP collection, but an association of timestamps to transactions that respects the input order.
Our recipe for "practical reclocking" is therefore:
remap
containing FromTime
at each IntoTime
.IntoTime
that is durable in remap
, summarize all input records up through remap
's FromTime
at that IntoTime
.
a. The summary should contain an output collection of published records, varying as a function of IntoTime
.
b. The summary may optionally contain auxiliary information ("state"), varying as a funtion of IntoTime
.The summary should have the property that for any remap
, all it
in IntoTime
produce identical summaries when restarted with their inputs at remap[it]
, and their summary at it
.