Our SQL Table Transactions Management still shows some of its lineage from the single-binary. At times, this conflicts with the "Platform V2" goals of use-case isolation and zero-downtime upgrades (0dt).
Use-case isolation. Make it possible to prevent pgwire sessions from interfering with each other through:
Zero-downtime upgrades. Nothing in particular motivating here. Don't introduce anything that interferes with 0dt.
Don't break things that work today:
MATERIALIZED VIEW
sINSERT INTO ... SELECT FROM ...
UPDATE
DELETE
Nice to haves, while we're revisiting SQL tables and transactions:
COPY INTO
/COPY FROM
Each table is (and will continue to be) backed by an individual persist shard with a unique ShardId address.
Currently:
X
time since the last round of upper
advancements. This allow joins of tables to be immediately served after a
write and joins of tables and sources to be served within X
.We break the problem of transaction management into two conceptual pieces, timestamp selection and atomic durability, by introducing the following model of read and write operations:
subscribe(table_id: ShardId, as_of: T, until: Option<T>)
with the usual
meaning of as_of
and until
.commit_at(ts: T, updates: Vec<ShardId, Vec<(Row, i64)>>) -> bool
which
makes the requested timestamp (and all lesser ones) immediately available for
reads or returns false if that's no longer possible (because of contention).Timestamp selection is then concerned with snapshot vs serializable vs
strict-serializable isolation as well as minimizing contention; various
strategies can be discussed in terms of subscribe
and commit_at
. Atomic
durability is then an implementation of subscribe
and commit_at
.
Example: read-only single-table txn
let read_ts = oracle.read_ts();
subscribe(table_id, read_ts, Some(read_ts+1))
Example: write-only single-table txn
loop {
if commit_at(oracle.write_ts(), updates) {
break;
}
}
Example: read-then-write single-table txn
let (read_ts, write_ts) = oracle.read_and_write_ts();
let mut s = subscribe(table_id, read_ts, None);
loop {
if commit_at(write_ts, update_fn(s, read_ts)) {
break;
}
(read_ts, write_ts) = oracle.read_and_write_ts();
}
update_fn
can be computed incrementally, there are various tricks we can
play to avoid re-writing all the data to s3 on each retry.update_fn
have changed since the last read_ts and skip recomputing
update_fn if not.
The rest of this document concerns the implementation of the durability component. Additional detail on the timestamp selection is left for a separate design document.
See accompanying prototype in #20954.
Efficient atomic multi-shard writes are accomplished through a new singleton-per-environment txns shard that coordinates writes to a (potentially large) number of data shards. Data shards may be added to and removed from the set at any time.
Benefits of these txns:
write_ts/commit_ts = read_ts + 1
).Txn
between
processes and also to merge uncommitted Txn
s, if necessary (e.g.
consolidating all monitoring collections, statement logging, etc into the
periodic timestamp advancement).Restrictions:
K, V, T, D
. However, each data
shard may have a independent K
and V
schemas. The txns shard inherits the
T
codec from the data shards (and uses its own K, V, D
ones).register_ts
before being used
in transactions. Registration is for bookkeeping only, there is no particular
meaning to the timestamp other than it being a lower bound on when txns using
this data shard can commit. Registration only needs to be run once-ever per
data shard, but it is idempotent, so can also be run at-least-once.A txn is broken into three phases:
commit_ts
. Feel free to think of this as a WAL. This makes
the txn durable (thus "definite") and also advances the logical upper of
every data shard registered at a timestamp before commit_ts, including those
not involved in the txn. However, at this point, it is not yet possible to
read at the commit ts.compare_and_append
. We call this process applying the
txn. Feel free to think of this as applying the WAL.(Note that this means each data shard's physical upper reflects the last committed txn touching that shard, and so the logical upper may be greater than this.)
A data shard may be forgotten at some forget_ts
to reclaim it from the
txns system. This allows us to delete it (e.g. when a table is dropped). Like
registration, forget is idempotent.
// Open a txns shard, initializing it if necessary.
let mut txns = TxnsHandle::open(0u64, client, ShardId::new()).await;
// Register data shards to the txn set.
let (d0, d1) = (ShardId::new(), ShardId::new());
txns.register(d0, 1u64).await.expect("not previously initialized");
txns.register(d1, 2u64).await.expect("not previously initialized");
// Commit a txn. This is durable if/when the `commit_at` succeeds, but reads
// at the commit ts will _block_ until after the txn is applied. Users are
// free to pass up the commit ack (e.g. to pgwire) to get a bit of latency
// back. NB: It is expected that the txn committer will run the apply step,
// but in the event of a crash, neither correctness nor liveness depend on
// it.
let mut txn = txns.begin();
txn.write(&d0, vec![0], 1);
txn.write(&d1, vec![1], -1);
txn.commit_at(&mut txns, 3).await.expect("ts 3 available")
// And make it available to reads by applying it.
.apply(&mut txns).await
// Commit a contended txn at a higher timestamp. Note that the upper of `d1`
// is also advanced by this.
let mut txn = txns.begin();
txn.write(&d0, vec![2], 1);
txn.commit_at(&mut txns, 3).await.expect_err("ts 3 not available");
txn.commit_at(&mut txns, 4).await.expect("ts 4 available")
.apply(&mut txns).await;
// Read data shard(s) at some `read_ts`.
let updates = d1_read.snapshot_and_fetch(
vec![txns.read_cache().to_data_inclusive(&d1, 4).unwrap()].into()
).await.unwrap();
The structure of the txns shard is (ShardId, Vec<u8>)
updates.
The core mechanism is that a txn commits a set of transmittable persist batch
handles as (ShardId, <opaque blob>)
pairs at a single timestamp. This
contractually both commits the txn and advances the logical upper of every
data shard (not just the ones involved in the txn).
Example:
// A txn to only d0 at ts=1
(d0, <opaque blob A>, 1, 1)
// A txn to d0 (two blobs) and d1 (one blob) at ts=4
(d0, <opaque blob B>, 4, 1)
(d0, <opaque blob C>, 4, 1)
(d1, <opaque blob D>, 4, 1)
However, the new commit is not yet readable until the txn apply has run, which
is expected to be promptly done by the committer, except in the event of a
crash. This, in ts order, moves the batch handles into the data shards with a
compare_and_append_batch
(similar to how the multi-worker persist_sink works).
Once apply is run, we "tidy" the txns shard by retracting the update adding the batch. As a result, the contents of the txns shard at any given timestamp is exactly the set of outstanding apply work (plus registrations, see below).
Example (building on the above):
// Tidy for the first txn at ts=3
(d0, <opaque blob A>, 3, -1)
// Tidy for the second txn (the timestamps can be different for each
// retraction in a txn, but don't need to be)
(d0, <opaque blob B>, 5, -1)
(d0, <opaque blob C>, 6, -1)
(d1, <opaque blob D>, 6, -1)
To make it easy to reason about exactly which data shards are registered in the
txn set at any given moment, the data shard is added to the set with a
(ShardId, <empty>)
pair. The data may not be read before the timestamp of the
update (which starts at the time it was initialized, but it may later be
forwarded).
Example (building on both of the above):
// d0 and d1 were both initialized before they were used above
(d0, <empty>, 0, 1)
(d1, <empty>, 2, 1)
Reads of data shards are almost as straightforward as writes. A data shard may be read normally, using snapshots, subscriptions, shard_source, etc, through the most recent non-empty write. However, the upper of the txns shard (and thus the logical upper of the data shard) may be arbitrarily far ahead of the physical upper of the data shard. As a result, we do the following:
as_of
is passed through unchanged if
the timestamp of that shard's latest non-empty write is past it. Otherwise, we
know the times between them have no writes and can fill them with empty
updates. Concretely, to read a snapshot as of T
:
T
, blocking until
the upper passes T
if necessary.T' <= T
.T'
to be applied by watching the data shard upper.compare_and_append
empty updates for (T', T]
, which is known by the
txn system to not have writes for this shard (otherwise we'd have picked a
different T'
).T
as normal.Note that all of the above can be determined solely by information in the txns shard. In particular, non-empty writes are indicated by updates with positive diffs.
Also note that the above is structured such that it is possible to write a timely operator with the data shard as an input, passing on all payloads unchanged and simply manipulating capabilities in response to data and txns shard progress.
Initially, an alternative was considered where we'd "translate" the as_of and
read the snapshot at T'
, but this had an important defect: we'd forever have
to consider this when reasoning about later persist changes, such as a future
consolidated shard_source operator. It's quite counter-intuitive for reads to
involve writes, but I think this is fine. In particular, because writing empty
updates to a persist shard is a metadata-only operation (we'll need to document
a new guarantee that a CaA of empty updates never results in compaction work,
but this seems like a reasonable guarantee). It might result in things like GC
maintenance or a CRDB write, but this is also true for registering a reader. On
the balance, I think this is a much better set of tradeoffs than the original
plan.
Compaction of data shards is initially delegated to the txns user (the storage controller). Because txn writes intentionally never read data shards and in no way depend on the sinces, the since of a data shard is free to be arbitrarily far ahead of or behind the txns upper. Data shard reads, when run through the above process, then follow the usual rules (can read at times beyond the since but not beyond the upper).
Compaction of the txns shard relies on the following invariant that is carefully maintained: every write less than the since of the txns shard has been applied. Mechanically, this is accomplished by a critical since capability held internally by the txns system. Any txn writer is free to advance it to a time once it has proven that all writes before that time have been applied.
It is advantageous to compact the txns shard aggressively so that applied writes
are promptly consolidated out, minimizing the size. For a snapshot read at
as_of
, we need to be able to distinguish when the latest write <= as_of
has
been applied. The above invariant enables this as follows:
as_of <= txns_shard.since()
, then the invariant guarantees that all
writes <= as_of
have been applied, so we're free to read as described in the
section above.as_of
in the txns shard yet, and still have
perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied.A data shard is removed from the txns set using a forget
operation that writes
a retraction of the registration update at some forget_ts
. After this, the
shard may be used through normal means, such as direct compare_and_append
writes or tombstone-ing it. To prevent accidental misuse, the forget operation
ensures that all writes to the data shard have been applied before writing the
retraction.
The code will support repeatedly registering and forgetting the same data shard, but this is not expected to be used in normal operation.
(GlobalId, Row)
ShardId -> ShardMetadata
.Consensus::compare_and_set
operation implemented in producing using CRDB.Consensus::compare_and_set_multi
which commits all or none of a set of shard
updates.