aljoscha/danhhz/ruchirk
Notice: The document represents a (hopefully) self-consistent snapshot of our
thinking at a point in time. It's a useful tool for understanding the structure
of persist and for establishing a common language, but it's a non-goal to keep
it continually updated with the latest thinking. For example: we've already
decided (at the request of STORAGE) to shift from the capability model for
ReadHandle described here to one of a shard-global
downgrade_since
/allow_compaction
. This has far reaching consequences,
potentially even to the number of concurrent readers we'll initially support. If
you have any questions or want to know the latest thinking, please don't
hesitate to reach out!
Persist is an implementation detail of STORAGE. Its "public" API is used only by STORAGE and code outside of STORAGE should be talking to STORAGE, not persist. However, having this strong API boundary between the two allows for the two teams to execute independently.
Persist's primary abstraction is a "shard", which is a durable and definite
Time-Varying Collection (TVC). Persist requires that the collection's "data"
is key-value structured (a ()
unit value is fine) but otherwise allows the
key, value, time, and diff to be abstract in terms of the Codec and Codec64
encode/decode traits. As a result, persist is independent of Materialize's
internal data formats: Row
etc.
Along with talking to the outside world, STORAGE pilots persist shards around to
present a "STORAGE collection", which is a durable TVC that handles reshardings
and version upgrades. It could also ensure that persist will never again see
data from some Codec
's previous implementation of encode
, allowing us to
remove support for it from the corresponding decode
.
A persist shard is exactly a storage shard and they can be used interchangeably.
((key, value), timestamp, diff)
tuple.
Persist requires that the data field has a (key, value)
structure, though
the value can be the unit type to mean "empty": ()
.upper
frontier,
which communicates that for times time not greater or equal to upper
, the
updates at time are complete. The since
frontier, which communicates that
for times time greater or equal to since
, the accumulated collection at time
will be correct."since
until upper
" if it reflects the
definite results for times greater or equal to since
but not greater or
equal to upper
; this intentionally allows for compaction of historical
detail and unavailability of future data, without ambiguity about which
logical times can be described with certainty."Description
of those updates.ShardId
: An identifier for a persist shard, as described in definite.Blob
and
Consensus
traits, together collectively called the persist location. One
valid way to think of persist is as a layer that maps simpler durable storage
semantics to ones that are more native to differential dataflow.&str key -> (possibly large) &[u8]
value store. The
production implementation is S3 (or other cloud storage). An in-mem HashMap
implementation exists for testing. A filesystem implementation may be used for
a single-binary materialized
deployment.&[u8]
entries, linearizing writes
concurrently from multiple places.SeqNo
: A sequence number identifying a specific version of the internal
state machine's state.[lower, upper)
range of times as of some known since
upper
advances) so long
as one writer can make progress. Concretely, we don’t need all writers to be
able to write a batch in order for it to be read.[since, upper)
(plus an optional
logarithmic factor).O(tens)
of concurrent readers/writers. Scaling to O(hundreds)
(and more)
of readers is possible but left as followup work.Persist uses a "rich client" model with some of the logic running in-process (but behind the persist abstraction) so that readers and writers can talk directly to S3.
Everything in persist is generic over <Key, Value, Timestamp, Diff>
(hereafter
<K, V, T, D>
) to allow users control over how their data formats are encoded
and decoded. K
and V
must implement Codec
, which requires its
implementations to be able to decode data written by previous versions of
encode
. (STORAGE can use renditions to migrate these over time to remove cruft
in decode and keep this from accumulating indefinitely.) T
and D
must
implement Codec64
. In practice, Codec64
means they're roundtripped as
little-endian i64
or u64
. Codec64
can be made more general if necessary,
but this "serialized as 8 bytes" restriction allows us to do some performance
tricks in our columnar disk format for batches of updates as well as makes a
number of metadata decoding pathways much simpler.
Most of the persist API has a close correspondence with named entities in the formalism. The formalism describes STORAGE, not persist, but given that part of STORAGE's duties is piloting around persist to account for reshardings and behavior changes across code versions, this makes sense.
WriteHandle
is roughly WriteCapability
.WriteHandle::append
is roughly Append
ReadHandle
is roughly ReadCapability
.ReadHandle::listen
+ ReadHandle::snapshot
are roughly SubscribeAt
Client::open
is roughly AcquireCapabilities
Here's our best initial guess at concretely what the API looks like. As always, the source of truth will be the code.
impl Client {
/// Provides capabilities for the durable TVC identified by `shard_id` at
/// its current since and upper frontiers.
///
/// This method is a best-effort attempt to regain control of the frontiers
/// of a shard. Its most common uses are to recover capabilities that have
/// expired (leases) or to attempt to read a TVC that one did not create (or
/// otherwise receive capabilities for). If the frontiers have been fully
/// released by all other parties, this call may result in capabilities with
/// empty frontiers (which are useless).
///
/// If `shard_id` has never been used before, initializes a new shard and
/// returns handles with `since` and `upper` frontiers set to initial values
/// of `Antichain::from_elem(T::minimum())`.
pub async fn open<K, V, T, D>(
&self,
timeout: Duration,
shard_id: ShardId,
) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), ExternalError>
}
impl<K, V, T, D> WriteHandle<K, V, T, D> {
/// Applies `updates` to this shard and downgrades this handle's upper to
/// `new_upper`.
///
/// In contrast to [Self::compare_and_append], multiple [WriteHandle]s (with
/// different [WriterId]s) may be used concurrently to write to the same
/// shard, but in this case, the data being written must be identical (in
/// the sense of "definite"-ness). It's intended for replicated use by
/// source ingestion, sinks, etc.
///
/// All times in `updates` must be greater or equal to `self.upper()` and
/// not greater or equal to `new_upper`. A `new_upper` of the empty
/// antichain "finishes" this shard, promising that no more data is ever
/// incoming.
///
/// `updates` may be empty, which allows for downgrading `upper` to
/// communicate progress. It is possible to heartbeat a writer lease by
/// calling this with `new_upper` equal to `self.upper()` and an empty
/// `updates` (making the call a no-op).
///
/// This uses a bounded amount of memory, even when `updates` is very large.
/// Individual records, however, should be small enough that we can
/// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
/// us.
///
/// The clunky two-level Result is to enable more obvious error handling in
/// the caller. See <http://sled.rs/errors.html> for details.
///
/// TODO: Introduce an AsyncIterator (futures::Stream) variant of this. Or,
/// given that the AsyncIterator version would be strictly more general,
/// alter this one if it turns out that the compiler can optimize out the
/// overhead.
pub async fn append<'a, I: IntoIterator<Item = ((&'a K, &'a V), &'a T, &'a D)>>(
&mut self,
timeout: Duration,
updates: I,
new_upper: Antichain<T>,
) -> Result<Result<(), InvalidUsage>, ExternalError>;
/// Applies `updates` to this shard and downgrades this handle's upper to
/// `new_upper` iff the current shard upper is equal to `expected_upper`.
///
/// In contrast to [Self::append], this linearizes mutations from all
/// writers. It's intended for use as an atomic primitive for timestamp
/// bindings, SQL tables, etc.
///
/// All times in `updates` must be greater or equal to `expected_upper` and
/// not greater or equal to `new_upper`. A `new_upper` of the empty
/// antichain "finishes" this shard, promising that no more data is ever
/// incoming.
///
/// `updates` may be empty, which allows for downgrading `upper` to
/// communicate progress. It is possible to heartbeat a writer lease by
/// calling this with `new_upper` equal to `self.upper()` and an empty
/// `updates` (making the call a no-op).
///
/// This uses a bounded amount of memory, even when `updates` is very large.
/// Individual records, however, should be small enough that we can
/// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
/// us.
///
/// The clunky two-level Result is to enable more obvious error handling in
/// the caller. See <http://sled.rs/errors.html> for details.
pub async fn compare_and_append<'a, I: IntoIterator<Item = ((&'a K, &'a V), &'a T, &'a D)>>(
&mut self,
timeout: Duration,
updates: I,
expected_upper: Antichain<T>,
new_upper: Antichain<T>,
) -> Result<Result<Result<(), Antichain<T>>, InvalidUsage>, ExternalError>;
}
impl<K, V, T, D> ReadHandle<K, V, T, D> {
/// Forwards the since frontier of this handle, giving up the ability to
/// read at times not greater or equal to `new_since`.
///
/// This may trigger (asynchronous) compaction and consolidation in the
/// system. A `new_since` of the empty antichain "finishes" this shard,
/// promising that no more data will ever be read by this handle.
///
/// It is possible to heartbeat a reader lease by calling this with
/// `new_since` equal to `self.since()` (making the call a no-op).
///
/// The clunky two-level Result is to enable more obvious error handling in
/// the caller. See <http://sled.rs/errors.html> for details.
pub async fn downgrade_since(
&mut self,
timeout: Duration,
new_since: Antichain<T>,
) -> Result<Result<(), InvalidUsage>, ExternalError>;
/// Returns an ongoing subscription of updates to a shard.
///
/// The stream includes all data at times greater than `as_of`. Combined
/// with [Self::snapshot] it will produce exactly correct results: the
/// snapshot is the TVCs contents at `as_of` and all subsequent updates
/// occur at exactly their indicated time. The recipient should only
/// downgrade their read capability when they are certain they have all data
/// through the frontier they would downgrade to.
///
/// The clunky two-level Result is to enable more obvious error handling in
/// the caller. See <http://sled.rs/errors.html> for details.
///
/// TODO: If/when persist learns about the structure of the keys and values
/// being stored, this is an opportunity to push down projection and key
/// filter information.
pub async fn listen(
&self,
timeout: Duration,
as_of: Antichain<T>,
) -> Result<Result<Listen<K, V, T, D>, InvalidUsage>, ExternalError>;
/// Returns a snapshot of the contents of the shard TVC at `as_of`.
///
/// This command returns the contents of this shard as of `as_of` once they
/// are known. This may "block" (in an async-friendly way) if `as_of` is
/// greater or equal to the current `upper` of the shard. The recipient
/// should only downgrade their read capability when they are certain they
/// have all data through the frontier they would downgrade to.
///
/// This snapshot may be split into a number of splits, each of which may be
/// exchanged (including over the network) to load balance the processing of
/// this snapshot. These splits are usable by anyone with access to the
/// shard's [crate::Location]. The `len()` of the returned `Vec` is
/// `num_splits`. If a 1:1 mapping between splits and (e.g. dataflow
/// workers) is used, then the work of replaying the snapshot will be
/// roughly balanced.
///
/// The clunky two-level Result is to enable more obvious error handling in
/// the caller. See <http://sled.rs/errors.html> for details.
///
/// TODO: If/when persist learns about the structure of the keys and values
/// being stored, this is an opportunity to push down projection and key
/// filter information.
pub async fn snapshot(
&self,
timeout: Duration,
as_of: Antichain<T>,
num_splits: NonZeroUsize,
) -> Result<Result<Vec<SnapshotSplit>, InvalidUsage>, ExternalError>;
}
The persistence system is built on top of two logical primitives, Blob
and
Consensus
.
Blob
is a durable key-value store that maps string keys -> arbitrary byte
payloads
. It's conceptually similar to a shared hashtable, perhaps expressed as
a Arc<Mutex<HashMap<String, Vec<u8>>>>
in Rust, except durable across restarts
and potentially shared across multiple processes.
Blob
supports the following operations:
GET(key)
: returns the value mapped to key
if there is one.PUT(key, value)
: map key
to value
. Note that this operation is atomic,
which means that any GET(key)
will either see the old value before the PUT
or value
after the PUT
but nothing else.DELETE(key)
: remove any value
associated with key
, if there is one and
do nothing otherwise.LIST KEYS(prefix)
: list the set of key
s currently mapped to a value
that
start with prefix
.Because all of our keys are write-once, modify never, Blob
does not have to be
linearizable. Readers can simply retry reads that don’t succeed.
We’re going to keep at least 3 different versions of Blob
, one built on S3
that will be the main production use case, one built on top of the POSIX
filesystem API for the single binary use case, and one in-memory purely for unit
testing / measuring overhead of doing IO.
Consensus
is a linearizable, durable, totally ordered array of binary records
and sequence numbers (think a durable Vec<(SeqNo, Vec<u8>)>
) that supports the
following operations:
HEAD
: return a recent (not necessarily the most recent) record inserted into
Consensus
and the sequence number it was inserted at.CompareAndSet(sequence_number, data) -> bool
: Atomically insert data
into
Consensus
if the most recent record at insert time was inserted at
sequence_number - 1
. Returns true if the insert was successful and false
otherwise.SCAN(sequence_number)
: returns a list of all records inserted at sequence
numbers greater than or equal to sequence_number
.TRUNCATE(sequence_number)
: atomically remove all records inserted at
sequence numbers less than sequence_number
.We use Consensus
to store incremental updates to persist’s metadata. We need a
compare-and-set primitive to guarantee a total order of metadata updates across
distributed writers, and we need to maintain a history of those updates in order
to periodically reconstruct the full metadata state, and write it out somewhere
else.
One lens through which you might view all of the following details is of persist maintaining an acyclic immutable data structure. The nodes are stored in an external blob store, with blob keys as pointers between nodes. The "root" pointer of the data structure is changed only by going through distributed consensus.
Each distributed actor writes blobs to a path with a unique prefix and blobs are atomically written, write-once, modify-never. This makes reasoning about caching, mutations, and idempotency of these blobs straightforward-ish.
Persist is a state machine and this immutable data structure contains its state.
The distributed actors only interact though consensus: starting with the latest
version of the state, applying inputs, and using CompareAndSet
to only move
the root pointer if it hasn't changed. If it has, they restart the loop.
Notably, our liveness is dependent on the availability of the external blob and consensus services we rely on. There's no getting around this. We'll have to select services we trust.
To reduce contention, we'll run the state update loop in an RPC server. But it's safe to run the loop from anywhere. This allows us to aggressively fail over if the RPC server misses a health check.
Readers of the immutable data structure declare which version of the root pointer they are using (by maintaining it in the state machine state) and are only allowed to progress forward though versions. This allows us to eventually reason about unreachable root versions and unreachable blobs for cleanup.
Readers and Writers are both given leases on registration. This lease is extended as the reader or writer is used. If the lease expires (or the process is restarted), the reader/writer acquires a new lease under a new (internal) identifier.
Persist is a deterministic state machine. This means that persist maintains some small metadata in persistent storage describing the current status of the state machine, and accepts an ordered sequence of inputs as defined in the API. Each of those inputs can either result in a no-op error, in which case the machine’s state is unchanged, or optionally, the state stored in persistent storage undergoes a deterministic state transition.
Shard global information:
upper
frontier (max of all writers upper
s)since
frontier (min of all readers since
s)seqno_since
frontier (the last seqno
any reader will be allowed to read
from) I believe this makes the most sense as the min of (all of the readers
seqno_since
, the seqno
of the last state rollup?)Foreach active (non-expired) writer:
upper
frontierForeach active (non-expired) reader:
since
frontierseqno_since
frontier (the seqno
before which it will never
read any metadata information)Forall expired readers and writers:
ID
seqno
at which the reader/writer was expiredFor all submitted non deleted batches:
[lower, upper)
, and since
frontiersseqno
when the batch was introducedForall logically deleted batches:
seqno
at which the batch was logically deletedForeach state rollup in the set state metadata rollups:
seqno
associated with the state rollupForeach logically deleted state rollup:
seqno
at which the state rollup was deleted.This metadata state is stored in a sequence of incremental updates/individual
state transitions in Consensus
, and a set of state rollups stored in S3
objects, for ease of access to historical data. The batches themselves are
stored in S3 objects inside of writer-owned paths within Blob
.
AddReaderWriter(lease_duration_seconds) -> Reader/Writer Id + WriteCapability
(upper) and ReadCapability(since, seqno_since)
- this state transition adds a
new reader and writer to the set of readers/writers associated with the same
id and the corresponding since
/seqno_since
/upper
chosen from the global
values. The new readers and writers have a lease of lease_duration_seconds
.
LogicallyDeleteReader(reader_id)
/ LogicallyDeleteWriter(writer_id)
:
expire a reader / writer. All future requests by this reader or writer will be
rejected.
PhysicallyDeleteReader(reader_id)
/ PhysicallyDeleteWriter(writer_id)
:
remove the reader / writer from the set of logically deleted readers and
writers.
Append(batch_key, description, writer_id)
: this transition adds a batch to
the set of active batches and downgrades the writer’s upper
to
description.upper
. This transition can also downgrade the collection’s
upper
if the global maximum of all writers’ upper
s increases. This
transition also updates the writer’s lease expiration time.
DowngradeReader(reader_id, new_since, new_seqno_since)
: Downgrade the reader
to a new (since, seqno_since)
. This transition can also downgrade the
collection’s since
/seqno since
if the global min of all reader’s since
/
seqno_since
increases). This transition also updates the reader’s lease
expiration time.
LogicallyDeleteBatch(batch_key, writer_id)
: This state transition is only
used by garbage collector(s). This transition moves the batch located at
batch_key into the logically deleted batches set provided that a) another
batch covers exists that covers the batch to be deleted’s time interval.
PhysicallyDeleteBatch(batch_key)
: remove the batch from the list of
logically deleted batches because the space has been physically reclaimed by
some process.
CreateStateRollup(seqno_upper, state_rollup_key)
: add a new state rollup
from [0, seqno_upper)
at batch_key
LogicallyDeleteStateRollup(state_rollup_key)
: move a state rollup to the set
of logically deleted state rollups
PhysicallyDeleteStateRollup(state_rollup_key)
: remove a state rollup from
the set of physically deleted state rollups.
Roughly speaking, all users of the state machine follow this structure:
seqno
of the state machine into memorysince
) or in Blob
(for example, for writers
appending a new batch)Consensus
at seqno + 1
. If you succeed,
amazing! If you fail, reconstruct the current state and current seqno
and try
again from the beginning. (Note that writers don’t have to continue to rewrite
their batch across CaS retries)It's easiest to reason about writing out the entire state of the root of our immutable data structure. Sadly, there is an unresolveable tension between writes (which append to our list of batches) and compactions (which reduce the size of the list of batches). We don't want compaction progress to throttle writes, but if writes get ahead of compactions, we get quadratic behavior in the number of bytes we have to write to keep writing out our state. This isn't a theoretical concern, it was one of the primary problems in the first iteration of persist.
The natural way to break this tension is to maintain the state incrementally, so that what we're writing is proportional to the size of the change, not the entire state. To contain the problem of different code versions of persist using different logic to roll up the incremental updates, we model our state as a fixed number of TVCs. This allows us to use differential dataflow's summation logic (possibly literally using its _consolidateupdates method).
Concretely (TODO: Adjust these to match the state described above):
writers: Collection<(WriterId, WriterMeta), SeqNo, i64>
readers: Collection<(ReaderId, ReaderMeta), SeqNo, i64>
since: Collection<((), Antichain<T>), SeqNo, i64>
trace: Collection<(BlobKey, Description<T>), SeqNo, i64>
rollup: Collection<(BlobKey, ()), SeqNo, i64>
SeqNo
as the time dimension.WriterId
, WriterMeta
, etc) implement Codec
.SeqNo
and i64
implement Codec64
.writers
and readers
collections each contain at most one instance of
the WriterId
/ReaderId
key at a given time.since
collection contains exactly one element at every time.trace
collection can contain multiple elements.rollup
collection can contain multiple elements.As hinted by the name, the rollup collection contains the entire contents of the
collection at the corresponding SeqNo
. This prevents us from needing an
unbounded amount of work at startup to compute the full state. New rollups are
computed and added at some heuristic interval. It contains multiple elements in
case someone needs to compute the state as of some SeqNo
previous to the most
recent rollup. These can be garbage collected over time in the same way we
garbage collect blobs as all readers progress past the SeqNo
.
A persist's shard's scaling is ultimately limited by one of two things: the raw throughput of writing updates to S3 (think a high traffic source) or the number of state changes it can process (think a high traffic table or a large number of readers+writers sending progress information).
Under normal operation, it's difficult for a computation over updates (e.g. decoding Avro) on a reasonable machine to saturate the throughput of writing to S3 from the same AZ . Similarly, by batching up state changes into a hot loop and running compaction out-of-band (i.e. by modeling "this compaction succeeded" as an input to the state machine), we should be able to scale to a high frequency of writes. High numbers of concurrent writers (i.e. hundreds or more) is an open question.
However, these of course may not be true under degraded conditions. If persist
is not able to keep up, it backpressures writers by only allowing them to have
one outstanding request at a time. For maximum throughput, a WriteHandle
user
should internally buffer the next batch's updates while the previous batch is
being written. If this buffer fills up, the WriteHandle
user should in turn
backpressure its own upstream system (e.g. reading from kafka for a source).
The goal for compaction is to present a sequence of batches that are
[since, upper)
rather than [lower,
upper)
even when since >> upper
. Colloquially, we want to forget about
historical details that no reader present or future is going to care about.[since, upper)
. This helps us bound the size of the
metadata required to represent an arbitrarily large collection and more
importantly, it introduces opportunities for more logical compaction. For
example, if an update at t2
cancels out an update at t1
, the cancellation
can only occur once the two updates land in the same physical batch.
Otherwise, even if the update at t1
gets forwarded to t2
, but still lives
in a batch from [lower: t0, upper: t1)
, the forwarding won’t unlock the
space reduction.In differential, the compaction machinery both produces these new batches and
logically removes the old batches from the data structure, and then uses
reference counting to handle physical deletion of the batches. In persist, it is
simpler to have one compactor that is only responsible for reading sequences of
incoming batches and producing new, physically and logically compacted ones as
the since
frontiers advance and leave the logical and physical deletion to a
separate garbage collection process.
For compaction selection, we plan to use an algorithm inspired by (and likely
identical to) the one in differential's Spine: a size-tiered compaction,
whereby some batch representing ~2^k
updates (at level k
) is only compacted
together with another batch representing ~2^k
updates (at level k
) to
produce a new batch representing ~2^k + 1
updates. This structure lets us meet
the goals stated above while performing a O(n)
number of reads and writes.
There are several places where state can accumulate in this design:
seqno
, incremental state transitions grows in Consensus
.Blob
that eventually become unused.Blob
that eventually
become unused.since
/upper
frontiers == []
Lets focus on 1, 2, and 3. Each of those can be logically deleted (system has a redundant copy of their information somewhere) when:
The Consensus
state at seqno
will never be used again when the
seqno_since
of all readers > seqno
, and a state rollup exists up to a
seqno > seqno
.
This is slightly more complex.
a) We know that a batch can be logically deleted when another batch exists
that can cover its [lower, upper)
at a since >=
the collection’s
since
. At that moment, we can mark the batch safe for logical deletion.
b) If we logically delete the batch at seqno X
, we can physically delete
the batch once the seqno_since
for the collection advances up to a seqno
that is > X
, and there is a state rollup at a seqno > X
.
c) Once the batch has been physically deleted, we can remove it from the set of batches that have been logically deleted.
a) We can logically delete a state rollup as soon as another state rollup
exists with a seqno >
this state rollup’s seqno
, at any subsequent
seqno
b) If we logically delete the state rollup at seqno X
, we can physically
delete it once the seqno_since
advances up to a seqno > X
and there is a
state rollup at seqno > X
.
c) Once the state rollup has been physically deleted, we can remove it from
the set of state rollups that have been logically deleted at any subsequent
seqno
.
Basically, the common pattern for 2 and 3 is:
X
.Any users on some old version of the state can continue to use the same resource as they were before.
B. When all readers acknowledge that they have seen the logical delete and
promise to never go backwards (ie they have advanced their seqno_since > X
,
and we have an appropriate state rollup at a seqno > X
so that no future
reader will need the state before the logical delete), we can now delete all
of the elements that were logically deleted at seqnos <= X
C. Once the object has been physically deleted, we can remove it from the set
of elements to be physically deleted (at any seqno >
when we perform the
physical delete).
This is basically the same idea as epoch based reclamation (we’re maintaining
a free-list per seqno
and then freeing those batches / state rollups once
that seqno
can never be read).
Arguably, 1 falls into the same pattern as well, only there’s no need to do an explicit logical delete?
We don’t expose a DeleteCollection
type of operation on writers. That would
introduce a way for writers and readers to potentially interfere with each
other's operations where as defined above, writers are independent of other
writers (no writer can invalidate another writer’s write operation), and other
readers (no reader can invalidate another writer’s operation) and vice-versa for
readers.
Instead, we know that a shard is safe to delete when its global upper
and
since
frontiers are the empty antichain []
, because at that point, the shard
can no longer be written to or read from. Once a shard reaches that state we can
delete all of its constituent batches, reject all requests for new readers and
writers and leave only a single state rollup that acts as a tombstone indicating
that the shard’s frontiers have advanced to []
. We could even have a specific
TOMBSTONE
object which, if it exists, indicates that the shard has permanently
been deleted. This lets us garbage-collect any consensus state associated with
the shard as well.
Naively, writers and, especially, readers can hold back reclamation of resources for an arbitrarily long amount of time. This would likely be due to fault, but could also just be accidental. If we eventually support something like cross-account export of computed views, it could even be maliciousness. To prevent this, all writers and readers are leased.
Our primary aim in the implementation of leasing is to avoid needing to reason about distributed clocks. As a result, the lease information is stored in the shard state and updated through consensus, just like anything else. Notably, a lease expiration also goes through consensus. Once a lease expires, the writer or reader learns about it on its next attempted state modification and must reconnect with a new id. The old id is permanently retired, allowing for it to eventually be gc'd.
The lease is extended through normal operation of append
for writers and
downgrade_since
for readers. Both of these have a no-op version if a pure
"heartbeat" is needed: writing no data and advancing upper
to the same value,
or downgrading to the same since
.
Persist uses a "rich client" model with some of the logic running in-process
(but behind the persist abstraction) so that readers and writers can talk
directly to S3. To reduce contention, state changes (including things like
"here's some data at an S3 key that I wrote") are then funneled through a
PersistShard
RPC service running in a separate process. Operationally, we'll
run multiple copies of PersistShard
and use some sort of fuzzy suppression to
mostly keep them from contending.
Besides local caches of data from Blob
and Consensus
, the implementation of
this service is stateless. Each request contains the ShardId
, so that the RPC
server can respond to any requests that happen to come its way. Combined, this
means that the mapping between persist shards and servers is simply a function
of where the requests are routed.
A single copy of the server should be sufficient for most (all?) customers, but
we have the option of easy horizontal scalability if necessary. However, note
that if we do, we'll want to route all traffic from each individual Shard to the
same process, so that in the steady state, our CaS operations against
Consensus
are conflict-free. TODO: Figure out if this routing should be done
at the k8s level or through something like a proxy.
In the future, we'll split compaction out into a second PersistCompaction
RPC
service for PersistShard
to call out into. It will also be stateless, modulo
caches. These would be elastically scalable with all compaction requests load
balanced between them.
We chose a two-tiered error handling system, inspired by this sled.rs blog post. The two tiers of errors are usage errors and location errors (these latter ones could be called storage errors, but that name would already seem taken).
Usage errors indicate when the user of the API has requested an invalid
operation. For example, trying to read as of t
when the read handle has
already been downgraded beyond t
. In practice, these usually (TODO: always?)
indicate a bug in the system that uses persist
.
The second tier of errors, location or storage errors, indicate that something
is wrong with one of the underlying systems (either blob
or log
). For
example, the blob
system could be down or some datum that we are trying to
read is temporarily unavailable. One might be inclined to also differentiate
between transient (temporary) and permanent location errors. This seems hard,
though, in the general case. For example, an unavailable blob
might "come
back", but it might stay permanently unavailable, and we don't know how to
differentiate between those two cases.
An important difference between the two classes of errors is timeouts and retries. When a usage error occurs an API call will immediately report it back. For location errors, on the other hand, we might retry within a given timeout window.
Another important axis along which we differentiate errors is definiteness (TODO: Come up with a better term because this one is already doing a lot of work elsewhere). Definite errors are those errors where we know that some operation definitely failed, while indefinite errors are those errors where we cannot know. This distinction is important for write operations (all operations that change state): if an operation definitely failed, we can retry without running the risk of writing duplicate data. If, on the other hand, we get a timeout or some other indefinite errors, we cannot know that the data was not written and have to abort the complete write attempt.
TODO: Figure out how to fit the two axes of errors into an ergonomic API. Maybe
definite/indefinite are variants of ExternalError
? It also feels like usage
error/location error sometimes map to definite/indefinite, but only if you
squint hard enough. I also think we will need to play this by ear, see what’s
ergonomic when we start implementing things on top.
There are open questions around this topic, but we can table them for now and see what works best when working with the API to implement sources and the rest of STORAGE.
ReadHandle
) but it has timed out. It’s not a
usage error because the reader theoretically did use the API correctly. But in
terms of UX, I think the only thing we can do is return a "cannot serve query
as of
A writer (something that writes data into a persistent shard) needs to uphold the property that multiple incarnations of a writer must write the same (in terms of definite) data. This needs to hold both across time (think a writer that restarts after a failure) and across multiple concurrent writers. We think that a restarting writer will have to request a new writer ID, so the two cases boil down to the same thing.
Upholding this property is complicated by the fact that materialize (or STORAGE) wants to assign timestamps to data (updates) before it is written to persistence. Table updates and data coming from sources need to be timestamped. Current thinking is that these timestamps are a realtime-esque timestamp. We will sketch how we can achieve this for the two initially planned use cases, tables and sources.
For tables, we seem to have two main options: YOLO mode and WAL mode.
In YOLO mode the part of the system that accepts table updates can assign a timestamp to the update and then issue a write request to persistence. That write request might or might not succeed. The writer can report success if it gets back a success response from persist. The writer might not get a response, due to reasons, but that doesn’t mean that the write surely failed. If the write seems to have failed, the writer cannot simply try and write again, using the same writer id, because that could cause the update to be inserted again. It would have to request a new writer ID, and could then try writing the same update (with the same timestamp) again.
In WAL mode, CompareAndAppend
is used to ensure that each update (with an
attached timestamp) is written to a WAL exactly once. An arbitrary number of
writers could then go and read from that WAL and try to append them to the
persistent shard. This is safe because the data is made definite in the WAL but
still requires that writers (using the same writer ID) don’t retry writes if
they don’t get a success message back. A restarted writer must still use a new
writer ID, to ensure that we don’t write duplicates.
The part that reads from the WAL and writes to persistence can be thought of as a source, where the WAL is the source system (see below) and we don’t need to do timestamping.
A source needs to roughly do this:
The difficulty here is ensuring that a given source instance writes the same (in terms of definiteness) updates as all the other instances. Both when it comes to instances across time, after restarts, and with concurrently running instances. To solve this, we need two things: source messages must come with some kind of identifier that we can use to look up a timestamp that we attach (think Kafka partition-offset pairs), let’s call this source timestamp and we need a definite collection of timestamp bindings where we look up the assigned timestamp for a given source timestamp. We will sketch two different approaches for this. There might be more.
For both approaches, the collection of timestamp bindings works like this: the
schema of bindings is (<source timestamp>, <ts>)
and a binding (s_ts, ts)
tells us that any update with a source timestamp that is less-equal s_ts
gets
assigned the timestamp ts
, unless there is another binding (with a lower
source timestamp) that already covers a given source timestamp. So the
collection of timestamp bindings define ranges of source timestamps that get
assigned the same timestamp.
Both approaches have in common that there needs to be durable consensus about what the timestamp bindings for given source updates should be. The two sketched approaches differ in how those bindings are minted, though. We will call the component that reads from an external source, applies bindings, and writes the timestamped data to a persist shard a source writer.
This approach assumes that it is possible to figure out the "high watermarks" of available data, in terms of their source timestamp. The centralized minter can then mint bindings of the current "real-time clock" and write them down. This writing down needs some kind of consensus/exactly once to ensure that there is a consistent collection of bindings that all the source writers use.
All the source writers need to do with this model is observe the minted bindings, read updates from the source, attach bindings to updates and write them to persistence.
With this approach, each source writer can read data from the source and whenever it encounters a source timestamp for which it doesn’t know the binding, it will "ask" a centralized component for a binding. That component can be centralized as in a centralized RPC service, or it can be a "rich client" that uses an external consensus system to mint a binding. The API for this minting service is:
GetBinding(SourceTimestamp) -> (SourceTimestamp, Timestamp)
: Let the minter
know that there is data at least up to the given source timestamps. The minter
can either figure out that it already has a binding that covers this and return
it. Or mint a new binding, record it (with consistent/exactly-once semantics)
and return that.
This way, we ensure that a) restarting source writers write the same data (with the same timestamps), and b) multiple concurrent source writers also write the same data with the same timestamp. Both because minting timestamps goes through some sort of consensus.
The format here is a bit different. We spell out questions that an Ops Team might have and give our best answers:
MetricsRegistry
machinery from mz. Along with distributed
tracing, enabled by our use of tracing
?I think our costs in the cloud platform world will have 2 major drivers - S3
costs for Blob
and service costs for Consensus
and the various readers/and
writers. I’m going to ignore financial costs in the single binary on-prem world
because we are not responsible for any financial costs there.
For S3 there’s three main levers of cost:
It’s hard for me to make intuitive sense of these units. Instead, I believe one appropriate way to build intuition here is to ask "what can we afford for $100 / year"? I think a year is the appropriate time quanta to think about because Materialize will be priced annually, and I think $100 is a reasonable monetary unit to think about because the smallest possible contracts will be on the order of ~single digit thousands per year, so ~single digit hundreds of dollars is the smallest possible "significant" cost denomination. Again, this is more to build intuition than anything else.
In S3 $100/year lets you, individually:
This structure helps me ask and answer "in our current design is there any way a
user could 10x our costs in a manner that we couldn’t justify a price increase
for?". For example, a user could decide to store 10x as much data. I would argue
that refusing to store past a certain amount of data without a price increase is
totally reasonable. However consider the following situation: a user has 1
source ingesting data at 10 MB/s. Then say they delete this source and replace
it with 10 sources each ingesting data at 1 MB/s. In this world, assuming the
aggregate amount of data stored is the same, we’ve still managed to 10x the
number of writes required / <time interval>
because we write batches for
different collections separately. There’s a number of technical ways to mitigate
this cost increase (e.g increase the write batching time interval as the number
of sources increases) and a number of product ways to prevent it (e.g. limit the
number of sources a user can have), and its unclear what's the best thing to do
generally. It just stands out as the biggest "potential cost surprise"
currently.
An alternate design stores the state entirely in Blob
and only uses
Consensus
to tick forward the root pointer. Instead of explictly listing
pointers to every batch of data in the state, a skip-list is used and the
batches point at batches from previous points in time. The remaining state is
small enough to write out in its entirety at each update.
The primary benefit of this is fewer constraints on our implementation of
Consensus
along with reduced complexity in our incremental state
representation. The primary drawback is that compaction of the skip-list
representation of batches requires rewriting them in place (logically any of
these replacements are interchangeable but the physical bytes are different),
which makes it more difficult to reason about correctness in edge-case
scenarios.
A reasonable concern is the overhead of running one of these state machines per shard, especially in the case of many low-traffic tables and/or sources. However, this is a hard problem. It would make state changes decently more difficult to reason about, but an even more difficult problem is migrating a shard from one state to another.
We're initially deciding to punt on this for a few reasons. First, a number of the tradeoffs here depend quite a bit on specifics of the operations side of persist, which we'll have a much more specific handle on once we get something running in production. Second, this is exactly the sort of problem that we can solve by eating the cost of the overhead in the next 6-12 months and revisiting later.