# Handling of Invalid Accumulations in Reductions ## Summary Invalid accumulations may happen when at some points in a dataflow computation, we know that a collection must be a multiset, but we observe negative multiplicities. Upon seeing invalid accumulations in aggregation queries / dataflows, Materialize may log errors or even crash. In some conditions, errors that are logged will not be reported as failures in the corresponding dataflow, making it difficult for users to take corrective actions themselves and/or notify us of erroneous sources. This design addresses making our strategy to handle invalid accumulation errors, when detected, uniform. This design work is pursued as part of epic [#17178](https://github.com/MaterializeInc/database-issues/issues/4967). ### Problem Description Differential Dataflow (DD) operates on `(data, time, diff)` tuples representing changes to `data` at `time` for a given multiplicity `diff`. The multiplicity `diff` is an integer, signaling that changes can lead to either creation or removal of `data` over time. Tuples with negative multiplicities are thus a fact of life and Materialize should be able to process them correctly. At some points during a computation, however, we should only observe multiplicities that are non-negative, since Materialize transforms inputs that are multisets into outputs that are also multisets. For example, every time output is produced out of a compute replica, we ensure that negative multiplicities result in a query-level error and, otherwise, that only tuples with positive multiplicities are emitted (see [relevant code](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/compute_state.rs#L903-L929)). Additionally, in reduction operators, we roll up the changes up to each advanced frontier, applying a reduction function to a snapshot of the values for each key to produce a new aggregate for the key. Since we compute a snapshot for each key, negative multiplicities should not be observed at that point. However, improper input data fed to a dataflow could contain too many retractions, resulting in negative multiplicities in these key points of a computation. One could argue that inputs to a computation should always be checked and respect the multiset property. This is, unfortunately, hard to achieve. Ultimately, input data processed by a compute replica originates either from introspection sources or from the STORAGE layer via `persist`. The data in the STORAGE layer is ingested from external sources or inserted as rows into tables. Since these data are not consolidated, but rather represented in terms of changes, checking for whether the data is a multiset is equivalent to performing full consolidation, which can be costly and unnecessary to do as part of a dataflow for a range of use cases. As a consequence, sources that incorrectly report too many or non-matching retractions for rows end up recording incorrect data in `persist`, which in turn may trigger errors at the COMPUTE layer. These errors are typically caught during reductions, which do not handle them uniformly. In particular, it is not always the case that a query-level error is produced: For some categories of errors, crashes can occur, while for others, errors are only logged. These errors lie at the root of at least two incidents; additionally, they have been [observed in Sentry](https://sentry.io/organizations/materializeinc/issues/3869058341/events/66dbb5ab35874580b814c1f9c17281f1/?project=6780145) after these incidents took place. At the time of writing, it is in principle possible, e.g., for a source to cause these errors by issuing invalid retractions to Materialize using `ENVELOPE DEBEZIUM`. If the COMPUTE layer of Materialize is able to detect that incorrect source data has been given to it, it is a reasonable [product-level expectation](https://materializeinc.slack.com/archives/CM7ATT65S/p1675358046875919?thread_ts=1675355385.487949&cid=CM7ATT65S) that an error will be reported to the user. Perceived and observed product stability increases if errors are cleanly reported to users without the need to internally trigger an incident as well as if errors are produced in lieu of replica crashes. ## Goals Our goal is to design a consistent error reporting strategy for invalid accumulations in reductions, whenever such errors are detected. It is desirable that the errors produced be visible to the user of the system, i.e., reported as SQL-level errors. It is undesirable that these errors become visible as system crashes (e.g., due to panics), as the latter would unnecessarily reduce system availability. We also see as desirable that these errors be additionally logged to Sentry so that proactive remedial actions can be taken. ## Non-Goals We do not aim to tackle here errors other than invalid accumulation errors in reductions. Other errors that can theoretically occur in reductions may lead to crashes, e.g., when incorrect data types are presented for some reduction operations (see [this example](https://github.com/MaterializeInc/materialize/blob/d28272444db09053e89eab1d568ba3a81f3da19a/src/compute/src/render/reduce.rs#L1138)). Additionally, it is not an aim of the present design to improve the general error reporting visibility in Materialize, e.g., ensuring that the user is alerted when materialized views contain errors. Finally, we only focus on invalid accumulation errors that can be detected during reductions. Note that even after this design is implemented, it will still be possible that reductions can be constructed over source data with incorrect multiplicities where we do not detect that the source data was, in fact, invalid. ## Description ### Error Categories We identify a few important error categories to be considered: 1. When we implement introspection sources, tables, and regular sources, we ensure that only multisets are given as input to the COMPUTE layer, despite their representation being in terms of changes that may include retractions. However, sometimes this assumption can be violated when subtle bugs occur (see, e.g., issue [#15930](https://github.com/MaterializeInc/database-issues/issues/4587)). Additionally, users may [directly introduce invalid retractions in source data](https://materializeinc.slack.com/archives/CM7ATT65S/p1675428135441319?thread_ts=1675355385.487949&cid=CM7ATT65S), e.g., exploiting `ENVELOPE DEBEZIUM`. Therefore, a number of sanity checks are performed during reductions rendered by the COMPUTE layer to ensure that we are operating on multisets. For example, when we compute a min/max aggregation, the closure given to specialize a Differential Dataflow reduction operator cannot observe negative multiplicities if we are operating on a multiset. So checks and error reporting are introduced, e.g., in [intermediate stages](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L634-L642) and [final reduction](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L590-L596) of hierarchical aggregates. The strategy for error reporting is to employ a soft assertion: during development, an assertion failure will trigger a crash; in production, an error will be logged and captured in Sentry. 2. Another reason for error due to negative multiplicities is that a negative accumulated result cannot be coerced to an [unsigned type](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L1344-L1346). This error is now treated as unrecoverable, leading to a `panic!` instead of the above strategy of employing a soft assertion. 3. Following a similar error reporting strategy as in category 1 above, we have a soft assertion regarding net-zero records with [non-zero accumulation in accumulable reductions](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L1283-L1290). This category of error can emerge if we determine that an aggregate row has been removed, due to the corresponding tuple multiplicity accumulating to zero, but there is an inconsistency with the accumulation state, namely the aggregation function on the raw changes ends up with a non-zero result. 4. As observed in an incident, shutdown of a `persist_source` operator could trigger errors due to the sanity checks above in the COMPUTE layer. This behavior occurs because `persist_source` would not emit the entirety of a batch nor guarantee that a partially emitted batch would consolidate to a multiset. We ignore this problem in the remainder of this document, as a solution was introduced by PR [#17147](https://github.com/MaterializeInc/materialize/pull/17147). The errors in categories 1 and 3 above are now reported in Sentry only and remain invisible to users, requiring that we proactively take steps to intervene. At the limit, these intervention steps may require a full-scale incident creation. Importantly, these errors are concerning as they indicate that Materialize might silently compute incorrect aggregation results. The errors in category 2 cause Materialize to crash in production. ### Approach: Report SQL-level Errors, Log to Sentry, and Eschew Crashes We discuss below a path for avoiding system crashes and some classes of silently erroneous computation by producing query-level errors whenever invalid accumulation errors are detected in reductions. Given that bugs in other Materialize components, and not only erroneous user-provided source data, could generate invalid accumulations in reductions, we still advocating keeping error reporting to Sentry in addition to producing query-level errors. Additionally, it is [important for our support team](https://materializeinc.slack.com/archives/CM7ATT65S/p1675358436801089) to have these errors in Sentry so that they can proactively notify users of invalid data in sources. #### Details of Current Handling of Invalid Accumulations in Reductions To discuss our solution approach, we first review the current handling of invalid accumulations by analyzing a few examples of category 1 and 3 errors. In general, our error reporting strategy based on soft asserts does not ensure that a query-level error will be generated when an invalid accumulation is seen. Consider, for example, a reduce collation. First, we check if [an error needs to be reported](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L329-L335). However, subsequently, processing continues normally. Then, the row is [output with a hard-coded multiplicity of one record](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L365). In the case of multiple basic aggregates, if [an error is detected](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L507-L516), then no output is produced for aggregates in the query. No failure is reported at query level either. A similar handling is performed for bucketed hierarchical aggregates. For Category 3 errors, the accumulator logic, which is data-type dependent, is executed after a [soft assertion](https://github.com/MaterializeInc/materialize/blob/9933639ea5393d1aaa40c1079d07ffc1cc622516/src/compute/src/render/reduce.rs#L1292-L1448); the logic ends up with a row with the so computed aggregate being output with a hard-coded multiplicity of one record. Despite generating incorrect output data, not wrapping a reduction operator with other operators for error checking has the significant advantage that the output arrangement of the reduction can be reused by downstream operators, e.g., joins. Arrangement reuse is a serious concern due to both the overhead in re-arranging the data downstream as well as the increase in memory footprint for a query. So rendering-level solutions that would introduced error checking operators after the reduction are not applicable, given that arrangement reuse would be hindered. Additionally, the closure given to Differential Dataflow's reduction operator does not expect a `Result` return type, but rather takes, for each key, a snapshot of the values at a given time, based on which an aggregated multiset is produced in the output. As such, errors cannot be easily exposed in the output of reduction operators without changing the type of the returned aggregates. The latter, however, would again hinder downstream arrangement reuse. #### Reduction Hierarchies: Use `Result` then Demux A key concern in reporting invalid accumulation errors out of reduction operators is that the error reporting strategy must not hinder downstream arrangement reuse. To dive into this concern further, we first observe a few key points about Differential Dataflow's reduction operator. Firstly, the operator expects arranged data in the input. The input arrangement facilitates reasoning in the operator about times and construction of per-key, per-time value snapshots as frontiers move forward. In addition to an input arrangement, the operator creates an output arrangement, which records the aggregated value multisets that are produced for each key at the advanced times. This output arrangement has the important function in the operator to enable retraction of old aggregated values when new ones are produced. This function is a requirement as the user-provided closure only sees value snapshots per-key and per-time; thus, it does not get an opportunity to produce retractions given retractions in the input. The output arrangement produced by a reduction can be reused by a downstream operator, e.g., a join, thus avoiding the maintenance of an additional arrangement. This reuse is a key property of externalizing state as arrangements and can have a significant impact in the memory footprint of a query. In some situations, however, these output arrangements are not reused during rendering. In particular, we render reduction hierarchies for certain kinds of aggregates, such as bucketed aggregates (e.g., streaming `MIN`/`MAX`) or non-monotonic top-k aggregates. In these hierarchies, a pattern is applied to produce the input edits that lead to the output instead of the output directly. In this pattern, the output of a reduction in an intermediary level of the hierarchy is negated as a collection, and the output arrangement is thus not reused downstream. We can exploit this pattern to introduce error reporting for invalid accumulations at these intermediary levels of the hierarchy, since arrangement reuse is not of concern. To this end, one further observation is useful: Once we ensure that the input snapshot at the bottom-most level of the hierarchy is free of invalid accumulations, we know that the input should not contain invalid accumulations at higher levels of the hierarchy. As a consequence, when a hierarchy is built with more than one reduction, the very first reduction can, instead of simply producing aggregated values, produce `Result<_, DataflowError>`. Then, we can demux the errors into an error stream, finally exposing the invalid accumulation errors detected. The choice to check for invalid accumulations only at the first level of the hierarchy supposes correct implementation of the operators in the hierarchy itself, i.e., these operators should not introduce invalid accumulations on otherwise correct input. This trade-off in error checking is believed to be acceptable since such incorrect operator behavior likely to be caught by testing, unlike corner cases in interactions with external systems or just wrongly provided data by users. The overall approach in this section is illustrated for bucketed aggregates in PR [#17918](https://github.com/MaterializeInc/materialize/pull/17918). At some point, there were performance concerns regarding the use of `DataflowError` due to the size of the `enum`. However, these concerns were mitigated by PR [#17222](https://github.com/MaterializeInc/materialize/pull/17222), which boxed all of its variants. #### Single-Level Reductions: Employ an Operator Pair The approach introduced in the previous section takes advantage of reduction hierarchies with more than one layer, where arrangement reuse for the bottom-most layer is not of concern. However, there are several instances where a single-level reduction is rendered, e.g., for accumulable, basic, or monotonic aggregates. In such cases, it is important to ensure that the output arrangement of the reduction can be reused by downstream operators. However, the closure provided to a Differential Dataflow reduction operator does not produce `Result`, but rather the multiset of aggregated values for a key directly. As such, exposing that an error was detected in the processing logic of the closure is a significant challenge. To tackle this challenge, we employ a redundant computation approach based on the determinism of Differential Dataflow computations in Materialize's COMPUTE layer. At a high level, we build operator pairs working in tandem over the same input arrangement so that we can expose different output interfaces. For detecting and exposing invalid accumulation errors, the notion would be that the operator pair implements the effective outcome of a fallible reduction operator. In the case of a reduction pair, one of the reductions computes the output, while the other computes the errors. The one computing the output maintains the output arrangement, while the one computing errors maintains (a typically very small) error arrangement. The two reduction operators consume the same input arrangement. So the memory footprint is similar to that of what a fallible reduction operator would need. The per-key, per-time snapshots of the data must be, however, produced twice. This is a potentially non-trivial overhead, but it is of a constant factor compared to a single non-fallible reduction. So we would not expect that any blow-ups in time or memory footprint would occur, making the solution comparatively attractive. Another important detail for this solution to work relates to the semantics that the COMPUTE layer adopts for output in the presence of errors. Currently, operators that may produce errors expose both an output and an error stream (see, e.g., [`flat_map_fallible`](https://github.com/MaterializeInc/materialize/blob/1558b5b68734c05219a0d4a925185643d43992fd/src/timely-util/src/operator.rs#L132-L141)). During rendering, we concatenate error streams from all fallible operators and sink the resulting errors, if any, into an arrangement. When evaluating output for a dataflow graph for a given time, we only consider the results in the final output stream if no errors were produced for that time. Coming back to the reduce pair solution above, an error stream can be obtained from the reduction operator computing errors, while the output arrangement from the other reduction operator in the pair can be reused by downstream computations. This strategy implies that the reduction operator producing output can limit itself to produce only correct output, ignoring errors, while the other operator in the pair focuses only on checking for and, if necessary, producing errors. Subsequently, the errors in the arrangement produced by the error-check reduction operator in the pair are exposed as an error stream. This way, the solution interoperates with the approach for managing errors that is already in place. The result is that we can then provide support for error retractions: Once an error is resolved at a later time, the revised output can also be exposed, while at the same time maintaining the incremental nature of the computation. The solution discussed in this section is illustrated for accumulable aggregates in PR [#17990](https://github.com/MaterializeInc/materialize/pull/17990). Due to the concerns regarding the performance impact of running a pair of reductions redundantly, a query processing evaluation was performed in the context of that PR in a local development enviroment with variations of CH-benCHmark Q1 over indexed TPC-H relations at scale factor 1. In the setting evaluated, the impact in query processing time of running reduction pairs was negligible. So while this evidence does not characterize impact on the latency of incremental result computation, it suggests that the overall impact can be expected to be acceptable. ## Alternatives Some alternatives were discussed and ultimately discarded due to concerns regarding code maintainability or performance. ### Post-Processing Operator after Reduction An alternative that was significantly discussed and conceptually evaluated was to include add a post-processing operator immediately after each reduction operator that evaluates if an error occurred during reduction and then produces an error in the appropriate error stream if so. This design avoids changing Differential Dataflow reduction operators (or clones thereof) to potentially create error streams, thus reducing the complexity of the change. Two important aspects need to be tackled for this solution approach to work: (1) We need to find a method to concretely implement the post-processing operator; (2) We need to devise a strategy to externalize the information that an error occurred in the output of the reduction operator. Regarding point (1), the most promissing option was to implement the post-processing as a map-project chain with the relevant error processing code encoded as `MirScalarExpr`. This option embraces error handling from invalid accumulations in reductions as a semantic, query-level construct, but it may suffer from lower expression evaluation performance than rendering-level alternatives. Importantly, since MIR transformations eventually reason about error-freeness of expressions, it is necessary to introduce the map-project chain at a higher level of abstraction. It was argued, in particular, that this enriched reduce translation should be introduced at the level of HIR-to-MIR lowering. At this point in the query optimization pipeline, all aggregate functions have already been appropriately detected and collected into `Reduce` nodes. Thus, we do not need to reason about query nesting or how to rewrite multiple interrelated aggregates with different placements in a query (e.g., consider different aggregates put in a `SELECT` and a `HAVING` clause). Regarding point (2), we note that: (a) The error detection strategy for invalid accumulations is specific to the reduction type; and (b) the necessary information for error detection, e.g., row multiplicities, is available only at the input to a reduce closure. Because of (a), ideally each reduction operator should output, along with the computed aggregates, an additional field that indicates their validity, i.e., we represent whether an invalid accumulation error was detected as an additional field per aggregate row. Due to (b), we need to represent this additional output also in arrangements. One could argue for the use of a traditional `Result` type as an encoding method. However, this option would specialize the arrangements produced by reductions to be on `Result` types, hindering their reuse by downstream operators assuming arrangements on `Row` types (e.g., joins). As mentioned above, arrangement reuse is critical for lowering overhead and memory footprint. This problem could be solved by employing an extra `Row` column per aggregate row at the output of a reduction closure that contains either `Datum::True` or `Datum::False` indicating whether the aggregate is valid or invalid, respectively. The post-processing map-project chain can then produce errors upon seeing `Datum::False` in this validity column and then project out the column. In more detail, this approach comprises first redefining the meaning of MIR `Reduce`, denoted `Reduce^{MIR}`, to include an additional aggregate validity column. That is, the output schema of `Reduce^{MIR}` is now defined as: ``` Output_Schema(Reduce^{MIR} {key, aggrs, monotonic, expected_group_size} (input^{MIR})) = (typeof(key_1), ..., typeof(key_k), typeof(aggr_1), ..., typeof(aggr_l), valid) ``` where `key_1, ..., key_k` $\in$ `key`, `aggr_1, ..., aggr_l` $\in$ `aggrs`, and `valid` being either type `Datum::True` or type `Datum::False`. Then, we translate HIR `Reduce`, denoted `Reduce^{HIR}`, as a map-project chain applied over `Reduce^{MIR}`, namely: ``` [[Reduce^{HIR} {key, aggrs, expected_group_size} (input^{HIR})]]^{MIR} = Project^{MIR} {0, ..., k-1, k+l+1, ..., k+2l+1} ( Map^{MIR} {output_or_error(k, k+l), ..., output_or_error(k+l-1, k+l)} ( Reduce^{MIR} {[[key]]^{MIR}, [[aggrs]]^{MIR}, false, expected_group_size} ([[input^{HIR}]]^{MIR}) ) ) ``` Since the translation is performed during HIR-to-MIR lowering, the query optimizer will have the opportunity to apply all relevant MIR transformations to the expression produced. In particular, the map and projection can be hoisted so that the arrangement produced by the reduction can be reused by other operators in the query, e.g., a join. With such an optimization, we would delay producing errors in favor of minimizing memory footprint. Importantly, since this optimization already exists, it would be applied without requiring any specific knowledge that the error expression was introduced for checking invalid accumulations in reductions, fitting naturally into the query optimization pipeline. However, other optimizations might need to recognize this pattern, a risk that was deemed substantial. To map out the latter risk, two initiatives were pursued. Firstly, we implemented a fix to the Category 2 panics for `uint2` and `uint4` sum aggregates using this approach, but without adding an additional column out of the reduction operator. In particular, the introduction of a post-processing step after aggregation that performs an appropriate [cast operation](https://github.com/MaterializeInc/materialize/blob/d28272444db09053e89eab1d568ba3a81f3da19a/src/expr/src/scalar/func.rs#L3868-L3870) was pursued. This more specialized version of the HIR-to-MIR translation approach was illustrated in PR [#17709](https://github.com/MaterializeInc/materialize/pull/17709), which got merged, but eventually undone by the reduction pair approach argued for in the main part of this document. Secondly, we tried to estimated the impact on optimizer transforms of potentially extending MIR reductions to validating reductions by the translation approach. This impact assessment was documented as PR [#17872](https://github.com/MaterializeInc/materialize/pull/17872), which was not merged. There, we find that several transforms would need to be reworked to account for the extra column produced as part of the proposed translation. While some changes are simple, some others carry more complexity, e.g., in `reduction_pushdown`. Some transformations, such as `reduce_fusion` and `threshold_elision`, might become ineffective with the additions of the map-projection chains to validating reductions and would need to be significantly reworked. One could argue for restricting validating reductions to only be turned on for certain cases. For example, `reduce_fusion` currently operates only on reductions without any aggregates. We could make these reductions non-validating. However, this risk containment approach introduces a tension between optimizer evolution and our ability to detect invalid accumulation errors. Due to these challenges, the translation approach was eventually abandoned. ### Rework Differential Dataflow Reductions to Allow for Closures Producing Errors An alternative is to introduce error streams into Differential Dataflow reduction operators directly, by allowing the closure provided to return errors. That way, we could derive new fallible reduction operators and merge the error streams produced by these with the input error streams to produce errors whenever invalid accumulations are detected by a reduction closure. The latter option changes the requirements on Differential Dataflow reduction operators. These new requirements could be satisfied by either changes to Differential Dataflow itself or by copying the Differential Dataflow code and specializing it inside Materialize with Timely Dataflow's operator builder. In either case, the generalization of reductions into fallible reductions would require non-trivial design and implementation effort. This is because now we would need to keep track of when errors are produced by a closure and when they can be retracted. These errors would need to be represented inside the altered Differential Dataflow reduction operator into a data structure providing access by key and time, i.e., an arrangement. We cannot simply reuse the arrangement that is already kept there for output retraction, since the types would not line up. Moreover, representing the output arrangement with a `Result` type would introduce issues with downstream arrangement reuse. So a separate arrangement would need to be maintained. We would need to then ensure that retractions are produced correctly either from the output arrangement or from the error arrangement as times move forward. As opposed to smaller-scale and low-risk copying of Differential Dataflow's code into the Materialize repository, a potential path of copying the Differential Dataflow reduction operator into Materialize to make it fallible would imply significant code duplication and corresponding issues with maintainablity. The Differential Dataflow reduction operator is implemented in a module that is >1,000 lines of code with moderately complex algorithmic implementation to manage arrangements and roll changes forward depending on the advancement of time. Additionally, it has dependencies to other module-level data structures (e.g., `ValueHistory`) that would have to be either exposed or also copied. It is a workhorse module for all of Materialize's reduction and top-k implementations. Bugs introduced while ingesting and changing the Differential Dataflow reduction operator in Materialize could have a large blast radius. ### Introduce New `Datum` Variants or Reinterpret Existing Ones One could employ a `Datum::Dummy` variant in place of the aggregated data that flags that an error has occurred. It is, however, unclear whether this would result in a panic or a more gentle query-level error. From a preliminary analysis, the result appears to be a panic, which would lead us to consider either special handling of `Datum::Dummy` or introduction of another `Datum` variant. The advantage of this approach would be higher confidence in query-level detection, as opposed to the first suggestion, but the engineering effort is less clear and the representation cleanliness is questionable. Alternatively, one could introduce a `Datum::Err` variant to signal error. The impact of this change would, however, be non-trivial. Several other parts of the codebase may now have to consider `Datum:Err` as a valid `Datum` variant. Significantly, scalar expression evaluation would potentially need to be changed to match also on `Datum::Err`. Additionally, we would need to revisit the semantics of operators such as joins when they are fed `Datum::Err` instances. For example, one could treat joins in the presence of errors always with outer-join semantics: forward errors without a match on the other join side, or otherwise, compute normally join results. Despite being a viable alternative, the risk and magnitude of the change would be excessively large. ### Introduce Negative Multiplicities to Encode Errors One could argue that we could encode the violation of the multiset property by generating negative multiplicities as part of processing. That way, upper processing layers will have a chance to observe the violation of a multiset output property and report a query-level error. However, it is not guaranteed that all query structures will result in a query-level error, since these negative multiplicities could be added up on another query path with positive multiplicities prior to output. The latter would lead to potential error reports to Sentry that are not visible to the user as SQL-level errors. Another disadvantage of this alternative is that it is not a solution uniformly applicable to all unsigned integer types, since as discussed above not all of their sums result in `numeric`. Changing this behavior would have unfortunate implications for backwards compatibility. ### Employing Side Channels It would be possible to employ a side channel, e.g., similar to the internal dataflow commands used in the STORAGE layer, that directly informs the worker about the error. The worker could then take measures to mark the dataflow as failed, terminate it, and report errors. However, employing such a side channel implies careful synchronization between the dataflow runtime and the external mechanism. The latter can be a source for subtle bugs. Additionally, the use of a side channel in this way may compromise our ability to retract errors. ### Source-level Multiset Checks An alternative approach would be to include checks for the multiset property at the sources, with appropriate measures to surface errors to users when source data violates this constraint. This could be done, e.g., by a design where source data is consolidated to incrementally update a full snapshot of the data at every frontier advancement, followed by an (incremental) test of whether the multiset property has been violated. Only data passing such a test would be fed into a subsequent dataflow computation. Such a design would need to be applied redundantly to introspection sources as well as to persist. While attractive from the correctness perspective, consolidation prior to processing has unknown processing latency implications. Additionally, it might require either large memory requirements or a possibility to store the incrementally maintained data snapshot out-of-core. A concrete design that fulfills all of these goals is not yet available at the time of writing and would be a significant undertaking. ## Final Remarks This design does not address improving reduction performance by "atomization", as proposed in issue [#8086](https://github.com/MaterializeInc/database-issues/issues/2470). We are leaving this additional scope to be tackled as part of a separate work stream.