In Materialize today, each source and sink instance is independent. For example:
TAIL
sink performs its own consolidation of the input
stream.The result is a system that can correctly backfill data when a source is instantiated multiple times, but is inefficient in the steady state. In an ideal system, once two instances of a source or sink had caught up to the same point, they would share resources. To continue the example from above:
TAIL
sink would share the underlying consolidation
operator.The proposal here is to teach Materialize to perform this resource sharing automatically and dynamically as the catalog evolves.
There are three goals here:
The last goal is in fact the motivating goal. Taken in isolation, supporting unmaterialized PostgreSQL sources does not clearly justify the complexities of dynamic resource sharing. But taken altogether, I think it becomes clear that there is a general problem here that warrants a general solution.
The design here does not even begin to address the implementation of dynamic resource sharing. Suffice it to say that the implementations will be nontrivial. We'll need separate implementation design docs for each source and sink type.
Consider the following DDL statements in Materialize today:
CREATE SOURCE kafka_src FROM KAFKA BROKER '...' TOPIC 'top' FORMAT AVRO ... ;
CREATE MATERIALIZED VIEW view1 AS SELECT col1 FROM kafka_src;
CREATE MATERIALIZED VIEW view2 AS SELECT col2 FROM kafka_src;
view1
and view2
will each instantiate kafka_src
, resulting in
two separate Kafka consumers that read from the top
topic:
kafka_src/1 kafka_src/2
+---------------------+ +---------------------+
| librdkafka consumer | | librdkafka consumer |
+---------------------+ +---------------------+
| Avro decoding | | Avro decoding |
+---------------------+ +---------------------+
| |
view1 view2
In the steady state, this is quite wasteful. We're streaming the records from
top
over the network twice and decoding the Avro in each record twice. We'd
prefer to run just one Kafka consumer that feeds both views:
+---------------------+
| librdkafka consumer |
+---------------------+
| Avro decoding |
+---------------------+
/ \
/ \
kafka_src/1 kafka_src/2
| |
view1 view2
If you know the complete set of columns you care about ahead of time, you can coax an efficient implementation out of Materialize today with an intermediate materialized view:
CREATE SOURCE kafka_src FROM KAFKA BROKER '...' TOPIC 'top' FORMAT AVRO ... ;
CREATE MATERIALIZED VIEW kafka_view AS SELECT col1, col2 FROM kafka_src;
CREATE VIEW view1 AS SELECT col1 FROM kafka_view;
CREATE VIEW view2 AS SELECT col2 FROM kafka_view;
This creates only one librdkafka consumer by ensuring that kafka_src
is
only instantiated once.
kafka_src/1
+---------------------+
| librdkafka consumer |
+---------------------+
| Avro decoding |
+---------------------+
|
kafka_view
/ \
view1 view2
This approach has two notable deficiencies.
First, it requires that you commit ahead of time to the columns you care about.
What if, down the road, you realize you need a view3
that extracts col3
? You
have two unappealing options:
Delete kafka_view
, view1
, and view2
. Recreate kafka_view
with
col3
as well, then create view1
, view2
, and view3
atop the new
kafka_view
.
This is both annoying to type and will interrupt the availability of
view1
and view2
.
Create another instantiation of kafka_src
by materializing view3
atop kafka_src
directly.
This is ergonomic but results in a suboptimal steady state.
Second, this workaround is not fully generalizable. Consider the following source and views:
CREATE SOURCE kafka_src FROM KAFKA BROKER '...' TOPIC 'top' FORMAT AVRO ... ;
CREATE MATERIALIZED VIEW v1 AS SELECT sum(col1) FROM src;
CREATE MATERIALIZED VIEW v2 AS SELECT col2 FROM src;
There is no suitable intermediate materialized view that can be extracted here. (There are some gross hacks, but none that we'd feel comfortable recommending to customers.)
With dynamic resource sharing, you get all the ergonomics of creating views as you please, but a system that converges on the same efficience that you'd get if you'd pre-committed to those views. The key insight is that we can create a temporary backfill consumer when a new instance is instantiated:
steady-state primary temporary backfill
+---------------------+ +---------------------+
| librdkafka consumer | | librdkafka consumer |
+---------------------+ +---------------------+
| Avro decoding | | Avro decoding |
+---------------------+ +---------------------+
/ \ |
/ \ |
kafka_src/1 kafka_src/2 kafka_src/3
| | |
view1 view2 view3
Once that backfill consumer has caught up to the steady-state primary
consumer—i.e., once the consumer offsets are identical—we can cut kafka_src/3
over to the primary consumer, and jettison the backfill consumer:
steady-state primary
+---------------------+
| librdkafka consumer |
+---------------------+
| Avro decoding |
+---------------------+
/ \------------------------+
/ \ \
kafka_src/1 kafka_src/2 kafka_src/3
| | |
view1 view2 view3
Note that this example is provided for illustrative purposes only. The actual implementation might choose to jettison the existing consumer and switch to the new consumer instead. See this discussion for details.
The MVP for PostgreSQL sources will not support unmaterialized sources. It's not fundamentally impossible to have multiple instances of a PostgreSQL source, but each instance of a PostgreSQL source is prohibitively expensive. Forcing PostgreSQL sources to be materialized is an easy way to ensure that only one instance of a PostgreSQL source is ever created.
Roughly speaking, PostgreSQL's replication protocol works per database, while a PostgreSQL source in Materialize corresponds to just one table in that database. If you were to instantiate a PostgreSQL source twice in Materialize, you'd wind up streaming two separate copies of the entire database to Materialize, which is certain to be surprising to users.
With dynamic resource sharing, creating a new instance of a PostgreSQL source would instead backfill the missing data for just the one table, then cut over to the already existing replication stream. This will be cheap enough that we'll be comfortable allowing PostgreSQL sources to be unmaterialized.
The major downside of this approach is the implementation complexity. Cutting source instances over from a backfill resource to the steady-state resource will require a lot of fiddly logic and testing.
My contention is that this complexity is well worth it for the user experience it provides. Expecting users to predict exactly which fields they'll need from a source is unrealistic. Asking them to recreate an entire stack of views, as described in the workaround is acceptable in the short term but unfriendly in the long term. But letting users interactively create the sources and views they want, while converging on the optimal implementation nonetheless? That's the magic experience we strive for.
Also, the lessons learned here will be directly applicable to our future plans for multi-query optimization. Dynamically sharing resources in sources is a simplified version of dynamically sharing resources (i.e., arrangements) between views.
We'd previously talked about using SQL transactions to permit bulk creation of set of sources and views, like so:
BEGIN;
CREATE SOURCE kafka_src FROM KAFKA BROKER '...' TOPIC 'top' FORMAT AVRO ...
CREATE MATERIALIZED VIEW view1 AS SELECT col1 FROM kafka_src
CREATE MATERIALIZED VIEW view2 AS SELECT col2 FROM kafka_src
COMMIT;
When the transaction commits, Materialize could look holistically at the objects within, and realize that only one new source instance is necessary.
But this turns out not to be any more flexible than the current workaround. You need to be willing to declare all of your sources and views in one shot, and that's not always feasible.
We might, however, want to support this feature for savvy users though as a minor optimization for dynamic resource sharing. Dynamic resource sharing only guarantees that the system converges to an optimal state. Creating sources one by one will result in a period of inefficiency, as several backfill resources are created and then merged together. But if the user is willing to provide a batch of create statements simultaneously, we can create the optimal number of instances from the start.
In theory, arbitrary schema changes on live views would avoid the need for dynamic resource sharing in some cases. In practice we are at least several years away from supporting this, but I mention it here for posterity.
Suppose you've set things up using the workaround described above:
CREATE SOURCE kafka_src FROM KAFKA BROKER '...' TOPIC 'top' FORMAT AVRO ... ;
CREATE MATERIALIZED VIEW kafka_view AS SELECT col1, col2 FROM kafka_src;
CREATE VIEW view1 AS SELECT col1 FROM kafka_view;
CREATE VIEW view2 AS SELECT col2 FROM kafka_view;
Now suppose you want to add a new view that references col3
without
reinstantiating the source. You could just replace the definition of
kafka_view
with a query that includes col3
:
CREATE OR REPLACE VIEW kafka_view AS SELECT col1, col2, col3 FROM kafka_src;
CREATE VIEW view3 AS SELECT col3 FROM kafka_view;