Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: maintain source and sink statistics through restarts #25218

Merged
merged 11 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions doc/developer/design/20240108_source_metrics_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

## The Problem

The [existing source statistics](https://materialize.com/docs/sql/system-catalog/mz_internal/#mz_source_statistics_per_worker)
The [existing source statistics](https://materialize.com/docs/sql/system-catalog/mz_internal/#mz_source_statistics_raw)
we expose to users for sources answer _some important questions_, like:
- _Generally_, how fast we are reading data from upstream?
- Have we committed the source's snapshot?
Expand Down Expand Up @@ -84,31 +84,31 @@ The first set of metrics this design document proposes involve _snapshot progres
on the _percentage_ of the source's snapshot Materialize has _read_. These metrics are designed to
answer #1 in [the problem statement](#the-problem).

We will introduce 2 new columns in `mz_source_statistics_per_worker`:
We will introduce 2 new columns in `mz_source_statistics_raw`:

```
| `snapshot_total` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_records_known` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_progress` | [`uint8`] | The number of upstream values Materialize has read so far. |
```

The unit of _values_ depends on the source type, and will be _rows_ for MySQL and Postgres, and _offsets_ for kafka.

These values can be summed across workers and compared (`snapshot_progress / snapshot_total`) to produce
These values can be summed across workers and compared (`snapshot_progress / snapshot_records_known`) to produce
a _lower-bound_ estimate on the % progress we have made reading the source's snapshot.

### Source specifics

The `SourceReader` implementation for each source will be required to produce a `snapshot_total`, as well as
The `SourceReader` implementation for each source will be required to produce a `snapshot_records_known`, as well as
a continuously updated `snapshot_progress` frontier on each worker.

#### Kafka

`snapshot_total` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
`snapshot_records_known` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
and summing across partitions. Similarly, `snapshot_progress` can be derived from the operator's output frontier.

#### Postgres and MySQL

`snapshot_total` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot.
`snapshot_records_known` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot.
Both the Postgres and MySQL implementations will be required to perform this query, per-table, during snapshotting. Note that `count(*)` is not
guaranteed to be cheap on Postgres and MySQL. To avoid this, we will perform this query _concurrently_ with the beginning of
snapshotting, allowing the user to see their source's progress before a percentage can be calculated.
Expand Down
48 changes: 25 additions & 23 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -780,28 +780,27 @@ only includes rows where the current role is a direct or indirect member of `gra
| `grantee` | [`text`] | The role that the privilege was granted to. |
| `privilege_type` | [`text`] | They type of privilege granted. |

### `mz_sink_statistics_per_worker`
### `mz_sink_statistics_raw`

The `mz_sink_statistics_per_worker` table contains statistics for each worker thread of
The `mz_sink_statistics_raw` table contains statistics for each worker thread of
each sink in the system.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
the system are restarted.

<!-- RELATION_SPEC mz_internal.mz_sink_statistics_per_worker -->
<!-- RELATION_SPEC mz_internal.mz_sink_statistics_raw -->
| Field | Type | Meaning |
|----------------------|-----------| -------- |
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sinks.id`](../mz_catalog#mz_sinks). |
| `worker_id` | [`uint8`] | The ID of the worker thread. |
| `messages_staged` | [`uint8`] | The number of messages staged but possibly not committed to the sink. |
| `messages_committed` | [`uint8`] | The number of messages committed to the sink. |
| `bytes_staged` | [`uint8`] | The number of bytes staged but possibly not committed to the sink. This counts both keys and values, if applicable. |
| `bytes_committed` | [`uint8`] | The number of bytes committed to the sink. This counts both keys and values, if applicable. |

### `mz_sink_statistics`

The `mz_sink_statistics` view contains statistics about each sink. It is an aggregated form of `mz_sink_statistics_per_worker`.
The `mz_sink_statistics` view contains statistics about each sink. It is an aggregated form of `mz_sink_statistics_raw`.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
Expand Down Expand Up @@ -848,32 +847,35 @@ messages and additional metadata helpful for debugging.
| `error` | [`text`] | If the sink is in an error state, the error message. |
| `details` | [`jsonb`] | Additional metadata provided by the sink. In case of error, may contain a `hint` field with helpful suggestions. |

### `mz_source_statistics_per_worker`
### `mz_source_statistics_raw`

The `mz_source_statistics_per_worker` table contains statistics for each worker thread of
The `mz_source_statistics_raw` table contains statistics for each worker thread of
each source in the system.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
the system are restarted.

<!-- RELATION_SPEC mz_internal.mz_source_statistics_per_worker -->
| Field | Type | Meaning |
| -------------------------|-------------| -------- |
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
| `worker_id` | [`uint8`] | The ID of the worker thread. |
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
| `messages_received` | [`uint8`] | The number of messages the worker has received from the external system. Messages are counted in a source type-specific manner. Messages do not correspond directly to updates: some messages produce multiple updates, while other messages may be coalesced into a single update. |
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |
<!-- RELATION_SPEC mz_internal.mz_source_statistics_raw -->
| Field | Type | Meaning |
| --------------------------|------------- | -------- |
| `id` | [`text`] | The ID of the source. Corresponds to [`mz_catalog.mz_sources.id`](../mz_catalog#mz_sources). |
| `messages_received` | [`uint8`] | The number of messages the worker has received from the external system. Messages are counted in a source type-specific manner. Messages do not correspond directly to updates: some messages produce multiple updates, while other messages may be coalesced into a single update. |
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
| `records_indexed` | [`uint8`] | The number of individual records indexed in the source envelope state. |
| `bytes_indexed` | [`uint8`] | The number of bytes indexed in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |
| `snapshot_records_known` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |
| `snapshot_records_staged` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
| `offset_known` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |
| `offset_committed` | [`uint8`] | Not yet populated. {{< warn-if-unreleased v0.87 >}} |

### `mz_source_statistics`

The `mz_source_statistics` view contains statistics about each source. It is an aggregated form of `mz_source_statistics_per_worker`.
The `mz_source_statistics` view contains statistics about each source. It is an aggregated form of `mz_source_statistics_raw`.

Materialize does not make any guarantees about the exactness or freshness of
these statistics. They are occasionally reset to zero as internal components of
Expand All @@ -894,8 +896,8 @@ Note that:
| `bytes_received` | [`uint8`] | The number of bytes the source has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the source has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the source has committed to the storage layer. |
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `bytes_indexed` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `records_indexed` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |

### `mz_source_statuses`
Expand Down
Loading