From 9d11519e23ff3be3d08ec811a9868f7711ba7c28 Mon Sep 17 00:00:00 2001 From: Gus Wynn Date: Thu, 15 Feb 2024 13:45:01 -0800 Subject: [PATCH] storage: codemode statistics names to new names --- .../design/20240108_source_metrics_2.md | 10 +- .../content/sql/system-catalog/mz_internal.md | 16 +- src/catalog/src/builtin.rs | 4 +- src/storage-client/src/statistics.proto | 12 +- src/storage-client/src/statistics.rs | 149 +++++++------- src/storage/src/render/upsert/types.rs | 10 +- src/storage/src/source/generator.rs | 12 +- src/storage/src/source/kafka.rs | 24 +-- .../src/source/postgres/replication.rs | 4 +- src/storage/src/source/statistics.rs | 4 +- src/storage/src/source/types.rs | 6 +- src/storage/src/statistics.rs | 194 +++++++++--------- .../autogenerated/mz_internal.slt | 16 +- .../mz_introspection_index_accounting.slt | 16 +- test/testdrive/source-statistics.td | 16 +- test/testdrive/statistics-maintenance.td | 6 +- test/upsert/autospill/02-memory.td | 2 +- test/upsert/autospill/03-rocksdb.td | 16 +- test/upsert/mzcompose.py | 2 +- test/upsert/rehydration/02-source-setup.td | 4 +- .../rehydration/03-after-rehydration.td | 18 +- 21 files changed, 269 insertions(+), 272 deletions(-) diff --git a/doc/developer/design/20240108_source_metrics_2.md b/doc/developer/design/20240108_source_metrics_2.md index a3834d3b026ba..bcef028f1d712 100644 --- a/doc/developer/design/20240108_source_metrics_2.md +++ b/doc/developer/design/20240108_source_metrics_2.md @@ -87,28 +87,28 @@ answer #1 in [the problem statement](#the-problem). We will introduce 2 new columns in `mz_source_statistics_raw`: ``` -| `snapshot_total` | [`uint8`] | The total number of upstream values that are part of the snapshot. | +| `snapshot_records_known` | [`uint8`] | The total number of upstream values that are part of the snapshot. | | `snapshot_progress` | [`uint8`] | The number of upstream values Materialize has read so far. | ``` The unit of _values_ depends on the source type, and will be _rows_ for MySQL and Postgres, and _offsets_ for kafka. -These values can be summed across workers and compared (`snapshot_progress / snapshot_total`) to produce +These values can be summed across workers and compared (`snapshot_progress / snapshot_records_known`) to produce a _lower-bound_ estimate on the % progress we have made reading the source's snapshot. ### Source specifics -The `SourceReader` implementation for each source will be required to produce a `snapshot_total`, as well as +The `SourceReader` implementation for each source will be required to produce a `snapshot_records_known`, as well as a continuously updated `snapshot_progress` frontier on each worker. #### Kafka -`snapshot_total` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator, +`snapshot_records_known` can be be trivially exposed by exposing the snapshot frontier already tracked within its source operator, and summing across partitions. Similarly, `snapshot_progress` can be derived from the operator's output frontier. #### Postgres and MySQL -`snapshot_total` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot. +`snapshot_records_known` will need to be calculated, in the unit of rows by performing `SELECT count(*)` on the tables that participate in the snapshot. Both the Postgres and MySQL implementations will be required to perform this query, per-table, during snapshotting. Note that `count(*)` is not guaranteed to be cheap on Postgres and MySQL. To avoid this, we will perform this query _concurrently_ with the beginning of snapshotting, allowing the user to see their source's progress before a percentage can be calculated. diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index b72d11d367785..7ddd31b0a8e73 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -864,14 +864,14 @@ the system are restarted. | `bytes_received` | [`uint8`] | The number of bytes the worker has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. | | `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the worker has written but not yet committed to the storage layer. | | `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the worker has committed to the storage layer. | -| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. | -| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. | +| `records_indexed` | [`uint8`] | The number of individual records stored in the source envelope state. | +| `bytes_indexed` | [`uint8`] | The number of bytes stored in the source envelope state. | | `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. | -| `snapshot_total` | [`uint8`] | Not yet populated. | -| `snapshot_read` | [`uint8`] | Not yet populated. | +| `snapshot_records_known` | [`uint8`] | Not yet populated. | +| `snapshot_records_staged` | [`uint8`] | Not yet populated. | | `snapshot_committed` | [`boolean`] | Whether the worker has committed the initial snapshot for a source. | -| `upstream_values` | [`uint8`] | Not yet populated. | -| `committed_values` | [`uint8`] | Not yet populated. | +| `offset_known` | [`uint8`] | Not yet populated. | +| `offset_committed` | [`uint8`] | Not yet populated. | ### `mz_source_statistics` @@ -896,8 +896,8 @@ Note that: | `bytes_received` | [`uint8`] | The number of bytes the source has read from the external system. Bytes are counted in a source type-specific manner and may or may not include protocol overhead. | | `updates_staged` | [`uint8`] | The number of updates (insertions plus deletions) the source has written but not yet committed to the storage layer. | | `updates_committed` | [`uint8`] | The number of updates (insertions plus deletions) the source has committed to the storage layer. | -| `envelope_state_bytes` | [`uint8`] | The number of bytes stored in the source envelope state. | -| `envelope_state_records` | [`uint8`] | The number of individual records stored in the source envelope state. | +| `bytes_indexed` | [`uint8`] | The number of bytes stored in the source envelope state. | +| `records_indexed` | [`uint8`] | The number of individual records stored in the source envelope state. | | `rehydration_latency` | [`interval`] | The amount of time it took for the worker to rehydrate the source envelope state. | ### `mz_source_statuses` diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index b2e684dcff5a0..6dacb63bc7f4c 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -6154,8 +6154,8 @@ SELECT SUM(bytes_received)::uint8 AS bytes_received, SUM(updates_staged)::uint8 AS updates_staged, SUM(updates_committed)::uint8 AS updates_committed, - SUM(envelope_state_bytes)::uint8 AS envelope_state_bytes, - SUM(envelope_state_records)::uint8 AS envelope_state_records, + SUM(bytes_indexed)::uint8 AS bytes_indexed, + SUM(records_indexed)::uint8 AS records_indexed, -- Ensure we aggregate to NULL when not all workers are done rehydrating. CASE WHEN bool_or(rehydration_latency IS NULL) THEN NULL diff --git a/src/storage-client/src/statistics.proto b/src/storage-client/src/statistics.proto index 0496ce6f01817..5a3aab3a05828 100644 --- a/src/storage-client/src/statistics.proto +++ b/src/storage-client/src/statistics.proto @@ -25,15 +25,15 @@ message ProtoSourceStatisticsUpdate { uint64 updates_committed = 4; uint64 bytes_received = 5; - uint64 envelope_state_records = 7; - uint64 envelope_state_bytes = 6; + uint64 records_indexed = 7; + uint64 bytes_indexed = 6; optional int64 rehydration_latency_ms = 8; - optional uint64 snapshot_total = 9; - optional uint64 snapshot_read = 10; + optional uint64 snapshot_records_known = 9; + optional uint64 snapshot_records_staged = 10; bool snapshot_committed = 11; - optional uint64 upstream_values = 12; - optional uint64 committed_values = 13; + optional uint64 offset_known = 12; + optional uint64 offset_committed = 13; } message ProtoSinkStatisticsUpdate { diff --git a/src/storage-client/src/statistics.rs b/src/storage-client/src/statistics.rs index 5a36cf5456465..094b856176770 100644 --- a/src/storage-client/src/statistics.rs +++ b/src/storage-client/src/statistics.rs @@ -48,10 +48,10 @@ pub static MZ_SOURCE_STATISTICS_RAW_DESC: Lazy = Lazy::new(|| { // // A gauge of the number of records in the envelope state. 0 for sources // Resetted when the source is restarted, for any reason. - .with_column("envelope_state_records", ScalarType::UInt64.nullable(false)) + .with_column("records_indexed", ScalarType::UInt64.nullable(false)) // A gauge of the number of bytes in the envelope state. 0 for sources // Resetted when the source is restarted, for any reason. - .with_column("envelope_state_bytes", ScalarType::UInt64.nullable(false)) + .with_column("bytes_indexed", ScalarType::UInt64.nullable(false)) // A gauge that shows the duration of rehydration. `NULL` before rehydration // is done. // Resetted when the source is restarted, for any reason. @@ -63,14 +63,14 @@ pub static MZ_SOURCE_STATISTICS_RAW_DESC: Lazy = Lazy::new(|| { // (like pg and mysql) may repopulate this column when tables are added. // // `NULL` while we discover the snapshot size. - .with_column("snapshot_total", ScalarType::UInt64.nullable(true)) + .with_column("snapshot_records_known", ScalarType::UInt64.nullable(true)) // A gauge of the number of _values_ (source defined unit) we have read of the _snapshot_ // of this source. // Sometimes resetted when the source can snapshot new pieces of upstream (like Postgres and // MySql). // // `NULL` while we discover the snapshot size. - .with_column("snapshot_read", ScalarType::UInt64.nullable(true)) + .with_column("snapshot_records_staged", ScalarType::UInt64.nullable(true)) // // Non-resetting gauges // @@ -81,10 +81,10 @@ pub static MZ_SOURCE_STATISTICS_RAW_DESC: Lazy = Lazy::new(|| { // // A gauge of the number of _values_ (source defined unit) available to be read from upstream. // Never resets. Not to be confused with any of the counters above. - .with_column("upstream_values", ScalarType::UInt64.nullable(false)) + .with_column("offset_known", ScalarType::UInt64.nullable(false)) // A gauge of the number of _values_ (source defined unit) we have committed. // Never resets. Not to be confused with any of the counters above. - .with_column("committed_values", ScalarType::UInt64.nullable(false)) + .with_column("offset_committed", ScalarType::UInt64.nullable(false)) }); pub static MZ_SINK_STATISTICS_RAW_DESC: Lazy = Lazy::new(|| { @@ -375,15 +375,15 @@ pub struct SourceStatisticsUpdate { pub updates_staged: Counter, pub updates_committed: Counter, - pub envelope_state_records: Gauge, - pub envelope_state_bytes: Gauge, + pub records_indexed: Gauge, + pub bytes_indexed: Gauge, pub rehydration_latency_ms: Gauge, - pub snapshot_total: Gauge, - pub snapshot_read: Gauge, + pub snapshot_records_known: Gauge, + pub snapshot_records_staged: Gauge, pub snapshot_committed: Gauge, - pub upstream_values: SkippableGauge, - pub committed_values: SkippableGauge, + pub offset_known: SkippableGauge, + pub offset_committed: SkippableGauge, } impl SourceStatisticsUpdate { @@ -397,18 +397,18 @@ impl SourceStatisticsUpdate { updates_staged: 0.into(), updates_committed: 0.into(), - envelope_state_records: Gauge::gauge(0), - envelope_state_bytes: Gauge::gauge(0), + records_indexed: Gauge::gauge(0), + bytes_indexed: Gauge::gauge(0), snapshot_committed: Gauge::gauge(true), // These are `Some(0)` and not `None` so `merge`-ing them // does not produce `None` if all workers produced values. rehydration_latency_ms: Gauge::gauge(Some(0)), - snapshot_total: Gauge::gauge(Some(0)), - snapshot_read: Gauge::gauge(Some(0)), + snapshot_records_known: Gauge::gauge(Some(0)), + snapshot_records_staged: Gauge::gauge(Some(0)), - upstream_values: SkippableGauge::gauge(Some(0)), - committed_values: SkippableGauge::gauge(Some(0)), + offset_known: SkippableGauge::gauge(Some(0)), + offset_committed: SkippableGauge::gauge(Some(0)), } } @@ -418,14 +418,14 @@ impl SourceStatisticsUpdate { bytes_received, updates_staged, updates_committed, - envelope_state_records, - envelope_state_bytes, + records_indexed, + bytes_indexed, rehydration_latency_ms, - snapshot_total, - snapshot_read, + snapshot_records_known, + snapshot_records_staged, snapshot_committed, - upstream_values, - committed_values, + offset_known, + offset_committed, .. } = self; @@ -433,14 +433,14 @@ impl SourceStatisticsUpdate { bytes_received.merge(other.bytes_received); updates_staged.merge(other.updates_staged); updates_committed.merge(other.updates_committed); - envelope_state_records.merge(other.envelope_state_records); - envelope_state_bytes.merge(other.envelope_state_bytes); + records_indexed.merge(other.records_indexed); + bytes_indexed.merge(other.bytes_indexed); rehydration_latency_ms.merge(other.rehydration_latency_ms); - snapshot_total.merge(other.snapshot_total); - snapshot_read.merge(other.snapshot_read); + snapshot_records_known.merge(other.snapshot_records_known); + snapshot_records_staged.merge(other.snapshot_records_staged); snapshot_committed.merge(other.snapshot_committed); - upstream_values.merge(other.upstream_values); - committed_values.merge(other.committed_values); + offset_known.merge(other.offset_known); + offset_committed.merge(other.offset_committed); } pub fn incorporate(&mut self, other: SourceStatisticsUpdate) { @@ -449,14 +449,14 @@ impl SourceStatisticsUpdate { bytes_received, updates_staged, updates_committed, - envelope_state_records, - envelope_state_bytes, + records_indexed, + bytes_indexed, rehydration_latency_ms, - snapshot_total, - snapshot_read, + snapshot_records_known, + snapshot_records_staged, snapshot_committed, - upstream_values, - committed_values, + offset_known, + offset_committed, .. } = self; @@ -465,14 +465,14 @@ impl SourceStatisticsUpdate { bytes_received: other_bytes_received, updates_staged: other_updates_staged, updates_committed: other_updates_committed, - envelope_state_records: other_envelope_state_records, - envelope_state_bytes: other_envelope_state_bytes, + records_indexed: other_records_indexed, + bytes_indexed: other_bytes_indexed, rehydration_latency_ms: other_rehydration_latency_ms, - snapshot_total: other_snapshot_total, - snapshot_read: other_snapshot_read, + snapshot_records_known: other_snapshot_records_known, + snapshot_records_staged: other_snapshot_records_staged, snapshot_committed: other_snapshot_committed, - upstream_values: other_upstream_values, - committed_values: other_committed_values, + offset_known: other_offset_known, + offset_committed: other_offset_committed, .. } = other; @@ -480,14 +480,15 @@ impl SourceStatisticsUpdate { bytes_received.incorporate(other_bytes_received, "bytes_received"); updates_staged.incorporate(other_updates_staged, "updates_staged"); updates_committed.incorporate(other_updates_committed, "updates_committed"); - envelope_state_records.incorporate(other_envelope_state_records, "envelope_state_records"); - envelope_state_bytes.incorporate(other_envelope_state_bytes, "envelope_state_bytes"); + records_indexed.incorporate(other_records_indexed, "records_indexed"); + bytes_indexed.incorporate(other_bytes_indexed, "bytes_indexed"); rehydration_latency_ms.incorporate(other_rehydration_latency_ms, "rehydration_latency_ms"); - snapshot_total.incorporate(other_snapshot_total, "snapshot_total"); - snapshot_read.incorporate(other_snapshot_read, "snapshot_read"); + snapshot_records_known.incorporate(other_snapshot_records_known, "snapshot_records_known"); + snapshot_records_staged + .incorporate(other_snapshot_records_staged, "snapshot_records_staged"); snapshot_committed.incorporate(other_snapshot_committed, "snapshot_committed"); - upstream_values.incorporate(other_upstream_values, "upstream_values"); - committed_values.incorporate(other_committed_values, "committed_values"); + offset_known.incorporate(other_offset_known, "offset_known"); + offset_committed.incorporate(other_offset_committed, "offset_committed"); } } @@ -502,20 +503,20 @@ impl PackableStats for SourceStatisticsUpdate { packer.push(Datum::from(self.updates_staged.0)); packer.push(Datum::from(self.updates_committed.0)); // Resetting gauges. - packer.push(Datum::from(self.envelope_state_records.0 .0)); - packer.push(Datum::from(self.envelope_state_bytes.0 .0)); + packer.push(Datum::from(self.records_indexed.0 .0)); + packer.push(Datum::from(self.bytes_indexed.0 .0)); let rehydration_latency = self .rehydration_latency_ms .0 .0 .map(|ms| mz_repr::adt::interval::Interval::new(0, 0, ms * 1000)); packer.push(Datum::from(rehydration_latency)); - packer.push(Datum::from(self.snapshot_total.0 .0)); - packer.push(Datum::from(self.snapshot_read.0 .0)); + packer.push(Datum::from(self.snapshot_records_known.0 .0)); + packer.push(Datum::from(self.snapshot_records_staged.0 .0)); // Gauges packer.push(Datum::from(self.snapshot_committed.0 .0)); - packer.push(Datum::from(self.upstream_values.pack().0)); - packer.push(Datum::from(self.committed_values.pack().0)); + packer.push(Datum::from(self.offset_known.pack().0)); + packer.push(Datum::from(self.offset_committed.pack().0)); } fn unpack(row: Row) -> (GlobalId, Self) { @@ -528,19 +529,23 @@ impl PackableStats for SourceStatisticsUpdate { updates_staged: iter.next().unwrap().unwrap_uint64().into(), updates_committed: iter.next().unwrap().unwrap_uint64().into(), - envelope_state_records: Gauge::gauge(iter.next().unwrap().unwrap_uint64()), - envelope_state_bytes: Gauge::gauge(iter.next().unwrap().unwrap_uint64()), + records_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()), + bytes_indexed: Gauge::gauge(iter.next().unwrap().unwrap_uint64()), rehydration_latency_ms: Gauge::gauge( >::try_from(iter.next().unwrap()) .unwrap() .map(|int| int.micros), ), - snapshot_total: Gauge::gauge(>::try_from(iter.next().unwrap()).unwrap()), - snapshot_read: Gauge::gauge(>::try_from(iter.next().unwrap()).unwrap()), + snapshot_records_known: Gauge::gauge( + >::try_from(iter.next().unwrap()).unwrap(), + ), + snapshot_records_staged: Gauge::gauge( + >::try_from(iter.next().unwrap()).unwrap(), + ), snapshot_committed: Gauge::gauge(iter.next().unwrap().unwrap_bool()), - upstream_values: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())), - committed_values: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())), + offset_known: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())), + offset_committed: SkippableGauge::gauge(Some(iter.next().unwrap().unwrap_uint64())), }; (s.id, s) @@ -557,15 +562,15 @@ impl RustType for SourceStatisticsUpdate { updates_staged: self.updates_staged.0, updates_committed: self.updates_committed.0, - envelope_state_records: self.envelope_state_records.0 .0, - envelope_state_bytes: self.envelope_state_bytes.0 .0, + records_indexed: self.records_indexed.0 .0, + bytes_indexed: self.bytes_indexed.0 .0, rehydration_latency_ms: self.rehydration_latency_ms.0 .0, - snapshot_total: self.snapshot_total.0 .0, - snapshot_read: self.snapshot_read.0 .0, + snapshot_records_known: self.snapshot_records_known.0 .0, + snapshot_records_staged: self.snapshot_records_staged.0 .0, snapshot_committed: self.snapshot_committed.0 .0, - upstream_values: self.upstream_values.0.clone().map(|i| i.0), - committed_values: self.committed_values.0.clone().map(|i| i.0), + offset_known: self.offset_known.0.clone().map(|i| i.0), + offset_committed: self.offset_committed.0.clone().map(|i| i.0), } } @@ -580,15 +585,15 @@ impl RustType for SourceStatisticsUpdate { updates_staged: Counter(proto.updates_staged), updates_committed: Counter(proto.updates_committed), - envelope_state_records: Gauge::gauge(proto.envelope_state_records), - envelope_state_bytes: Gauge::gauge(proto.envelope_state_bytes), + records_indexed: Gauge::gauge(proto.records_indexed), + bytes_indexed: Gauge::gauge(proto.bytes_indexed), rehydration_latency_ms: Gauge::gauge(proto.rehydration_latency_ms), - snapshot_total: Gauge::gauge(proto.snapshot_total), - snapshot_read: Gauge::gauge(proto.snapshot_read), + snapshot_records_known: Gauge::gauge(proto.snapshot_records_known), + snapshot_records_staged: Gauge::gauge(proto.snapshot_records_staged), snapshot_committed: Gauge::gauge(proto.snapshot_committed), - upstream_values: SkippableGauge::gauge(proto.upstream_values), - committed_values: SkippableGauge::gauge(proto.committed_values), + offset_known: SkippableGauge::gauge(proto.offset_known), + offset_committed: SkippableGauge::gauge(proto.offset_committed), }) } } diff --git a/src/storage/src/render/upsert/types.rs b/src/storage/src/render/upsert/types.rs index 5750e606200a3..0930586a6f6e8 100644 --- a/src/storage/src/render/upsert/types.rs +++ b/src/storage/src/render/upsert/types.rs @@ -863,10 +863,9 @@ where .rehydration_updates .set(self.snapshot_stats.updates); // These `set_` functions also ensure that these values are non-negative. + self.stats.set_bytes_indexed(self.snapshot_stats.size_diff); self.stats - .set_envelope_state_bytes(self.snapshot_stats.size_diff); - self.stats - .set_envelope_state_records(self.snapshot_stats.values_diff); + .set_records_indexed(self.snapshot_stats.values_diff); if completed { if self.shrink_upsert_unused_buffers_by_ratio > 0 { @@ -919,9 +918,8 @@ where self.worker_metrics.upsert_updates.inc_by(stats.updates); self.worker_metrics.upsert_deletes.inc_by(stats.deletes); - self.stats.update_envelope_state_bytes_by(stats.size_diff); - self.stats - .update_envelope_state_records_by(stats.values_diff); + self.stats.update_bytes_indexed_by(stats.size_diff); + self.stats.update_records_indexed_by(stats.values_diff); Ok(()) } diff --git a/src/storage/src/source/generator.rs b/src/storage/src/source/generator.rs index 816f3fab79823..f4b3283f7b633 100644 --- a/src/storage/src/source/generator.rs +++ b/src/storage/src/source/generator.rs @@ -117,8 +117,8 @@ impl SourceRender for LoadGeneratorSourceConnection { let mut resume_uppers = std::pin::pin!(resume_uppers); let mut statistics = ProgressStatisticsUpdate { - upstream_values: 0, - committed_values: 0, + offset_known: 0, + offset_committed: 0, }; while let Some((output, event)) = rows.next() { @@ -140,8 +140,8 @@ impl SourceRender for LoadGeneratorSourceConnection { data_output.give(&cap, (message, offset, diff)).await; } - if offset.offset > statistics.upstream_values { - statistics.upstream_values = offset.offset; + if offset.offset > statistics.offset_known { + statistics.offset_known = offset.offset; } } Event::Progress(Some(offset)) => { @@ -163,10 +163,10 @@ impl SourceRender for LoadGeneratorSourceConnection { Some(frontier) = resume_uppers.next() => { if let Some(offset) = frontier.as_option() { let total = offset.offset.saturating_sub(1); - if total > statistics.committed_values{ + if total > statistics.offset_committed{ // Note we don't subtract from the upper, as we // want to report total number of offsets we have processed. - statistics.committed_values = total; + statistics.offset_committed = total; } } } diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index c60a364c22b6e..b63a8d29b8506 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -102,11 +102,11 @@ pub struct KafkaSourceReader { } /// A partially-filled version of `ProgressStatisticsUpdate`. This allows us to -/// only emit updates when `upstream_values` is updated by the metadata thread. +/// only emit updates when `offset_known` is updated by the metadata thread. #[derive(Default)] struct PartialProgressStatistics { - upstream_values: Option, - committed_values: Option, + offset_known: Option, + offset_committed: Option, } struct PartitionCapability { @@ -564,7 +564,7 @@ impl SourceRender for KafkaSourceConnection { .progress_statistics .lock() .expect("poisoned") - .upstream_values = Some(upstream_stat); + .offset_known = Some(upstream_stat); data_cap.downgrade(&future_ts); progress_cap.downgrade(&future_ts); prev_pid_info = Some(partitions); @@ -712,28 +712,28 @@ impl SourceRender for KafkaSourceConnection { .await; } - // If we have a new `upstream_values` from the partition metadata thread, and + // If we have a new `offset_known` from the partition metadata thread, and // `committed` from reading the `resume_uppers` stream, we can emit a // progress stats update. let progress_statistics = { let mut stats = reader.progress_statistics.lock().expect("poisoned"); - if stats.committed_values.is_some() && stats.upstream_values.is_some() { + if stats.offset_committed.is_some() && stats.offset_known.is_some() { Some(( - stats.upstream_values.take().unwrap(), - stats.committed_values.take().unwrap(), + stats.offset_known.take().unwrap(), + stats.offset_committed.take().unwrap(), )) } else { None } }; - if let Some((upstream_values, committed_values)) = progress_statistics { + if let Some((offset_known, offset_committed)) = progress_statistics { stats_output .give( &stats_cap, ProgressStatisticsUpdate { - committed_values, - upstream_values, + offset_committed, + offset_known, }, ) .await; @@ -789,7 +789,7 @@ impl KafkaResumeUpperProcessor { self.progress_statistics .lock() .expect("poisoned") - .committed_values = Some(progress_stat); + .offset_committed = Some(progress_stat); if !offsets.is_empty() { let mut tpl = TopicPartitionList::new(); diff --git a/src/storage/src/source/postgres/replication.rs b/src/storage/src/source/postgres/replication.rs index 442bde1908a6b..577a229f21ca3 100644 --- a/src/storage/src/source/postgres/replication.rs +++ b/src/storage/src/source/postgres/replication.rs @@ -580,8 +580,8 @@ async fn raw_stream<'a>( ProgressStatisticsUpdate { // Similar to the kafka source, we don't subtract 1 from the upper as we want to report the // _number of bytes_ we have processed/in upstream. - upstream_values: upstream_stat.offset, - committed_values: last_committed_upper.offset, + offset_known: upstream_stat.offset, + offset_committed: last_committed_upper.offset, }, ) .await; diff --git a/src/storage/src/source/statistics.rs b/src/storage/src/source/statistics.rs index fc2737655665c..10cc957a95795 100644 --- a/src/storage/src/source/statistics.rs +++ b/src/storage/src/source/statistics.rs @@ -47,8 +47,8 @@ pub fn process_statistics( ); for d in data { - source_statistics.set_upstream_values(d.upstream_values); - source_statistics.set_committed_values(d.committed_values); + source_statistics.set_offset_known(d.offset_known); + source_statistics.set_offset_committed(d.offset_committed); } } }); diff --git a/src/storage/src/source/types.rs b/src/storage/src/source/types.rs index 65587644987fb..563ecbc2e8f2b 100644 --- a/src/storage/src/source/types.rs +++ b/src/storage/src/source/types.rs @@ -28,15 +28,15 @@ use crate::healthcheck::{HealthStatusMessage, StatusNamespace}; use crate::source::RawSourceCreationConfig; /// An update produced by implementors of `SourceRender` that presents an _aggregated_ -/// description of the number of _committed_values_ and _upstream_values_ for the given +/// description of the number of _offset_committed_ and _offset_known_ for the given /// source. /// /// The aggregate is required to be a 64 bit unsigned integer, whose units are /// implementation-defined. #[derive(Clone, Debug)] pub struct ProgressStatisticsUpdate { - pub upstream_values: u64, - pub committed_values: u64, + pub offset_known: u64, + pub offset_committed: u64, } /// Describes a source that can render itself in a timely scope. diff --git a/src/storage/src/statistics.rs b/src/storage/src/statistics.rs index 00e292c30af78..f46950e26d42f 100644 --- a/src/storage/src/statistics.rs +++ b/src/storage/src/statistics.rs @@ -43,13 +43,13 @@ pub(crate) struct SourceStatisticsMetricDefs { // Gauges pub(crate) snapshot_committed: UIntGaugeVec, - pub(crate) envelope_state_bytes: UIntGaugeVec, - pub(crate) envelope_state_records: UIntGaugeVec, + pub(crate) bytes_indexed: UIntGaugeVec, + pub(crate) records_indexed: UIntGaugeVec, pub(crate) rehydration_latency_ms: IntGaugeVec, // statistics that are not yet exposed to users. - pub(crate) upstream_values: UIntGaugeVec, - pub(crate) committed_values: UIntGaugeVec, + pub(crate) offset_known: UIntGaugeVec, + pub(crate) offset_committed: UIntGaugeVec, } impl SourceStatisticsMetricDefs { @@ -80,13 +80,13 @@ impl SourceStatisticsMetricDefs { help: "The number of bytes worth of messages the worker has received from upstream. The way the bytes are counted is source-specific.", var_labels: ["source_id", "worker_id", "parent_source_id"], )), - envelope_state_bytes: registry.register(metric!( - name: "mz_source_envelope_state_bytes", + bytes_indexed: registry.register(metric!( + name: "mz_source_bytes_indexed", help: "The number of bytes of the source envelope state kept. This will be specific to the envelope in use.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], )), - envelope_state_records: registry.register(metric!( - name: "mz_source_envelope_state_records", + records_indexed: registry.register(metric!( + name: "mz_source_records_indexed", help: "The number of records in the source envelope state. This will be specific to the envelope in use", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id"], )), @@ -95,13 +95,13 @@ impl SourceStatisticsMetricDefs { help: "The amount of time in milliseconds it took for the worker to rehydrate the source envelope state. This will be specific to the envelope in use.", var_labels: ["source_id", "worker_id", "parent_source_id", "shard_id", "envelope"], )), - upstream_values: registry.register(metric!( - name: "mz_source_upstream_values", + offset_known: registry.register(metric!( + name: "mz_source_offset_known", help: "The total number of _values_ (source-defined unit) present in upstream.", var_labels: ["source_id", "worker_id", "shard_id"], )), - committed_values: registry.register(metric!( - name: "mz_source_committed_values", + offset_committed: registry.register(metric!( + name: "mz_source_offset_committed", help: "The total number of _values_ (source-defined unit) we have fully processed, and storage and committed.", var_labels: ["source_id", "worker_id", "shard_id"], )), @@ -120,13 +120,13 @@ pub struct SourceStatisticsMetrics { // Gauges pub(crate) snapshot_committed: DeleteOnDropGauge<'static, AtomicU64, Vec>, - pub(crate) envelope_state_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec>, - pub(crate) envelope_state_records: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub(crate) bytes_indexed: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub(crate) records_indexed: DeleteOnDropGauge<'static, AtomicU64, Vec>, pub(crate) rehydration_latency_ms: DeleteOnDropGauge<'static, AtomicI64, Vec>, // statistics that are not yet exposed to users. - pub(crate) upstream_values: DeleteOnDropGauge<'static, AtomicU64, Vec>, - pub(crate) committed_values: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub(crate) offset_known: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub(crate) offset_committed: DeleteOnDropGauge<'static, AtomicU64, Vec>, } impl SourceStatisticsMetrics { @@ -174,13 +174,13 @@ impl SourceStatisticsMetrics { worker_id.to_string(), parent_source_id.to_string(), ]), - envelope_state_bytes: defs.envelope_state_bytes.get_delete_on_drop_gauge(vec![ + bytes_indexed: defs.bytes_indexed.get_delete_on_drop_gauge(vec![ id.to_string(), worker_id.to_string(), parent_source_id.to_string(), shard.clone(), ]), - envelope_state_records: defs.envelope_state_records.get_delete_on_drop_gauge(vec![ + records_indexed: defs.records_indexed.get_delete_on_drop_gauge(vec![ id.to_string(), worker_id.to_string(), parent_source_id.to_string(), @@ -193,12 +193,12 @@ impl SourceStatisticsMetrics { shard.clone(), envelope.to_string(), ]), - upstream_values: defs.upstream_values.get_delete_on_drop_gauge(vec![ + offset_known: defs.offset_known.get_delete_on_drop_gauge(vec![ id.to_string(), worker_id.to_string(), parent_source_id.to_string(), ]), - committed_values: defs.committed_values.get_delete_on_drop_gauge(vec![ + offset_committed: defs.offset_committed.get_delete_on_drop_gauge(vec![ id.to_string(), worker_id.to_string(), parent_source_id.to_string(), @@ -344,19 +344,19 @@ pub struct SourceStatisticsRecord { // Gauges are always wrapped in an `Option` that represents if that gauge has been // initialized by that worker. - envelope_state_records: Option, - envelope_state_bytes: Option, + records_indexed: Option, + bytes_indexed: Option, // This field is nullable, so its value is an `Option`. rehydration_latency_ms: Option>, // The following fields are able to be unset when shipped to the controller, so their // values are `Option`'s - snapshot_total: Option>, - snapshot_read: Option>, + snapshot_records_known: Option>, + snapshot_records_staged: Option>, snapshot_committed: Option, - upstream_values: Option>, - committed_values: Option>, + offset_known: Option>, + offset_committed: Option>, } impl SourceStatisticsRecord { @@ -372,14 +372,14 @@ impl SourceStatisticsRecord { self.snapshot_committed = None; // We consider these gauges always initialized - self.envelope_state_bytes = Some(0); - self.envelope_state_records = Some(0); + self.bytes_indexed = Some(0); + self.records_indexed = Some(0); // We don't yet populate these, so we consider the initialized (with an empty value). - self.snapshot_total = Some(None); - self.snapshot_read = Some(None); - self.upstream_values = Some(None); - self.committed_values = Some(None); + self.snapshot_records_known = Some(None); + self.snapshot_records_staged = Some(None); + self.offset_known = Some(None); + self.offset_committed = Some(None); } /// Reset counters so that we continue to ship diffs to the controller. @@ -410,14 +410,14 @@ impl SourceStatisticsRecord { bytes_received, updates_staged, updates_committed, - envelope_state_records, - envelope_state_bytes, + records_indexed, + bytes_indexed, rehydration_latency_ms, - snapshot_total, - snapshot_read, + snapshot_records_known, + snapshot_records_staged, snapshot_committed, - upstream_values, - committed_values, + offset_known, + offset_committed, } = self.clone(); SourceStatisticsUpdate { @@ -426,14 +426,14 @@ impl SourceStatisticsRecord { bytes_received: bytes_received.into(), updates_staged: updates_staged.into(), updates_committed: updates_committed.into(), - envelope_state_records: Gauge::gauge(envelope_state_records.unwrap()), - envelope_state_bytes: Gauge::gauge(envelope_state_bytes.unwrap()), + records_indexed: Gauge::gauge(records_indexed.unwrap()), + bytes_indexed: Gauge::gauge(bytes_indexed.unwrap()), rehydration_latency_ms: Gauge::gauge(rehydration_latency_ms.unwrap()), - snapshot_total: Gauge::gauge(snapshot_total.unwrap()), - snapshot_read: Gauge::gauge(snapshot_read.unwrap()), + snapshot_records_known: Gauge::gauge(snapshot_records_known.unwrap()), + snapshot_records_staged: Gauge::gauge(snapshot_records_staged.unwrap()), snapshot_committed: Gauge::gauge(snapshot_committed.unwrap()), - upstream_values: SkippableGauge::gauge(upstream_values.unwrap()), - committed_values: SkippableGauge::gauge(committed_values.unwrap()), + offset_known: SkippableGauge::gauge(offset_known.unwrap()), + offset_committed: SkippableGauge::gauge(offset_committed.unwrap()), } } } @@ -523,14 +523,14 @@ impl SourceStatistics { updates_staged: 0, updates_committed: 0, bytes_received: 0, - envelope_state_records: Some(0), - envelope_state_bytes: Some(0), + records_indexed: Some(0), + bytes_indexed: Some(0), rehydration_latency_ms: None, - snapshot_read: Some(None), - snapshot_total: Some(None), + snapshot_records_staged: Some(None), + snapshot_records_known: Some(None), snapshot_committed: None, - upstream_values: Some(None), - committed_values: Some(None), + offset_known: Some(None), + offset_committed: Some(None), }, prom: SourceStatisticsMetrics::new( metrics, @@ -560,14 +560,14 @@ impl SourceStatistics { match &cur.stats { SourceStatisticsRecord { - envelope_state_records: Some(_), - envelope_state_bytes: Some(_), + records_indexed: Some(_), + bytes_indexed: Some(_), rehydration_latency_ms: Some(_), - snapshot_total: Some(_), - snapshot_read: Some(_), + snapshot_records_known: Some(_), + snapshot_records_staged: Some(_), snapshot_committed: Some(_), - upstream_values: Some(_), - committed_values: Some(_), + offset_known: Some(_), + offset_committed: Some(_), .. } => { let ret = Some(cur.stats.clone()); @@ -624,84 +624,78 @@ impl SourceStatistics { cur.prom.bytes_received.inc_by(value); } - /// Update the `envelope_state_bytes` stat. + /// Update the `bytes_indexed` stat. /// A positive value will add and a negative value will subtract. - pub fn update_envelope_state_bytes_by(&self, value: i64) { + pub fn update_bytes_indexed_by(&self, value: i64) { let mut cur = self.stats.borrow_mut(); if let Some(updated) = cur .stats - .envelope_state_bytes + .bytes_indexed .unwrap_or(0) .checked_add_signed(value) { - cur.stats.envelope_state_bytes = Some(updated); - cur.prom.envelope_state_bytes.set(updated); + cur.stats.bytes_indexed = Some(updated); + cur.prom.bytes_indexed.set(updated); } else { - let envelope_state_bytes = cur.stats.envelope_state_bytes.unwrap_or(0); + let bytes_indexed = cur.stats.bytes_indexed.unwrap_or(0); tracing::warn!( - "Unexpected u64 overflow while updating envelope_state_bytes value {} with {}", - envelope_state_bytes, + "Unexpected u64 overflow while updating bytes_indexed value {} with {}", + bytes_indexed, value ); - cur.stats.envelope_state_bytes = Some(0); - cur.prom.envelope_state_bytes.set(0); + cur.stats.bytes_indexed = Some(0); + cur.prom.bytes_indexed.set(0); } } - /// Set the `envelope_state_bytes` to the given value - pub fn set_envelope_state_bytes(&self, value: i64) { + /// Set the `bytes_indexed` to the given value + pub fn set_bytes_indexed(&self, value: i64) { let mut cur = self.stats.borrow_mut(); let value = if value < 0 { - tracing::warn!( - "Unexpected negative value for envelope_state_bytes {}", - value - ); + tracing::warn!("Unexpected negative value for bytes_indexed {}", value); 0 } else { value.unsigned_abs() }; - cur.stats.envelope_state_bytes = Some(value); - cur.prom.envelope_state_bytes.set(value); + cur.stats.bytes_indexed = Some(value); + cur.prom.bytes_indexed.set(value); } - /// Update the `envelope_state_records` stat. + /// Update the `records_indexed` stat. /// A positive value will add and a negative value will subtract. - pub fn update_envelope_state_records_by(&self, value: i64) { + pub fn update_records_indexed_by(&self, value: i64) { let mut cur = self.stats.borrow_mut(); if let Some(updated) = cur .stats - .envelope_state_records + .records_indexed .unwrap_or(0) .checked_add_signed(value) { - cur.stats.envelope_state_records = Some(updated); - cur.prom.envelope_state_records.set(updated); + cur.stats.records_indexed = Some(updated); + cur.prom.records_indexed.set(updated); } else { - let envelope_state_records = cur.stats.envelope_state_records.unwrap_or(0); + let records_indexed = cur.stats.records_indexed.unwrap_or(0); tracing::warn!( - "Unexpected u64 overflow while updating envelope_state_records value {} with {}", - envelope_state_records, + "Unexpected u64 overflow while updating records_indexed value {} with {}", + records_indexed, value ); - cur.stats.envelope_state_records = Some(0); - cur.prom.envelope_state_records.set(0); + cur.stats.records_indexed = Some(0); + cur.prom.records_indexed.set(0); } } - /// Set the `envelope_state_records` to the given value - pub fn set_envelope_state_records(&self, value: i64) { + /// Set the `records_indexed` to the given value + pub fn set_records_indexed(&self, value: i64) { let mut cur = self.stats.borrow_mut(); let value = if value < 0 { - tracing::warn!( - "Unexpected negative value for envelope_state_records {}", - value - ); + tracing::warn!("Unexpected negative value for records_indexed {}", value); 0 } else { value.unsigned_abs() }; - cur.stats.envelope_state_records = Some(value); - cur.prom.envelope_state_records.set(value); + cur.stats.records_indexed = Some(value); + cur.prom.records_indexed.set(value); } /// Initialize the `rehydration_latency_ms` stat as `NULL`. @@ -730,20 +724,20 @@ impl SourceStatistics { cur.prom.rehydration_latency_ms.set(value); } - /// Set the `upstream_values` stat to the given value. - pub fn set_upstream_values(&self, value: u64) { + /// Set the `offset_known` stat to the given value. + pub fn set_offset_known(&self, value: u64) { let cur = self.stats.borrow_mut(); // Not yet exposed to users. - // cur.prom.upstream_values = value; - cur.prom.upstream_values.set(value); + // cur.prom.offset_known = value; + cur.prom.offset_known.set(value); } - /// Set the `committed_values` stat to the given value. - pub fn set_committed_values(&self, value: u64) { + /// Set the `offset_committed` stat to the given value. + pub fn set_offset_committed(&self, value: u64) { let cur = self.stats.borrow_mut(); // Not yet exposed to users. - // cur.prom.committed_values = value; - cur.prom.committed_values.set(value); + // cur.prom.offset_committed = value; + cur.prom.offset_committed.set(value); } } diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index 22d80d8cff924..3c9c68c1b6a55 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -492,14 +492,14 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 3 bytes_received uint8 4 updates_staged uint8 5 updates_committed uint8 -6 envelope_state_records uint8 -7 envelope_state_bytes uint8 +6 records_indexed uint8 +7 bytes_indexed uint8 8 rehydration_latency interval -9 snapshot_total uint8 -10 snapshot_read uint8 +9 snapshot_records_known uint8 +10 snapshot_records_staged uint8 11 snapshot_committed boolean -12 upstream_values uint8 -13 committed_values uint8 +12 offset_known uint8 +13 offset_committed uint8 query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_source_statistics' ORDER BY position @@ -510,8 +510,8 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 4 bytes_received uint8 5 updates_staged uint8 6 updates_committed uint8 -7 envelope_state_bytes uint8 -8 envelope_state_records uint8 +7 bytes_indexed uint8 +8 records_indexed uint8 9 rehydration_latency interval query ITT diff --git a/test/sqllogictest/mz_introspection_index_accounting.slt b/test/sqllogictest/mz_introspection_index_accounting.slt index 52981c29d6c3a..742b059dfba8f 100644 --- a/test/sqllogictest/mz_introspection_index_accounting.slt +++ b/test/sqllogictest/mz_introspection_index_accounting.slt @@ -478,8 +478,8 @@ mz_sinks schema_id mz_sinks size mz_sinks type mz_source_statistics bytes_received -mz_source_statistics envelope_state_bytes -mz_source_statistics envelope_state_records +mz_source_statistics bytes_indexed +mz_source_statistics records_indexed mz_source_statistics id mz_source_statistics messages_received mz_source_statistics rehydration_latency @@ -487,18 +487,18 @@ mz_source_statistics snapshot_committed mz_source_statistics updates_committed mz_source_statistics updates_staged mz_source_statistics_raw bytes_received -mz_source_statistics_raw committed_values -mz_source_statistics_raw envelope_state_bytes -mz_source_statistics_raw envelope_state_records +mz_source_statistics_raw offset_committed +mz_source_statistics_raw bytes_indexed +mz_source_statistics_raw records_indexed mz_source_statistics_raw id mz_source_statistics_raw messages_received mz_source_statistics_raw rehydration_latency mz_source_statistics_raw snapshot_committed -mz_source_statistics_raw snapshot_read -mz_source_statistics_raw snapshot_total +mz_source_statistics_raw snapshot_records_staged +mz_source_statistics_raw snapshot_records_known mz_source_statistics_raw updates_committed mz_source_statistics_raw updates_staged -mz_source_statistics_raw upstream_values +mz_source_statistics_raw offset_known mz_source_status_history details mz_source_status_history error mz_source_status_history occurred_at diff --git a/test/testdrive/source-statistics.td b/test/testdrive/source-statistics.td index 955abe5519ff6..dd7fb9076763e 100644 --- a/test/testdrive/source-statistics.td +++ b/test/testdrive/source-statistics.td @@ -129,8 +129,8 @@ users true false true true false true SUM(u.updates_staged) BETWEEN 3 AND 11, SUM(u.updates_committed) BETWEEN 3 AND 11, SUM(u.bytes_received) > 0, - SUM(u.envelope_state_bytes) > 0, - SUM(u.envelope_state_records), + SUM(u.bytes_indexed) > 0, + SUM(u.records_indexed), bool_and(u.rehydration_latency IS NOT NULL) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id @@ -142,7 +142,7 @@ upsert true 9 true true true true 3 true # While we can't control how batching works above, we can ensure that this new, later update # causes 1 more messages to be received, which is 1 update, a delete. # We use `set-from-sql` to assert this. We will also use this to validate that the -# `envelope_state_bytes` value goes down because of the delete. +# `bytes_indexed` value goes down because of the delete. $ set-from-sql var=updates-committed SELECT (SUM(u.updates_committed) + 1)::text @@ -152,7 +152,7 @@ SELECT $ set-from-sql var=state-bytes SELECT - (SUM(u.envelope_state_bytes))::text + (SUM(u.bytes_indexed))::text FROM mz_sources s JOIN mz_internal.mz_source_statistics u ON s.id = u.id WHERE s.name IN ('upsert') @@ -166,8 +166,8 @@ $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} SUM(u.updates_staged), SUM(u.updates_committed), SUM(u.bytes_received) > 0, - SUM(u.envelope_state_bytes) < ${state-bytes}, - SUM(u.envelope_state_records) + SUM(u.bytes_indexed) < ${state-bytes}, + SUM(u.records_indexed) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert') @@ -182,8 +182,8 @@ upsert true 10 "${updates-committed}" "${updates-committed}" true true 2 u.updates_staged, u.updates_committed, u.bytes_received > 0, - u.envelope_state_bytes < ${state-bytes}, - u.envelope_state_records, + u.bytes_indexed < ${state-bytes}, + u.records_indexed, u.rehydration_latency IS NOT NULL FROM mz_sources s JOIN mz_internal.mz_source_statistics u ON s.id = u.id diff --git a/test/testdrive/statistics-maintenance.td b/test/testdrive/statistics-maintenance.td index ae32f9e019e97..6a7b513741b1a 100644 --- a/test/testdrive/statistics-maintenance.td +++ b/test/testdrive/statistics-maintenance.td @@ -55,7 +55,7 @@ sink1 1 1 true true > SELECT s.name, SUM(u.updates_committed) > 0, SUM(u.messages_received), - SUM(snapshot_read) IS NULL + SUM(snapshot_records_staged) IS NULL FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert1') @@ -76,7 +76,7 @@ sink1 1 1 true true > SELECT s.name, SUM(u.updates_committed) > 0, SUM(u.messages_received), - SUM(snapshot_read) IS NULL + SUM(snapshot_records_staged) IS NULL FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert1') @@ -101,7 +101,7 @@ sink1 2 2 true true > SELECT s.name, SUM(u.updates_committed) > 0, SUM(u.messages_received), - SUM(snapshot_read) IS NULL + SUM(snapshot_records_staged) IS NULL FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert1') diff --git a/test/upsert/autospill/02-memory.td b/test/upsert/autospill/02-memory.td index b44dce7783b93..6b81b044d5381 100644 --- a/test/upsert/autospill/02-memory.td +++ b/test/upsert/autospill/02-memory.td @@ -29,7 +29,7 @@ animal:whale 2 > SELECT - SUM(u.envelope_state_bytes) > 0 + SUM(u.bytes_indexed) > 0 FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name = 'autospill' diff --git a/test/upsert/autospill/03-rocksdb.td b/test/upsert/autospill/03-rocksdb.td index 758de0298a3ac..c27b368ed3486 100644 --- a/test/upsert/autospill/03-rocksdb.td +++ b/test/upsert/autospill/03-rocksdb.td @@ -9,7 +9,7 @@ $ set-from-sql var=previous-bytes SELECT - (SUM(u.envelope_state_bytes))::text + (SUM(u.bytes_indexed))::text FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name = 'autospill' @@ -22,13 +22,13 @@ fish:AREALLYBIGFISHAREALLYBIGFISHAREALLYBIGFISHAREALLYBIGFISH 3 > SELECT - SUM(u.envelope_state_bytes) > 0, + SUM(u.bytes_indexed) > 0, -- This + the assertion below that ensures the byte count goes to 0 tests - -- that we correctly transition the `envelope_state_bytes` count to the + -- that we correctly transition the `bytes_indexed` count to the -- in-rocksdb. In the past, we would accidentally SUM the previous and -- new size here. - SUM(u.envelope_state_bytes) < (${previous-bytes} * 2), - SUM(u.envelope_state_records) + SUM(u.bytes_indexed) < (${previous-bytes} * 2), + SUM(u.records_indexed) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name = 'autospill' @@ -45,10 +45,10 @@ animal: > SELECT count(*) from autospill; 0 -# Both envelope_state_bytes and envelope_state_records should be zero +# Both bytes_indexed and records_indexed should be zero > SELECT - SUM(u.envelope_state_bytes), - SUM(u.envelope_state_records) + SUM(u.bytes_indexed), + SUM(u.records_indexed) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name = 'autospill' diff --git a/test/upsert/mzcompose.py b/test/upsert/mzcompose.py index 88db581585c3f..db2c396010eaa 100644 --- a/test/upsert/mzcompose.py +++ b/test/upsert/mzcompose.py @@ -599,7 +599,7 @@ def workflow_load_test(c: Composition, parser: WorkflowArgumentParser) -> None: c.testdrive( dedent( f""" - > select sum(envelope_state_records) + > select sum(records_indexed) from mz_internal.mz_source_statistics_raw st join mz_sources s on s.id = st.id where name = 's1'; diff --git a/test/upsert/rehydration/02-source-setup.td b/test/upsert/rehydration/02-source-setup.td index 42298522eb8ce..6c5357125484a 100644 --- a/test/upsert/rehydration/02-source-setup.td +++ b/test/upsert/rehydration/02-source-setup.td @@ -63,8 +63,8 @@ mammalmore moose 2 # # Ensure that statistics are correctly updated > SELECT - SUM(u.envelope_state_bytes) > 0, - SUM(u.envelope_state_records), + SUM(u.bytes_indexed) > 0, + SUM(u.records_indexed), bool_and(u.rehydration_latency IS NOT NULL) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id diff --git a/test/upsert/rehydration/03-after-rehydration.td b/test/upsert/rehydration/03-after-rehydration.td index b3ab39ab8bdfa..969d6ecff7f09 100644 --- a/test/upsert/rehydration/03-after-rehydration.td +++ b/test/upsert/rehydration/03-after-rehydration.td @@ -36,8 +36,8 @@ mammalmore moose 2 # byte count could be lower or higher than before restarting, # as rehydration has to store values differently. > SELECT - SUM(u.envelope_state_bytes) > 0, - SUM(u.envelope_state_records), + SUM(u.bytes_indexed) > 0, + SUM(u.records_indexed), bool_and(u.rehydration_latency IS NOT NULL) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id @@ -49,7 +49,7 @@ true 3 true # Save the size of the rehydrated state. $ set-from-sql var=rehydrated-state-bytes SELECT - (SUM(u.envelope_state_bytes))::text + (SUM(u.bytes_indexed))::text FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert') @@ -73,8 +73,8 @@ mammalmore moose 2 # This is also != because different implementations use # space differently during rehydration and normal operation. > SELECT - SUM(u.envelope_state_bytes) != ${rehydrated-state-bytes}, - SUM(u.envelope_state_records) + SUM(u.bytes_indexed) != ${rehydrated-state-bytes}, + SUM(u.records_indexed) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert') @@ -84,7 +84,7 @@ true 3 $ set-from-sql var=state-bytes SELECT - (SUM(u.envelope_state_bytes))::text + (SUM(u.bytes_indexed))::text FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert') @@ -93,8 +93,8 @@ $ kafka-ingest format=avro topic=upsert key-format=avro key-schema=${keyschema} {"key": "fish"} {"f1": "MUCHMUCHMUCHLONGERVALUE", "f2": 9000} > SELECT - SUM(u.envelope_state_bytes) > ${state-bytes}, - SUM(u.envelope_state_records) + SUM(u.bytes_indexed) > ${state-bytes}, + SUM(u.records_indexed) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert') @@ -114,7 +114,7 @@ birdmore geese 56 mammalmore moose 2 > SELECT - SUM(u.envelope_state_records) + SUM(u.records_indexed) FROM mz_sources s JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id WHERE s.name IN ('upsert')