20220330_persist.md 59 KB

Persist

aljoscha/danhhz/ruchirk

Notice: The document represents a (hopefully) self-consistent snapshot of our thinking at a point in time. It's a useful tool for understanding the structure of persist and for establishing a common language, but it's a non-goal to keep it continually updated with the latest thinking. For example: we've already decided (at the request of STORAGE) to shift from the capability model for ReadHandle described here to one of a shard-global downgrade_since/allow_compaction. This has far reaching consequences, potentially even to the number of concurrent readers we'll initially support. If you have any questions or want to know the latest thinking, please don't hesitate to reach out!

Summary

Persist is an implementation detail of STORAGE. Its "public" API is used only by STORAGE and code outside of STORAGE should be talking to STORAGE, not persist. However, having this strong API boundary between the two allows for the two teams to execute independently.

Persist's primary abstraction is a "shard", which is a durable and definite Time-Varying Collection (TVC). Persist requires that the collection's "data" is key-value structured (a () unit value is fine) but otherwise allows the key, value, time, and diff to be abstract in terms of the Codec and Codec64 encode/decode traits. As a result, persist is independent of Materialize's internal data formats: Row etc.

Along with talking to the outside world, STORAGE pilots persist shards around to present a "STORAGE collection", which is a durable TVC that handles reshardings and version upgrades. It could also ensure that persist will never again see data from some Codec's previous implementation of encode, allowing us to remove support for it from the corresponding decode.

A persist shard is exactly a storage shard and they can be used interchangeably.

Jargon

  • update: A differential dataflow ((key, value), timestamp, diff) tuple. Persist requires that the data field has a (key, value) structure, though the value can be the unit type to mean "empty": ().
  • Time-Varying Collection: (TVC) Quoting the formalism, "A multiset of typed data whose contents may vary arbitrarily as function of some ordered time." Materialize represents TVCs as a set of updates.
  • Partial Time-Varying Collection: (pTVC) Quoting the formalism, "Materialize most often only has partial information about TVCs. A TVC's future is usually not yet completely known, and its deep past also may not be faithfully recorded. To represent what Materialize knows with certainty, Materialize maintains two "frontiers" for each TVC. The upper frontier, which communicates that for times time not greater or equal to upper, the updates at time are complete. The since frontier, which communicates that for times time greater or equal to since, the accumulated collection at time will be correct."
  • definite: Quoting the correctness doc, "A collection is definite when all uses of any one identification will yield exactly the same time-varying collection of data (identical contents at each logical time). A handle to a collection can be "definite from since until upper" if it reflects the definite results for times greater or equal to since but not greater or equal to upper; this intentionally allows for compaction of historical detail and unavailability of future data, without ambiguity about which logical times can be described with certainty."
  • shard: A durable and definite TVC.
  • batch: In the spirit of differential dataflow's Trace batch, an immutable collection of updates, along with a Description of those updates.
  • ShardId: An identifier for a persist shard, as described in definite.
  • location: An external source of data durability. All of persist's interactions with the outside world are abstracted through the Blob and Consensus traits, together collectively called the persist location. One valid way to think of persist is as a layer that maps simpler durable storage semantics to ones that are more native to differential dataflow.
  • blob: A durable &str key -> (possibly large) &[u8] value store. The production implementation is S3 (or other cloud storage). An in-mem HashMap implementation exists for testing. A filesystem implementation may be used for a single-binary materialized deployment.
  • consensus: A durable log with &[u8] entries, linearizing writes concurrently from multiple places.
  • SeqNo: A sequence number identifying a specific version of the internal state machine's state.

Goals

  • Make pTVCs definite, i.e. make it so that we can read the contents of a pTVC through some [lower, upper) range of times as of some known since
  • Make pTVCs definite both for a distributed cloud platform workload, and for a local single binary workload.
  • Store the contents of the pTVC in S3 / some other service offering similarly cheap cold storage.
  • Checksum stored data for end-to-end defense against e.g. network errors.
  • Operational simplicity: multiple {readers, writers, any other state machine/maintenance workers} can at least be turned on at the same time.
  • Reader / writer liveness:
    • If multiple readers and writers are attempting to read / write at a time, at least one of them can always make progress.
    • Readers and writers are not affected by their peers being paused or restarted.
    • The collection makes progress (i.e. the collection upper advances) so long as one writer can make progress. Concretely, we don’t need all writers to be able to write a batch in order for it to be read.
  • Horizontal scalability of reads: multiple readers can read the contents of a collection linearly faster than a single reader.
  • Separation of compute and storage.
  • High throughput reads - saturate the network capacity of multiple writers + optionally allow for writers to keep a local cache.
  • High throughput writes - the shard’s write throughput is limited to the max write throughput of a single writer.
  • Ease of reasoning about consensus (Concretely: do not write our own distributed consensus protocol. Use an off the shelf solution instead.)
  • Compaction: store the contents of a pTVC using space proportional to the set of distinct (data, time, diff) records in [since, upper) (plus an optional logarithmic factor).
  • Low write amplification.
  • Low/bounded S3 costs. Concretely: the S3 costs should be proportional to the bytes written and read by each of the writers and readers respectively. The S3 cost should not be proportional to the number of distinct read/write frontier downgrades.
  • S3 operations/second should be tunable independently of the size/rate of change of the pTVC. In other words, we should have a lever that we can pull to reduce our S3 cost, in exchange for higher latency/throughput for the end user. Then the user is free to choose the performance level they want to pay for.

Non-goals

  • Replication/HA of pTVC: data is replicated in S3 using the fact that S3 itself is replicated (we’re not doing any application level replication), and not replicated when running in single binary mode.
  • Cross AWS region replication.
  • Liveness during key-value store/consensus service failure.
  • Write horizontal scalability. Multiple writers write different copies of the same shard, and storage is responsible for splitting an upstream source into multiple shards.
  • Handling cases where S3 / related cloud storage services themselves lose or modify stored data.
  • Low latency writes and reads. We target that small writes and reads (e.g. writes and reads to batches smaller than 1 KiB) have a median (p50) latency of under 100 ms, and a p95 latency of under 1s. Larger writes are the same minimum latency plus however long it takes to write the data to s3.
  • High throughput of reader/writer creation.
  • High throughput of pTVC creation.
  • Hundreds/thousands of writers/readers (yet). We’ve initially designed for O(tens) of concurrent readers/writers. Scaling to O(hundreds) (and more) of readers is possible but left as followup work.
  • Self-containment: acceptable to rely on S3/etcd/other external distributed consensus implementation/sqlite.
  • Encryption of stored data

API

Persist uses a "rich client" model with some of the logic running in-process (but behind the persist abstraction) so that readers and writers can talk directly to S3.

Everything in persist is generic over <Key, Value, Timestamp, Diff> (hereafter <K, V, T, D>) to allow users control over how their data formats are encoded and decoded. K and V must implement Codec, which requires its implementations to be able to decode data written by previous versions of encode. (STORAGE can use renditions to migrate these over time to remove cruft in decode and keep this from accumulating indefinitely.) T and D must implement Codec64. In practice, Codec64 means they're roundtripped as little-endian i64 or u64. Codec64 can be made more general if necessary, but this "serialized as 8 bytes" restriction allows us to do some performance tricks in our columnar disk format for batches of updates as well as makes a number of metadata decoding pathways much simpler.

Most of the persist API has a close correspondence with named entities in the formalism. The formalism describes STORAGE, not persist, but given that part of STORAGE's duties is piloting around persist to account for reshardings and behavior changes across code versions, this makes sense.

  • WriteHandle is roughly WriteCapability.
  • WriteHandle::append is roughly Append
  • ReadHandle is roughly ReadCapability.
  • ReadHandle::listen + ReadHandle::snapshot are roughly SubscribeAt
  • Client::open is roughly AcquireCapabilities

Here's our best initial guess at concretely what the API looks like. As always, the source of truth will be the code.

impl Client {
    /// Provides capabilities for the durable TVC identified by `shard_id` at
    /// its current since and upper frontiers.
    ///
    /// This method is a best-effort attempt to regain control of the frontiers
    /// of a shard. Its most common uses are to recover capabilities that have
    /// expired (leases) or to attempt to read a TVC that one did not create (or
    /// otherwise receive capabilities for). If the frontiers have been fully
    /// released by all other parties, this call may result in capabilities with
    /// empty frontiers (which are useless).
    ///
    /// If `shard_id` has never been used before, initializes a new shard and
    /// returns handles with `since` and `upper` frontiers set to initial values
    /// of `Antichain::from_elem(T::minimum())`.
    pub async fn open<K, V, T, D>(
        &self,
        timeout: Duration,
        shard_id: ShardId,
    ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), ExternalError>
}

impl<K, V, T, D> WriteHandle<K, V, T, D> {
    /// Applies `updates` to this shard and downgrades this handle's upper to
    /// `new_upper`.
    ///
    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s (with
    /// different [WriterId]s) may be used concurrently to write to the same
    /// shard, but in this case, the data being written must be identical (in
    /// the sense of "definite"-ness).  It's intended for replicated use by
    /// source ingestion, sinks, etc.
    ///
    /// All times in `updates` must be greater or equal to `self.upper()` and
    /// not greater or equal to `new_upper`. A `new_upper` of the empty
    /// antichain "finishes" this shard, promising that no more data is ever
    /// incoming.
    ///
    /// `updates` may be empty, which allows for downgrading `upper` to
    /// communicate progress. It is possible to heartbeat a writer lease by
    /// calling this with `new_upper` equal to `self.upper()` and an empty
    /// `updates` (making the call a no-op).
    ///
    /// This uses a bounded amount of memory, even when `updates` is very large.
    /// Individual records, however, should be small enough that we can
    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
    /// us.
    ///
    /// The clunky two-level Result is to enable more obvious error handling in
    /// the caller. See <http://sled.rs/errors.html> for details.
    ///
    /// TODO: Introduce an AsyncIterator (futures::Stream) variant of this. Or,
    /// given that the AsyncIterator version would be strictly more general,
    /// alter this one if it turns out that the compiler can optimize out the
    /// overhead.
    pub async fn append<'a, I: IntoIterator<Item = ((&'a K, &'a V), &'a T, &'a D)>>(
        &mut self,
        timeout: Duration,
        updates: I,
        new_upper: Antichain<T>,
    ) -> Result<Result<(), InvalidUsage>, ExternalError>;

    /// Applies `updates` to this shard and downgrades this handle's upper to
    /// `new_upper` iff the current shard upper is equal to `expected_upper`.
    ///
    /// In contrast to [Self::append], this linearizes mutations from all
    /// writers. It's intended for use as an atomic primitive for timestamp
    /// bindings, SQL tables, etc.
    ///
    /// All times in `updates` must be greater or equal to `expected_upper` and
    /// not greater or equal to `new_upper`. A `new_upper` of the empty
    /// antichain "finishes" this shard, promising that no more data is ever
    /// incoming.
    ///
    /// `updates` may be empty, which allows for downgrading `upper` to
    /// communicate progress. It is possible to heartbeat a writer lease by
    /// calling this with `new_upper` equal to `self.upper()` and an empty
    /// `updates` (making the call a no-op).
    ///
    /// This uses a bounded amount of memory, even when `updates` is very large.
    /// Individual records, however, should be small enough that we can
    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
    /// us.
    ///
    /// The clunky two-level Result is to enable more obvious error handling in
    /// the caller. See <http://sled.rs/errors.html> for details.
    pub async fn compare_and_append<'a, I: IntoIterator<Item = ((&'a K, &'a V), &'a T, &'a D)>>(
        &mut self,
        timeout: Duration,
        updates: I,
        expected_upper: Antichain<T>,
        new_upper: Antichain<T>,
    ) -> Result<Result<Result<(), Antichain<T>>, InvalidUsage>, ExternalError>;
}

impl<K, V, T, D> ReadHandle<K, V, T, D> {
    /// Forwards the since frontier of this handle, giving up the ability to
    /// read at times not greater or equal to `new_since`.
    ///
    /// This may trigger (asynchronous) compaction and consolidation in the
    /// system. A `new_since` of the empty antichain "finishes" this shard,
    /// promising that no more data will ever be read by this handle.
    ///
    /// It is possible to heartbeat a reader lease by calling this with
    /// `new_since` equal to `self.since()` (making the call a no-op).
    ///
    /// The clunky two-level Result is to enable more obvious error handling in
    /// the caller. See <http://sled.rs/errors.html> for details.
    pub async fn downgrade_since(
        &mut self,
        timeout: Duration,
        new_since: Antichain<T>,
    ) -> Result<Result<(), InvalidUsage>, ExternalError>;

    /// Returns an ongoing subscription of updates to a shard.
    ///
    /// The stream includes all data at times greater than `as_of`. Combined
    /// with [Self::snapshot] it will produce exactly correct results: the
    /// snapshot is the TVCs contents at `as_of` and all subsequent updates
    /// occur at exactly their indicated time. The recipient should only
    /// downgrade their read capability when they are certain they have all data
    /// through the frontier they would downgrade to.
    ///
    /// The clunky two-level Result is to enable more obvious error handling in
    /// the caller. See <http://sled.rs/errors.html> for details.
    ///
    /// TODO: If/when persist learns about the structure of the keys and values
    /// being stored, this is an opportunity to push down projection and key
    /// filter information.
    pub async fn listen(
        &self,
        timeout: Duration,
        as_of: Antichain<T>,
    ) -> Result<Result<Listen<K, V, T, D>, InvalidUsage>, ExternalError>;

    /// Returns a snapshot of the contents of the shard TVC at `as_of`.
    ///
    /// This command returns the contents of this shard as of `as_of` once they
    /// are known. This may "block" (in an async-friendly way) if `as_of` is
    /// greater or equal to the current `upper` of the shard. The recipient
    /// should only downgrade their read capability when they are certain they
    /// have all data through the frontier they would downgrade to.
    ///
    /// This snapshot may be split into a number of splits, each of which may be
    /// exchanged (including over the network) to load balance the processing of
    /// this snapshot. These splits are usable by anyone with access to the
    /// shard's [crate::Location]. The `len()` of the returned `Vec` is
    /// `num_splits`. If a 1:1 mapping between splits and (e.g. dataflow
    /// workers) is used, then the work of replaying the snapshot will be
    /// roughly balanced.
    ///
    /// The clunky two-level Result is to enable more obvious error handling in
    /// the caller. See <http://sled.rs/errors.html> for details.
    ///
    /// TODO: If/when persist learns about the structure of the keys and values
    /// being stored, this is an opportunity to push down projection and key
    /// filter information.
    pub async fn snapshot(
        &self,
        timeout: Duration,
        as_of: Antichain<T>,
        num_splits: NonZeroUsize,
    ) -> Result<Result<Vec<SnapshotSplit>, InvalidUsage>, ExternalError>;
}

Blob+Consensus

The persistence system is built on top of two logical primitives, Blob and Consensus.

Blob is a durable key-value store that maps string keys -> arbitrary byte payloads. It's conceptually similar to a shared hashtable, perhaps expressed as a Arc<Mutex<HashMap<String, Vec<u8>>>> in Rust, except durable across restarts and potentially shared across multiple processes.

Blob supports the following operations:

  • GET(key) : returns the value mapped to key if there is one.
  • PUT(key, value) : map key to value. Note that this operation is atomic, which means that any GET(key) will either see the old value before the PUT or value after the PUT but nothing else.
  • DELETE(key) : remove any value associated with key, if there is one and do nothing otherwise.
  • LIST KEYS(prefix): list the set of keys currently mapped to a value that start with prefix.

Because all of our keys are write-once, modify never, Blob does not have to be linearizable. Readers can simply retry reads that don’t succeed.

We’re going to keep at least 3 different versions of Blob, one built on S3 that will be the main production use case, one built on top of the POSIX filesystem API for the single binary use case, and one in-memory purely for unit testing / measuring overhead of doing IO.

Consensus is a linearizable, durable, totally ordered array of binary records and sequence numbers (think a durable Vec<(SeqNo, Vec<u8>)>) that supports the following operations:

  • HEAD: return a recent (not necessarily the most recent) record inserted into Consensus and the sequence number it was inserted at.
  • CompareAndSet(sequence_number, data) -> bool: Atomically insert data into Consensus if the most recent record at insert time was inserted at sequence_number - 1. Returns true if the insert was successful and false otherwise.
  • SCAN(sequence_number): returns a list of all records inserted at sequence numbers greater than or equal to sequence_number .
  • TRUNCATE(sequence_number): atomically remove all records inserted at sequence numbers less than sequence_number.

We use Consensus to store incremental updates to persist’s metadata. We need a compare-and-set primitive to guarantee a total order of metadata updates across distributed writers, and we need to maintain a history of those updates in order to periodically reconstruct the full metadata state, and write it out somewhere else.

Intuition for Safety

One lens through which you might view all of the following details is of persist maintaining an acyclic immutable data structure. The nodes are stored in an external blob store, with blob keys as pointers between nodes. The "root" pointer of the data structure is changed only by going through distributed consensus.

Each distributed actor writes blobs to a path with a unique prefix and blobs are atomically written, write-once, modify-never. This makes reasoning about caching, mutations, and idempotency of these blobs straightforward-ish.

Persist is a state machine and this immutable data structure contains its state. The distributed actors only interact though consensus: starting with the latest version of the state, applying inputs, and using CompareAndSet to only move the root pointer if it hasn't changed. If it has, they restart the loop.

Intuition for Liveness

Notably, our liveness is dependent on the availability of the external blob and consensus services we rely on. There's no getting around this. We'll have to select services we trust.

To reduce contention, we'll run the state update loop in an RPC server. But it's safe to run the loop from anywhere. This allows us to aggressively fail over if the RPC server misses a health check.

Readers of the immutable data structure declare which version of the root pointer they are using (by maintaining it in the state machine state) and are only allowed to progress forward though versions. This allows us to eventually reason about unreachable root versions and unreachable blobs for cleanup.

Readers and Writers are both given leases on registration. This lease is extended as the reader or writer is used. If the lease expires (or the process is restarted), the reader/writer acquires a new lease under a new (internal) identifier.

Implementation

State Machine

Persist is a deterministic state machine. This means that persist maintains some small metadata in persistent storage describing the current status of the state machine, and accepts an ordered sequence of inputs as defined in the API. Each of those inputs can either result in a no-op error, in which case the machine’s state is unchanged, or optionally, the state stored in persistent storage undergoes a deterministic state transition.

Persist has to keep track of the following state:

Shard global information:

  • upper frontier (max of all writers uppers)
  • since frontier (min of all readers sinces)
  • seqno_since frontier (the last seqno any reader will be allowed to read from) I believe this makes the most sense as the min of (all of the readers seqno_since, the seqno of the last state rollup?)

Foreach active (non-expired) writer:

  • the writer’s upper frontier
  • the unique key prefix assigned to the writer
  • the writer’s lease expiration time

Foreach active (non-expired) reader:

  • the reader’s since frontier
  • the reader’s seqno_since frontier (the seqno before which it will never read any metadata information)
  • the reader’s lease expiration time

Forall expired readers and writers:

  • the expired reader / writer ID
  • the seqno at which the reader/writer was expired

For all submitted non deleted batches:

  • the writer ID that submitted the batch
  • the batch’s [lower, upper), and since frontiers
  • The key / set of keys pointing to the batch in S3
    • (maybe?) the seqno when the batch was introduced

Forall logically deleted batches:

  • the writer that deleted the batch
  • the seqno at which the batch was logically deleted
  • the key / set of keys pointing to the batch in S3

Foreach state rollup in the set state metadata rollups:

  • the seqno associated with the state rollup
  • the key pointing to the state rollup in S3.

Foreach logically deleted state rollup:

  • the seqno at which the state rollup was deleted.

This metadata state is stored in a sequence of incremental updates/individual state transitions in Consensus, and a set of state rollups stored in S3 objects, for ease of access to historical data. The batches themselves are stored in S3 objects inside of writer-owned paths within Blob.

Allowed state transitions:

  • AddReaderWriter(lease_duration_seconds) -> Reader/Writer Id + WriteCapability (upper) and ReadCapability(since, seqno_since) - this state transition adds a new reader and writer to the set of readers/writers associated with the same id and the corresponding since/seqno_since/upper chosen from the global values. The new readers and writers have a lease of lease_duration_seconds.

  • LogicallyDeleteReader(reader_id) / LogicallyDeleteWriter(writer_id): expire a reader / writer. All future requests by this reader or writer will be rejected.

  • PhysicallyDeleteReader(reader_id) / PhysicallyDeleteWriter(writer_id): remove the reader / writer from the set of logically deleted readers and writers.

  • Append(batch_key, description, writer_id): this transition adds a batch to the set of active batches and downgrades the writer’s upper to description.upper. This transition can also downgrade the collection’s upper if the global maximum of all writers’ uppers increases. This transition also updates the writer’s lease expiration time.

  • DowngradeReader(reader_id, new_since, new_seqno_since): Downgrade the reader to a new (since, seqno_since). This transition can also downgrade the collection’s since/seqno since if the global min of all reader’s since / seqno_since increases). This transition also updates the reader’s lease expiration time.

  • LogicallyDeleteBatch(batch_key, writer_id): This state transition is only used by garbage collector(s). This transition moves the batch located at batch_key into the logically deleted batches set provided that a) another batch covers exists that covers the batch to be deleted’s time interval.

  • PhysicallyDeleteBatch(batch_key): remove the batch from the list of logically deleted batches because the space has been physically reclaimed by some process.

  • CreateStateRollup(seqno_upper, state_rollup_key): add a new state rollup from [0, seqno_upper) at batch_key

  • LogicallyDeleteStateRollup(state_rollup_key): move a state rollup to the set of logically deleted state rollups

  • PhysicallyDeleteStateRollup(state_rollup_key): remove a state rollup from the set of physically deleted state rollups.

State machine usage:

Roughly speaking, all users of the state machine follow this structure:

  • Grab the current state and seqno of the state machine into memory
  • Perform the requisite operations operations either in memory (for example, for readers downgrading their since) or in Blob (for example, for writers appending a new batch)
  • Construct an incremental metadata update representing the state transition
  • Attempt to CaS this update into Consensus at seqno + 1. If you succeed, amazing! If you fail, reconstruct the current state and current seqno and try again from the beginning. (Note that writers don’t have to continue to rewrite their batch across CaS retries)

Incremental State

It's easiest to reason about writing out the entire state of the root of our immutable data structure. Sadly, there is an unresolveable tension between writes (which append to our list of batches) and compactions (which reduce the size of the list of batches). We don't want compaction progress to throttle writes, but if writes get ahead of compactions, we get quadratic behavior in the number of bytes we have to write to keep writing out our state. This isn't a theoretical concern, it was one of the primary problems in the first iteration of persist.

The natural way to break this tension is to maintain the state incrementally, so that what we're writing is proportional to the size of the change, not the entire state. To contain the problem of different code versions of persist using different logic to roll up the incremental updates, we model our state as a fixed number of TVCs. This allows us to use differential dataflow's summation logic (possibly literally using its _consolidateupdates method).

Concretely (TODO: Adjust these to match the state described above):

  • writers: Collection<(WriterId, WriterMeta), SeqNo, i64>
  • readers: Collection<(ReaderId, ReaderMeta), SeqNo, i64>
  • since: Collection<((), Antichain<T>), SeqNo, i64>
  • trace: Collection<(BlobKey, Description<T>), SeqNo, i64>
  • rollup: Collection<(BlobKey, ()), SeqNo, i64>
  • Note that these collections use SeqNo as the time dimension.
  • All the keys and values (WriterId, WriterMeta, etc) implement Codec.
  • SeqNo and i64 implement Codec64.
  • The writers and readers collections each contain at most one instance of the WriterId/ReaderId key at a given time.
  • The since collection contains exactly one element at every time.
  • The trace collection can contain multiple elements.
  • The rollup collection can contain multiple elements.

As hinted by the name, the rollup collection contains the entire contents of the collection at the corresponding SeqNo. This prevents us from needing an unbounded amount of work at startup to compute the full state. New rollups are computed and added at some heuristic interval. It contains multiple elements in case someone needs to compute the state as of some SeqNo previous to the most recent rollup. These can be garbage collected over time in the same way we garbage collect blobs as all readers progress past the SeqNo.

Pipelining and Backpressure

A persist's shard's scaling is ultimately limited by one of two things: the raw throughput of writing updates to S3 (think a high traffic source) or the number of state changes it can process (think a high traffic table or a large number of readers+writers sending progress information).

Under normal operation, it's difficult for a computation over updates (e.g. decoding Avro) on a reasonable machine to saturate the throughput of writing to S3 from the same AZ . Similarly, by batching up state changes into a hot loop and running compaction out-of-band (i.e. by modeling "this compaction succeeded" as an input to the state machine), we should be able to scale to a high frequency of writes. High numbers of concurrent writers (i.e. hundreds or more) is an open question.

However, these of course may not be true under degraded conditions. If persist is not able to keep up, it backpressures writers by only allowing them to have one outstanding request at a time. For maximum throughput, a WriteHandle user should internally buffer the next batch's updates while the previous batch is being written. If this buffer fills up, the WriteHandle user should in turn backpressure its own upstream system (e.g. reading from kafka for a source).

Compaction

The goal for compaction is to present a sequence of batches that are

  1. Logically consolidated: they contain state proportional to the number of distinct (data, time) pairs between [since, upper) rather than [lower, upper) even when since >> upper. Colloquially, we want to forget about historical details that no reader present or future is going to care about.
  2. Physically consolidated: the collection presents the user with a sequence of a logarithmic number of batches relative to the total number of updates presented between [since, upper). This helps us bound the size of the metadata required to represent an arbitrarily large collection and more importantly, it introduces opportunities for more logical compaction. For example, if an update at t2 cancels out an update at t1, the cancellation can only occur once the two updates land in the same physical batch. Otherwise, even if the update at t1 gets forwarded to t2, but still lives in a batch from [lower: t0, upper: t1), the forwarding won’t unlock the space reduction.

In differential, the compaction machinery both produces these new batches and logically removes the old batches from the data structure, and then uses reference counting to handle physical deletion of the batches. In persist, it is simpler to have one compactor that is only responsible for reading sequences of incoming batches and producing new, physically and logically compacted ones as the since frontiers advance and leave the logical and physical deletion to a separate garbage collection process.

For compaction selection, we plan to use an algorithm inspired by (and likely identical to) the one in differential's Spine: a size-tiered compaction, whereby some batch representing ~2^k updates (at level k) is only compacted together with another batch representing ~2^k updates (at level k) to produce a new batch representing ~2^k + 1 updates. This structure lets us meet the goals stated above while performing a O(n) number of reads and writes.

Garbage Collection

There are several places where state can accumulate in this design:

  1. The sequence of seqno, incremental state transitions grows in Consensus.
  2. Writers write new batches into Blob that eventually become unused.
  3. A background process writes new state rollups into Blob that eventually become unused.
  4. Writers and readers expire their lease (writer / reader ids eventually become unused)
  5. Collections themselves eventually get deleted (see below section) when their since/upper frontiers == []

Lets focus on 1, 2, and 3. Each of those can be logically deleted (system has a redundant copy of their information somewhere) when:

  1. The Consensus state at seqno will never be used again when the seqno_since of all readers > seqno, and a state rollup exists up to a seqno > seqno.

  2. This is slightly more complex.

    a) We know that a batch can be logically deleted when another batch exists that can cover its [lower, upper) at a since >= the collection’s since. At that moment, we can mark the batch safe for logical deletion.

    b) If we logically delete the batch at seqno X, we can physically delete the batch once the seqno_since for the collection advances up to a seqno that is > X, and there is a state rollup at a seqno > X.

    c) Once the batch has been physically deleted, we can remove it from the set of batches that have been logically deleted.

  3.  

    a) We can logically delete a state rollup as soon as another state rollup exists with a seqno > this state rollup’s seqno, at any subsequent seqno

    b) If we logically delete the state rollup at seqno X, we can physically delete it once the seqno_since advances up to a seqno > X and there is a state rollup at seqno > X.

    c) Once the state rollup has been physically deleted, we can remove it from the set of state rollups that have been logically deleted at any subsequent seqno.

Basically, the common pattern for 2 and 3 is:

  • A. The application can observe: this element in the state is logically redundant, and mark it safe for deletion (add it to a set of to be deleted elements). This transition takes place at some sequence number X.

Any users on some old version of the state can continue to use the same resource as they were before.

  • B. When all readers acknowledge that they have seen the logical delete and promise to never go backwards (ie they have advanced their seqno_since > X, and we have an appropriate state rollup at a seqno > X so that no future reader will need the state before the logical delete), we can now delete all of the elements that were logically deleted at seqnos <= X

  • C. Once the object has been physically deleted, we can remove it from the set of elements to be physically deleted (at any seqno > when we perform the physical delete).

This is basically the same idea as epoch based reclamation (we’re maintaining a free-list per seqno and then freeing those batches / state rollups once that seqno can never be read).

Arguably, 1 falls into the same pattern as well, only there’s no need to do an explicit logical delete?

Shard Deletion

We don’t expose a DeleteCollection type of operation on writers. That would introduce a way for writers and readers to potentially interfere with each other's operations where as defined above, writers are independent of other writers (no writer can invalidate another writer’s write operation), and other readers (no reader can invalidate another writer’s operation) and vice-versa for readers.

Instead, we know that a shard is safe to delete when its global upper and since frontiers are the empty antichain [], because at that point, the shard can no longer be written to or read from. Once a shard reaches that state we can delete all of its constituent batches, reject all requests for new readers and writers and leave only a single state rollup that acts as a tombstone indicating that the shard’s frontiers have advanced to []. We could even have a specific TOMBSTONE object which, if it exists, indicates that the shard has permanently been deleted. This lets us garbage-collect any consensus state associated with the shard as well.

Leasing

Naively, writers and, especially, readers can hold back reclamation of resources for an arbitrarily long amount of time. This would likely be due to fault, but could also just be accidental. If we eventually support something like cross-account export of computed views, it could even be maliciousness. To prevent this, all writers and readers are leased.

Our primary aim in the implementation of leasing is to avoid needing to reason about distributed clocks. As a result, the lease information is stored in the shard state and updated through consensus, just like anything else. Notably, a lease expiration also goes through consensus. Once a lease expires, the writer or reader learns about it on its next attempted state modification and must reconnect with a new id. The old id is permanently retired, allowing for it to eventually be gc'd.

The lease is extended through normal operation of append for writers and downgrade_since for readers. Both of these have a no-op version if a pure "heartbeat" is needed: writing no data and advancing upper to the same value, or downgrading to the same since.

RPC Server

Persist uses a "rich client" model with some of the logic running in-process (but behind the persist abstraction) so that readers and writers can talk directly to S3. To reduce contention, state changes (including things like "here's some data at an S3 key that I wrote") are then funneled through a PersistShard RPC service running in a separate process. Operationally, we'll run multiple copies of PersistShard and use some sort of fuzzy suppression to mostly keep them from contending.

Besides local caches of data from Blob and Consensus, the implementation of this service is stateless. Each request contains the ShardId, so that the RPC server can respond to any requests that happen to come its way. Combined, this means that the mapping between persist shards and servers is simply a function of where the requests are routed.

A single copy of the server should be sufficient for most (all?) customers, but we have the option of easy horizontal scalability if necessary. However, note that if we do, we'll want to route all traffic from each individual Shard to the same process, so that in the steady state, our CaS operations against Consensus are conflict-free. TODO: Figure out if this routing should be done at the k8s level or through something like a proxy.

In the future, we'll split compaction out into a second PersistCompaction RPC service for PersistShard to call out into. It will also be stateless, modulo caches. These would be elastically scalable with all compaction requests load balanced between them.

Retries and Error Handling

We chose a two-tiered error handling system, inspired by this sled.rs blog post. The two tiers of errors are usage errors and location errors (these latter ones could be called storage errors, but that name would already seem taken).

Usage errors indicate when the user of the API has requested an invalid operation. For example, trying to read as of t when the read handle has already been downgraded beyond t. In practice, these usually (TODO: always?) indicate a bug in the system that uses persist.

The second tier of errors, location or storage errors, indicate that something is wrong with one of the underlying systems (either blob or log). For example, the blob system could be down or some datum that we are trying to read is temporarily unavailable. One might be inclined to also differentiate between transient (temporary) and permanent location errors. This seems hard, though, in the general case. For example, an unavailable blob might "come back", but it might stay permanently unavailable, and we don't know how to differentiate between those two cases.

An important difference between the two classes of errors is timeouts and retries. When a usage error occurs an API call will immediately report it back. For location errors, on the other hand, we might retry within a given timeout window.

Another important axis along which we differentiate errors is definiteness (TODO: Come up with a better term because this one is already doing a lot of work elsewhere). Definite errors are those errors where we know that some operation definitely failed, while indefinite errors are those errors where we cannot know. This distinction is important for write operations (all operations that change state): if an operation definitely failed, we can retry without running the risk of writing duplicate data. If, on the other hand, we get a timeout or some other indefinite errors, we cannot know that the data was not written and have to abort the complete write attempt.

TODO: Figure out how to fit the two axes of errors into an ergonomic API. Maybe definite/indefinite are variants of ExternalError? It also feels like usage error/location error sometimes map to definite/indefinite, but only if you squint hard enough. I also think we will need to play this by ear, see what’s ergonomic when we start implementing things on top.

There are open questions around this topic, but we can table them for now and see what works best when working with the API to implement sources and the rest of STORAGE.

Open Questions:

  • What kind of error do we return when a read times out. That is, a reader thinks it has a lease (a ReadHandle) but it has timed out. It’s not a usage error because the reader theoretically did use the API correctly. But in terms of UX, I think the only thing we can do is return a "cannot serve query as of
  • Are invalid configurations (think wrong S3 address, some etcd misconfiguration, wrong password) a usage error or location error? I'd tend towards usage error, but don't know how hard that is to thread through. In my head I also see usage errors as static errors, because you can know that they will occur by looking at how the API is used, they are deterministic. On the other hand, location errors are non-deterministic, it's just something that can happen, but never has to. On the other hand, a "wrong password" error could be considered temporary, and can be fixed by changing the password in the system or in the given configuration.
  • What about errors where we notice that we messed up? A corrupted meta, or some sanity check failing. Do these fall into the two other classes of error? Do we panic?
  • Should we try and do something more sophisticated for handling retries? Configurable back-off strategy, number of retries, timeout? I'm inclined to keep just a timeout, and punt to the higher-level system, because it keeps persist simple.
  • Example Usage/Usage in Sources/Usage in Tables

    A writer (something that writes data into a persistent shard) needs to uphold the property that multiple incarnations of a writer must write the same (in terms of definite) data. This needs to hold both across time (think a writer that restarts after a failure) and across multiple concurrent writers. We think that a restarting writer will have to request a new writer ID, so the two cases boil down to the same thing.

    Upholding this property is complicated by the fact that materialize (or STORAGE) wants to assign timestamps to data (updates) before it is written to persistence. Table updates and data coming from sources need to be timestamped. Current thinking is that these timestamps are a realtime-esque timestamp. We will sketch how we can achieve this for the two initially planned use cases, tables and sources.

    Tables

    For tables, we seem to have two main options: YOLO mode and WAL mode.

    In YOLO mode the part of the system that accepts table updates can assign a timestamp to the update and then issue a write request to persistence. That write request might or might not succeed. The writer can report success if it gets back a success response from persist. The writer might not get a response, due to reasons, but that doesn’t mean that the write surely failed. If the write seems to have failed, the writer cannot simply try and write again, using the same writer id, because that could cause the update to be inserted again. It would have to request a new writer ID, and could then try writing the same update (with the same timestamp) again.

    In WAL mode, CompareAndAppend is used to ensure that each update (with an attached timestamp) is written to a WAL exactly once. An arbitrary number of writers could then go and read from that WAL and try to append them to the persistent shard. This is safe because the data is made definite in the WAL but still requires that writers (using the same writer ID) don’t retry writes if they don’t get a success message back. A restarted writer must still use a new writer ID, to ensure that we don’t write duplicates.

    The part that reads from the WAL and writes to persistence can be thought of as a source, where the WAL is the source system (see below) and we don’t need to do timestamping.

    Sources

    A source needs to roughly do this:

    • Read from an external system
    • Attach a timestamp to raw data
    • Write the timestamped data to persistence (a shard)

    The difficulty here is ensuring that a given source instance writes the same (in terms of definiteness) updates as all the other instances. Both when it comes to instances across time, after restarts, and with concurrently running instances. To solve this, we need two things: source messages must come with some kind of identifier that we can use to look up a timestamp that we attach (think Kafka partition-offset pairs), let’s call this source timestamp and we need a definite collection of timestamp bindings where we look up the assigned timestamp for a given source timestamp. We will sketch two different approaches for this. There might be more.

    For both approaches, the collection of timestamp bindings works like this: the schema of bindings is (<source timestamp>, <ts>) and a binding (s_ts, ts) tells us that any update with a source timestamp that is less-equal s_ts gets assigned the timestamp ts, unless there is another binding (with a lower source timestamp) that already covers a given source timestamp. So the collection of timestamp bindings define ranges of source timestamps that get assigned the same timestamp.

    Both approaches have in common that there needs to be durable consensus about what the timestamp bindings for given source updates should be. The two sketched approaches differ in how those bindings are minted, though. We will call the component that reads from an external source, applies bindings, and writes the timestamped data to a persist shard a source writer.

    Centralized Timestamp Minter

    This approach assumes that it is possible to figure out the "high watermarks" of available data, in terms of their source timestamp. The centralized minter can then mint bindings of the current "real-time clock" and write them down. This writing down needs some kind of consensus/exactly once to ensure that there is a consistent collection of bindings that all the source writers use.

    All the source writers need to do with this model is observe the minted bindings, read updates from the source, attach bindings to updates and write them to persistence.

    Distributed Minting (but still with consensus, so also centralized in a way) (title of this section TBD… 😅)

    With this approach, each source writer can read data from the source and whenever it encounters a source timestamp for which it doesn’t know the binding, it will "ask" a centralized component for a binding. That component can be centralized as in a centralized RPC service, or it can be a "rich client" that uses an external consensus system to mint a binding. The API for this minting service is:

    GetBinding(SourceTimestamp) -> (SourceTimestamp, Timestamp): Let the minter know that there is data at least up to the given source timestamps. The minter can either figure out that it already has a binding that covers this and return it. Or mint a new binding, record it (with consistent/exactly-once semantics) and return that.

    This way, we ensure that a) restarting source writers write the same data (with the same timestamps), and b) multiple concurrent source writers also write the same data with the same timestamp. Both because minting timestamps goes through some sort of consensus.

    Operations/Deployment

    The format here is a bit different. We spell out questions that an Ops Team might have and give our best answers:

    • Q: How is configuration passed to persist?
    • A: Flags on the binary, like all the other binaries.
    • Q: What metrics (if any) do we want to expose? How are metrics exposed?
    • A: We reuse the MetricsRegistry machinery from mz. Along with distributed tracing, enabled by our use of tracing?
    • Q: What extra systems do we need to keep running?
    • A: Initial version would seem to need S3 and etcd (or something like it). Naively, those come "for free" with our current cloud setup but I don't know what burden they put on the ops team.
    • Q: Do we need any local storage? (i.e.:: a persistent volume, not S3 or etcd).
    • A: We don’t, at least nothing that would be required for correctness. At most, "local" disks would be used for caching, though likely not in the first versions (This is important for the ops team because persistent volumes are a PITA to work with.)
    • Q: Can you also include network information (IE: list of ports and their uses)?
    • A: I don't think we (persist) need to expose any extra ports. But might be good to also record this here. TODO: Include any ports/info once the design has solidified.
    • Q: More long-term, should there be a standalone (cli-)tool that can be used to go and investigate the persist (meta-)data. This could be helpful for debugging when things go sideways.
    • A: Yes.

    Discarded/Not important/Obvious:

    • Long-term (and I'm just spitballing here): Do we eventually want to allow on-the-fly reconfiguration? Say changing timeouts, credentials. Or should configuration changes always require restarts. I can envision a configuration service that is periodically updated/checked.
    • If (in version 0) there is no centralized persist runtime/controller running anywhere, who reports the metrics? All the individual writers/readers. But who reports collection-level metrics, like registered writers/readers, frontiers, etc. The answer might be the RPC server or the thing that runs compaction.
    • What cruft do we expect to accrue in the underlying systems, and how do we think this can be cleaned up? Think failed blob writes, blobs that have been written but never got looped into meta. Same thing probably happens for meta updates. Will those cleanup tasks also eventually performed by the thing that does compactions in the future? Do we eventually need a tool that can go and find garbage?
      • For the cleanup, I generally prefer the thing that writes stuff to be responsible for cleaning up failed writes. In some cases, we can do this with S3 lifecycle rules, like to remove failed multi-part uploads (this is already enabled). We may need some separate cleanup process, though, for things where the writer may get deleted (ie: a source that gets deleted?).
      • I'm not a huge fan of cronjobs or K8S jobs. If we need a separate process to do cleanup, I'd prefer a long-running service.
      • As we have more or less standardized on deploying through Kubernetes, is my understanding, I’m wondering if we can use K8s-native mechanisms for this too.
    • If managing the k8s dns->pod thing is cheap enough that we can have 1 dns per shard (no idea if this is reasonable), then this makes it trivial operationally to move shards around between rpc servers. We likely always start with 1 and if/when a shard becomes enough traffic that we split it out, start a new pod running the rpc server and once it's up, update the dns entry. Maybe bounce the old one to clear the caches of that shards data

    Cost model in S3

    I think our costs in the cloud platform world will have 2 major drivers - S3 costs for Blob and service costs for Consensus and the various readers/and writers. I’m going to ignore financial costs in the single binary on-prem world because we are not responsible for any financial costs there.

    For S3 there’s three main levers of cost:

    • Cost of data storage: 1 GiB of data costs $0.023 / month to store.
    • Cost of write operations + list operations - 1000 put/list/delete operations cost $0.005.
    • Cost of GET operations - 1000 GETs cost $0.0004

    It’s hard for me to make intuitive sense of these units. Instead, I believe one appropriate way to build intuition here is to ask "what can we afford for $100 / year"? I think a year is the appropriate time quanta to think about because Materialize will be priced annually, and I think $100 is a reasonable monetary unit to think about because the smallest possible contracts will be on the order of ~single digit thousands per year, so ~single digit hundreds of dollars is the smallest possible "significant" cost denomination. Again, this is more to build intuition than anything else.

    In S3 $100/year lets you, individually:

    • Store 362 GiB of data for the whole year.
    • Perform 20 MM put/list/delete operations over the year. There’s slightly more than 31 MM seconds in a year so this works out to ~0.62 put/list/delete operations / second in aggregate over the entire year
    • Perform 250 MM get operations over the year. By the same division as above thats 7.9 get operations / second in aggregate over the entire year.

    This structure helps me ask and answer "in our current design is there any way a user could 10x our costs in a manner that we couldn’t justify a price increase for?". For example, a user could decide to store 10x as much data. I would argue that refusing to store past a certain amount of data without a price increase is totally reasonable. However consider the following situation: a user has 1 source ingesting data at 10 MB/s. Then say they delete this source and replace it with 10 sources each ingesting data at 1 MB/s. In this world, assuming the aggregate amount of data stored is the same, we’ve still managed to 10x the number of writes required / <time interval> because we write batches for different collections separately. There’s a number of technical ways to mitigate this cost increase (e.g increase the write batching time interval as the number of sources increases) and a number of product ways to prevent it (e.g. limit the number of sources a user can have), and its unclear what's the best thing to do generally. It just stands out as the biggest "potential cost surprise" currently.

    Alternatives

    Store the state in Blob instead of Consensus

    An alternate design stores the state entirely in Blob and only uses Consensus to tick forward the root pointer. Instead of explictly listing pointers to every batch of data in the state, a skip-list is used and the batches point at batches from previous points in time. The remaining state is small enough to write out in its entirety at each update.

    The primary benefit of this is fewer constraints on our implementation of Consensus along with reduced complexity in our incremental state representation. The primary drawback is that compaction of the skip-list representation of batches requires rewriting them in place (logically any of these replacements are interchangeable but the physical bytes are different), which makes it more difficult to reason about correctness in edge-case scenarios.

    Open questions

    Multi-Shards per State

    A reasonable concern is the overhead of running one of these state machines per shard, especially in the case of many low-traffic tables and/or sources. However, this is a hard problem. It would make state changes decently more difficult to reason about, but an even more difficult problem is migrating a shard from one state to another.

    We're initially deciding to punt on this for a few reasons. First, a number of the tradeoffs here depend quite a bit on specifics of the operations side of persist, which we'll have a much more specific handle on once we get something running in production. Second, this is exactly the sort of problem that we can solve by eating the cost of the overhead in the next 6-12 months and revisiting later.