# Persist aljoscha/danhhz/ruchirk _Notice: The document represents a (hopefully) self-consistent snapshot of our thinking at a point in time. It's a useful tool for understanding the structure of persist and for establishing a common language, but it's a non-goal to keep it continually updated with the latest thinking. For example: we've already decided (at the request of STORAGE) to shift from the capability model for ReadHandle described here to one of a shard-global `downgrade_since`/`allow_compaction`. This has far reaching consequences, potentially even to the number of concurrent readers we'll initially support. If you have any questions or want to know the latest thinking, please don't hesitate to reach out!_ ## Summary Persist is an implementation detail of [STORAGE]. Its "public" API is used only by STORAGE and code outside of STORAGE should be talking to STORAGE, not persist. However, having this strong API boundary between the two allows for the two teams to execute independently. Persist's primary abstraction is a "shard", which is a durable and definite [Time-Varying Collection (TVC)]. Persist requires that the collection's "data" is key-value structured (a `()` unit value is fine) but otherwise allows the key, value, time, and diff to be abstract in terms of the [Codec] and [Codec64] encode/decode traits. As a result, persist is independent of Materialize's internal data formats: `Row` etc. Along with talking to the outside world, STORAGE pilots persist shards around to present a "STORAGE collection", which is a durable TVC that handles reshardings and version upgrades. It could also ensure that persist will never again see data from some `Codec`'s previous implementation of `encode`, allowing us to remove support for it from the corresponding `decode`. A persist shard is exactly a [storage shard] and they can be used interchangeably. [STORAGE]: https://github.com/MaterializeInc/materialize/pull/10967 [Time-Varying Collection (TVC)]: https://github.com/MaterializeInc/materialize/blob/13603a6ee1c13eaa243948fc6a99e8fe2a9d0cdd/doc/developer/platform/formalism.md#time-varying-collections [Codec]: https://github.com/MaterializeInc/materialize/blob/b844c760634fea3aecf8e15af09bb42a246a51de/src/persist-types/src/lib.rs#L23-L52 [Codec64]: https://github.com/MaterializeInc/materialize/blob/b7eae8529abf5543c1bfbfa4115b98a49f43bd9a/src/persist-types/src/lib.rs#L54-L78 [storage shard]: https://github.com/MaterializeInc/materialize/blob/e8c27d00cda02b1f9a50fbfc852d59998b936232/doc/developer/platform/architecture-storage.md#shards ## Jargon - _update_: A differential dataflow `((key, value), timestamp, diff)` tuple. Persist requires that the data field has a `(key, value)` structure, though the value can be the unit type to mean "empty": `()`. - _Time-Varying Collection_: (TVC) Quoting the [formalism], "A multiset of typed data whose contents may vary arbitrarily as function of some ordered time." Materialize represents TVCs as a set of updates. - _Partial Time-Varying Collection_: (pTVC) Quoting the [formalism], "Materialize most often only has partial information about TVCs. A TVC's future is usually not yet completely known, and its deep past also may not be faithfully recorded. To represent what Materialize knows with certainty, Materialize maintains two "frontiers" for each TVC. The `upper` frontier, which communicates that for times time not greater or equal to `upper`, the updates at time are complete. The `since` frontier, which communicates that for times time greater or equal to `since`, the accumulated collection at time will be correct." - _definite_: Quoting the [correctness doc], "A collection is definite when all uses of any one identification will yield exactly the same time-varying collection of data (identical contents at each logical time). A handle to a collection can be "definite from `since` until `upper`" if it reflects the definite results for times greater or equal to `since` but not greater or equal to `upper`; this intentionally allows for compaction of historical detail and unavailability of future data, without ambiguity about which logical times can be described with certainty." - _shard_: A durable and definite TVC. - _batch_: In the spirit of differential dataflow's [Trace batch], an immutable collection of updates, along with a `Description` of those updates. - `ShardId`: An identifier for a persist shard, as described in definite. - _location_: An external source of data durability. All of persist's interactions with the outside world are abstracted through the `Blob` and `Consensus` traits, together collectively called the persist location. One valid way to think of persist is as a layer that maps simpler durable storage semantics to ones that are more native to differential dataflow. - _blob_: A durable `&str key -> (possibly large) &[u8]` value store. The production implementation is S3 (or other cloud storage). An in-mem HashMap implementation exists for testing. A filesystem implementation may be used for a single-binary `materialized` deployment. - _consensus_: A durable log with `&[u8]` entries, linearizing writes concurrently from multiple places. - `SeqNo`: A _sequence number_ identifying a specific version of the internal [state machine]'s state. [formalism]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/platform/formalism.md [correctness doc]: https://github.com/MaterializeInc/materialize/blob/v0.13.0/doc/developer/design/20210831_correctness.md#description [trace batch]: https://github.com/TimelyDataflow/differential-dataflow/blob/27492f26ccec54360eba8af882acac94b8a63a6c/src/trace/mod.rs#L256-L277 [state machine]: #state-machine ## Goals - Make pTVCs definite, i.e. make it so that we can read the contents of a pTVC through some `[lower, upper)` range of times as of some known `since` - Make pTVCs definite both for a distributed cloud platform workload, and for a local single binary workload. - Store the contents of the pTVC in S3 / some other service offering similarly cheap cold storage. - Checksum stored data for end-to-end defense against e.g. network errors. - Operational simplicity: multiple {readers, writers, any other state machine/maintenance workers} can at least be turned on at the same time. - Reader / writer liveness: - If multiple readers and writers are attempting to read / write at a time, at least one of them can always make progress. - Readers and writers are not affected by their peers being paused or restarted. - The collection makes progress (i.e. the collection `upper` advances) so long as one writer can make progress. Concretely, we don’t need all writers to be able to write a batch in order for it to be read. - Horizontal scalability of reads: multiple readers can read the contents of a collection linearly faster than a single reader. - Separation of compute and storage. - High throughput reads - saturate the network capacity of multiple writers + optionally allow for writers to keep a local cache. - High throughput writes - the shard’s write throughput is limited to the max write throughput of a single writer. - Ease of reasoning about consensus (Concretely: do not write our own distributed consensus protocol. Use an off the shelf solution instead.) - Compaction: store the contents of a pTVC using space proportional to the set of distinct (data, time, diff) records in `[since, upper)` (plus an optional logarithmic factor). - Low write amplification. - Low/bounded S3 costs. Concretely: the S3 costs should be proportional to the bytes written and read by each of the writers and readers respectively. The S3 cost should not be proportional to the number of distinct read/write frontier downgrades. - S3 operations/second should be tunable independently of the size/rate of change of the pTVC. In other words, we should have a lever that we can pull to reduce our S3 cost, in exchange for higher latency/throughput for the end user. Then the user is free to choose the performance level they want to pay for. ## Non-goals - Replication/HA of pTVC: data is replicated in S3 using the fact that S3 itself is replicated (we’re not doing any application level replication), and not replicated when running in single binary mode. - Cross AWS region replication. - Liveness during key-value store/consensus service failure. - Write horizontal scalability. Multiple writers write different copies of the same shard, and storage is responsible for splitting an upstream source into multiple shards. - Handling cases where S3 / related cloud storage services themselves lose or modify stored data. - Low latency writes and reads. We target that small writes and reads (e.g. writes and reads to batches smaller than 1 KiB) have a median (p50) latency of under 100 ms, and a p95 latency of under 1s. Larger writes are the same minimum latency plus however long it takes to write the data to s3. - High throughput of reader/writer creation. - High throughput of pTVC creation. - Hundreds/thousands of writers/readers (yet). We’ve initially designed for `O(tens)` of concurrent readers/writers. Scaling to `O(hundreds)` (and more) of readers is possible but left as followup work. - Self-containment: acceptable to rely on S3/etcd/other external distributed consensus implementation/sqlite. - Encryption of stored data ## API Persist uses a "rich client" model with some of the logic running in-process (but behind the persist abstraction) so that readers and writers can talk directly to S3. Everything in persist is generic over `` (hereafter ``) to allow users control over how their data formats are encoded and decoded. `K` and `V` must implement `Codec`, which requires its implementations to be able to decode data written by previous versions of `encode`. (STORAGE can use renditions to migrate these over time to remove cruft in decode and keep this from accumulating indefinitely.) `T` and `D` must implement `Codec64`. In practice, `Codec64` means they're roundtripped as little-endian `i64` or `u64`. `Codec64` can be made more general if necessary, but this "serialized as 8 bytes" restriction allows us to do some performance tricks in our columnar disk format for batches of updates as well as makes a number of metadata decoding pathways much simpler. Most of the persist API has a close correspondence with named entities in the [formalism]. The formalism describes STORAGE, not persist, but given that part of STORAGE's duties is piloting around persist to account for reshardings and behavior changes across code versions, this makes sense. - `WriteHandle` is roughly `WriteCapability`. - `WriteHandle::append` is roughly `Append` - `ReadHandle` is roughly `ReadCapability`. - `ReadHandle::listen` + `ReadHandle::snapshot` are roughly `SubscribeAt` - `Client::open` is roughly `AcquireCapabilities` Here's our best initial guess at concretely what the API looks like. As always, the source of truth will be [the code]. [the code]: https://github.com/MaterializeInc/materialize/blob/main/src/persist-client/src/lib.rs ```rust impl Client { /// Provides capabilities for the durable TVC identified by `shard_id` at /// its current since and upper frontiers. /// /// This method is a best-effort attempt to regain control of the frontiers /// of a shard. Its most common uses are to recover capabilities that have /// expired (leases) or to attempt to read a TVC that one did not create (or /// otherwise receive capabilities for). If the frontiers have been fully /// released by all other parties, this call may result in capabilities with /// empty frontiers (which are useless). /// /// If `shard_id` has never been used before, initializes a new shard and /// returns handles with `since` and `upper` frontiers set to initial values /// of `Antichain::from_elem(T::minimum())`. pub async fn open( &self, timeout: Duration, shard_id: ShardId, ) -> Result<(WriteHandle, ReadHandle), ExternalError> } impl WriteHandle { /// Applies `updates` to this shard and downgrades this handle's upper to /// `new_upper`. /// /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s (with /// different [WriterId]s) may be used concurrently to write to the same /// shard, but in this case, the data being written must be identical (in /// the sense of "definite"-ness). It's intended for replicated use by /// source ingestion, sinks, etc. /// /// All times in `updates` must be greater or equal to `self.upper()` and /// not greater or equal to `new_upper`. A `new_upper` of the empty /// antichain "finishes" this shard, promising that no more data is ever /// incoming. /// /// `updates` may be empty, which allows for downgrading `upper` to /// communicate progress. It is possible to heartbeat a writer lease by /// calling this with `new_upper` equal to `self.upper()` and an empty /// `updates` (making the call a no-op). /// /// This uses a bounded amount of memory, even when `updates` is very large. /// Individual records, however, should be small enough that we can /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to /// us. /// /// The clunky two-level Result is to enable more obvious error handling in /// the caller. See for details. /// /// TODO: Introduce an AsyncIterator (futures::Stream) variant of this. Or, /// given that the AsyncIterator version would be strictly more general, /// alter this one if it turns out that the compiler can optimize out the /// overhead. pub async fn append<'a, I: IntoIterator>( &mut self, timeout: Duration, updates: I, new_upper: Antichain, ) -> Result, ExternalError>; /// Applies `updates` to this shard and downgrades this handle's upper to /// `new_upper` iff the current shard upper is equal to `expected_upper`. /// /// In contrast to [Self::append], this linearizes mutations from all /// writers. It's intended for use as an atomic primitive for timestamp /// bindings, SQL tables, etc. /// /// All times in `updates` must be greater or equal to `expected_upper` and /// not greater or equal to `new_upper`. A `new_upper` of the empty /// antichain "finishes" this shard, promising that no more data is ever /// incoming. /// /// `updates` may be empty, which allows for downgrading `upper` to /// communicate progress. It is possible to heartbeat a writer lease by /// calling this with `new_upper` equal to `self.upper()` and an empty /// `updates` (making the call a no-op). /// /// This uses a bounded amount of memory, even when `updates` is very large. /// Individual records, however, should be small enough that we can /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to /// us. /// /// The clunky two-level Result is to enable more obvious error handling in /// the caller. See for details. pub async fn compare_and_append<'a, I: IntoIterator>( &mut self, timeout: Duration, updates: I, expected_upper: Antichain, new_upper: Antichain, ) -> Result>, InvalidUsage>, ExternalError>; } impl ReadHandle { /// Forwards the since frontier of this handle, giving up the ability to /// read at times not greater or equal to `new_since`. /// /// This may trigger (asynchronous) compaction and consolidation in the /// system. A `new_since` of the empty antichain "finishes" this shard, /// promising that no more data will ever be read by this handle. /// /// It is possible to heartbeat a reader lease by calling this with /// `new_since` equal to `self.since()` (making the call a no-op). /// /// The clunky two-level Result is to enable more obvious error handling in /// the caller. See for details. pub async fn downgrade_since( &mut self, timeout: Duration, new_since: Antichain, ) -> Result, ExternalError>; /// Returns an ongoing subscription of updates to a shard. /// /// The stream includes all data at times greater than `as_of`. Combined /// with [Self::snapshot] it will produce exactly correct results: the /// snapshot is the TVCs contents at `as_of` and all subsequent updates /// occur at exactly their indicated time. The recipient should only /// downgrade their read capability when they are certain they have all data /// through the frontier they would downgrade to. /// /// The clunky two-level Result is to enable more obvious error handling in /// the caller. See for details. /// /// TODO: If/when persist learns about the structure of the keys and values /// being stored, this is an opportunity to push down projection and key /// filter information. pub async fn listen( &self, timeout: Duration, as_of: Antichain, ) -> Result, InvalidUsage>, ExternalError>; /// Returns a snapshot of the contents of the shard TVC at `as_of`. /// /// This command returns the contents of this shard as of `as_of` once they /// are known. This may "block" (in an async-friendly way) if `as_of` is /// greater or equal to the current `upper` of the shard. The recipient /// should only downgrade their read capability when they are certain they /// have all data through the frontier they would downgrade to. /// /// This snapshot may be split into a number of splits, each of which may be /// exchanged (including over the network) to load balance the processing of /// this snapshot. These splits are usable by anyone with access to the /// shard's [crate::Location]. The `len()` of the returned `Vec` is /// `num_splits`. If a 1:1 mapping between splits and (e.g. dataflow /// workers) is used, then the work of replaying the snapshot will be /// roughly balanced. /// /// The clunky two-level Result is to enable more obvious error handling in /// the caller. See for details. /// /// TODO: If/when persist learns about the structure of the keys and values /// being stored, this is an opportunity to push down projection and key /// filter information. pub async fn snapshot( &self, timeout: Duration, as_of: Antichain, num_splits: NonZeroUsize, ) -> Result, InvalidUsage>, ExternalError>; } ``` ## Blob+Consensus The persistence system is built on top of two logical primitives, `Blob` and `Consensus`. `Blob` is a durable key-value store that maps `string keys -> arbitrary byte payloads`. It's conceptually similar to a shared hashtable, perhaps expressed as a `Arc>>>` in Rust, except durable across restarts and potentially shared across multiple processes. `Blob` supports the following operations: - `GET(key)` : returns the value mapped to `key` if there is one. - `PUT(key, value)` : map `key` to `value`. Note that this operation is atomic, which means that any `GET(key)` will either see the old value before the `PUT` or `value` after the `PUT` but nothing else. - `DELETE(key)` : remove any `value` associated with `key`, if there is one and do nothing otherwise. - `LIST KEYS(prefix)`: list the set of `key`s currently mapped to a `value` that start with `prefix`. Because all of our keys are write-once, modify never, `Blob` does not have to be linearizable. Readers can simply retry reads that don’t succeed. We’re going to keep at least 3 different versions of `Blob`, one built on S3 that will be the main production use case, one built on top of the POSIX filesystem API for the single binary use case, and one in-memory purely for unit testing / measuring overhead of doing IO. `Consensus` is a linearizable, durable, totally ordered array of binary records and sequence numbers (think a durable `Vec<(SeqNo, Vec)>`) that supports the following operations: - `HEAD`: return a recent (not necessarily the most recent) record inserted into `Consensus` and the sequence number it was inserted at. - `CompareAndSet(sequence_number, data) -> bool`: Atomically insert `data` into `Consensus` if the most recent record at insert time was inserted at `sequence_number - 1`. Returns true if the insert was successful and false otherwise. - `SCAN(sequence_number)`: returns a list of all records inserted at sequence numbers greater than or equal to `sequence_number` . - `TRUNCATE(sequence_number)`: atomically remove all records inserted at sequence numbers less than `sequence_number`. We use `Consensus` to store incremental updates to persist’s metadata. We need a compare-and-set primitive to guarantee a total order of metadata updates across distributed writers, and we need to maintain a history of those updates in order to periodically reconstruct the full metadata state, and write it out somewhere else. ## Intuition for Safety One lens through which you might view all of the following details is of persist maintaining an acyclic immutable data structure. The nodes are stored in an external [blob] store, with blob keys as pointers between nodes. The "root" pointer of the data structure is changed only by going through distributed [consensus]. Each distributed actor writes blobs to a path with a unique prefix and blobs are atomically written, write-once, modify-never. This makes reasoning about caching, mutations, and idempotency of these blobs straightforward-ish. Persist is a state machine and this immutable data structure contains its state. The distributed actors only interact though consensus: starting with the latest version of the state, applying inputs, and using `CompareAndSet` to only move the root pointer if it hasn't changed. If it has, they restart the loop. [blob]: #blobconsensus [consensus]: #blobconsensus ## Intuition for Liveness Notably, our liveness is dependent on the availability of the external [blob] and [consensus] services we rely on. There's no getting around this. We'll have to select services we trust. To reduce contention, we'll run the state update loop in an RPC server. But it's safe to run the loop from anywhere. This allows us to aggressively fail over if the RPC server misses a health check. Readers of the immutable data structure declare which version of the root pointer they are using (by maintaining it in the state machine state) and are only allowed to progress forward though versions. This allows us to eventually reason about unreachable root versions and unreachable blobs for cleanup. Readers and Writers are both given leases on registration. This lease is extended as the reader or writer is used. If the lease expires (or the process is restarted), the reader/writer acquires a new lease under a new (internal) identifier. ## Implementation ### State Machine Persist is a deterministic state machine. This means that persist maintains some small metadata in persistent storage describing the current status of the state machine, and accepts an ordered sequence of inputs as defined in the [API]. Each of those inputs can either result in a no-op error, in which case the machine’s state is unchanged, or optionally, the state stored in persistent storage undergoes a deterministic state transition. [API]: #api #### Persist has to keep track of the following state: Shard global information: - `upper` frontier (max of all writers `upper`s) - `since` frontier (min of all readers `since`s) - `seqno_since` frontier (the last `seqno` any reader will be allowed to read from) I believe this makes the most sense as the min of (all of the readers `seqno_since`, the `seqno` of the last state rollup?) Foreach active (non-expired) writer: - the writer’s `upper` frontier - the unique key prefix assigned to the writer - the writer’s lease expiration time Foreach active (non-expired) reader: - the reader’s `since` frontier - the reader’s `seqno_since` frontier (the `seqno` before which it will never read any metadata information) - the reader’s lease expiration time Forall expired readers and writers: - the expired reader / writer `ID` - the `seqno` at which the reader/writer was expired For all submitted non deleted batches: - the writer ID that submitted the batch - the batch’s `[lower, upper)`, and `since` frontiers - The key / set of keys pointing to the batch in S3 - (maybe?) the `seqno` when the batch was introduced Forall logically deleted batches: - the writer that deleted the batch - the `seqno` at which the batch was logically deleted - the key / set of keys pointing to the batch in S3 Foreach state rollup in the set state metadata rollups: - the `seqno` associated with the state rollup - the key pointing to the state rollup in S3. Foreach logically deleted state rollup: - the `seqno` at which the state rollup was deleted. This metadata state is stored in a sequence of incremental updates/individual state transitions in `Consensus`, and a set of state rollups stored in S3 objects, for ease of access to historical data. The batches themselves are stored in S3 objects inside of writer-owned paths within `Blob`. #### Allowed state transitions: - `AddReaderWriter(lease_duration_seconds) -> Reader/Writer Id + WriteCapability (upper) and ReadCapability(since, seqno_since)` - this state transition adds a new reader and writer to the set of readers/writers associated with the same id and the corresponding `since`/`seqno_since`/`upper` chosen from the global values. The new readers and writers have a lease of `lease_duration_seconds`. - `LogicallyDeleteReader(reader_id)` / `LogicallyDeleteWriter(writer_id)`: expire a reader / writer. All future requests by this reader or writer will be rejected. - `PhysicallyDeleteReader(reader_id)` / `PhysicallyDeleteWriter(writer_id)`: remove the reader / writer from the set of logically deleted readers and writers. - `Append(batch_key, description, writer_id)`: this transition adds a batch to the set of active batches and downgrades the writer’s `upper` to `description.upper`. This transition can also downgrade the collection’s `upper` if the global maximum of all writers’ `upper`s increases. This transition also updates the writer’s lease expiration time. - `DowngradeReader(reader_id, new_since, new_seqno_since)`: Downgrade the reader to a new `(since, seqno_since)`. This transition can also downgrade the collection’s `since`/`seqno since` if the global min of all reader’s `since` / `seqno_since` increases). This transition also updates the reader’s lease expiration time. - `LogicallyDeleteBatch(batch_key, writer_id)`: This state transition is only used by garbage collector(s). This transition moves the batch located at batch_key into the logically deleted batches set provided that a) another batch covers exists that covers the batch to be deleted’s time interval. - `PhysicallyDeleteBatch(batch_key)`: remove the batch from the list of logically deleted batches because the space has been physically reclaimed by some process. - `CreateStateRollup(seqno_upper, state_rollup_key)`: add a new state rollup from `[0, seqno_upper)` at `batch_key` - `LogicallyDeleteStateRollup(state_rollup_key)`: move a state rollup to the set of logically deleted state rollups - `PhysicallyDeleteStateRollup(state_rollup_key)`: remove a state rollup from the set of physically deleted state rollups. #### State machine usage: Roughly speaking, all users of the state machine follow this structure: - Grab the current state and `seqno` of the state machine into memory - Perform the requisite operations operations either in memory (for example, for readers downgrading their `since`) or in `Blob` (for example, for writers appending a new batch) - Construct an incremental metadata update representing the state transition - Attempt to CaS this update into `Consensus` at `seqno + 1`. If you succeed, amazing! If you fail, reconstruct the current state and current `seqno` and try again from the beginning. (Note that writers don’t have to continue to rewrite their batch across CaS retries) ### Incremental State It's easiest to reason about writing out the entire state of the root of our immutable data structure. Sadly, there is an unresolveable tension between writes (which append to our list of batches) and compactions (which reduce the size of the list of batches). We don't want compaction progress to throttle writes, but if writes get ahead of compactions, we get quadratic behavior in the number of bytes we have to write to keep writing out our state. This isn't a theoretical concern, it was one of the primary problems in the first iteration of persist. The natural way to break this tension is to maintain the state _incrementally_, so that what we're writing is proportional to the size of the change, not the entire state. To contain the problem of different code versions of persist using different logic to roll up the incremental updates, we model our state as a fixed number of TVCs. This allows us to use differential dataflow's summation logic (possibly literally using its _consolidate_updates_ method). Concretely (TODO: Adjust these to match the state described above): - `writers: Collection<(WriterId, WriterMeta), SeqNo, i64>` - `readers: Collection<(ReaderId, ReaderMeta), SeqNo, i64>` - `since: Collection<((), Antichain), SeqNo, i64>` - `trace: Collection<(BlobKey, Description), SeqNo, i64>` - `rollup: Collection<(BlobKey, ()), SeqNo, i64>` - Note that these collections use `SeqNo` as the time dimension. - All the keys and values (`WriterId`, `WriterMeta`, etc) implement `Codec`. - `SeqNo` and `i64` implement `Codec64`. - The `writers` and `readers` collections each contain at most one instance of the `WriterId`/`ReaderId` key at a given time. - The `since` collection contains exactly one element at every time. - The `trace` collection can contain multiple elements. - The `rollup` collection can contain multiple elements. As hinted by the name, the rollup collection contains the entire contents of the collection at the corresponding `SeqNo`. This prevents us from needing an unbounded amount of work at startup to compute the full state. New rollups are computed and added at some heuristic interval. It contains multiple elements in case someone needs to compute the state as of some `SeqNo` previous to the most recent rollup. These can be [garbage collected] over time in the same way we garbage collect blobs as all readers progress past the `SeqNo`. [garbage collected]: #garbage-collection ### Pipelining and Backpressure A persist's shard's scaling is ultimately limited by one of two things: the raw throughput of writing updates to S3 (think a high traffic source) or the number of state changes it can process (think a high traffic table or a large number of readers+writers sending progress information). Under normal operation, it's difficult for a computation over updates (e.g. decoding Avro) on a reasonable machine to saturate the throughput of writing to S3 from the same AZ . Similarly, by batching up state changes into a hot loop and running compaction out-of-band (i.e. by modeling "this compaction succeeded" as an input to the state machine), we should be able to scale to a high frequency of writes. High numbers of concurrent writers (i.e. hundreds or more) is an open question. However, these of course may not be true under degraded conditions. If persist is not able to keep up, it [backpressure]s writers by only allowing them to have one outstanding request at a time. For maximum throughput, a `WriteHandle` user should internally buffer the next batch's updates while the previous batch is being written. If this buffer fills up, the `WriteHandle` user should in turn backpressure its own upstream system (e.g. reading from kafka for a source). [backpressure]: https://github.com/aphyr/distsys-class#backpressure ### Compaction The goal for compaction is to present a sequence of batches that are 1. Logically consolidated: they contain state proportional to the number of distinct (data, time) pairs between `[since, upper)` rather than `[lower, upper)` even when `since >> upper`. Colloquially, we want to forget about historical details that no reader present or future is going to care about. 2. Physically consolidated: the collection presents the user with a sequence of a logarithmic number of batches relative to the total number of updates presented between `[since, upper)`. This helps us bound the size of the metadata required to represent an arbitrarily large collection and more importantly, it introduces opportunities for more logical compaction. For example, if an update at `t2` cancels out an update at `t1`, the cancellation can only occur once the two updates land in the same physical batch. Otherwise, even if the update at `t1` gets forwarded to `t2`, but still lives in a batch from `[lower: t0, upper: t1)`, the forwarding won’t unlock the space reduction. In differential, the compaction machinery both produces these new batches and logically removes the old batches from the data structure, and then uses reference counting to handle physical deletion of the batches. In persist, it is simpler to have one compactor that is only responsible for reading sequences of incoming batches and producing new, physically and logically compacted ones as the `since` frontiers advance and leave the logical and physical deletion to a separate garbage collection process. For compaction selection, we plan to use an algorithm inspired by (and likely identical to) the one in differential's [Spine]: a size-tiered compaction, whereby some batch representing `~2^k` updates (at level `k`) is only compacted together with another batch representing `~2^k` updates (at level `k`) to produce a new batch representing `~2^k + 1` updates. This structure lets us meet the goals stated above while performing a `O(n)` number of reads and writes. [Spine]: https://github.com/TimelyDataflow/differential-dataflow/blob/27492f26ccec54360eba8af882acac94b8a63a6c/src/trace/implementations/spine_fueled.rs ### Garbage Collection There are several places where state can accumulate in this design: 1. The sequence of `seqno`, incremental state transitions grows in `Consensus`. 2. Writers write new batches into `Blob` that eventually become unused. 3. A background process writes new state rollups into `Blob` that eventually become unused. 4. Writers and readers expire their lease (writer / reader ids eventually become unused) 5. Collections themselves eventually get deleted (see below section) when their `since`/`upper` frontiers `== []` Lets focus on 1, 2, and 3. Each of those can be logically deleted (system has a redundant copy of their information somewhere) when: 1. The `Consensus` state at `seqno` will never be used again when the `seqno_since` of all readers `> seqno`, and a state rollup exists up to a seqno `> seqno`. 2. This is slightly more complex. a) We know that a batch can be logically deleted when another batch exists that can cover its `[lower, upper)` at a `since >=` the collection’s `since`. At that moment, we can mark the batch safe for logical deletion. b) If we logically delete the batch at `seqno X`, we can physically delete the batch once the `seqno_since` for the collection advances up to a `seqno` that is `> X`, and there is a state rollup at a `seqno > X`. c) Once the batch has been physically deleted, we can remove it from the set of batches that have been logically deleted. 3.   a) We can logically delete a state rollup as soon as another state rollup exists with a `seqno >` this state rollup’s `seqno`, at any subsequent `seqno` b) If we logically delete the state rollup at `seqno X`, we can physically delete it once the `seqno_since` advances up to a `seqno > X` and there is a state rollup at `seqno > X`. c) Once the state rollup has been physically deleted, we can remove it from the set of state rollups that have been logically deleted at any subsequent `seqno`. Basically, the common pattern for 2 and 3 is: - A. The application can observe: this element in the state is logically redundant, and mark it safe for deletion (add it to a set of to be deleted elements). This transition takes place at some sequence number `X`. Any users on some old version of the state can continue to use the same resource as they were before. - B. When all readers acknowledge that they have seen the logical delete and promise to never go backwards (ie they have advanced their `seqno_since > X`, and we have an appropriate state rollup at a `seqno > X` so that no future reader will need the state before the logical delete), we can now delete all of the elements that were logically deleted at `seqnos <= X` - C. Once the object has been physically deleted, we can remove it from the set of elements to be physically deleted (at any `seqno >` when we perform the physical delete). This is basically the same idea as epoch based reclamation (we’re maintaining a free-list per `seqno` and then freeing those batches / state rollups once that `seqno` can never be read). Arguably, 1 falls into the same pattern as well, only there’s no need to do an explicit logical delete? ### Shard Deletion We don’t expose a `DeleteCollection` type of operation on writers. That would introduce a way for writers and readers to potentially interfere with each other's operations where as defined above, writers are independent of other writers (no writer can invalidate another writer’s write operation), and other readers (no reader can invalidate another writer’s operation) and vice-versa for readers. Instead, we know that a shard is safe to delete when its global `upper` and `since` frontiers are the empty antichain `[]`, because at that point, the shard can no longer be written to or read from. Once a shard reaches that state we can delete all of its constituent batches, reject all requests for new readers and writers and leave only a single state rollup that acts as a tombstone indicating that the shard’s frontiers have advanced to `[]`. We could even have a specific `TOMBSTONE` object which, if it exists, indicates that the shard has permanently been deleted. This lets us garbage-collect any consensus state associated with the shard as well. ### Leasing Naively, writers and, especially, readers can hold back reclamation of resources for an arbitrarily long amount of time. This would likely be due to fault, but could also just be accidental. If we eventually support something like cross-account export of computed views, it could even be maliciousness. To prevent this, all writers and readers are leased. Our primary aim in the implementation of leasing is to avoid needing to reason about distributed clocks. As a result, the lease information is stored in the shard state and updated through consensus, just like anything else. Notably, a lease expiration also goes through consensus. Once a lease expires, the writer or reader learns about it on its next attempted state modification and must reconnect with a new id. The old id is permanently retired, allowing for it to eventually be gc'd. The lease is extended through normal operation of `append` for writers and `downgrade_since` for readers. Both of these have a no-op version if a pure "heartbeat" is needed: writing no data and advancing `upper` to the same value, or downgrading to the same `since`. ### RPC Server Persist uses a "rich client" model with some of the logic running in-process (but behind the persist abstraction) so that readers and writers can talk directly to S3. To reduce contention, state changes (including things like "here's some data at an S3 key that I wrote") are then funneled through a `PersistShard` RPC service running in a separate process. Operationally, we'll run multiple copies of `PersistShard` and use some sort of fuzzy suppression to mostly keep them from contending. Besides local caches of data from `Blob` and `Consensus`, the implementation of this service is stateless. Each request contains the `ShardId`, so that the RPC server can respond to any requests that happen to come its way. Combined, this means that the mapping between persist shards and servers is simply a function of where the requests are routed. A single copy of the server should be sufficient for most (all?) customers, but we have the option of easy horizontal scalability if necessary. However, note that if we do, we'll want to route all traffic from each individual Shard to the same process, so that in the steady state, our CaS operations against `Consensus` are conflict-free. TODO: Figure out if this routing should be done at the k8s level or through something like a proxy. In the future, we'll split compaction out into a second `PersistCompaction` RPC service for `PersistShard` to call out into. It will also be stateless, modulo caches. These would be elastically scalable with all compaction requests load balanced between them. ### Retries and Error Handling We chose a two-tiered error handling system, inspired by [this sled.rs blog post](http://sled.rs/errors.html). The two tiers of errors are _usage errors_ and _location errors_ (these latter ones could be called _storage errors_, but that name would already seem taken). Usage errors indicate when the user of the API has requested an invalid operation. For example, trying to read as of `t` when the read handle has already been downgraded beyond `t`. In practice, these usually (TODO: always?) indicate a bug in the system that uses `persist`. The second tier of errors, location or storage errors, indicate that something is wrong with one of the underlying systems (either `blob` or `log`). For example, the `blob` system could be down or some datum that we are trying to read is temporarily unavailable. One might be inclined to also differentiate between transient (temporary) and permanent location errors. This seems hard, though, in the general case. For example, an unavailable `blob` might "come back", but it might stay permanently unavailable, and we don't know how to differentiate between those two cases. An important difference between the two classes of errors is timeouts and retries. When a usage error occurs an API call will immediately report it back. For location errors, on the other hand, we might retry within a given timeout window. Another important axis along which we differentiate errors is _definiteness_ (TODO: Come up with a better term because this one is already doing a lot of work elsewhere). _Definite_ errors are those errors where we know that some operation definitely failed, while _indefinite_ errors are those errors where we cannot know. This distinction is important for write operations (all operations that change state): if an operation definitely failed, we can retry without running the risk of writing duplicate data. If, on the other hand, we get a timeout or some other indefinite errors, we cannot know that the data was not written and have to abort the complete write attempt. TODO: Figure out how to fit the two axes of errors into an ergonomic API. Maybe definite/indefinite are variants of `ExternalError`? It also feels like usage error/location error sometimes map to definite/indefinite, but only if you squint hard enough. I also think we will need to play this by ear, see what’s ergonomic when we start implementing things on top. There are open questions around this topic, but we can table them for now and see what works best when working with the API to implement sources and the rest of STORAGE. #### Open Questions: - What kind of error do we return when a read times out. That is, a reader _thinks_ it has a lease (a `ReadHandle`) but it has timed out. It’s not a usage error because the reader theoretically did use the API correctly. But in terms of UX, I think the only thing we can do is return a "cannot serve query as of