Skip to content

Commit

Permalink
Merge pull request #25218 from guswynn/maintained-statistics
Browse files Browse the repository at this point in the history
storage: maintain source and sink statistics through restarts
  • Loading branch information
guswynn authored Feb 20, 2024
2 parents 0504390 + bc34e32 commit e1e595a
Show file tree
Hide file tree
Showing 37 changed files with 1,345 additions and 541 deletions.
14 changes: 7 additions & 7 deletions doc/developer/design/20240108_source_metrics_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

## The Problem

The [existing source statistics](https://materialize.com/docs/sql/system-catalog/mz_internal/#mz_source_statistics_per_worker)
The [existing source statistics](https://materialize.com/docs/sql/system-catalog/mz_internal/#mz_source_statistics_raw)
we expose to users for sources answer _some important questions_, like:
- _Generally_, how fast we are reading data from upstream?
- Have we committed the source's snapshot?
Expand Down Expand Up @@ -84,31 +84,31 @@ The first set of metrics this design document proposes involve _snapshot progres
on the _percentage_ of the source's snapshot Materialize has _read_. These metrics are designed to
answer #1 in [the problem statement](#the-problem).

We will introduce 2 new columns in `mz_source_statistics_per_worker`:
We will introduce 2 new columns in `mz_source_statistics_raw`:

```
| `snapshot_total` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_records_known` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_progress` | [`uint8`] | The number of upstream values Materialize has read so far. |
```

The unit of _values_ depends on the source type, and will be _rows_ for MySQL and Postgres, and _offsets_ for kafka.

These values can be summed across workers and compared (`snapshot_progress / snapshot_total`) to produce
These values can be summed across workers and compared (`snapshot_progress / snapshot_records_known`) to produce
a _lower-bound_ estimate on the % progress we have made reading the source's snapshot.

### Source specifics

The `SourceReader` implementation for each source will be required to produce a `snapshot_total`, as well as
The `SourceReader` implementation for each source will be required to produce a `snapshot_records_known`, as well as
a continuously updated `snapshot_progress` frontier on each worker.

#### Kafka

`snapshot_total` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
`snapshot_records_known` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
and summing across partitions. Similarly, `snapshot_progress` can be derived from the operator's output frontier.

#### Postgres and MySQL

`snapshot_total` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot.
`snapshot_records_known` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot.
Both the Postgres and MySQL implementations will be required to perform this query, per-table, during snapshotting. Note that `count(*)` is not
guaranteed to be cheap on Postgres and MySQL. To avoid this, we will perform this query _concurrently_ with the beginning of
snapshotting, allowing the user to see their source's progress before a percentage can be calculated.
Expand Down
48 changes: 25 additions & 23 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -796,28 +796,27 @@ only includes rows where the current role is a direct or indirect member of `gra
| `grantee` | [`text`] | The role that the privilege was granted to. |
| `privilege_type` | [`text`] | They type of privilege granted. |

### `mz_sink_statistics_per_worker`
### `mz_sink_statistics_raw`

The `mz_sink_statistics_per_worker` table contains statistics for each worker thread of
The `mz_sink_statistics_raw` table contains statistics for each worker thread of
each sink in the system.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
the system are restarted.

<!-- RELATION_SPEC mz_internal.mz_sink_statistics_per_worker -->
<!-- RELATION_SPEC mz_internal.mz_sink_statistics_raw -->
| Field | Type | Meaning |
|----------------------|-----------| -------- |
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sinks.id`](../mz_catalog#mz_sinks). |
| `worker_id` | [`uint8`] | The ID of the worker thread. |
| `messages_staged` | [`uint8`] | The number of messages staged but possibly not committed to the sink. |
| `messages_committed` | [`uint8`] | The number of messages committed to the sink. |
| `bytes_staged` | [`uint8`] | The number of bytes staged but possibly not committed to the sink. This counts both keys and values, if applicable. |
| `bytes_committed` | [`uint8`] | The number of bytes committed to the sink. This counts both keys and values, if applicable. |

### `mz_sink_statistics`

The `mz_sink_statistics` view contains statistics about each sink. It is an aggregated form of `mz_sink_statistics_per_worker`.
The `mz_sink_statistics` view contains statistics about each sink. It is an aggregated form of `mz_sink_statistics_raw`.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
Expand Down Expand Up @@ -864,32 +863,35 @@ messages and additional metadata helpful for debugging.
| `error` | [`text`] | If the sink is in an error state, the error message. |
| `details` | [`jsonb`] | Additional metadata provided by the sink. In case of error, may contain a `hint` field with helpful suggestions. |

### `mz_source_statistics_per_worker`
### `mz_source_statistics_raw`

The `mz_source_statistics_per_worker` table contains statistics for each worker thread of
The `mz_source_statistics_raw` table contains statistics for each worker thread of
each source in the system.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
the system are restarted.

<!-- RELATION_SPEC mz_internal.mz_source_statistics_per_worker -->
| Field | Type | Meaning |
| -------------------------|-------------| -------- |
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
| `worker_id` | [`uint8`] | The ID of the worker thread. |
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
| `messages_received` | [`uint8`] | The number of messages the worker has received from the external system. Messages are counted in a source type-specific manner. Messages do not correspond directly to updates: some messages produce multiple updates, while other messages may be coalesced into a single update. |
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |
<!-- RELATION_SPEC mz_internal.mz_source_statistics_raw -->
| Field | Type | Meaning |
| --------------------------|------------- | -------- |
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
| `messages_received` | [`uint8`] | The number of messages the worker has received from the external system. Messages are counted in a source type-specific manner. Messages do not correspond directly to updates: some messages produce multiple updates, while other messages may be coalesced into a single update. |
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
| `records_indexed` | [`uint8`] | The number of individual records indexed in the source envelope state. |
| `bytes_indexed` | [`uint8`] | The number of bytes indexed in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |
| `snapshot_records_known` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |
| `snapshot_records_staged` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
| `offset_known` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |
| `offset_committed` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |

### `mz_source_statistics`

The `mz_source_statistics` view contains statistics about each source. It is an aggregated form of `mz_source_statistics_per_worker`.
The `mz_source_statistics` view contains statistics about each source. It is an aggregated form of `mz_source_statistics_raw`.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
Expand All @@ -910,8 +912,8 @@ Note that:
| `bytes_received` | [`uint8`] | The number of bytes the source has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the source has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the source has committed to the storage layer. |
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `bytes_indexed` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `records_indexed` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |

### `mz_source_statuses`
Expand Down
Loading

0 comments on commit e1e595a

Please sign in to comment.