Skip to content

Commit

Permalink
storage: codemode statistics names to new names
Browse files Browse the repository at this point in the history
  • Loading branch information
guswynn committed Feb 15, 2024
1 parent b692d3f commit 9d11519
Show file tree
Hide file tree
Showing 21 changed files with 269 additions and 272 deletions.
10 changes: 5 additions & 5 deletions doc/developer/design/20240108_source_metrics_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,28 @@ answer #1 in [the problem statement](#the-problem).
We will introduce 2 new columns in `mz_source_statistics_raw`:

```
| `snapshot_total` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_records_known` | [`uint8`] | The total number of upstream values that are part of the snapshot. |
| `snapshot_progress` | [`uint8`] | The number of upstream values Materialize has read so far. |
```

The unit of _values_ depends on the source type, and will be _rows_ for MySQL and Postgres, and _offsets_ for kafka.

These values can be summed across workers and compared (`snapshot_progress / snapshot_total`) to produce
These values can be summed across workers and compared (`snapshot_progress / snapshot_records_known`) to produce
a _lower-bound_ estimate on the % progress we have made reading the source's snapshot.

### Source specifics

The `SourceReader` implementation for each source will be required to produce a `snapshot_total`, as well as
The `SourceReader` implementation for each source will be required to produce a `snapshot_records_known`, as well as
a continuously updated `snapshot_progress` frontier on each worker.

#### Kafka

`snapshot_total` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
`snapshot_records_known` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator,
and summing across partitions. Similarly, `snapshot_progress` can be derived from the operator's output frontier.

#### Postgres and MySQL

`snapshot_total` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot.
`snapshot_records_known` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot.
Both the Postgres and MySQL implementations will be required to perform this query, per-table, during snapshotting. Note that `count(*)` is not
guaranteed to be cheap on Postgres and MySQL. To avoid this, we will perform this query _concurrently_ with the beginning of
snapshotting, allowing the user to see their source's progress before a percentage can be calculated.
Expand Down
16 changes: 8 additions & 8 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -864,14 +864,14 @@ the system are restarted.
| `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. |
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `records_indexed` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `bytes_indexed` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |
| `snapshot_total` | [`uint8`] | Not yet populated. |
| `snapshot_read` | [`uint8`] | Not yet populated. |
| `snapshot_records_known` | [`uint8`] | Not yet populated. |
| `snapshot_records_staged` | [`uint8`] | Not yet populated. |
| `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. |
| `upstream_values` | [`uint8`] | Not yet populated. |
| `committed_values` | [`uint8`] | Not yet populated. |
| `offset_known` | [`uint8`] | Not yet populated. |
| `offset_committed` | [`uint8`] | Not yet populated. |

### `mz_source_statistics`

Expand All @@ -896,8 +896,8 @@ Note that:
| `bytes_received` | [`uint8`] | The number of bytes the source has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. |
| `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the source has written but not yet committed to the storage layer. |
| `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the source has committed to the storage layer. |
| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `bytes_indexed` | [`uint8`] | The number of bytes stored in the source envelope state. |
| `records_indexed` | [`uint8`] | The number of individual records stored in the source envelope state. |
| `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. |

### `mz_source_statuses`
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6154,8 +6154,8 @@ SELECT
SUM(bytes_received)::uint8 AS bytes_received,
SUM(updates_staged)::uint8 AS updates_staged,
SUM(updates_committed)::uint8 AS updates_committed,
SUM(envelope_state_bytes)::uint8 AS envelope_state_bytes,
SUM(envelope_state_records)::uint8 AS envelope_state_records,
SUM(bytes_indexed)::uint8 AS bytes_indexed,
SUM(records_indexed)::uint8 AS records_indexed,
-- Ensure we aggregate to NULL when not all workers are done rehydrating.
CASE
WHEN bool_or(rehydration_latency IS NULL) THEN NULL
Expand Down
12 changes: 6 additions & 6 deletions src/storage-client/src/statistics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ message ProtoSourceStatisticsUpdate {
uint64 updates_committed = 4;
uint64 bytes_received = 5;

uint64 envelope_state_records = 7;
uint64 envelope_state_bytes = 6;
uint64 records_indexed = 7;
uint64 bytes_indexed = 6;
optional int64 rehydration_latency_ms = 8;
optional uint64 snapshot_total = 9;
optional uint64 snapshot_read = 10;
optional uint64 snapshot_records_known = 9;
optional uint64 snapshot_records_staged = 10;

bool snapshot_committed = 11;
optional uint64 upstream_values = 12;
optional uint64 committed_values = 13;
optional uint64 offset_known = 12;
optional uint64 offset_committed = 13;
}

message ProtoSinkStatisticsUpdate {
Expand Down
Loading

0 comments on commit 9d11519

Please sign in to comment.