As part of the platform v2 work (specifically use-case isolation) we want to develop a scalable and isolated query/control layer that is made up of multiple processes that interact with distributed primitives at the moments where coordination is required.
The query/control layer is the point where different parts come together. For our purposes, it comprises the Adapter, which is responsible for translating user queries into commands for other parts and the Controllers, which are responsible for driving around clusters and what runs on them. A central component here is the Coordinator, which mediates between the other components and owns the durable state of an environment and ensures correctness (roughly, strict serializable).
With our current architecture, ensuring correctness requires that the Coordinator employ a "single-threaded" event loop to sequentialize, among other things, processing of all user queries and their responses. This is a blocker for having a scalable and isolated query/control layer: all workloads are competing for execution time on the single event loop.
We talk about threads, tasks, and loops below, but those should be understood more as logical concepts, rather than physical implementation details. Though there is of course a connection and we talk about the current physical implementation. We will propose a separate design doc that lays our the post-platform-v2 physical architecture, having to do with processes and threads and how they interact. The logical architecture described here is what will enable that physical architecture and we focus on the arguments for why it does provide correctness.
In order to understand the proposal below we first need to understand what the Coordinator (a central component of the query/control layer) is doing and why it currently requires a singleton event loop for correctness. I will therefore first explore the Coordinator in Background. Then I will give my proposed logical architecture design. To close things out, I will describe a roadmap, highlighting interesting incremental value that we can deliver along the way.
The choice of splitting the design into a logical and a physical architecture is meant to mirror architecture-db.md, but we are focusing on the query/control layer, also called ADAPTER in those earlier design docs.
It is interesting to note that the proposed design is very much in line with the design principles laid out a while ago in architecture-db.md:
The approach to remove scaling limits is "decoupling". Specifically, we want to remove as much artificial coupling as possible from the design of Materialize Platform. We will discuss the decoupling of both
- the "logical architecture" (which modules are responsible for what) and
- the "physical architecture" (where and how does code run to implement the above).
[...]
Decoupling is enabled primarily through the idea of a "definite collection". A definite collection is a durable, explicitly timestamped changelog of a data collection. Any two independent users of a definite collection can be sure they are seeing the same contents at any specific timestamp.
Definite collections, and explicit timestamping generally, allow us to remove many instances of explicitly sequenced behavior. If reads and writes (and other potential actions) have explicit timestamps, these actions do not require sequencing to ensure their correct behavior. The removal of explicit sequencing allows us to decouple the physical behaviors of various components.
The value I provide here is that we take the design one step further, to double down on those design ideas for a scalable and isolated serving layer, aka. ADAPTER.
At a high level, the Coordinator is the component that glues all the other components together. It is the sole holder of interesting resources, such as: the controllers(s), which are used to drive around what COMPUTE and STORAGE do, the Catalog (which I use as a proxy for "all persistent environment state" here), and the Timestamp Oracle. The Coordinator mediates access to those resources, by only allowing operations that flow through it's event loop to access and modify those resources. Among other things, handling of client queries is sequentialized by the main loop, as well as handling responses/status updates from the controllers/clusters.
The messages/operations that pass through the event-loop fall into these categories:
As we will see later, an important part of the current design is that these messages have priorities. The coordinator has to work off all Internal Commands (1.) before processing Controller Responses (2.), and so on for 2. and 3.
When we consider the commands that the Coordinator issues to the Controller(s), we can differentiate two categories:
CREATE INDEX
where the
fact that the index exists is first written down in the Catalog and then a
corresponding CreateDataflow
command is sent to the controller.SELECT
), for COMPUTE, and writes to tables,
for STORAGE. These are ephemeral because they don't derive from the Catalog
and therefore would not survive a restart of environmentd
.For the first category, the Coordinator acts as a deterministic translator of Catalog contents or changes to the Catalog to Controller commands.
We can differentiate external commands using similar categories:
INSERT
-style
queries that insert data into user collections.The mapping from DDL
, etc. to the types of commands does not fit 100% but
it's a useful shorthand so we'll keep using it.
Clusters send these types of responses back to the controller:
GlobalId
).Processing of all responses has to wait its turn on the single event loop. The Controller will signal that it is ready to process responses and then wait. The (compute) Controller will enqueue peek/subscribe responses to be processed on the even loop, which will eventually send results back over pgwire.
The current design uses the sequencing of the single event loop, along with the priorities of which messages to process, to uphold our correctness guarantees: external commands and the internal commands their processing spawns are brought into a total order, and Coordinator/Controller state can only be modified by one operation at a time. This "trivially" makes for a linearizable system.
For example, think of a scenario where multiple concurrent clients/connections
create collections (DDL
) and query or write to those collections (DQL
and
DML
). Say we have two clients, c1
and c2
, and they run these operations,
in order, that is they have a side-channel by which they know which earlier
operations succeeded and only start subsequent operations once the previous one
has been reported as successful:
c1
: CREATE TABLE foo [..]
c2
: INSERT INTO foo VALUES [..]
c1
: SELECT * FROM foo
At a high level, processing of these queries goes like this (abridged):
CREATE TABLE foo [..]
comes inNotice how working off commands in sequence (again, on the single event loop)
and the rules about message priority make it so that users don't observe
anomalies: External message #8 cannot be processed before the internal messages
spawned by the INSERT
are worked off. And the fact that Catalog state (and
other state, including talking to Controller(s)) can only be modified from one
"thread" make it so that the Peek will observe the correct state when it is
being processed.
Potential anomalies here would be:
INSERT
statement returns an error saying foo
doesn't exist.SELECT
statement returns an error saying foo
doesn't exist.SELECT
statement returns but doesn't contain the previously inserted
values.Any design that aims at replacing the single event loop has to take these things into consideration and prevent anomalies!
These are the assumptions that went into the design proposed below. Some of these derive from Background, above.
For now, this is a mostly un-ordered list but I will add more structure based on feedback.
Observations, from the above section on Coordinator Background:
Assumptions:
since
, in the absence of other read holds. They are
used when trying to Peek at the "freshest possible" timestamp. But there are
no guarantees about when they are a) sent back from the cluster to the
controller, and b) when they are received by the controller. Meaning they are
not a concern in our consistency/linearizability story.We use terms and ideas from architecture-db.md and formalism.md so if you haven't read those recently now's a good time to brush up on them.
This section can be seen as an extension of architecture-db.md but we drill down into the innards of ADAPTER and slightly change how we drive around COMPUTE AND STORAGE. At least we deviate from how they're driven around in the current physical design of Coordinator.
[!NOTE] Please keep in mind that this section is about the logical architecture. It might seem verbose or over/under specified but is not necessarily indicative of a performant implementation of the design.
We introduce (or make explicit) two new components:
And we also change how some of the existing components work.
The TIMESTAMP ORACLE can be considered a service that can be called from any component that needs it, and it provides linearizable timestamps and/or records writes as applied.
ReadTs(timeline) -> Timestamp
: returns the current read timestamp.This timestamp will be greater or equal to all prior values of
ApplyWrite()
, and strictly less than all subsequent values of WriteTs()
.
WriteTs(timeline) -> Timestamp
: allocates and returns monotonically
increasing write timestamps.This timestamp will be strictly greater than all prior values of ReadTs()
and WriteTs
.
ApplyWrite(timeline, timestamp)
: marks a write at time timestamp
as
completed.All subsequent values of ReadTs()
will be greater or equal to the given
write timestamp
.
CATALOG can also be considered a shared service whose methods can be called from any component that needs it. And multiple actors can append data to it.
Changes to the catalog are versioned in the "catalog"
timeline, which is
tracked separately from other timelines in TIMESTAMP ORACLE. Any successful
append of updates to the catalog implicitly does an ApplyWrite("catalog",
new_upper)
on TIMESTAMP ORACLE.
CompareAndAppend(expected_upper, new_upper, updates) -> Result<(),
AppendError>
: appends the given updates to the catalog iff the expected
upper matches its current upper.
SubscribeAt(timestamp) -> Subscribe
: returns a snapshot of the catalog at
timestamp
, followed by an ongoing stream of subsequent updates.
SnapshotAt(timestamp)
: returns an in-memory snapshot of the catalog at
timestamp
.
A snapshot corresponds to our current in-memory representation of the catalog
that the Coordinator holds. This can be (and will be, in practice!)
implemented using SubscribeAt
to incrementally keep an in-memory
representation of the catalog up to date with changes. However, it is
logically a different operation.
The controllers remain largely as they have been previously designed, but we introduce a distinction between deterministic controller command and ephemeral/query-like controller commands. See Controller Commands above.
The controllers can be seen as consuming a stream of deterministic controller
commands, in the newly introduced "catalog"
timeline, and
AdvanceCatalogFrontier
is downgrading its frontier.
Ephemeral/query-like commands gain a new catalog_ts
parameter which tells the
controller that it can only process them once the input frontier (of catalog
updates) has sufficiently advanced.
These changes make it so that the controller knows up to which CATALOG
timestamp it has received commands and to ensure that peeks (or other
query-like operations) are only served once we know that controller state is
at least up to date with the catalog state as of which the peek was
processed. Note that we say the Controller has to be at least up to date with
the catalog_ts
, it is okay for its catalog frontier to be beyond that. This
has the effect that in-flight peeks can end up in a situation where a
collection that we are trying to query doesn't exist anymore, and we therefore
have to report back an error. This is in line with current Materialize
behavior.
As we see below in the section on ADAPTER. We replace explicit sequencing by decoupling using pTVCs and timestamps.
Peek(global_id, catalog_ts, ..)
: performs the peek, but only once the catalog
frontier has advanced far enough.It is important to note that the new catalog_ts
parameter is different from
an as_of
/ts
, which latter refers to the timeline/timestamp of the object
being queried.
AdvanceCatalogFrontier(timestamp): tells the controller that it has seen
commands corresponding to catalog changes up to
timestamp`.(Most existing commands have been omitted for brevity!)
ADAPTER spawns tasks for handling client connections as they show up.
Additionally, it spawns one task per controller that runs a loop that
synthesizes commands for the controller based on differential CATALOG changes.
To do this, the task subscribes to CATALOG, filters on changes that are
relevant for the given controller, sends commands to the controller while
preserving the order in which they are observed in the stream of changes, and
synthesizes AdvanceCatalogFrontier
commands when the catalog timestamp
advances. These commands are the commands that deterministically derive from
CATALOG state (see Background section, above).
When processing client requests, ADAPTER uses TIMESTAMP ORACLE to get the latest read timestamp for CATALOG, then fetches a CATALOG snapshot as of at least that timestamp, and continues to process the request using the catalog snapshot as the source of truth. It's important to note that the catalog timestamp is not correlated with the timestamp of data collections that are being queried. Most of the time, the catalog read timestamp will not advance, it will only advance when DDL cause the catalog to change.
When handling DDL-style client queries, ADAPTER uses TIMESTAMP ORACLE to get a write timestamp and tries to append the required changes to the CATALOG. If there is a conflict it has to get a new timestamp, get the latest CATALOG snapshot, and try again. ADAPTER does not explicitly instruct the controllers to do anything that is required by these changes and it does not wait for controllers to act on those changes before returning to the client. Controllers are expected to learn about these changes and act upon them from their "private" command loop.
Client queries that spawn ephemeral controller commands (mostly DQL and DML-style queries) are timestamped with the timestamp (in the catalog timeline) as of which the query was processed. This ensure that these ephemeral controller commands are consistent with what the controller must know as of that timestamp.
Previous design documents describe physical implementations of the newly required components:
As a follow-up, we will propose a design doc for the post-platform-v2 physical, distributed architecture.
These milestones gradually move us from our current architecture to a design where the query/control layer (aka. the adapter, coordinator, and controllers) is less and less tightly coupled and therefore supports better scalability and isolation.
This milestone is about developing the components that we will need to decouple the components of the query/control layer. The current abstractions around the catalog and timestamp oracle are not shareable between multiple actors. Similarly, table writes cannot be done concurrently by multiple actors.
Epics:
Once we have the new components developed in Milestone 1 we can start incrementally pulling responsibilities out of the "single-threaded" Coordinator event loop. Individual benefits of this are:
pgwire
proactively, bypassing the event loop. And, most
importantly other responsibilities will not negatively aspect processing
Peeks.At the end of this milestone, we will have an isolated and vertically scalable query/control layer. The current implementation can not scale up by adding more resources to the single process/machine because the single event loop is a bottleneck. Once responsibilities are decouple we can scale up vertically.
Once the controllers are decoupled from the Coordinator via the Catalog pTVC, we can start moving them out into their own processes or move them along-side the cluster/replica processes. Similarly, we can start moving query processing to other processes.
TBD!