A description of Materialize’s consistency guarantees with respect to the operations that occur inside of Materialize.
Operations include: SELECT
, INSERT
, UPDATE
, and DELETE
statements (but not TAIL
)
This design document is in service of Epic #11631 and Correctness Property #1 (copied below).
Transactions in Materialize are strictly serializable with respect to the operations that occur inside of Materialize. Operations include:
SELECT
,INSERT
,UPDATE
, andDELETE
statements (but notTAIL
)- Materialize initiated acknowledgements of upstream sources (e.g., the commit of an offset by a Kafka source, the acknowledge of an LSN by a PostgresSQL source, etcetera)
This document does not take any opinion on whether the changes proposed should be the default behavior or an opt-in behavior via some configuration.
Time units of milliseconds are used below for convenience to reflect current implementations. However, this document does not take any opinion on what time units should be used for the design proposed.
The goal of this design document is to describe what changes are needed to provide Strict Serializability to the operations mentioned in correctness property 1.
The following items' impact on consistency is not considered:
TAIL
AS OF
, which allows clients to specify the timestamp of a query and do not constrain the timestamp selection of
other queries. These properties allow clients to circumvent our consistency guarantees.Strict Serializability (in simplified terms) means that you can assign a total order to all transactions in the system, and that the total order is constrained by the real time occurrences of those transactions. In other words, if transaction A finishes in real time before transaction B starts in real time, then transaction A MUST be ordered before transaction B in the total order. Concurrent transactions can be ordered arbitrarily. For a more in depth discussion please read https://jepsen.io/consistency/models/strict-serializable and chapters 7 and 9 of Designing Data-Intensive Applications.
Materialize can satisfy the constraints of Strict Serializability if all reads and writes occur at specific timestamps, and these timestamps are non-decreasing in real-time order, with strict increases for writes following reads. Each timestamp is assigned to an event at some real-time moment within the event's bounds (its start and stop). Each timestamp transition is made durable before any subsequent response is issued.
We can then use these timestamp to form a total order constrained by real time:
Timelines are the unit of consistency that Materialize provides and can be read about in detail in the Timeline design doc . Every database object belongs to a single timeline and each read transaction can only include objects from a single timeline. Each timeline defines their own notion of time and consistency guarantees are only provided within a single timeline. There can only be at most a single Coordinator thread assigning timestamps per timeline (though multiple timelines can share a thread if they want). All the properties discussed below need to happen on a per-timeline basis, and require no communication across timelines (except to serialize access to the catalog).
Each timeline needs some definition for the current time. For example timelines that track real time can use the system clock for the current time. The current time can go backwards, but timestamps assigned must be non-decreasing.
The document also allows user tables to exist in any timeline.
The stash is a consistent durable storage engine that provides per key linearizability and transaction serializability. Every transaction in the stash includes an epoch number. If the epoch number in the transaction doesn't match the epoch number in the stash, then the transaction is rejected. All Coordinators will increment the epoch number in the stash during startup and include that new epoch in all stash transactions.
All reads to any object and all writes to user tables are assigned a timestamp by the Coordinator on a single thread. The logic for determining what timestamp to return must satisfy:
since
of all objects involved in the query.upper
of the table.We can accomplish this by using a single global timestamp per timeline. The timestamp will be initialized to some valid
initial value and be advanced periodically to some valid value that is higher than the previous value. The periodic
advancements should maintain the property that the global timestamp is within some error bound of all the timestamps
being assigned to data from upstream sources. All reads are given a timestamp equal to the global timestamp, which is
greater than or equal to the since
of all involved objects as described
in Read Capabilities on Global Timestamp. All writes to user tables are given
a timestamp larger than the global timestamp, and when the write completes then the timestamp should be advanced to the
timestamp of that write.
As an example, a timeline's timestamp can be initialized to the current system time and be advanced periodically to the
current value of a monotonically increasing system clock. Another example is that a timeline's timestamps can be
initialized to the timestamp's minimum value and can be periodically advanced in the following
way: global_timestamp = max(min(uppers) - 1, global_timestamp)
, where uppers
is the list of all upper
s in the
timeline. These are just potential implementations and not the only correct implementations.
If some operation is given timestamp ts
, then once that operation completes the global timestamp must be larger than
or equal to ts
. How this specific property is achieved for reads is described
in Read Capabilities on Global Timestamp and how it's achieved for writes to
user tables is described in Group Commit/Write Coalesce.
The properties described in this section are sufficient to ensure Strict Serializability across one start of Materialize (i.e. not between restarts).
append
command to be made durable before returning a response to the client, additionally timestamp transitions are persisted
to disk (See Global Timestamp Recovery).After a restart the Coordinator must reestablish the global timestamp to some value greater than or equal to the value of the previous global timestamp before the restart. This ensures that a read or write after the restart is assigned a timestamp greater than or equal to all reads and writes before the restart.
Proposal: Use a Percolator inspired timestamp recovery protocol (See section 2.3 Timestamps). Periodically we durably store some value greater than the current global timestamp in the stash. We never allocate a timestamp larger than or equal the one durably stored, without first updating the durably stored timestamp. When Materialize restarts, it uses the value durably stored as the initial timestamp.
The properties described in this section and the Global Timestamp section are sufficient to ensure Strict Serializability across restarts.
Write read cycles (write followed by read followed by write followed by read etc) and consecutive writes can cause the global timestamp to increase in an unbounded fashion.
NOTE: Recall that each timeline has some definiton of the current time (i.e. the system clock) and a current timestamp (the timestamp most recently assigned to an operation).
Proposal: All writes across all sessions per timeline should be added to a queue. If the current timestamp of the global timeline is less than or equal to the current time of the global timeline, then all writes in the queue can be executed and committed immediately. Otherwise the queue must wait until the current time is equal to or greater than the current timestamp, before executing and committing the writes in the queue. All writes in a queue are executed and committed together in a batch and assigned the same timestamp. The commits are all assigned the current global write timestamp.
NOTE: The TimestampOracle
provides us the property that the global write timestamp will be higher than all previous
reads.
This approach limits the per session write throughput to 1 write transaction per 1 time unit for user tables.
This approach guarantees that when a write completes, the global timestamp is larger than or equal the timestamp of that write. Also, when a write completes, all previous writes and reads were at a lower timestamp than the write. Also, it places a bounds on how much faster the global timestamp can advance compared to the current time.
NOTE: If the client had multiple writes to a single table known ahead of time, then grouping them in a single multi-statement write transaction would increase throughput. There may be some user education needed for this.
NOTE: This would also fix the problem discussed in #12198.
Proposal: Explicitly hold read capabilities for a timestamp less than or equal to the global timestamp on all objects. All reads are assigned a timestamp equal to the global timestamp.
This approach guarantees that when a read completes, the global timestamp is equal to or larger than the timestamp of that read.
We can still allow non-strict serializable reads to exist at the same time. The timestamps of these reads will not be constrained to the global timestamp and will be selected using the old method of timestamp selection. This will allow clients to trade off consistency for recency and performance on a per-query basis.
Periodically the Coordinator will want to update the read capabilities to allow compaction to happen. There are multiple dimensions to these update with their own tradeoffs:
To achieve strict serializability of acknowledgements of upstream sources, acknowledgements should behave the same as a write transaction. The data being acknowledged is the write of the transaction and the acknowledgement is the commit. Materialize therefore must satisfy the following properties:
ts
has been acknowledged by Materialize, then all reads must have a timestamp greater than
or equal to ts
.ts
has not been acknowledged by Materialize, then no read can have a timestamp greater
than or equal to ts
.Property 1 prevents the following scenario: a client goes to an upstream source and sees that some data has been acknowledged by Materialize but then can't see that data in Materialize.
Property 2 prevents the following scenario: a client sees some data in Materialize, but doesn't see that it's been acknowledged in an upstream source.
This guarantee is actually too strong for our needs and has very negative availability ramifications. Once an acknowledgement has been sent by Materialize, then no reads can be serviced until the upstream source has confirmed that it has received our acknowledgement. Before the confirmation, Materialize has no way of knowing if the acknowledgement has been received, and any timestamp selected for a read will be susceptible to one of the negative scenarios described above. This means is that if any upstream source is unresponsive to an acknowledgement, then all reads will be halted until that source is responsive again.
Property 1 alone is still a useful property, because property 1 allows us to provide guarantees like the following: A client is using synchronous replication with an upstream source. When their transaction commits in the upstream source, they are guaranteed to also be able to see the transaction in Materialize. However, they may actually see the transaction in Materialize before their transaction commits in the upstream source. This is still a powerful and useful guarantee to provide. Therefore, we should break the second bullet point into the two properties described above and indicate that we only plan to support the first property.
Proposal: When STORAGE wants to acknowledge some data that has a timestamp ts
, it must wait until it knows that the
global timestamp is larger than or equal to ts
. STORAGE can discover this by one of the following ways (NOTE: these
aren't separate proposals, they can all work together):
ACK
request to the Coordinator timestamped with ts
. The Coordinator will wait for the global
timestamp to advance to ts
and then send a response back to STORAGE indicating that it's OK to send the
acknowledgement.AS OF
, is in the same timeline as the data being acknowledged,
and has a timestamp larger than or equal to ts
, then it knows that the global timestamp must have advanced to ts
or greater, and it's safe to send the acknowledgement.NOTE: There are some race conditions here such as:
ts
to the Coordinator.ts
.ts
to an upstream source.This is fine and STORAGE can just ignore the ACK OK response.
Due to network partitions or some other fault it's possible that there are multiple Coordinators running at the same time. Materialize needs to be robustness enough so that this scenario will not cause any violations to Strict Serializability.
All DDL involves a stash transaction, so if a new Coordinator has taken over leadership then DDL on all old Coordinators will fail.
When a new Coordinator starts up, it will allocate a range of timestamps as described in Global Timestamp Recovery. This prevents all previous Coordinators from allocating any new timestamps.
Proposal: After the new Coordinator allocates a timestamp range, but before accepting client queries, it will advance all tables to the bottom of it's allocated range. This prevents all previous Coordinators from being able to write to any table.
Proposal: After receiving a read query but before committing it, the Coordinator will check the stash and make sure that the epoch hasn't changed.
There is a small race condition here where a new Coordinator takes leadership after the previous Coordinator confirms its leadership, but before the previous Coordinator has returned a result. The scenario would look like the following:
In this scenario the queries A and B are concurrent, and we can order them however we want without violating Strict Serializability.
upper
would have to advance past the current global timestamp if the global timestamp tracked the wall
clock. CockroachDB uses similar timestamps
here: https://github.com/cockroachdb/cockroach/blob/711ccb5b8fba4e6a826ed6ba46c0fc346233a0f7/pkg/sql/sem/builtins/builtins.go#L8548-L8560