Optimization is a slow process that is on the critical path for environmentd
startup times. Some
environments spend 30 seconds just on optimization. All optimization time spent in startup is
experienced downtime for users when environmentd
restarts.
Startup spends less than 1 second optimizing expressions.
The solution being proposed in this document is a cache of optimized expressions. During startup,
environmentd
will first look in the cache for optimized expressions and only compute a new
expression if it isn't present in the cache. If enough expressions are cached and the cache is fast
enough, then the time spent on this part of startup should be small.
The cache will present similarly as a key-value value store where the key is a composite of
The value will be a serialized version of the optimized expression. An environmentd
process with
build version n
, will never be expected to look at a serialized expression with a build
version m
s.t. n != m
. Therefore, there are no forwards or backwards compatibility needed on
the serialized representation of expressions. The cache will also be made durable so that it's
available after a restart, at least within the same build version.
Upgrading an environment will look something like this:
n
in read-only mode.n
.n
in read-write mode.Restarting an environment will look something like this:
n
in read-write mode.The catalog currently has an in-memory expression cache.
This cache is used to serve EXPLAIN
queries to ensure accurate and consistent responses. When an
index is dropped, it may change how an object would be optimized, but it does not change how the
object is currently deployed in a cluster. This cache contains the expressions that are deployed in
a cluster, but not necessarily the expressions that would result from optimization from the current
catalog contents.
Below is the API that the cache will present. It may be further wrapped with typed methods that take care of serializing and deserializing bytes. Additionally, we probably don't need a trait when implementing.
/// All the cached expressions for a single `GlobalId`.
///
/// Note: This is just a placeholder for now, don't index too hard on the exact fields. I haven't
/// done the necessary research to figure out what they are.
struct Expressions {
local_mir: OptimizedMirRelationExpr,
global_mir: DataflowDescription<OptimizedMirRelationExpr>,
physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
notices: SmallVec<[Arc<OptimizerNotice>; 4]>,
optimizer_feature: OptimizerFeatures,
}
struct ExpressionCache {
build_version: Version,
information_needed_to_connect_to_durable_store: _,
}
impl ExpressionCache {
/// Creates a new [`ExpressionCache`] for `build_version`.
fn new(&mut self, build_version: Version, information_needed_to_connect_to_durable_store: _) -> Self;
/// Reconciles all entries in current build version with the current objects, `current_ids`,
/// and current optimizer features, `optimizer_features`.
///
/// If `remove_prior_versions` is `true`, all previous versions are durably removed from the
/// cache.
///
/// Returns all cached expressions in the current build version, after reconciliation.
fn open(&mut self, current_ids: &BTreeSet<GlobalId>, optimizer_features: &OptimizerFeatures, remove_prior_versions: bool) -> Vec<(GlobalId, Expressions)>;
/// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into
/// current build version.
///
/// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value
/// will be taken from `new_entries`.
fn insert_expressions(&mut self, new_entries: Vec<(GlobalId, Expressions)>, invalidate_ids: BTreeSet<GlobalId>);
/// Durably remove and return all entries in current build version that depend on an ID in
/// `dropped_ids` .
///
/// Optional for v1.
fn invalidate_entries(&mut self, dropped_ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, Expressions)>;
}
Below is a detailed set of steps that will happen in startup.
ExpressionCache::open
to read the cache into memory and perform reconciliation (See
Startup Reconciliation). When passing in the arguments,
remove_prior_versions == !read_only_mode
.ExpressionCache::insert_expressions
. This will also perform any necessary invalidations if
the new expression is an index. See (Create Invalidations).When opening the cache for the first time, we need to perform the following reconciliation tasks:
remove_prior_versions
is true, then remove all prior versions.ExpressionCache::insert_expressions
.When creating and inserting a new index, we need to invalidate some entries that may optimize to
new expressions. When creating index i
on object o
, we need to invalidate the following objects:
o
.o
.o
, if all views were inlined.This is optional for v1, on startup ExpressionCache::open
will update the cache to the
correct state.
ExpressionCache::invalidate_entries
.ExpressionCache::insert_expressions
.The implementation will use persist for durability. The cache will be a single dedicated shard.
Each cache entry will be keyed by (build_version, global_id)
and the value will be a
serialized version of the expression.
It is possible and expected that multiple environments will be writing to the cache at the same time. This would manifest in an upper mismatch error during an insert or invalidation. In case of this error, the cache should read in all new updates, apply each update as described below, and retry the operation from the beginning.
If the update is in a different build version as the current cache, then ignore it. It is in a different logical namespace and won't conflict with the operation.
If the update is in the same build version, then we must be in a split-brain scenario where both the current process and another process think they are the leader. We should still update any in-memory state as if the current cache had made that change. This relies on the following invariants:
Therefore, we can be sure that any new global IDs refer to the same object that the current cache thinks it refers to. Also, the optimized expressions that the other process produced is identical to the optimized expression that the current process would have produced. Eventually, one of the processes will be fenced out on some other operation. The reason that we don't panic immediately, is because the current process may actually be the leader and enter a live-lock scenario like the following:
A
starts up and becomes the leader.B
starts up and becomes the leader.A
writes to the cache.B
panics.A
is fenced.environmentd
s will be communicating with the cache.FileBlob
for durability. It's extremely well tested (most of CI uses it
for persist) and solves at least some of the file system cons.FileBlob
as the blob store and some local
consensus implementation.One potential implementation is via the filesystem of an attached durable storage to environmentd
.
Each cache entry would be saved as a file of the format
/path/to/cache/<build_version>/<global_id>
.