Aside from reading and writing streaming data, Materialize may create or destroy external state for particular sources and sinks. The goal of this design doc is to align on a single way to create and destroy that external state.
coord
, dataflow
).Some Materialize sources and sinks need to create, maintain, and destroy state external to Materialize. Examples of this include:
In the future, the following will also require the ability to create, maintain, and destroy external state:
Futhermore, external state is not necessarily associated 1-1 with a source. Materialize will currently spawn as many source instances as needed from a given source, typically one per downstream materialized view. Since it's the source instances that do the actual communication with external systems there might be instance-specific state that needs to be maintained. For example, postgres source instances each need a separate replication slot to read data from.
As we add more sources and sinks, the more uniformly we implement them the better. Even if we don't move forward with this design, it would be great to get on the same page about how we should manage external state as we all implement new features.
Kafka sinks, which are the only completed objects that manage external state at the moment, do so in coord
(via its
sink_builder
). This proposal would not require any changes there.
Postgres sources, on the other hand, create upstream replication slots in the dataflow
crate. If we wanted to move
forward with this proposal, that detail of Postgres sources would need to be rewritten.
Currently, there are two crates where we could handle external state: coord
and dataflow
. I propose that all logic that
manages external state should exist in coord
. (dataflow is explored as an alternative later on.) As per it's documentation,
the coord
crate is responsible for coordinating client requests with the dataflow layer. This is primarily handled via the
Coordinator, which is currently responsible for:
//! * Launching the dataflow workers.
//! * Periodically allowing the dataflow workers to compact existing data.
//! * Executing SQL queries from clients by parsing and planning them, sending
//! the plans to the dataflow layer, and then streaming the results back to
//! the client.
//! * Assigning timestamps to incoming source data.
This proposal would add the additional responsibility:
//! * Create, track, and destroy external state that must be handled by Materialize.
coord
and the Coordinator are uniquely positioned to create, track, and destroy external state because it is the only entity
that has access to Materialize's inner Catalog. For this reason, if we want to persist information about managed external state,
that information actually must exist in coord
.
To make this work, I propose adding a function like the following to CatalogItem
(which is similar to [Petros' propsal
here]):
/// Returns an operator that indicates any external state to be created before
/// we create an item in the catalog.
pub async fn create_state_op(&self) -> Option<ExternalStateOp> {}
/// Returns an operator that indicates any external state to be destroyed once
/// catatlog_transact succeeds.
pub async fn destroy_state_op(&self) -> Option<ExternalStateOp> {}
The ExternalStateOp
enum will be similar to our Catalog Op
, but it will be responsible for holding the details necessary
to create or destroy external state. ExternalStateOp
would reside in the dataflow-types
crate to access the connector-specific
details required to create or destroy that state. But the Coordinator, who will fetch the ExternalStateOp
s on CREATE
ing or
DROP
ping objects, will be responsible for executing them.
I propose that ExternalStateOp
should have a function like the following:
/// Creates or destroys external state as indicated by the ExternalStateOp
pub async fn execute(&self) -> Result<(), anyhow::Error> {
match &self {
op1 => { do something external },
...
}
}
That execute
function will by called by the Coordinator in catalog_transact
once the object in question has been
successfully created or dropped. For a sketch of what this would look like, check out this WIP PR.
And, as we look to the (potentially distant) future when Materialize will be running a single Coordinator and multiple dataflow
s,
each dataflow
not knowing about the existence of the others, I think that this proposal becomes more compelling. We avoid the
distributed systems problem of figuring out which dataflow
should create/destroy things and how they should communicate that information.
Instead, the Coordinator, which is aware of the fact that there are multiple dataflow
s running the same thing, can simply handle
that responsibility.
An alternative approach is to let the dataflow
crate be responsible for creating and destroying external state. We could
do this by introducing new Create
and Drop
Traits that sources and sinks would have to implement. For sources and sinks
that don't require external state, their implementations of Create
and Drop
would simply be no-ops. For other sources and
sinks, the logic required to create and destroy external state would be implemented in these Traits.
A bonus of this approach is that the logic to create or destroy external state related to a database object would be located near that object's definition.
A challenge of this approach is that if the dataflow
crate fails to create or destroy external state, we will be stuck with
an error-ing dataflow. The user will have to manually DROP
this object and recreate it. This is not a challenge if the external
state management is handled in coord
, where it will error sooner.
dataflow
layer? (Where it's not possible to create it
earlier?)This design document was discussing on 5/5/2021, in the External Integrations weekly team sync. The following thoughts and issues were raised.
The Coordinator may spin up a source than in turn spins up multiple source instances. If each of those source instances
requires its own state, and we want to store external state information in coord
, we will need to enable one of two
things:
Connector
information.The team agreed that the second approach was preferable, even if it updated the interface of some dataflow
functions.
However, storing source instance state introduces a bit of persistence complexity. Whatever instance-specific state we
store in the Catalog must be stable across versions or versioned.
Note: the same may be true for sink instances of some type, but was not discussed.
It was brought up that if there were an issue with a source or sink at the dataflow
layer, the dataflow operators would
not be empowered to clean up state themselves or communicate this back to the Coordinator. While this is a valid objection,
it matches current behavior. If there is an issue with a dataflow, the user will have to manually DROP
it. Moving forward,
as we provide more useful errors, we may want to revisit this. There are two ways that we can approach doing this:
It was proposed that we create a new pg_replication_slots
table in Materialize's Catalog in order to track the external
state of Postgres sources. (This suggestion is Postgres source specific, breaking a bit with the goal of this design doc, but
helping us think of more concrete solutions.) pg_replication_slots
will be like Materialize's timestamps table.
Alternatively, we could create a new table like CREATE TABLE source_instance_data (source_id, data)
to achieve the same
goal, without making the table bespoke.
It was proposed that, instead of the proposal in this document, that we create a new CatalogItem
type to track and store
information relevant to external state. Unfortunately, CatalogItem
is a specific type of resolveable object, and information
about external state does not match that pattern. We will not be moving forward with this alternate proposal.
It was brought up that we want to move the Timestamper from coord
and into dataflow
, which is the opposite of what this
document proposes for external state. The reason being that each BYO source must share the same Timestamper, and it would
be preferable for each source to get its own thread. This was a valid proposal for the Timestamper, but had two challenges:
It was raised that the async work to create or destroy state might be slow -- will we block? The answer to this is yes!
After the discussion with the External Integrations team, we agreed that managing external state belongs in coord
for the
reasons listed above. In order to implement this, we've agreed that we will add a new struct that tracks external state
that will sit beside our current Connector
information. It is essential that this new state information be directly tied
to individual instances of sources and sinks.
When creating objects in Materialize that create external state, the Coordinator will be responsible for instantiating this
state and passing the required information to the dataflow
layer. When dropping objects, the Coordinator will be responsible
for using this state information to actually destroy the external state.
A first attempt at managing external state this way was made in #6714, but has not yet been merged.