20220728_persisting_introspection_data.md 19 KB

Persisting introspection data

This document discusses design choices and an implementation overview regarding persisting introspection information.

Introspection and what it does

The introspection mechanism surfaces the timely logging dataflows to users. For example the mz_catalog.mz_scheduling_histogram allows user to inspect how long certain timely operators were running. This can be useful for understanding and debugging Materialize’s performance.

The contents of introspection tables are inherently different per timely instance and thus per replica, unlike with other dataflows, it is not expected that different replicas produce the same data.

The usual logic of ActiveReplication that forwards Peek’s to all replicas and uses the first one does not make sense, as there is no way for the user to tell from which replica the obtained numbers are.

Thus we have to offer some other mechanism.

Scenarios to consider

Debugging the performance of materialize is especially important if a replica does not behave as expected. It is important, that users can query and interpret the introspection data of replicas in the following scenarios:

  • Replica that has been removed

In this case the user actively deleted the replica (for example via DROP CLUSTER REPLICA). The user might still be interested in the introspection data, for example as reference in a comparison to another (faster/slower) replica. On the other hand, since the user actively requested the replica to be dropped, it would have also been possible for the user to save relevant data before dropping the replica. (See the design question “Behavior after replica drop“.)

  • Slow replica

If a replica gets saturated and does not perform timely updates anymore, the user is probably interested in which dataflows cause a lot of computation. We should ensure that 1) the logging works reliably even in the case of an otherwise saturated replica and 2) it does not influence the logging of other replicas.

  • Crashing replica

If a replica crashes (for example due to being out-of-memory), the introspection data can’t reliably be saved by the user before the crash happens, as it happened unexpectedly. It is important for the user to see the introspection data for a postmortem analysis.

  • Restarting replica

Related to the crashed replicas, we have to consider what happens when a replica restarts, since Kubernetes (potentially with some back-off) does this automatically. If we want to analyze the fault scenario it is that the user can distinguish from which lifetime the data stems. On the other hand, keeping around a lot of data from a long defunct replica might distract the user.

  • In future versions: Using introspection for autoscaling

At some point in the distant future, we might use this data to automatically detect an overloaded replica and migrate the instance to a bigger instance automatically.

Problems

Difficulties in offering an aggregated source

From a pure SQL perspective, the obvious schema to present the cross-replica introspection information is to extend the introspection tables with a replica id column.

For example mz_scheduling_histograms would get an additional column replica_id.

worker    slept_for    requested    count
-----------------------------------------
...

Would become

replica_id     worker    slept_for    requested    count
---------------------------------------------------------
...

However, if done naively the way queries work in materialize requires a fixed timestamp at which time the results of the queries are reported. But if one replica is lagging or otherwise unable to provide logging data, the query would either report out of date data (when settling for the least timestamp where all replicas have provided data) or block for a long time (when choosing a timestamp closer to the actual time). See the design question “Expose an aggregate sources or source per replica” as well as the discussion in the Github PR for more technical details.

UX gap with per replica sources

To replace the arranged introspection with the persisted sources, we should consider the following scenario: A user has two replicas. First the user tries to debug a performance issue by writing a complex SQL statement targeting the replica (using _1 as table postfix). Now the user wants to send the same query to another replica. The user has to rewrite the whole query to include the new postfix (_2).

We could offer an aggregate view over all replicas, but this view has to be changed in the catalog on each replica create or drop, thus any view defined upon this needs to be removed, defeating the purpose of having the view in the first place. Also, a TAIL running against such an aggregated source would not contain newly appearing replicas.

In addition, this aggregated view would also suffer from the timestamp selection problem described in "Difficulties in offering an aggregated source".

Alternatively, to stay backwards compatible with the replica targeted queries, we could offer a view that refers to a postfixed variant, and recreate this view with a SET CLUSTER_REPLICA command. (I.e. the view would be defined as CREATE VIEW mz_arrangement_sharing AS SELECT * FROM mz_arrangement_sharing_1 and the postfix changes with each set cluster_replica). This approach suffers from the same limitations as above, because the view has to be recreated. It is not possible for the user to define views upon that view and then change the cluster_replica variable.

The zombie replica problem

We have to consider the possibility of a half alive computed that has lost all connections to other workers and environmentd, but still continues writing its introspection sources. Because it has lost all connections, it is indistinguishable from dead to the rest of the system. At some point, Kubernetes will restart that instance, but this might not happen immediately and we should consider the option of having two computeds - with the same replica id - running at the same time.

If it goes wrong, the user could see duplicate data in the introspection sources.

Depending on which design settle, this problem has different solutions. It boils down to either delegating this to the storage layer (using renditions or similar) or actively checking for this situation (for example by reading back the introspection and checking no one else wrote with that id).

Current and new proposed solution

M1: Replica directed queries

In M1, we implemented replica targeted queries. They use the special variable cluster and cluster_replica to direct a future SELECT to a specific replica. Using this technique, it is possible to query the introspection information of a given replica.

CREATE CLUSTER test
  REPLICA replica_a (SIZE '1'),
  REPLICA replica_b (SIZE '2');
SET cluster = test;
SET cluster_replica = replica_a;
SELECT * FROM mz_materializations;

Setting this variable will have an effect on all queries, not only those that target introspection sources.

M2: Using persist for introspection

A better alternative is having the replicas write their logging information into persist and making these persist shards available in the catalog. This way it is possible to query and analyze introspection data from another (non-production) cluster, even in the case of a replica that died or became otherwise unresponsive

Design decisions

Because of the problems outlined before, we offer per-replica introspection sources and views. But since these are not a complete replacement (see the UX gap above) we keep the arranged logs and the replica directed queries around.

Replica crash/restart

On replica restart, we reuse the persist shard. When a replica starts, it obtains a snapshot from persist and removes the entries. This works fine if the previous replica has terminated completely, but does not solve the zombie replica problem. However, the precise mechanism on how we write a timely dataflow to persist is currently under revision.

Views defined on introspection sources

We do have some views that are defined on top of introspection sources to present the data in a more user friendly way. These views are duplicated for the persisted introspection sources, also with a postfix.

Namespace

For now we keep the persisted introspection data as well as the introspection views in mz_catalog . Also for now we keep the catalog item type as “source”, thus they show up for example with SELECT * FROM mz_catalog.mz_sources

Expose an aggregate sources or per-replica source

For the reasons described above, it is not trivial to realize a unionized view over all replicas. In the future, source reclocking might be a mechanism that is applicable to this problem, but it is fairly low priority. There are a couple of ideas how to get the benefits with mechanisms available now:

  • Construct one source per replica

The idea is to expose to the user one source per replica-introspection type pair. For example after CREATE CLUSTER c1 REPLICAS (r (SIZE '1')); the following tables can be found in mz_catalog:

mz_catalog.dataflow_operators_1 (for the default cluster)

mz_catalog.dataflow_operators_2 (for the cluster c1)

Each table name has a cluster id suffix, users can determine the cluster id using the table mz_catalog.mz_cluster_replicas.

Benefits: It allows the user to query up-to-date data from a single replica. There is nothing that blocks a computed of writing into the logging sources as fast as persist allows, thus the queried data should be fresh. This approach synergizes well with storage renditions, if a computed restarts, it could start a new rendition, elegantly solving the problem that a “zombie” computed writes to the same shard.

Drawbacks: It is not idiomatic SQL, which makes it hard to construct aggregates over the replicas (”what is the average response time over all replicas?” type of queries). Also, the user can still create an aggregate view over these sources (as shown below), which then would expose the same lag problem as if we provide a naively unionized view. The user has to understand the implications of doing this somehow.

CREATE VIEW mz_replica_cpu_usage AS
  SELECT * FROM mz_catalog.mz_replica_cpu_usage_1
  UNION ALL
  SELECT * FROM mz_catalog.mz_replica_cpu_usage_2
  UNION ALL
  SELECT * FROM mz_catalog.mz_replica_cpu_usage_3
  • Let all computeds write into one aggregated shard

Using an atomic operation on the persist shard, multiple writers can write into one persist shard, which could then directly contain the data in the format with the replica column added. (see Frank’s comment ).

Benefits: It exposes a single source per introspection type, thus we can match the ideal schema presented above. Updates from fast replica appear together with old updates from lagging replicas.

Drawbacks: Multiple reader have to contend for the persist shard, thus limiting the update frequency of the data which requires some compromise in data freshness. There is another reason for limiting the update frequency: We have to determine an artificial timestamp (because we can only append data with an increasing timestamp) and have to ensure that we are not running arbitrarily far into the future.

To determine an update frequency, we need to know: How many replicas we expect to exist at one time and how fast persist’s  compare_and_append is in a scenario with contention. As an educated guess: For example assuming a maximum of 100 replicas and an update latency of 10ms I’d suggest a update frequency of 10s or more (accounting for potential retries).

  • Pipe updates through environmentd

Benefits: Using environmentd as serialization point can potentially lead to better freshness than if every writer contends on the persist shard itself.

Drawbacks: High load on environmentd. A pretty big break with our dataplane separation in platform.

Decision: We implement the per replica source for now.

Behavior after replica drop

Should the persisted introspection data remain after a drop cluster? The user actively requested the DROP CLUSTER REPLICA so the user could always save the data before dropping.

  • If the source remains, what should be the content of these sources? If we do one source per replica, we also have to consider what are the contents? We can for example have the source stay around but provide empty data.
  • If no, do we need cascade (as in DROP CLUSTER ... CASCADE) to clean up the introspection shards? Or is introspection data “volatile” that should be dropped even without cascade?
  • Even if we do remove the entries without cascade, we need to handle the case when there are dependencies on those entries, such as a view created on top of the introspection sources.

Right now, DROP CLUSTER REPLICA does not have a cascade option (because there was nothing to cascade into).

Decision: We drop the introspection sources and views on replica drop. We add a CASCADE option, that will also drop any dependents on the views. The same logic

The same logic propagates to DROP CLUSTER if a cluster is dropped, and there is a (transitive) dependency on one of the cluster’s replica introspection sources, the command will fail. Unless CASCADE is specified, in which case, the cluster, the replica, the introspection sources and any views using these introspection sources is used.

Implementation details

This section describes some of the implementation choices of introspection sources.

Right now we have two different introspection locations arranged and persisted. Both contain the same data, but are either stored in a in-memory arrangement or in a persist shard.

The computeds write compact introspection data. To present this data in a more user-friendly way, we define views on top of the introspection sources.

Note that each replica (even within the same cluster) generates different introspection data. This is unlike production data, where we expect each replica to produce exactly the same data (and thus it does not matter which replica writes first to persists or replies to a Peek ).

Arranged Persisted
Source (compact representation) Often postfixed with internal (For example: mz_dataflow_operator_reachability_internal ) mz_dataflow_operator_reachability_internal_1
Views (human readable representation) Normal, builtin view (For example: mz_dataflow_operator_reachability ) mz_dataflow_operator_reachability_1

The user can request on a per cluster basis whether to enable introspection and with which frequency the introspection data is updated. This option applies to both, arranged and persisted introspection.

The types LogVariant and LogView enumerate the persisted introspection sources and views.

Catalog perspective

Arranged views are global, independent of the number of clusters or replicas. Arranged sources get a different GlobalId per cluster. Persisted sources and views on the other hand are per-replica. Thus environmentd creates or removes catalog entries related to introspection sources in roughly the following cases:

  • The system is started/bootstrapped
    • This is when the arranged ID’s are allocated and views are built (since the views are static, using the BUILTINS mechanism).
    • For persisted introspection sources, nothing happens apart from a potential catalog migration (See Catalog Migration).
  • A cluster is created
    • GlobalId for the arranged sources are allocated. They are not visible in the introspection tables (i.e. select * from mz_catalog.mz_sources; will always display the GlobalId of the default cluster). But queries are rewritten correctly: If the non-default cluster is selected, selecting from a arranged source will resolve the name to the per cluster id.
  • A replica is created
    • Nothing happens for arranged introspection
    • A GlobalId and an introspection shard are allocated for each introspection source, after inserting the corresponding catalog entries (e.g. mz_catalog.mz_dataflow_operator_reachability_internal_1 ), the introspection views that use these postfixed sources are parsed from a SQL string and inserted into the catalog. The LogView enum can produce a SQL CREATE VIEW ... string that refers to the postfixed sources. Thus it’s important that the sources are inserted in the catalog before parsing and inserting the views in the catalog too.
  • A replica is dropped
    • Nothing happens for arranged introspection....
    • Arranged introspection views and sources are removed. CASCADE is necessary if any other catalog item depends upon the sources or views. However at runtime, views are a normal catalog item that has a dependency on the source. Whenever we determine if there are dependents, we have to ensure we don’t accidentally include the persisted views.

Note that the replica created case can be triggered with a CREATE CLUSTER REPLICA as well as an inline replica specification with CREATE CLUSTER . Analogous, drop replica can happen with DROP CLUSTER REPLICA as well as DROP CLUSTER ... CASCADE

All of these mechanisms become a no-op if a cluster is created with introspection turned off.

The GlobalId to CollectionMetadata mapping is stored in the METADATA_COLLECTION stash, as any other id backed by a storage collection.

Catalog Storage

At runtime, the catalog is populated with the normal entries for sources and views. However, when the catalog is stored all the persisted sources and views are stored alongside the replica object. This ensures we can retrieve the connection between a replica and the introspection ids.

Catalog Migration

The serialized SerializedComputeInstanceReplicaLogging has three variants:

  • Default: No introspection sources created, but create the default ones when possible.
  • Concrete: Introspection sources are allocated, but not views. Create default views when possible.
  • ConcreteViews: Introspection sources and views have been allocated.

In catalog::open the cases Default and Concrete are replaced with ConcreteViews . If the cluster has logging enabled, it will become ConcreteViews with non empty vectors. If the cluster has logging disabled, it will be replaced with ConcreteViews([],[]).

Compute controller

The compute controller that manages the replica has to ensure that the CreateInstance command is specialized for each replica. It does so using the data stored in the replica struct.

Computed perspective

The computeds do not know about the views at all, they know and worry only about the compact representation. When a user queries an introspection view, it is handled like a normal view.

The CreateInstance command is the first command a computed receives and contains a LoggingConfig . This struct describes which introspection sources to maintain in memory, as arrangement and which to write to persist.

If there are multiple replicas, the arranged logs use the same GlobalId for the same introspection source across replicas. This means that queries that use arranged introspection sources need to be directed to a replica, otherwise the user can’t tell which replica replied.