The source ingestion pipeline design doc describes that Materialize must guarantee bounded input reliance. This means that sources should not re-read data that has already been written to persist. This requires that Materialize bound the information we need to restart a source.
Note: this section uses terminology defined in the original reclocking design doc.
IntoTime
, or currently, an ordinary Timestamp
) frontier that is
safe to restart a specific source at.FromTime
, e.g. a Postgres LSN or a Kafka (partition, offset)
) frontier
that can be safely committed to the upstream service. The details and exact semantics of this commit frontier
are specific to the source type, and some sources may ignore any such frontier.SourceReader
: as in "a SourceReader
implementer", the Rust type that produces actual data from an upstream source.Source Ingestion operator
: The Timely operator that uses a SourceReader
to read messages from a source. This produces "un-timestamped" batches of updates.RemapOperator
: The Timely operator that produces the remap collectionReclockOperator
: The Timely operator that consumes the remap collection and "un-timestamped" batches
to produce timestamped data for a source.storaged
instances
(i.e. not in the storage controller).To accomplish the Primary Goal for a source, Materialize will semi-regularly provide an offset resumption frontier
to the underlying SourceReader
, which will communicate that frontier to the upstream source. After such a frontier
has been exchanged, the upstream source may choose to forgo providing data from before that frontier when Materialize
restarts ingestion for that source.
Source types that do not provide such an API may not be able to provided bounded input reliance
The offset resumption frontier can be derived from the source's timestamp resumption frontier, by mapping the timestamp resumption frontier to the offset resumption frontier using the sources remap collection.
The timestamp resumption frontier can be calculated by combining the uppers
of all persist shards that are related to this source
(see the persist design doc for more information about these uppers
). Currently this is only:
In the future this will contain other shards like "intermediate state shards".
The specifics of the "combination" is unspecified in this
document, but is most likely equivalent to a Lattice::meet
.
Because the upstream source is allowed to drop data from before the offset resumption frontier, sources must be restarted at the frontier that has been committed upstream. Because we must calculate the timestamp resumption frontier periodically (in service of constructing the offset resumption frontier), we can additionally calculate such a frontier when restarting sources, to accomplish this goal. This is already accomplished in the storage controller, but a future extension to this design may move it into storaged instances.
Currently we never compact the remap collection, which is stored in persist. This design allows us to forget about all
data from before the timestamp resumption frontier (as it will already be persisted), so we can additionally use the
periodically calculated timestamp resumption frontier as a since
for the remap collection
The Primary Goal requires regularly calculating a relatively up-to-date offset resumption frontier, and
communicating that frontier to ALL active SourceReaders
for the given source.
The offset resumption frontier can be calculated by mapping it from the timestamp resumption frontier
using the trace stored in the [ReclockOperator
]s. The timestamp resumption frontier can be calculated, as
described above, by combining the uppers
of all persist shards that are related to the source.
For each source, we are going to add a new ResumptionFrontier
timely operator, that periodically reads all the relevant persist shards'
uppers
, and combines them to produce a timestamp resumption frontier. This operator will produce no meaningful data, but will advance its
output frontier to the calculated timestamp resumption frontier.
This timestamp resumption frontier will be an input to the core Source Ingestion Operator
, which, when receiving an update (in the form of
the input frontier advancing) will use the trace of the timely-worker local [ReclockOperator
],
(shared with and Rc
and RefCell
), to map it to an offset resumption frontier, passing it along to the
SourceReader
instance.
A flowchart of the above design:
graph TD;
ResumptionOperator -- _timestamp resumption frontier_ --> s1(Source Ingestion operator 1);
ResumptionOperator -- _timestamp resumption frontier_ --> s2(Source Ingestion operator 2);
s1 --> rmo(RemapOperator);
s2 --> rmo(RemapOperator);
s1 --> rc1(ReclockOperator 1);
s2 --> rc2(ReclockOperator 2);
rmo --> rc1;
rmo --> rc2;
rc1 -- envelope processing, etc. --> ps1(persist sink 1);
rc2 -- envelope processing, etc. --> ps2(persist sink 2);
s1 -. shared trace .- rc1;
s2 -. shared trace .- rc2
ps1 --> d(data collections);
ps2 --> d
d -. reading from persist .-> ResumptionOperator
In reality, the new timestamp resumption frontier will be passed to the Source Ingestion operator
, but needs to
be communicated to the SourceReader
driven by this operator.
This may require some kind of coordination (most likely a tokio watch
) to move the frontier into the async stream. Additionally, the
SourceReader
is currently turned into a boxed Stream
, to receive values from the source. Because we now need to pass frontiers back,
we may need some kind of coordination introduced so we can both send and receive. This document does not specify how this should be implemented.
Inverting a frontier to frontier mapping (in this case, from the time resumption frontier to the offset resumption frontier) is non-trivial, if the frontier's time domain is not totally ordered. The current reclocking API manages totally-ordered times, and this implementation will rely on that fact.
In the future, it may be possible to support non-totally ordered time, depending on the exact semantics of inversion we require.
Because the ResumptionFrontier
operator will periodically emit up-to-date timestamp resumption frontier, it cannot be relied on
when we are starting new source ingestion instances. Currently, we calculate the timestamp resumption frontier in the storage
controller, using the same technique that the ResumptionOperator
uses. In the future, we may want to move this
initial calculation into storaged instances, but complexities around how dataflows are created (that rely on this frontier)
prevent us from doing so easily.
The ReclockOperator
can be altered to take as input frontier updates from the ResumptionFrontier
operator,
which can drive the currently unused compact
method.
Note that when we call ReclockOperator::compact
can happen in any order with when we commit the upstream frontier.
The above design accomplishes this, if, in the future, we move initial timestamp resumption frontier calculation into storaged instances.
upper
used in timestamp resumption frontier calculation is produced at the very END of
the source ingestion pipeline, but the ResumptionFrontier
is at the very beginning. A alternative design could use
native feedback structure in Timely (like this) to move this data around.
SourceReader
be altered to support both receiving (values) and sending (frontiers)? Does it need to be split into multiple
traits?ReclockOperator
and Source Ingestion operator
?