20220819_bounded_input_reliance.md 11 KB

Bounded Input Reliance

Summary

The source ingestion pipeline design doc describes that Materialize must guarantee bounded input reliance. This means that sources should not re-read data that has already been written to persist. This requires that Materialize bound the information we need to restart a source.

Jargon

Note: this section uses terminology defined in the original reclocking design doc.

  • timestamp resumption frontier: A target gauge (whose type is IntoTime, or currently, an ordinary Timestamp) frontier that is safe to restart a specific source at.
  • offset resumption frontier: A source gauge (whose type is FromTime, e.g. a Postgres LSN or a Kafka (partition, offset)) frontier that can be safely committed to the upstream service. The details and exact semantics of this commit frontier are specific to the source type, and some sources may ignore any such frontier.
  • remap collection: A pTVC that stores the (bi-directional) mapping between a _sourcegauge and _targetgauge.
  • source: (or "upstream source" or "upstream service") a service like Kafka or Postgres that produces a stream of data.
  • SourceReader: as in "a SourceReader implementer", the Rust type that produces actual data from an upstream source.
  • Source Ingestion operator: The Timely operator that uses a SourceReader to read messages from a source. This produces "un-timestamped" batches of updates.
  • RemapOperator: The Timely operator that produces the remap collection
  • ReclockOperator: The Timely operator that consumes the remap collection and "un-timestamped" batches to produce timestamped data for a source.

Goals

  • Primary Goal:
    • Regularly commit advancing offset resumption frontiers for all sources
  • Additional Goals:
    • Restart sources with up-to-date resumption frontiers.
    • Note that this is required, as the upstream sources could could compact data that is not beyond the offset resumption frontier.
    • Compact the remap collection
    • Localize timestamp resumption frontier and offset resumption frontier management inside storaged instances (i.e. not in the storage controller).

Non-goals

  • Expose the remap collection to sql (for now).

Overview

To accomplish the Primary Goal for a source, Materialize will semi-regularly provide an offset resumption frontier to the underlying SourceReader, which will communicate that frontier to the upstream source. After such a frontier has been exchanged, the upstream source may choose to forgo providing data from before that frontier when Materialize restarts ingestion for that source.

Source types that do not provide such an API may not be able to provided bounded input reliance

The offset resumption frontier can be derived from the source's timestamp resumption frontier, by mapping the timestamp resumption frontier to the offset resumption frontier using the sources remap collection.

The timestamp resumption frontier can be calculated by combining the uppers of all persist shards that are related to this source (see the persist design doc for more information about these uppers). Currently this is only:

  • The "data" persist shard.
  • The remap collection persist shard.

In the future this will contain other shards like "intermediate state shards". The specifics of the "combination" is unspecified in this document, but is most likely equivalent to a Lattice::meet.

Additional Goals

Restart sources with up-to-date resumption frontiers

Because the upstream source is allowed to drop data from before the offset resumption frontier, sources must be restarted at the frontier that has been committed upstream. Because we must calculate the timestamp resumption frontier periodically (in service of constructing the offset resumption frontier), we can additionally calculate such a frontier when restarting sources, to accomplish this goal. This is already accomplished in the storage controller, but a future extension to this design may move it into storaged instances.

Compact the remap collection

Currently we never compact the remap collection, which is stored in persist. This design allows us to forget about all data from before the timestamp resumption frontier (as it will already be persisted), so we can additionally use the periodically calculated timestamp resumption frontier as a since for the remap collection

Implementation

The Primary Goal requires regularly calculating a relatively up-to-date offset resumption frontier, and communicating that frontier to ALL active SourceReaders for the given source.

The offset resumption frontier can be calculated by mapping it from the timestamp resumption frontier using the trace stored in the [ReclockOperator]s. The timestamp resumption frontier can be calculated, as described above, by combining the uppers of all persist shards that are related to the source.

For each source, we are going to add a new ResumptionFrontier timely operator, that periodically reads all the relevant persist shards' uppers, and combines them to produce a timestamp resumption frontier. This operator will produce no meaningful data, but will advance its output frontier to the calculated timestamp resumption frontier.

This timestamp resumption frontier will be an input to the core Source Ingestion Operator, which, when receiving an update (in the form of the input frontier advancing) will use the trace of the timely-worker local [ReclockOperator], (shared with and Rc and RefCell), to map it to an offset resumption frontier, passing it along to the SourceReader instance.

A flowchart of the above design:

graph TD;
    ResumptionOperator -- _timestamp resumption frontier_ --> s1(Source Ingestion operator 1);
    ResumptionOperator -- _timestamp resumption frontier_ --> s2(Source Ingestion operator 2);

    s1 --> rmo(RemapOperator);
    s2 --> rmo(RemapOperator);

    s1 --> rc1(ReclockOperator 1);
    s2 --> rc2(ReclockOperator 2);

    rmo --> rc1;
    rmo --> rc2;

    rc1 -- envelope processing, etc. --> ps1(persist sink 1);
    rc2 -- envelope processing, etc. --> ps2(persist sink 2);

    s1 -. shared trace .- rc1;
    s2 -. shared trace .- rc2

    ps1 --> d(data collections);
    ps2 --> d

    d -. reading from persist .-> ResumptionOperator

Subtleties

Frontier value management

In reality, the new timestamp resumption frontier will be passed to the Source Ingestion operator, but needs to be communicated to the SourceReader driven by this operator.

This may require some kind of coordination (most likely a tokio watch) to move the frontier into the async stream. Additionally, the SourceReader is currently turned into a boxed Stream, to receive values from the source. Because we now need to pass frontiers back, we may need some kind of coordination introduced so we can both send and receive. This document does not specify how this should be implemented.

Total-ordered time requirement

Inverting a frontier to frontier mapping (in this case, from the time resumption frontier to the offset resumption frontier) is non-trivial, if the frontier's time domain is not totally ordered. The current reclocking API manages totally-ordered times, and this implementation will rely on that fact.

In the future, it may be possible to support non-totally ordered time, depending on the exact semantics of inversion we require.

Implementation of Additional Goals

Implementation of: Restart sources with up-to-date timestamp resumption frontiers

Because the ResumptionFrontier operator will periodically emit up-to-date timestamp resumption frontier, it cannot be relied on when we are starting new source ingestion instances. Currently, we calculate the timestamp resumption frontier in the storage controller, using the same technique that the ResumptionOperator uses. In the future, we may want to move this initial calculation into storaged instances, but complexities around how dataflows are created (that rely on this frontier) prevent us from doing so easily.

Implementation of: Compact the remap collection

The ReclockOperator can be altered to take as input frontier updates from the ResumptionFrontier operator, which can drive the currently unused compact method.

Note that when we call ReclockOperator::compact can happen in any order with when we commit the upstream frontier.

Localize frontier management in storage instances

The above design accomplishes this, if, in the future, we move initial timestamp resumption frontier calculation into storaged instances.

Alternatives

  • This proposed design uses persist to create a circular link between the end of source ingestion and the beginning (for example, the "data" persist shard upper used in timestamp resumption frontier calculation is produced at the very END of the source ingestion pipeline, but the ResumptionFrontier is at the very beginning. A alternative design could use native feedback structure in Timely (like this) to move this data around.
    • We avoided this design because the data being fed backwards is a frontier, which makes it non-trivial to correctly feedback in Timely, which requires frontier adjustments. Additionally, the future addition of more frontiers as inputs to the timestamp resumption frontier made maintenance of this scheme seem worryingly complex
    • We also think the additional load on persist will be negligible, and will be fanned out from multiple storage instances

Open questions

  • How should SourceReader be altered to support both receiving (values) and sending (frontiers)? Does it need to be split into multiple traits?
  • How, exactly, should we share the trace between the ReclockOperator and Source Ingestion operator?
  • When are we confident we can compact the remap collection? Does this depend on some ordering on when we finalize the upstream commit?