20210401_volatile_sources.md 14 KB

Volatile sources

Summary

Our users want to connect Materialize to upstream sources that do not provide guarantees about what data they will present. These sources are in tension with the goal of providing a deterministic, replayable system.

This design doc proposes to:

  • Introduce "volatile source" as a term of art that describes a source that does not meet a specific standard of reliability.

  • Use the terminology consistently in the docs site and error messages to educate our users about volatile sources.

  • Restrict the use of volatile sources in contexts where they could impede future improvements to the system's determinism and correctness.

Goals

The only goal is to stabilize the PubNub and SSE sources.

Both of these are volatile sources with some worrying correctness properties, but their existence will massively improve the "wow factor" for demos of Materialize Cloud.

Description

Terminology

A "volatile source" is any source that cannot reliabily present the same data with the same timestamps across different instantiations.

A "instantiation" of a source occurs whenever:

  • A new index is created on the source or on an unmaterialized view that references the source.
  • An ad-hoc query or TAIL references the source directly.

Restarting Materialize will cause all sources to be reinstantiated. Referencing a materialized view that in turn depends on a source does not create a new instantiation of a source.

Examples of volatile sources:

  • Any Kinesis source. Since Kinesis streams have mandatory retention periods (by default 24 hours), a new instantiation of the source will not see any data that has expired.
  • A PubNub demo source, since new instantiations always start from the tail of the stream.
  • An append-only Kafka source with a non-infinite retention policy.
  • A file source, where the file is periodically overwritten.

Examples of nonvolatile sources:

  • A PubNub source with retention, though we don't presently support this.
  • An append-only Kafka source, when the topic has infinite retention and no compaction policy.
  • An upsert Kafka source, when the topic has infinite retention but may have a key compaction policy.
  • A file source, when the file is treated as append only.

It's important to note that volatility is a property of how a source is configured, rather than fundamental to a source type. Kinesis is the rare exception. All other sources can be either volatile and non-volatile modes, depending on how the source is configured.

Discussion

Volatile sources impede Materialize's goal of providing users with correct, deterministic results. If you restart Materialize, we want to guarantee that you will see exactly the same data at exactly the same timestamps, and there is ongoing work to facilitate that. If you've created a volatile source, there is simply no possibility that we'll be able to provide that guarantee, because the upstream source may have garbage collected the data that you saw last time you ran Materialize.

Several forthcoming initiatives depend on this determinism guarantee, however:

  • Exactly-once sinks
  • Active-active replication
  • Dataflow transformations that consider source instances to be logically equivalent

My contention is that it is wholly unrealistic to expect our users to only use nonvolatile sources with Materialize. The streaming world runs on inconsistency and nondurability. To play in this space, we need to interoperate seamlessly with volatile sources, and then offer a large, inviting off ramp into the Materialize world of determinism and consistency.

The proposal here, then, is to bake the notion of volatility into the system. We start by marking volatile sources as such. Any view that depends on such a source becomes volatile. Any sink created from a volatile source or view becomes volatile. The optimizer will need to be careful not to treat different instantiations of a volatile source or view as logically equivalent. Exactly once sinks and other features that only work on nonvolatile sources can look for and reject volatile sources.

Design

To start, I propose that we observe a simple rule:

  • All PubNub, SSE, and Kinesis sources are marked as volatile.
  • All other sources are marked as having unknown volatility (NULL).

In the future, we'll want to allow users to mark Kafka, S3, and file sources as volatile at their option, to capture when their configuration of the source is volatile. See Future work below.

The only user-visible change at the moment will be an additional column in SHOW SOURCES that indicates whether a source is volatile

> SHOW SOURCES;
 name        | type | volatile | materialized
-------------+------+----------+-------------
 pubnub_src  | user | true     | false
 kafka_src   | user | NULL     | false

and similarly for SHOW VIEWS:

> SHOW VIEWS;
 name         | type | volatile | materialized
--------------+------+----------+-------------
 pubnub_view  | user | true     | true
 kafka_view   | user | NULL     | true

In the docs, we'll want to add a page about "Volatile sources" that explains what they are, why they are good for demos, and why they are bad for production applications that care about correctness. We can add a warning callout on PubNub, SSE, and Kinesis source pages that links to the volatile source page.

The big "gotcha" with volatile sources is that different instantiations can have different data within the same materialized process. Consider a sequence like the following:

CREATE SOURCE market_orders_raw FROM PUBNUB
SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe'
CHANNEL 'pubnub-market-orders';

CREATE MATERIALIZED VIEW market_orders_1 AS
  SELECT text::jsonb AS val FROM market_orders_raw;

-- Imagine the user makes a cup of coffee now.

CREATE MATERIALIZED VIEW market_orders_2 AS
  SELECT text::jsonb AS val FROM market_orders_raw;

The data in market_orders_1 and market_orders_2 will not be equivalent, because they'll each have instantiated separate copies of the source at separate times. market_orders_1 will contain some historical data that market_orders_2 does not.

The more I think about it, the more this seems like reasonable, even desirable behavior from a volatile source, provided we can educate users sufficiently.

Implications

Exactly-once sinks

Exactly-once sinks will need to error if they are created on a volatile source or view. This will be easy to check in the coordinator. The error message might read:

ERROR:  exactly-once sinks cannot be created from a volatile {source|view}
DETAIL: Data in volatile {sources|views} is not consistent across restarts,
        which is at odds with exactly-once semantics. To learn more, visit
        https://materialize.com/s/volatility.

Active-active replication

Active-active replication will rely on the determinism of the dataflow computation to compute identical results across two separate instances of Materialize. That is at odds with volatile sources, which will provide the two copies of Materialize with two different sets of data.

One easy solution is to simply ban volatile sources when using Materialize in this hypothetical distributed mode.

Alternatively, we could say "you got what you asked for", and message the deficiencies accordingly.

Optimizer

The optimizer will need to be careful to treat different instances of volatile sources and views as logically different views. This is a burden for the optimizer team, but hopefully a small one. The check to see whether two sources or views is roughly:

fn are_objects_equivalent(metadata: &Metadata, id1: GlobalId, id2: GlobalId) -> bool {
    id1 == id2 && !metadata.is_volatile(id1)
}

Other to-be-developed features

There is no obligation for new features to play well with volatile sources. In other words, it is acceptable for engineers to treat volatile sources as second-class citizens in Materialize.

For example, if you add a new sink type, there is no obligation to make it work with volatile sources if doing so would be difficult. If you add a new optimization that would be substantially complicated by volatile sources, there is no obligation to do that hard work; you can just disable the optimization for volatile sources instead.

That said, our backwards compatibility still applies to volatile sources. It is not okay to break existing, working features involving volatile sources (modulo extenuating circumstances).

Future work

Toggleable volatility

In the future, I believe we'll want to allow users to explicitly mark a source as nonvolatile or volatile at their option, e.g. via a flag to CREATE SOURCE:

CREATE SOURCE [NONVOLATILE | VOLATILE] src
FROM KAFKA BROKER ...

Correctly choosing the defaults here will require a good deal of thought, though, so I'd like to hold off on this discussion for now. The proposal above of treating all Kafka, S3, and file sources as nonvolatile is equivalent to the system's current behavior.

Another benefit of deferring this toggleability is that computing the volatility bit as proposed in this doc does not require storing any additional state, as it is a pure function of the source type. If we decide we don't like the concept of volatility, we can excise it without breaking backwards compatibility.

Volatility removal

Source persistence may one day permit the conversion of a volatile source into a nonvolatile one. If we can write down the data we see as it flows into Materialize, then it doesn't matter if the upstream source garbage collects it.

Expanded volatility bits

We might discover that a single volatility bit is too coarse. For example, exactly-once sinks could learn to support volatile sources that provide the guarantee that you will never see a message more than once.

This proposal leaves the door open for slicing "volatility" into multiple dimensions. This is another good reason to punt on toggleable volatility. Once volatility specifications are part of CREATE SOURCE, we'll have to worry about backwards compatibility; until then, we have a lot of latitude to refine our notion of volatility.

Automated volatility violation detection

Relying on users to properly declare their sources as volatile or nonvolatile is error prone. Unfortunately this problem is inherent to interfacing with external systems; our users are responsibile for configuring and operating those systems according to some specification, and then telling us what that specification is. Given a nonvolatile Kafka source, for example, there is simply no way for Materialize to prevent a user from adding a short retention policy to the underlying Kafka topic that will make the source volatile.

Still, in many cases, Materialize could assist in detecting volatility violations. We could spot-check the data across instances of the same source occasionally, for example, and report any errors that we notice. We could periodically query the Kafka broker for its retention settings and surface any misconfigurations. We could remember a file's inode and size, and complain if either has changed after Materialize restarts.

Alternatives

Forced materialization

One considered alternative was forcing all PubNub and SSE sources to be materialized. This neatly sidesteps the volatility within a single materialized binary, because it guarantees there is only one instantiation of each volatile source.

This severely limits Materialize's expressivity, though. One use case we expect to be very common in demos is to only materialize the last n seconds of data, which requires both an unmaterialized source and a materialized view:

CREATE SOURCE market_orders_raw FROM PUBNUB SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe' CHANNEL 'pubnub-market-orders';

CREATE MATERIALIZED VIEW market_orders AS
  SELECT *
  FROM (
      SELECT
        (text::jsonb)->>'bid_price' AS bid_price,
        (text::jsonb)->>'order_quantity' AS order_quantity,
        (text::jsonb)->>'symbol' AS symbol,
        (text::jsonb)->>'trade_type' AS trade_type,
        ((text::jsonb)->'timestamp')::bigint * 1000 AS timestamp_ms
      FROM market_orders_raw
    )
  WHERE mz_now() BETWEEN timestamp_ms AND timestamp_ms + 30000

One-shot sources

One other discussed alternative was to introduce a "one-shot" source, which could be used in exactly one materialized view:

CREATE MATERIALIZED VIEW market_orders AS
 WITH market_orders_raw AS SOURCE (
     PUBNUB SUBSCRIBE KEY 'sub-c-4377ab04-f100-11e3-bffd-02ee2ddab7fe' CHANNEL 'pubnub-market-orders';
  )
  SELECT *
  FROM (
      SELECT
        (text::jsonb)->>'bid_price' AS bid_price,
        (text::jsonb)->>'order_quantity' AS order_quantity,
        (text::jsonb)->>'symbol' AS symbol,
        (text::jsonb)->>'trade_type' AS trade_type,
        ((text::jsonb)->'timestamp')::bigint * 1000 AS timestamp_ms
      FROM market_orders_raw
    )
  WHERE mz_now() BETWEEN timestamp_ms AND timestamp_ms + 30000

The idea here is to force you to type out the SOURCE definition every time you want to use it, to make it more clear that you will get different data in different views that use that source. But this design does not fit well into the existing catalog infrastructure, which maintains a very clear separation between sources and views. And even with this design, I still think we'd want to introduce the notion of volatility, because we'll want to exactly-once sinks and active-active replication to know that market_orders does not uphold the usual consistency properties.

Wait for volatility removal

We could, in principle, reject the idea of volatile sources, and wait for source persistence to unlock volatility removal. Then the story for using volatile sources with Materialize would be to require using them with source persistence, so that they become nonvolatile.

Source persistence is likely 1-2 years away, though, especially an implementation that works seamlessly with compaction. It's also not clear to me whether every user who wants a volatile source would be ok with requiring persistence for those sources.