Our users want to connect Materialize to upstream sources that do not provide guarantees about what data they will present. These sources are in tension with the goal of providing a deterministic, replayable system.
This design doc proposes to:
Introduce "volatile source" as a term of art that describes a source that does not meet a specific standard of reliability.
Use the terminology consistently in the docs site and error messages to educate our users about volatile sources.
Restrict the use of volatile sources in contexts where they could impede future improvements to the system's determinism and correctness.
The only goal is to stabilize the PubNub and SSE sources.
Both of these are volatile sources with some worrying correctness properties, but their existence will massively improve the "wow factor" for demos of Materialize Cloud.
A "volatile source" is any source that cannot reliabily present the same data with the same timestamps across different instantiations.
A "instantiation" of a source occurs whenever:
TAIL
references the source directly.Restarting Materialize will cause all sources to be reinstantiated. Referencing a materialized view that in turn depends on a source does not create a new instantiation of a source.
Examples of volatile sources:
Examples of nonvolatile sources:
It's important to note that volatility is a property of how a source is configured, rather than fundamental to a source type. Kinesis is the rare exception. All other sources can be either volatile and non-volatile modes, depending on how the source is configured.
Volatile sources impede Materialize's goal of providing users with correct, deterministic results. If you restart Materialize, we want to guarantee that you will see exactly the same data at exactly the same timestamps, and there is ongoing work to facilitate that. If you've created a volatile source, there is simply no possibility that we'll be able to provide that guarantee, because the upstream source may have garbage collected the data that you saw last time you ran Materialize.
Several forthcoming initiatives depend on this determinism guarantee, however:
My contention is that it is wholly unrealistic to expect our users to only use nonvolatile sources with Materialize. The streaming world runs on inconsistency and nondurability. To play in this space, we need to interoperate seamlessly with volatile sources, and then offer a large, inviting off ramp into the Materialize world of determinism and consistency.
The proposal here, then, is to bake the notion of volatility into the system. We start by marking volatile sources as such. Any view that depends on such a source becomes volatile. Any sink created from a volatile source or view becomes volatile. The optimizer will need to be careful not to treat different instantiations of a volatile source or view as logically equivalent. Exactly once sinks and other features that only work on nonvolatile sources can look for and reject volatile sources.
To start, I propose that we observe a simple rule:
NULL
).In the future, we'll want to allow users to mark Kafka, S3, and file sources as volatile at their option, to capture when their configuration of the source is volatile. See Future work below.
The only user-visible change at the moment will be an additional column in
SHOW SOURCES
that indicates whether a source is volatile
> SHOW SOURCES;
name | type | volatile | materialized
-------------+------+----------+-------------
pubnub_src | user | true | false
kafka_src | user | NULL | false
and similarly for SHOW VIEWS
:
> SHOW VIEWS;
name | type | volatile | materialized
--------------+------+----------+-------------
pubnub_view | user | true | true
kafka_view | user | NULL | true
In the docs, we'll want to add a page about "Volatile sources" that explains what they are, why they are good for demos, and why they are bad for production applications that care about correctness. We can add a warning callout on PubNub, SSE, and Kinesis source pages that links to the volatile source page.
The big "gotcha" with volatile sources is that different instantiations can have
different data within the same materialized
process. Consider a sequence like
the following:
CREATE SOURCE market_orders_raw FROM PUBNUB
SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe'
CHANNEL 'pubnub-market-orders';
CREATE MATERIALIZED VIEW market_orders_1 AS
SELECT text::jsonb AS val FROM market_orders_raw;
-- Imagine the user makes a cup of coffee now.
CREATE MATERIALIZED VIEW market_orders_2 AS
SELECT text::jsonb AS val FROM market_orders_raw;
The data in market_orders_1
and market_orders_2
will not be equivalent,
because they'll each have instantiated separate copies of the source at
separate times. market_orders_1
will contain some historical data that
market_orders_2
does not.
The more I think about it, the more this seems like reasonable, even desirable behavior from a volatile source, provided we can educate users sufficiently.
Exactly-once sinks will need to error if they are created on a volatile source or view. This will be easy to check in the coordinator. The error message might read:
ERROR: exactly-once sinks cannot be created from a volatile {source|view}
DETAIL: Data in volatile {sources|views} is not consistent across restarts,
which is at odds with exactly-once semantics. To learn more, visit
https://materialize.com/s/volatility.
Active-active replication will rely on the determinism of the dataflow computation to compute identical results across two separate instances of Materialize. That is at odds with volatile sources, which will provide the two copies of Materialize with two different sets of data.
One easy solution is to simply ban volatile sources when using Materialize in this hypothetical distributed mode.
Alternatively, we could say "you got what you asked for", and message the deficiencies accordingly.
The optimizer will need to be careful to treat different instances of volatile sources and views as logically different views. This is a burden for the optimizer team, but hopefully a small one. The check to see whether two sources or views is roughly:
fn are_objects_equivalent(metadata: &Metadata, id1: GlobalId, id2: GlobalId) -> bool {
id1 == id2 && !metadata.is_volatile(id1)
}
There is no obligation for new features to play well with volatile sources. In other words, it is acceptable for engineers to treat volatile sources as second-class citizens in Materialize.
For example, if you add a new sink type, there is no obligation to make it work with volatile sources if doing so would be difficult. If you add a new optimization that would be substantially complicated by volatile sources, there is no obligation to do that hard work; you can just disable the optimization for volatile sources instead.
That said, our backwards compatibility still applies to volatile sources. It is not okay to break existing, working features involving volatile sources (modulo extenuating circumstances).
In the future, I believe we'll want to allow users to explicitly mark a source
as nonvolatile or volatile at their option, e.g. via a flag to CREATE SOURCE
:
CREATE SOURCE [NONVOLATILE | VOLATILE] src
FROM KAFKA BROKER ...
Correctly choosing the defaults here will require a good deal of thought, though, so I'd like to hold off on this discussion for now. The proposal above of treating all Kafka, S3, and file sources as nonvolatile is equivalent to the system's current behavior.
Another benefit of deferring this toggleability is that computing the
volatility
bit as proposed in this doc does not require storing any additional
state, as it is a pure function of the source type. If we decide we don't like
the concept of volatility, we can excise it without breaking backwards
compatibility.
Source persistence may one day permit the conversion of a volatile source into a nonvolatile one. If we can write down the data we see as it flows into Materialize, then it doesn't matter if the upstream source garbage collects it.
We might discover that a single volatility bit is too coarse. For example, exactly-once sinks could learn to support volatile sources that provide the guarantee that you will never see a message more than once.
This proposal leaves the door open for slicing "volatility" into multiple
dimensions. This is another good reason to punt on
toggleable volatility. Once volatility specifications
are part of CREATE SOURCE
, we'll have to worry about backwards compatibility;
until then, we have a lot of latitude to refine our notion of volatility.
Relying on users to properly declare their sources as volatile or nonvolatile is error prone. Unfortunately this problem is inherent to interfacing with external systems; our users are responsibile for configuring and operating those systems according to some specification, and then telling us what that specification is. Given a nonvolatile Kafka source, for example, there is simply no way for Materialize to prevent a user from adding a short retention policy to the underlying Kafka topic that will make the source volatile.
Still, in many cases, Materialize could assist in detecting volatility violations. We could spot-check the data across instances of the same source occasionally, for example, and report any errors that we notice. We could periodically query the Kafka broker for its retention settings and surface any misconfigurations. We could remember a file's inode and size, and complain if either has changed after Materialize restarts.
One considered alternative was forcing all PubNub and SSE sources to be
materialized. This neatly sidesteps the volatility within a single
materialized
binary, because it guarantees there is only one instantiation
of each volatile source.
This severely limits Materialize's expressivity, though. One use case we expect to be very common in demos is to only materialize the last n seconds of data, which requires both an unmaterialized source and a materialized view:
CREATE SOURCE market_orders_raw FROM PUBNUB SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe' CHANNEL 'pubnub-market-orders';
CREATE MATERIALIZED VIEW market_orders AS
SELECT *
FROM (
SELECT
(text::jsonb)->>'bid_price' AS bid_price,
(text::jsonb)->>'order_quantity' AS order_quantity,
(text::jsonb)->>'symbol' AS symbol,
(text::jsonb)->>'trade_type' AS trade_type,
((text::jsonb)->'timestamp')::bigint * 1000 AS timestamp_ms
FROM market_orders_raw
)
WHERE mz_now() BETWEEN timestamp_ms AND timestamp_ms + 30000
One other discussed alternative was to introduce a "one-shot" source, which could be used in exactly one materialized view:
CREATE MATERIALIZED VIEW market_orders AS
WITH market_orders_raw AS SOURCE (
PUBNUB SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe' CHANNEL 'pubnub-market-orders';
)
SELECT *
FROM (
SELECT
(text::jsonb)->>'bid_price' AS bid_price,
(text::jsonb)->>'order_quantity' AS order_quantity,
(text::jsonb)->>'symbol' AS symbol,
(text::jsonb)->>'trade_type' AS trade_type,
((text::jsonb)->'timestamp')::bigint * 1000 AS timestamp_ms
FROM market_orders_raw
)
WHERE mz_now() BETWEEN timestamp_ms AND timestamp_ms + 30000
The idea here is to force you to type out the SOURCE
definition every time you
want to use it, to make it more clear that you will get different data in
different views that use that source. But this design does not fit well into the
existing catalog infrastructure, which maintains a very clear separation between
sources and views. And even with this design, I still think we'd want to
introduce the notion of volatility, because we'll want to exactly-once sinks and
active-active replication to know that market_orders
does not uphold the
usual consistency properties.
We could, in principle, reject the idea of volatile sources, and wait for source persistence to unlock volatility removal. Then the story for using volatile sources with Materialize would be to require using them with source persistence, so that they become nonvolatile.
Source persistence is likely 1-2 years away, though, especially an implementation that works seamlessly with compaction. It's also not clear to me whether every user who wants a volatile source would be ok with requiring persistence for those sources.