# Self Describing Persist Batches - Associated: - [write Schema-ified data blobs #24830](https://github.com/MaterializeInc/database-issues/issues/7411) - [persist: schema evolution](https://github.com/MaterializeInc/database-issues/issues/4818) - [Table support for push sources](https://github.com/MaterializeInc/database-issues/issues/6896) - [[dnm] columnar: Write columnar encodings as part of a Batch](https://github.com/MaterializeInc/materialize/pull/26120) - [[dnm] columnar: Array support](https://github.com/MaterializeInc/materialize/pull/25848) ## The Problem We want Persist to have an understanding of the data it contains. The main motivating factor for this is to support schema migrations, concretely, being able to run `ALTER TABLE ... ADD COLUMN ...` or `ALTER TABLE ... DROP COLUMN ...`. Today Persist writes Parquet files with the following schema: ``` { "k": bytes, "v": bytes, "t": i64, "d": i64, } ``` Where `"k"` is a `Row` (technically `SourceData`) encoded as Protobuf (via the [`Codec`](https://github.com/MaterializeInc/materialize/blob/d0aa5b7d0b47e55cf4e211e507116a41cb7f8680/src/persist-types/src/lib.rs#L39) trait) and `"v"` is always the unit type `()`, which gets encoded as an empty byte array. The root of the problem is `Row` is not self describing, we need some extra information (i.e. a `RelationDesc`) to know what columns the `Datum`s in a `Row` map to. Consider the following scenario: ```sql CREATE TABLE t1 ('a' text, 'b' int); INSERT INTO t1 VALUES ('hello', 5); ALTER TABLE t1 DROP COLUMN 'b'; ALTER TABLE t1 ADD COLUMN 'b' int DEFAULT 50; DELETE FROM t1 WHERE 'b' = 5; ``` To properly handle the `DELETE` we need enough information in the persisted batch to realize that the `Row('hello', 5)` initially inserted should __not__ be deleted because the column with the value `5` corresponds to the previous column named `b`. ## Success Criteria * Batches within Persist are self-describing. * Unblock work for the following projects: * Evolving the schema of a Persist shard (e.g. adding columns to tables). * User defined sort order of data, i.e. `PARTITION BY`. * Only fetch the columns from a shard that are needed, i.e. projection pushdown. * Make `persist_source` faster [#25901](https://github.com/MaterializeInc/database-issues/issues/7726). ## Out of Scope * Detailed logic for migrating the schema of a batch. * The following design doc will touch upon how this migration could work, but is a large enough issue to warrant its own design doc. * Design or implementation of any "unblocked" features. ## Solution Proposal Require that data written to Persist (e.g. `Row`s) be transformed into [Apache Arrow](https://arrow.apache.org/) data types, then use this columnar format to write [Apache Parquet](https://parquet.apache.org/) to S3. ### Apache Arrow and Parquet Arrow is a relatively new in-memory columnar data format. It is designed for efficient (e.g. SIMD vectorized) analytical operations and to be a standard that all "data libraries" can interface with. Persist currently collects statistics for filter-pushdown using Arrow. Parquet is columnar format designed for efficient storage based off of the [Dremel paper from Google](https://research.google/pubs/dremel-interactive-analysis-of-web-scale-datasets-2/) and built as a [collaboration between Twitter and Cloudera for Hadoop](https://blog.twitter.com/engineering/en_us/a/2013/announcing-parquet-10-columnar-storage-for-hadoop). It's goal is space-efficient storage of data that allows fetching individual columns from a collection of rows. Crucially it is also self describing and supports additional arbitrary metadata. Parquet also has a number of nice features that we won't use immediately but could enable fairly easily: * Tracks the size and location of columns within the Parquet file, designed for projection pushdown. * Can collect statistics on the data in the Parquet file, which would supplement our own filter pushdown. * Can store arbitrary metadata for a file. This could be a way we spill our own collected stats to S3. * Dictionary encoding out of the box. For columns with repeated values (e.g. time or diff) could result in space savings. * Compression. Individual columns can be compressed with Snappy, gzip, Brotli, or LZ4. > **Note:** Arrow and Parquet are pretty tightly coupled, mapping from one format to the other is already handled by any open-source library we would use. For the most part our codebase will interact with Arrow, only the lowest layer of Persist having to know about Parquet. ### Columnar Encodings Currently we durably persist `Datum`s using the [`ProtoDatum`](https://github.com/MaterializeInc/materialize/blob/d0aa5b7d0b47e55cf4e211e507116a41cb7f8680/src/repr/src/row.proto#L32-L69) protobuf message. Arrow's data types are not as rich as protobufs, so we need to figure out how exactly we would represent `Datum`s in an Arrow format. These decisions are not set in stone though, we describe below how migrations to other Arrow data types would be possible, but it's less work if we get this right from the start.
Scalar Type | Rust Representation | Arrow Array | Notes |
Numeric |
```rust
struct Decimal |
``` StructArray<{ lossy: f64, actual: Bytes, }> ``` |
Encoded as two values, a lossy `f64` for sorting and filtering, then a
serialized representation of `Decimal` as opaque bytes.
See |
Date | ```rust struct Date { // Days since the Posgres Epoch. days: i32, } ``` |
`PrimitiveArray |
Directly encode the number of days since the UNIX Epoch (1970-01-01). > **Alternative:** We could encode this as number of days since the Postgres Epoch so it would be a direct representation of the Rust type, but I'm leaning towards encoding as days since the UNIX epoch for consistency with `Timestamp` which does is also relative to the UNIX epoch. The max value supported for `Date` in Postgres is the year 5,874,897 AD which can be represented with either offset. |
Time | ```rust struct NaiveTime { secs: u32, frac: u32, } ``` | `FixedSizeBinary[8]` | Represented as the `secs` field and `frac` field encoded in that order as big-endian. > **Alternative:** We could represent this as number of nanoseconds since midnight which is a bit more general but is a more costly at runtime for encoding. Ideally Persist encoding is a fast as possible so I'm leaning towards the more direct-from-Rust approach. > Note: We only need 47 bits to represent this total range, leaving 19 bits unused. In the future if we support the `TIMETZ` type we could probably also represent that in a `u64`, using these extra bits to store the timezone. |
Timestamp | ```rust struct NaiveDateTime { date: NaiveDate { // (year << 13) | day ymdf: i32, }, time: NaiveTime, } ``` |
`PrimitiveArray |
`chrono` (our underlying date time library) uses a more memory efficient encoding of date by squeezing both year and day into a single `i32`, combined with a `NaiveTime` this ends up being 12 bytes. We can repesent this same range of time as the number of microseconds since the UNIX epoch in an `i64`. Postgres does something very similar, the only difference is it uses an offset of 2000-01-01. |
TimestampTz |
```rust
struct DateTime |
`PrimitiveArray |
Just like Timestamp, we'll encode this as the number of microseconds since the UNIX epoch. We don't actually need to store any timezone information, instead we convert to the session timezone when loaded. This is how Postgres works. |
Interval | ```rust struct Interval { months: i32, days: i32, micros: i64, } ``` | `FixedSizeBinary[16]` | Represented by encoding the `months`, `days`, and `micros` fields encoded as big endian. > **Alternative:** The smallest possible representation for interval would be 11 bytes, or 12 if we don't want to do bit swizzling. But other than space savings I don't believe there is a benefit to this approach. In fact it would incur some computational overhead to encode and there are no benefits from a SIMD perspective either. > **Alternative:** We could represent `Interval`s in a `StructArray` but we don't expose the internal details of `Interval` so this wouldn't aid in filtering or pushdown. The only benefit of structuring an interval would be for space reduction if we enable dictionary encoding. |
Jsonb |
```rust
// JsonbRef<'a>(Datum<'a>)
enum Value {
Null,
Boolean(bool),
Number(f64),
String(String),
Array(Vec |
`BinaryArray` | Serialize JSON with the existing protobuf types, i.e. ProtoDatum, and store this binary blob. > **Structured Data:** An option is to structure the JSON data using an Arrow Union type. What is nice about this approach is it would allow us to do some form of projection pushdown on the JSON data. The main issue though is Arrow does not really support recursive data types. In fact, it is impossible to statically define the above `Value` enum in Arrow. The only option is to dynamically generate a DataType/schema given a column of values, see [1] for an example of this approach. I don't believe dynamically generating the schema is a good option because it is relatively complex, and we would end up with arbitrarily deep schemas based on user provided data. The arbitrarily deep schemas particularly concerns me because it would have unpredictable performance. > **Alternative:** An alternative to fully structing the data is structuing it with a depth limit. For example, structuring up-to X levels deep, and then binary encoding the rest. This gets us predictable performance with the ability to do limited pushdown, at the cost of code complexity. This is probably the best approach in the long term, but in my opinion the additional technical complexity makes it out-of-scope for the initial implementation. > **Alternative:** Instead of serializing the JSON data with protobuf, we could use a different serialization format like [BSON](https://bsonspec.org/). This approach is nice because it gets us a path to entirely eliminating `ProtoDatum` (🔥) but I am slightly leaning away from this given Protobuf is already used so heavily in our codebase. If we do use a different serialization format we'll need to be careful about how we encode numeric data, currently our JSON `Datum` uses `ProtoNumeric` which has very high precision. I am leaning away from this approach because we already use protobuf internally so it's well understood, and there are a few tricks we can use to improve deserialization to greatly improve our performance, e.g. zero-copy strings, lazy deserialization, and skipping fields we don't care about. [1] https://gist.github.com/ParkMyCar/594f647a1bc5a146bb54ca46e6e95680 |
UUID | ```rust extern crate uuid; uuid::Uuid([u8; 16]) ``` | `FixedSizeBinary[16]` | Encode the bytes from the `Uuid` directly into a fixed size buffer. |
Array | ```rust struct Array { elements: DatumList, dims: ArrayDimensions, } ``` |
```
ArrayDimensions: StructArray<{
lower_bound: i64,
length: u64,
}>
Array: StructArray<{
elements: VariableListArray |
Store all arrays (including multidimensional) linearly in Row-major order, with their metadata structured. Arrays are a bit tricky, their shape must be rectangular but all of the values in a column don't need to have the same shape, and users can specify a logical lower bound other than 1. For example, the following is valid: ```sql CREATE TABLE t1 (a int[]); INSERT INTO t1 VALUES (ARRAY[1]), (ARRAY[ARRAY[2], ARRAY[3]]); ``` Even though column `a` is defined as a single dimension `int[]`, it's valid to insert a multi-dimensional array. This is because arrays in Postgres are all a single type, in other words, `int[]` and `int[][][]` are the same type. > **Alternative:** We could binary encode the `ArrayDimensions` data but the Arrow types aren't too complex, so it's not clear that this would definitely be a better solution. |
List |
```rust
// DatumList<'a>.
Vec |
`VariableSizeList |
A list of values. > **Note:** Unlike `Array`, all the values in a column of `List`s must have the same number of dimensions. Also internally [Arrow represents nested lists](https://arrow.apache.org/docs/format/Columnar.html#list-layout) in a Row-major format. |
Record | ```rust Vec<(ColumnName, ColumnType)> ``` | `StructArray` | All Record types have the same schema, so at creation time we can define the schema of the column. This is different than JSON where all values can have a different schema/shape. |
Map |
```rust
// DatumMap<'a>.
HashMap |
`MapArray` | The Arrow spec does not include the concept of a Map but the `arrow2` and `arrow-rs` crates have a `MapArray` type that is a list of tuples. > **Alternative:** We could encode maps to some binary format, e.g. proto, and store them as a binary blob. While this might be simpler it prevents us from being able to push down optimizations into the map. |
MzTimestamp | ```rust struct MzTimestamp(u64); ``` |
`PrimitiveArray |
Number of milliseconds since the UNIX epoch. |
Range |
```rust
struct Range |
``` RangeBound: StructArray<{ inclusive: bool, bound: T, }> ``` ``` Range: StructArray<{ lower: RangeBound, upper: RangeBound, }> ``` | Structure the data as it is in Rust. Ranges seem pretty interesting and powerful, so Persist having an understanding of the data seems worthwhile for the long term. They could also be entirely unused (I'm not sure) in which case the complexity of encoding these in a structured way might not be worth it. > **Alternative:** Encode a Range into a binary format and store it as a blob. |
MzAclItem | ```rust struct MzAclItem { // String grantee: RoleId, // String grantor: RoleId, // u64 acl_mode: AclMode, } ``` | ``` StructArray<{ grantee: String, grantor: String, acl_mode: u64, }> ``` | Structure the data as it is in Rust. > **Alternative:** Encode an MzAclItem into a binary format and store it as a blob. |
AclItem | ```rust struct AclItem { // u32 grantee: Oid, // u32 grantor: Oid, // u64 acl_mode: AclMode, } ``` | ``` StructArray<{ grantee: u32, grantor: u32, acl_mode: u64, }> ``` | Structure the data as it is in Rust. > **Alternative:** It would be relatively easy to stitch together the three values that make up an `AclItem` into a `FixedSizeBinary<16>`, it should even sort the same as its Rust counterpart. |
Int2Vector |
```rust
Vec |
`VariableSizeList |
Structure the data as it is in Rust. |