title: "Partitioning and filter pushdown" description: "Declare how collections are stored." aliases:
A few types of Materialize collections are durably written to storage: materialized views, tables, and sources.
Internally, each collection is stored as a set of runs of data, each of which is sorted and then partitioned up into individual parts, and those parts are written to object storage and fetched only when necessary to satisfy a query. Materialize will also periodically compact the data it stores, to consolidate small parts into larger ones or discard deleted rows.
Using the PARTITION BY
option, you can specify the internal ordering that
Materialize will use to sort, partition, and store these runs of data.
A well-chosen partitioning can unlock optimizations like filter pushdown, which in turn can make queries and other operations more efficient.
{{< note >}}
The PARTITION BY
option has no impact on the order in which records are returned by queries.
If you want to return results in a specific order, use an ORDER BY
clause on your SELECT
statement.
{{< /note >}}
The option PARTITION BY <column list>
declares that a materialized view or table should be partitioned by the listed columns.
For example, a table that stores an append-only collection of events may want to partition the data by time:
CREATE TABLE events (event_ts timestamptz, body jsonb)
WITH (
PARTITION BY (event_ts)
);
This PARTITION BY
clause declares that events with similar event_ts
timestamps should be stored together.
When multiple columns are specified, rows are partitioned lexicographically.
For example, PARTITION BY (event_date, event_time)
would partition first by the created date;
if many rows have the same event_date
, those rows would be partitioned by the event_time
column.
Durable collections without a PARTITION BY
option can be partitioned arbitrarily.
{{< note >}}
The PARTITION BY
option does not mean that rows with different values for the specified columns will be stored in different parts, only that rows with similar values for those columns should be stored together.
{{< /note >}}
Materialize currently imposes some restrictions on the list of columns in the PARTITION BY
clause.
smallint
, integer
, and bigint
;date
, time
, timestamp
, timestamptz
, and mz_timestamp
;text
and bytea
;boolean
and uuid
;record
types where all fields types are supported.We intend to relax some of these restrictions in the future.
Suppose that our example events
table has accumulated years' worth of data, but we're running a query with a temporal filter that matches only rows with recent timestamps.
SELECT * FROM events WHERE mz_now() <= event_ts + INTERVAL '5min';
This query returns only rows with similar values for event_ts
: timestamps in the last five minutes.
Since we declared that our events
table is partitioned by event_ts
, that means all the rows that pass this filter will be stored in the same small subset of parts.
Materialize tracks a small amount of metadata for every part, including the range of possible values for many columns. When it can determine that none of the data in a part will match a filter, it will skip fetching that data from object storage. This optimization is called filter pushdown, and when you're querying with a selective filter against a large collection, it can save a great deal of time and computation.
Materialize will always try to apply filter pushdown to your query, but that filtering is usually only effective when similar rows are stored together. If you want to make sure that the filter pushdown optimization is effective for your query, you can:
PARTITION BY
clause on the relevant column to ensure that data with similar values for that column are stored close together.Filters that consist of arithmetic, date math, and comparisons are generally eligible for pushdown, including all the examples in this page. However, more complex filters might not be. You can check whether the filters in your query can be pushed down using an EXPLAIN
statement. In the following example, we can be confident our temporal filter will be pushed down because it's present in the pushdown
list at the bottom of the output.
EXPLAIN SELECT * FROM events WHERE mz_now() <= event_ts + INTERVAL '5min';
----
Explained Query:
[...]
Source materialize.public.events
[...]
pushdown=((mz_now() <= timestamp_to_mz_timestamp((#0 + 00:05:00))))
Some common functions, such as casting from a string to a timestamp, can prevent filter pushdown for a query. For similar functions that do allow pushdown, see the pushdown functions documentation.
These examples create real objects. After you have tried the examples, make sure to drop these objects and spin down any resources you may have created.
For timeseries or "event"-type collections, it's often useful to partition the data by timestamp.
First, create a table called events
.
-- Create a table of timestamped events. Note that the `event_ts` column is
-- first in the column list and in the parition-by clause.
CREATE TABLE events (
event_ts timestamptz,
content text
) WITH (
PARTITION BY (event_ts)
);
Insert a few records, one "older" record and one more recent.
INSERT INTO events VALUES (now()::timestamp - '5 minutes', 'hello');
INSERT INTO events VALUES (now(), 'world');
Run a select statement against the data within the next five minutes. This should return only the more recent of the two rows.
SELECT * FROM events WHERE event_ts + '2 minutes' > mz_now();
To verify that Materialize fetched only the parts that contain data with the
recent timestamps, run an EXPLAIN FILTER PUSHDOWN
statement.
EXPLAIN FILTER PUSHDOWN FOR
SELECT * FROM events WHERE event_ts + '2 minutes' > mz_now();
If you wait a few minutes longer until there are no events that match the temporal filter, you'll notice that not only does the query return zero rows, but the explain shows that we fetched zero parts.
{{< note >}}
The exact numbers you see here may vary: parts can be much larger than a single row, and the actual level of filtering may fluctuate for small datasets as data is compacted together internally. However, datasets of a few gigabytes or larger should reliably see benefits from this optimization.
{{< /note >}}
Other datasets don't have a strong timeseries component, but they do have a clear notion of type or category. For example, suppose you have a collection of music venues spread across the world that you regularly query by a single country.
First, create a table called venues
, partitioned by country.
-- Create a table for our venue data.
-- Once again, the partition column is listed first.
CREATE TABLE venues (
country_code text,
id bigint,
name text
) WITH (
PARTITION BY (country_code)
);
Insert a few records with different country codes.
INSERT INTO venues VALUES ('US', 1, 'Rock World');
INSERT INTO venues VALUES ('CA', 2, 'Friendship Cove');
Query for venues in particular countries.
SELECT * FROM venues WHERE country_code IN ('US', 'MX');
Run EXPLAIN FILTER PUSHDOWN
to check that we're filtering out parts that don't include data that's relevant to the query.
EXPLAIN FILTER PUSHDOWN FOR
SELECT * FROM venues WHERE country_code IN ('US', 'MX');
{{< note >}}
As before, we're not guaranteed to see much or any benefit from filter pushdown on small collections... but for datasets of over a few gigabytes, we should reliably be able to filter down to a subset of the parts we'd otherwise need to fetch.
{{< /note >}}