# Materialize Formalism Materialize is a system that maintains views over changing data. At Materialize's heart are *time-varying collections* (TVCs). These are multisets of typed *data* whose contents may vary arbitrarily as function of some ordered *time*. Materialize records, transforms, and presents the contents of time varying collections. Materialize's primary function is to provide the contents of time-varying collection at specific times with absolute certainty. This document details the **intended behavior** of the system. It does not intend to describe the **current behavior** of the system. Various liberties are taken in the names of commands and types, in the interest of simplicity. Not all commands here may be found in the implementation, nor all implementation commands found here. You may wish to consult the [VLDB](https://vldb.org/pvldb/vol13/p1793-mcsherry.pdf) and [CIDR](https://github.com/TimelyDataflow/differential-dataflow/blob/master/differentialdataflow.pdf) papers on differential dataflow, as well as the [DD library](https://github.com/TimelyDataflow/differential-dataflow). # In a nutshell A *time-varying collections* (TVC) is a map of partially ordered *times* to *collection versions*, where each collection version is a multiset of typed *pieces of data*. Because the past and future of a TVC may be unknown, Materialize represents a TVC as a *partial TVC* (pTVC), which is an accurate representation of a TVC during a range of times bounded by two *frontiers*: a lower frontier `since`, and an upper frontier `upper`. To avoid storing and processing the entire collection version for every single time, Materialize uses sets of *updates* as a differential representation of a pTVC. These updates are triples of `[data, time, diff]`: the piece of data being changed, the time of the change, and the change in multiplicity of that data. Materialize can *compact* a pTVC by merging updates from the past, and *append* new updates to a pTVC as they arrive. Compaction advances the `since` frontier of the pTVC, and appending advances the `upper` frontier. Both compact and append preserve correctness: these differential representations of a pTVC are faithful to the "real" TVC between `since` and `upper`. Materialize identifies each TVC by a unique `GlobalId`, and maintains a mutable map of `GlobalId`s to the current pTVC representing that `GlobalID`'s TVC. Internally, Materialize is separated into three layers: *storage*, *compute*, and *adapter*. The storage layer records updates from (e.g.) external data sources. The compute layer provides materialized views: new TVCs which are derived from other TVCs in some way. The ability to write and read to these layers is given by a first-class *capability* object, which constrains when pTVCs can be compacted and appended to. Users issue SQL queries to Materialize, but the storage and compute layers are defined in terms of low-level operations on update triples. The *adapter* translates these SQL queries into low-level commands. Since SQL does not (generally) include an explicit model of time, the adapter picks timestamps for SQL statements using a stateful *timeline* object. To map table names to `GlobalIds` and capabilities, Materialize maintains a *source* mapping. SQL indices are maintained by the compute layer. Finally: Materialize's adapter supports SQL transactions. # In depth ## Data The term *piece of data*, in this document, means a single, opaque piece of arbitrary typed data. For example, one might use a piece of data to represent a single row in an SQL database, or a single key-value pair in a key-value store. In this document, we often denote a piece of data as `data`. However, we want to emphasize that as far as updates are concerned, each piece of data is an opaque, atomic object rather than a composite type. When we describe an update to a row, that data includes the entire new row, not just the columns inside it which changed. We say "piece of data" instead of "datum" because "datum" has a different meaning in Materialize's internals: it refers to a cell within a row. In the CIDR differential dataflow paper, a piece of data is called a "record". ## Times A *Timeline* is a unique ID (for disambiguating e.g. two sets of integers), a set of *Materialize times* `T`, together with a *meet* (greatest lower bound) and *join* (least upper bound), which together form a [lattice](https://en.wikipedia.org/wiki/Lattice_(order)). ``` meet(t1, t2) => t3 join(t1, t2) => t3 ``` As with all lattices, both `meet` and `join` must be associative, commutative, and idempotent. A *time* is an element of `T`. We define a partial order `<=t` over times in the usual way for join semilattices: for any two times `t1` and `t2` in `T`, `t1 <=t t2` iff `join(t1, t2) = t2`. We also define a strict partial order over times ` multiplicity}`. Multiplicities may be negative: partially ordered delivery of updates to collection versions requires commutativity. However, users generally don't see collection versions with negative multiplicities. The *empty* collection version `cv0` is an empty multiset. ## Time-varying collections A *time-varying collection* (TVC) represents the entire history of a collection which changes over time. At any time, it has an effective value: a collection version. Each TVC is associated with a single timeline, which governs which times it contains, and how those times relate to one another. Whenever we discuss a Materailize time in the context of a TVC, we mean a time in that TVC's timeline. We model a TVC as a pair of `[timeline, collection-versions]`, where `collection-versions` is a function which maps a time to a single collection version, for all times in `timeline`. A *read* of a TVC `tvc` at time `t` means the collection version stored in `tvc` for `t`. We write this as `read(tvc, t) => collection-version`. ## Partial time-varying collections At any wall-clock time a TVC is (in general) unknowable. Some of its past changes have been forgotten or were never recorded, some information has yet to arrive, and its future is unwritten. A *partial time-varying collection* (pTVC) is a partial representation of a corresponding TVC which is accurate for some range of times. We model this as a timeline, a function of times to collection versions, and two frontiers: *since* and *upper*: `ptvc = [timeline, collection-versions, since, upper]`. Informally, the `since` frontier tells us which versions have been lost to the past, and the `upper` frontier tells us which versions have yet to be learned. At times between `since` and `upper`, a pTVC should be identical to its corresponding TVC. ![A diagram showing a pTVC as a small region of a larger TVC](assets/tvc-ptvc.jpeg) We think of `since` as the "read frontier": times not later than or equal to `since` cannot be correctly read. We think of `upper` as the "write frontier": times later than or equal to `upper` may still be written to the TVC. A *read* of a pTVC `ptvc` at time `t` is the collection version in `ptvc` which corresponds to `t`. We write this as `read(ptvc, t) => collection-version`. We say that a read from a pTVC is *correct* when it is exactly the same collection version as would be read from the pTVC's corresponding TVC: `read(ptvc, t) = read(tvc, t)`. All pTVCs ensure an important correctness invariant. Consider a pTVC `ptvc` corresponding to some TVC `tvc`, and with `since` and `upper` frontiers. For all times `t` such that `t` is later than or equal to `since`, and `t` is *not* later than or equal to `upper`, reads of `ptvc` at `t` are correct: `read(ptvc, t) = read(tvc, t)`. In the context of a pTVC, we say that such a time `t` is *between* `since` and `upper`. ![A diagram showing past, readable, and future times in a pTVC.](assets/since-readable-upper.jpeg) In this diagram, times are shown as a two-dimensional lattice forming a partial order; time generally flows left to right, and the precise order is shown by arrows. The `since` frontier is formed by two times labeled "since"; the `upper` frontier is formed by two times labeled "upper". Times later than `upper` are drawn in orange: these times are in the unknown future of the pTVC. Times not later than or equal to `since` are drawn in gray: these times are in the forgotten past. Only times in blue--those between `since` and `upper`--can be correctly read. ## Update-set representations of TVCs and pTVCs Because the set of times is in general very large, and because TVCs usually change incrementally, it is usually more efficient to represent a TVC as a set of *updates* which encode changes between collection versions. An *update* is a triple `[data, time, diff]`. The `data` specifies what element of the collection version is changing, the `time` gives the time the change took place, and the `diff` is the change in multiplicity for that element. Given a collection version `cv1` and an update `u = [data, time, diff]`, we can *apply* `u` to `cv1` to produce a new collection version `cv2`: `apply(cv1, u) => cv2`. `cv2` is exactly `cv1`, but with `multiplicity(cv2, data) = multiplicity(cv1, data) + diff`. We can also apply a countable set of updates to a collection version by applying each update in turn. We write this `apply(collection, updates)`. Since `apply` is commutative, the order does not matter. In practice, these are computers: all sets are countable. (If you're wondering about uncountable sets, the [Möbius inversion](https://en.wikipedia.org/wiki/M%C3%B6bius_inversion_formula) might be helpful.) Given a time `t` and two collection versions `cv1` and `cv2`, we can find their *difference*: a minimal set of updates which, when applied to `cv1`, yields `cv2`. ``` diff(t, cv1, cv2) = { u = [data, t, diff] | (data in cv1 or data in cv2) and (diff = multiplicity(cv2, data) - multiplicity(cv1, data)) and (diff != 0) } ``` ### Isomorphism between update and time-function representations Imagine we have a TVC represented as a set of updates, and wish to read the collection version at time `t`. We can do this by applying all updates which occurred before or at `t` to the empty collection version. This lets us go from an update-set representation to a time-function representation. ``` read(updates, t) = apply(cv0, { u = [data, time, diff] | u in updates and time <=t t }) ``` Similarly, given a time-function representation `tvc`, we can obtain an update-set representation by considering each point in time and finding the difference between the version at that time and the sum of all updates from all prior times. If we take Materialize times as the natural numbers, this is simple: the update set for a given time `t` is simply the difference between the version at `t-1` and the version at `t`. ``` updates(tvc, t) = diff(t, read(tvc, t - 1), read(tvc, t)) ``` In general times may be partially ordered, which means there might be *multiple* prior times; there is no single diff. Instead, we must sum the updates from all prior times, and diff that sum with the collection version at `t`: ``` updates(tvc, t) = diff(t, apply(cv0, prior-updates(tvc, t)), read(tvc, t)) ``` The updates prior to `t` are simply the union of the updates for all times prior to `t`: ``` prior-updates(tvc, t) = ⋃ updates(tvc, prior-t) for all prior-t (ReadCapability, WriteCapability)`: binds to `id` the TVC described by `description`. The command returns capabilities naming `id`, with frontiers set to initial values chosen by the Storage layer. The collection associated with `id` is based on `description`, but it is up to the Storage layer to determine what the source's contents will be. A standarad example is a Kafka topic, whose contents are "added" to the underlying collection as they are observed. The Storage layer only needs to ensure that it can reliably produce the updates between `since` and `upper`, and otherwise the nature of the content is up for discussion. It is an error to re-use a previously used `id`. * `SubscribeAt(ReadCapability(id, frontier))`: returns a snapshot of `id` at `frontier`, followed by an ongoing stream of subsequent updates. This command returns the contents of `id` as of `frontier` once they are known, and updates thereafter once they are known. The snapshot and stream contain in-line statements of the subscription's `upper` frontier: those times for which updates may still arrive. The subscription will produce exactly correct results: the snapshot is the TVCs contents at `frontier`, and all subsequent updates occur at exactly their indicated time. This call may block, or not be promptly responded to, if `frontier` is greater or equal to the current `upper` of `id`. The subscription can be canceled by either endpoint, and the recipient should only downgrade their read capability when they are certain they have all data through the frontier they would downgrade to. A subscription can be constructed with additional arguments that change how the data is returned to the user. For example, the user may ask to not receive the initial snapshot, rather that receive and then discard it. For example, the user may provide filtering and projection that can be applied before the data are transmitted. * `CloneCapability(Capability) -> Capability'`: creates an independent copy of the given capability, with the same `id` and `frontier`. * `DowngradeCapability(Capability, frontier')`: downgrades the given capability to `frontier'`, leaving its `id` unchanged. Every time in the new `frontier'` must be greater than or equal to the current `frontier`. * `DropCapability(Capability)`: downgrades a capability to the final frontier `{}`, rendering it useless. * `Append(WriteCapability(id, frontier), updates, new_frontier)`: applies `updates` to `id` and downgrades `frontier` to `new_frontier`. All times in `updates` must be greater or equal to `frontier` and not greater or equal to `new_frontier`. It is probably an error to call this with `new_frontier` equal to `frontier`, as it would mean `updates` must be empty. This is the analog of the `INSERT` statement, though `updates` are signed and general enough to support `DELETE` and `UPDATE` at the same time. * `AcquireCapabilities(id) -> (ReadCapability, WriteCapability)`: provides capabilities for `id` at its current `since` and `upper` frontiers. This method is a best-effort attempt to regain control of the frontiers of a collection. Its most common uses are to recover capabilities that have expired (leases) or to attempt to read a TVC that one did not create (or otherwise receive capabilities for). If the frontiers have been fully released by all other parties, this call may result in capabilities with empty frontiers (which are useless). The Storage layer provides its output through `SubscribeAt` and `AcquireCapabilities`, which reveal the contents and frontiers of the TVC. All commands are durable upon the return of the invocation. ## Compute The Compute layer presents one additional API command. The layer also intercepts commands for identifiers it introduces. * `MaintainView([ReadCapabilities], view, [WriteCapabilities]) -> [ReadCapabilities]`: installs a view maintenance computation described by `view`. Each view is described by a set of input read capabilities, identifying collections and a common frontier. The `view` itself describes a computation that produces a number of outputs collections. Optional write capabilities allow the maintained view to be written back to Storage. The method returns read capabilities for each of its outputs. The Compute layer retains the read capabilities it has been offered, and only downgrades them as it is certain they are not needed. For example, the Compute layer will not downgrade its input read capabalities past the `since` of its outputs. The outputs of a dataflow may or may not make their way to durable TVCs in the Storage layer. All commands are durable upon return of the invocation. ### Constraints A view introduces *constraints* on the capabilities and frontiers of its input and output identifiers. * The view holds a write capability for each output that does not advance beyond the input write capabilities. This prevents the output from announcing it is complete through times for which the input may still change. The output write capabilities are advanced through timely dataflow, which tracks outstanding work and can confirm completeness. * The view holds a read capability for each input that does not advance beyond the output read capabilities. This prevents the compaction of inputs updates into a state that prevents recovery of the outputs. Outputs may need to be recovered in the case of failure, but also in other less dramatic query migration scenarios. These constraints are effected using the capability abstractions. The Compute layer acquires and maintains read capabilities for collections managed by it, and by the Storage layer. # Adapter The adapter layer translates SQL statements into commands for the Storage and Compute layers. The most significant differences between SQL statements and commands to the Storage and Compute layers are: 1. The absence of explicit timestamps in SQL statements. A `SELECT` statement does not indicate *when* it should be run, or against which version of its input data. The Adapter layer introduces timestamps to these commands, in a way that provides the appearance of sequential execution. 2. The use of user-defined and reused names in SQL statements rather than `GlobalId` identifiers. SQL statements reference user-assigned names that may be rebound over time. The meaning, and even the validity, of a query depends on the current association of user-defined name to `GlobalId`. The Adapter layer maintains a map from user-assigned names to `GlobalId` identifiers, and introduces new ones as appropriate. Generally, SQL is "more ambiguous" than the Storage and Compute layers, and the Adapter layer must resolve that ambiguity. The SQL commands are a mix of DDL (data definition language) and DML (data manipulation language). In Materialize today, the DML commands are timestamped, and the DDL commands are largely not (although their creation of `GlobalId` identifiers is essentially sequential). That DDL commands are not timestamped in the same sense as a pTVC is a known potential source of apparent consistency issues. While it may be beneficial to think of Adapter's state as a pTVC, changes to its state are not meant to cascade through established views definitions. Commands acknowledged by the Adapter layer to a user are durably recorded in a total order. Any user can rely on all future behavior of the system reflecting any acknowledged command. ## Timestamp Oracle The adapter assigns timestamps to DML commands using a *timestamp oracle*. The timestamp oracle is a stateful, linearizable object which (at a high gloss) provides a simple API: ``` AssignTimestamp(command) -> timestamp ``` Since the timestamp oracle is linearizable, all calls to AssignTimestamp appear to take place in a total order at some point between the invocation and completion of the call to `AssignTimestamp`. Along the linearization order, returned timestamps satisfy several laws: 1. The timestamps never decrease. 2. The timestamps of INSERT/UPDATE/DELETE statements are strictly greater than those of preceding SELECT statements. 3. The timestamps of SELECT statements are greater than or equal to the read capabilities of their input collections. 4. The timestamps of INSERT/UPDATE/DELETE statements are greater than or equal to the write capabilities of their target collection. The timestamps from the timestamp oracle do not need to strictly increase, and any events that have the same timestamp are *concurrent*. Concurrent events (at the same timestamp) first modify collections and then read collections. All writes at a timestamp are visible to all reads at the same timestamp. Some DML operations, like UPDATE and DELETE, require a read-write transaction which prevents other writes from intervening. In these and other non-trivial cases, we rely on the total order of timestamps to provide the apparent total order of system execution. --- Several commands support an optional `AS OF