Currently, results of SELECT queries are sent back from the cluster to
environmentd
via the compute protocol and these results are fully
materialized (stored in memory) in environmentd
before sending them out to
the client.
This has several implications:
environmentd
limits the size of
results we can returnIn practice the above make it so we limit the size of results using a
max_result_size
parameter, and some larger customers are chafing against
that.
Today, we always materialize the results of a query in environmentd
memory
before sending them on to the client. This is only required for some types of
queries, though: only queries that require post processing need
materialization while queries that don't need post processing could be streamed
through to the client. Queries that need an ordering established on the whole
result require post processing, which today are only those queries that have an
ORDER BY. This is handled by RowSetFinishing
in the code.
Below, we use streamable queries for queries that don't require post processing, and non-streamable queries for those that do require it.
We think it is easier to lift the result size limitation for streamable
queries, so want to approach that first, but there are ways we can lift the
limitation for non-streamable queries as well, for example by moving the post
processing to the cluster side or by implementing an approach where we can sort
and merge large results within bounded memory on the environmentd
side.
There are two ways of getting query results and sending them from the cluster
back to environmentd
:
This leads to there being at least three ways a SELECT can be executed from the
environmentd
side:
clusterd
side and is
shipped back using the same mechanism that is used for shipping back other
SELECT results.1 and 2 here imply that the result must fit into memory on the cluster side, unless we want to, say, change how results can be extracted on the cluster side.
The user facing goal is:
And we have technical goals as well, enabling potential future work and making sure we have the right abstractions in place:
environmentd
. Or we read out of an existing arrangementPurposefully high level, but the idea is that we a) need to stop sending large
amounts of data through the compute protocol, and b) need to stop materializing
large results in environmentd
memory.
This is the approach:
Peek Stash System: We create a new "peek stash" system that uses persist batches to store large query results out-of-band from the compute protocol.
Dynamic Threshold: We have a configurable threshold
(peek_response_stash_threshold_bytes
) that determines when to use the
stash vs. sending results inline via the compute protocol.
Streaming Architecture: Results will be streamed from persist to
environmentd
and then to the client, avoiding full materialization in
environmentd
memory.
compute_state.rs
we add PendingPeek::Stash
for
async stashing operationsConfigurable parameters: Multiple system variables control the behavior:
enable_compute_peek_response_stash
: Feature flag (default: false)peek_response_stash_threshold_bytes
: Size threshold for using stashpeek_stash_batch_size
: Batch size for row processingpeek_stash_num_batches
: Number of batches to process per pump cyclepeek_response_stash_batch_max_runs
: Max runs per persist batch, for
controller consolidation on the worker, which reduces work in
environmentd
Metrics: We add stashed_peek_seconds
histogram to track performance
Response handling: New PeekResponse::Stashed
variant contains persist
shard information that environmentd
uses to stream results back
PeekResponse::Stashed
sent back with shard metadataenvironmentd
streams results from persist to client incrementallyA selection of the non-goals from above:
These will require changing where and how we apply post processing. We would
want to apply it on the cluster side but that requires changing how we extract
results. That is, we shouldn't extract results from an index anymore but
instead install a dataflow fragment that does the extraction and applies the
post processing, for example by shipping all results to one worker before
shipping results back to environmentd
, possible also via the persist blob
store.
Don't use persist blob store for these results. Arguably, the above idea is slightly abusing the persist blob store for other purposes, but it's a thing that we have and works. We could add another protocol that allows sending back larger responses out-of-band. Or add a separate blob store that is tailor-made for handling results.
Using persist blob store and the existing code around Batch
has the benefit
that we're re-using a lot of existing infrastructure/code.
clusterd
out via balancerd
to the clientWe would need to decide where/how we eventually apply ORDER BY, LIMIT and friends. And it would be a lot more work/uncertain how much work it is.
Moritz pointed out this idea: instead of implementing new code on the
compute/clusterd
side for stashing results as persist batches we would do
more work on the adapter side. When we expect a large query result we don't use
the normal SELECT paths but instead render a temporary materialized view (which
writes results directly to persist) and then stream results from that out to
the client.
I'd open to being convinced, but the challenges I see with this are: