The existing source statistics we expose to users for sources answer some important questions, like:
but fail to answer two critical questions users regularly ask about their sources:
This design document proposes metrics that answer these questions, and goes into detail about their implementation.
This design has succeeded when we are able to present these metrics to users in the console, and they can use them to answer questions about their source's snapshotting and sizing.
Sources implementations expose their progress in various incompatible ways. This progress information can be partially ordered and have various shapes. In the context of metrics, we want to be able to calculate rates and compare progress against a total in a way consistent across all sources.
This means sources will need to implement a way to aggregate their notions of progress into a single 64-bit unsigned integer. Clients processing this data can then consistently calculate rates and percentages from these numbers. Note that the unit of the single integer does not need to be consistent across sources.
For example, consider the following expression of progress that can be exposed by the Kafka source implementation.
# Partition Id: Offset
{0: 100, 1: 150}
In this case, the Kafka source can be said to have processed 240 offsets.
The Postgres source has 1 way to expose progress: the lsn of the replication slot. However, during snapshotting it can also sum the number of rows it has read across all tables. For example:
{table1: 110 rows, table2: 15 rows} = 125 rows processed
During replication, the source can report the lsn
as the number of bytes processed.
Similar to Postgres, The MySQL source has a rows-per-table defined during snapshotting.
However, during replication, MySQL's source timestamp is partially ordered, instead of just a single lsn number:
# source id: transaction id
{0: 1000th txid, 1: 10th txid} = 1010 transactions processed.
The first set of metrics this design document proposes involve snapshot progress, which is a lower bound estimate on the percentage of the source's snapshot Materialize has read. These metrics are designed to answer #1 in the problem statement.
We will introduce 2 new columns in mz_source_statistics_raw
:
| `snapshot_records_known` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_progress` | [`uint8`] | The number of upstream values Materialize has read so far. |
The unit of values depends on the source type, and will be rows for MySQL and Postgres, and offsets for kafka.
These values can be summed across workers and compared (snapshot_progress / snapshot_records_known
) to produce
a lower-bound estimate on the % progress we have made reading the source's snapshot.
The SourceReader
implementation for each source will be required to produce a snapshot_records_known
, as well as
a continuously updated snapshot_progress
frontier on each worker.
snapshot_records_known
can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
and summing across partitions. Similarly, snapshot_progress
can be derived from the operator's output frontier.
snapshot_records_known
will need to be calculated, in the unit of rows by performing SELECT count(*)
on the tables that participate in the snapshot.
Both the Postgres and MySQL implementations will be required to perform this query, per-table, during snapshotting. Note that count(*)
is not
guaranteed to be cheap on Postgres and MySQL. To avoid this, we will perform this query concurrently with the beginning of
snapshotting, allowing the user to see their source's progress before a percentage can be calculated.
snapshot_progress
will need to be exposed by the source operators themselves, as the data itself during snapshotting all has
the same frontier. This means the operators will need to track and periodically report a frontier describing the progress they
have made reading the snapshot. In practice, this is the number of rows they have read, per-table.
Below is a set of charts for that these new metrics and the derived percentage would look like in 2 difference scenarios.
These charts make it clear that if we can read from upstream faster than we can process that data, we may present a snapshot_progress
that is artificially inflated. This is considered reasonable for this design because:
The second set of metrics this design document proposes describe the rate at which upstream and Materialize are processing data. These metrics are designed to answer #2 in the problem statement.
| `outstanding_values` | [`uint8`] | The total number of outstanding values in the upstream source. The unit is source-specific. |
| `processed_values` | [`uint8`] | The total number of upstream values Materialized has processed. Same unit as `outstanding_values`. |
These values are primarily designed to be calculated into rates (which will happen client-side), and users should not be expected to understand them as anything else. These rates have units that depend on the source type (replication bytes for Postgres, offsets for Kafka, transactions for MySQL). Note that we may add prometheus metrics for these metrics that are broken down by partition/mysql-source-id/etc.
These rates are also designed to be compared with each other. The following charts show what these metrics will look like, depending on whether Materialize is keeping up or falling behind.
Note that when our process rate is consistently below the outstanding rate, the user knows their source isn't keeping up with upstream.
Each source's SourceReader
implementation will be required to expose a continuous stream of outstanding_values
values. This means:
SELECT @gtid_executed
, which is the set of gtids representing the latest transactions committed,
aggregated as defined above.SELECT pg_current_wal_lsn()
, which is the lsn of the latest transaction.Additionally, the source pipeline will periodically invert the latest frontier we have committed for the source, from the Materialize timestamp
domain to the source-timestamp domain. For MySQL and Postgres sources, this frontier will the meet
of the subsource frontiers (as in,
calculate their minimum). These will be aggregated just like the above.
Existing metrics such as messages_received
and bytes_received
continue to be useful for understanding the general performance of a given source.
If users find some of them confusing or not useful, we can remove them in the future.
N/A for now. This design document is primarily designed to capture the two core metrics we should add to sources, and how to implement them in a feasible way. The attached example charts capture the desired output.
count(*)
be prohibitively expensive, or do we need to use estimates?