20250415_large_select_result_size.md 9.0 KB

Large Select Result Size

Context

Currently, results of SELECT queries are sent back from the cluster to environmentd via the compute protocol and these results are fully materialized (stored in memory) in environmentd before sending them out to the client.

This has several implications:

  • Sending large results "clogs up" the cluster/controller communication
  • The amount of memory we want to give to environmentd limits the size of results we can return

In practice the above make it so we limit the size of results using a max_result_size parameter, and some larger customers are chafing against that.

Today, we always materialize the results of a query in environmentd memory before sending them on to the client. This is only required for some types of queries, though: only queries that require post processing need materialization while queries that don't need post processing could be streamed through to the client. Queries that need an ordering established on the whole result require post processing, which today are only those queries that have an ORDER BY. This is handled by RowSetFinishing in the code.

Below, we use streamable queries for queries that don't require post processing, and non-streamable queries for those that do require it.

We think it is easier to lift the result size limitation for streamable queries, so want to approach that first, but there are ways we can lift the limitation for non-streamable queries as well, for example by moving the post processing to the cluster side or by implementing an approach where we can sort and merge large results within bounded memory on the environmentd side.

Technical Context

There are two ways of getting query results and sending them from the cluster back to environmentd:

  1. Extracting from an arrangement (the data structure backing indexes)
  2. Extracting directly from persist

This leads to there being at least three ways a SELECT can be executed from the environmentd side:

  1. Fast-path SELECT: there is an existing arrangement (index) that we can extract our result from.
  2. Slow-path SELECT: in order to get our result we first need to render a temporary dataflow with an index that will contain the results. Then we can use the same logic as a fast-path SELECT to extract the result. Afterwards the temporary dataflow is torn down again.
  3. Fast-path persist SELECT: The query is simple enough that we can directly extract it from a persist shard. With potentially some amount of map-filter-project applied. This happens on the clusterd side and is shipped back using the same mechanism that is used for shipping back other SELECT results.

1 and 2 here imply that the result must fit into memory on the cluster side, unless we want to, say, change how results can be extracted on the cluster side.

Goals

The user facing goal is:

  • Lift result-size limitation for streamable queries

And we have technical goals as well, enabling potential future work and making sure we have the right abstractions in place:

  • Create re-usable building blocks for shipping data/results out-of-band, these can then be used in other places, such as for SUBSCRIBE
  • Make persist Batches more of a "first-class" citizen, eventually allowing us to decouple where and how batches are written from where and how they are appended to a shard

Non-Goals

  • Lift limitation on result having to fit into cluster memory: for non-persist queries, the result is still first staged in an arrangement before we read it out and send it to environmentd. Or we read out of an existing arrangement
  • Change how results are extracted from "the dataflows" on the cluster side
  • Lift result-size limitation for non-streamable queries
  • Change result-size limitations for SUBSCRIBE
  • Change how we ship around and append data to shards, when handling INSERT
  • Change how we ship around and append data for sources

Implementation

Purposefully high level, but the idea is that we a) need to stop sending large amounts of data through the compute protocol, and b) need to stop materializing large results in environmentd memory.

This is the approach:

  1. Peek Stash System: We create a new "peek stash" system that uses persist batches to store large query results out-of-band from the compute protocol.

  2. Dynamic Threshold: We have a configurable threshold (peek_response_stash_threshold_bytes) that determines when to use the stash vs. sending results inline via the compute protocol.

  3. Streaming Architecture: Results will be streamed from persist to environmentd and then to the client, avoiding full materialization in environmentd memory.

Key Implementation Details:

  • New enum variant: In compute_state.rs we add PendingPeek::Stash for async stashing operations
  • Background processing: Uses async tasks to pump rows into persist batches while the main worker thread continues other work and periodically pumps rows from the arrangement to the async task
  • Configurable parameters: Multiple system variables control the behavior:

    • enable_compute_peek_response_stash: Feature flag (default: false)
    • peek_response_stash_threshold_bytes: Size threshold for using stash
    • peek_stash_batch_size: Batch size for row processing
    • peek_stash_num_batches: Number of batches to process per pump cycle
    • peek_response_stash_batch_max_runs: Max runs per persist batch, for controller consolidation on the worker, which reduces work in environmentd
  • Metrics: We add stashed_peek_seconds histogram to track performance

  • Response handling: New PeekResponse::Stashed variant contains persist shard information that environmentd uses to stream results back

Flow:

  1. Query starts execution normally
  2. If result size exceeds threshold and query is streamable, switch to stash mode
  3. Background task writes results to persist batches (under a temporary shard ID that will never turn into an actual shard)
  4. PeekResponse::Stashed sent back with shard metadata
  5. environmentd streams results from persist to client incrementally

Future Work

A selection of the non-goals from above:

  • Lift result-size limitation for non-streamable queries
  • Change how results are extracted from "the dataflows" on the cluster side

These will require changing where and how we apply post processing. We would want to apply it on the cluster side but that requires changing how we extract results. That is, we shouldn't extract results from an index anymore but instead install a dataflow fragment that does the extraction and applies the post processing, for example by shipping all results to one worker before shipping results back to environmentd, possible also via the persist blob store.

Alternatives

Add a result-specific blob store (or similar)

Don't use persist blob store for these results. Arguably, the above idea is slightly abusing the persist blob store for other purposes, but it's a thing that we have and works. We could add another protocol that allows sending back larger responses out-of-band. Or add a separate blob store that is tailor-made for handling results.

Using persist blob store and the existing code around Batch has the benefit that we're re-using a lot of existing infrastructure/code.

Come up with a protocol for streaming results straight from clusterd out via balancerd to the client

We would need to decide where/how we eventually apply ORDER BY, LIMIT and friends. And it would be a lot more work/uncertain how much work it is.

Use temporary materialized views for large results

Moritz pointed out this idea: instead of implementing new code on the compute/clusterd side for stashing results as persist batches we would do more work on the adapter side. When we expect a large query result we don't use the normal SELECT paths but instead render a temporary materialized view (which writes results directly to persist) and then stream results from that out to the client.

I'd open to being convinced, but the challenges I see with this are:

  • When setting off the query we don't yet know whether the result is big. The main proposal would normally send results back via the compute protocol but fall back to out-of-band when we notice that the result is too big. We can't do that with the materialized view approach.
  • The optimizer/rendering treat peeks differently from long-running dataflows (indexes and materialized views): we use the fact that the input is monotonic and that we don't have to maintain an arrangement for multiple times, continually. We would loose those optimizations.
  • One of the technical goals is to create the building blocks for decoupling writing data and appending data. I think this will be useful for future user-facing projects, and we wouldn't build it now if we go with another approach.

Open Questions

  • Is this even a good idea? One could argue that materialize is not meant to serve big multi-gigabyte results from SELECT queries and that instead customers should use other means for extracting larger amounts of data.