Skip to content

Commit

Permalink
compute: wire up sink metrics with CorrectionV2
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Jan 6, 2025
1 parent c8f449a commit e01a7dc
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
15 changes: 11 additions & 4 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,17 @@ impl<D> Drop for CorrectionV1<D> {
}

/// Helper type for convenient tracking of length and capacity together.
#[derive(Clone, Copy, Default)]
struct LengthAndCapacity {
length: usize,
capacity: usize,
#[derive(Clone, Copy, Debug, Default)]
pub(super) struct LengthAndCapacity {
pub length: usize,
pub capacity: usize,
}

impl AddAssign<Self> for LengthAndCapacity {
fn add_assign(&mut self, size: Self) {
self.length += size.length;
self.capacity += size.capacity;
}
}

impl AddAssign<(usize, usize)> for LengthAndCapacity {
Expand Down
96 changes: 85 additions & 11 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ use std::fmt;
use std::rc::Rc;

use differential_dataflow::trace::implementations::BatchContainer;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics};
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::containers::stack::StackWrapper;
use timely::container::columnation::Columnation;
use timely::progress::Antichain;
use timely::{Container, PartialOrder};

use crate::sink::correction::LengthAndCapacity;

const CHAIN_PROPORTIONALITY: usize = 3;
const CHUNK_SIZE_BYTES: usize = 8 << 10;

Expand All @@ -71,10 +73,15 @@ pub(super) struct CorrectionV2<D: Data> {
chains: Vec<Chain<D>>,
/// The frontier by which all contained times are advanced.
since: Antichain<Timestamp>,

/// Total length and capacity of chunks in `chains`.
///
/// Tracked to maintain metrics.
total_size: LengthAndCapacity,
/// Global persist sink metrics.
_metrics: SinkMetrics,
metrics: SinkMetrics,
/// Per-worker persist sink metrics.
_worker_metrics: SinkWorkerMetrics,
worker_metrics: SinkWorkerMetrics,
}

impl<D: Data> CorrectionV2<D> {
Expand All @@ -83,8 +90,9 @@ impl<D: Data> CorrectionV2<D> {
Self {
chains: Default::default(),
since: Antichain::from_elem(Timestamp::MIN),
_metrics: metrics,
_worker_metrics: worker_metrics,
total_size: Default::default(),
metrics,
worker_metrics,
}
}

Expand Down Expand Up @@ -154,6 +162,8 @@ impl<D: Data> CorrectionV2<D> {
let merged = merge_chains(vec![a, b], &self.since);
self.chains.push(merged);
}

self.update_metrics();
}

/// Return consolidated updates before the given `upper`.
Expand Down Expand Up @@ -210,6 +220,8 @@ impl<D: Data> CorrectionV2<D> {
self.chains[i - 1] = merged;
}
}

self.update_metrics();
}

/// Return the current since frontier.
Expand All @@ -235,17 +247,37 @@ impl<D: Data> CorrectionV2<D> {
self.consolidate_before(&upper);
}
}

/// Update persist sink metrics to the given new length and capacity.
fn update_metrics(&mut self) {
let mut new_size = LengthAndCapacity::default();
for chain in &mut self.chains {
new_size += chain.get_size();
}

let old_size = self.total_size;
let len_delta = UpdateDelta::new(new_size.length, old_size.length);
let cap_delta = UpdateDelta::new(new_size.capacity, old_size.capacity);
self.metrics
.report_correction_update_deltas(len_delta, cap_delta);
self.worker_metrics
.report_correction_update_totals(new_size.length, new_size.capacity);

self.total_size = new_size;
}
}

#[derive(Debug)]
struct Chain<D: Data> {
chunks: Vec<Chunk<D>>,
cached_size: Option<LengthAndCapacity>,
}

impl<D: Data> Default for Chain<D> {
fn default() -> Self {
Self {
chunks: Default::default(),
cached_size: None,
}
}
}
Expand Down Expand Up @@ -273,10 +305,13 @@ impl<D: Data> Chain<D> {
self.push_chunk(chunk);
}
}

self.invalidate_cached_size();
}

fn push_chunk(&mut self, chunk: Chunk<D>) {
self.chunks.push(chunk);
self.invalidate_cached_size();
}

fn push_cursor(&mut self, cursor: Cursor<D>) {
Expand Down Expand Up @@ -311,7 +346,23 @@ impl<D: Data> Chain<D> {
fn iter(&self) -> impl Iterator<Item = (D, Timestamp, Diff)> + '_ {
self.chunks
.iter()
.flat_map(|c| c.0.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
.flat_map(|c| c.data.iter().map(|(d, t, r)| (d.clone(), *t, *r)))
}

fn get_size(&mut self) -> LengthAndCapacity {
if self.cached_size.is_none() {
let mut size = LengthAndCapacity::default();
for chunk in &mut self.chunks {
size += chunk.get_size();
}
self.cached_size = Some(size);
}

self.cached_size.unwrap()
}

fn invalidate_cached_size(&mut self) {
self.cached_size = None;
}
}

Expand Down Expand Up @@ -535,12 +586,18 @@ impl<D: Data> From<Cursor<D>> for Chain<D> {
}
}

struct Chunk<D: Data>(StackWrapper<(D, Timestamp, Diff)>);
struct Chunk<D: Data> {
data: StackWrapper<(D, Timestamp, Diff)>,
cached_size: Option<LengthAndCapacity>,
}

impl<D: Data> Default for Chunk<D> {
fn default() -> Self {
let capacity = Self::capacity();
Self(StackWrapper::with_capacity(capacity))
Self {
data: StackWrapper::with_capacity(capacity),
cached_size: None,
}
}
}

Expand All @@ -557,15 +614,15 @@ impl<D: Data> Chunk<D> {
}

fn len(&self) -> usize {
Container::len(&self.0)
Container::len(&self.data)
}

fn capacity_left(&self) -> bool {
self.len() < Self::capacity()
}

fn index(&self, idx: usize) -> (&D, Timestamp, Diff) {
let (d, t, r) = self.0.index(idx);
let (d, t, r) = self.data.index(idx);
(d, *t, *r)
}

Expand All @@ -579,7 +636,9 @@ impl<D: Data> Chunk<D> {

fn push<DT: Borrow<D>>(&mut self, update: (DT, Timestamp, Diff)) {
let (d, t, r) = update;
self.0.copy_destructured(d.borrow(), &t, &r);
self.data.copy_destructured(d.borrow(), &t, &r);

self.invalidate_cached_size();
}

fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
Expand All @@ -600,6 +659,21 @@ impl<D: Data> Chunk<D> {

Some(lower)
}

fn get_size(&mut self) -> LengthAndCapacity {
if self.cached_size.is_none() {
let length = Container::len(&self.data);
let mut capacity = 0;
self.data.heap_size(|_, cap| capacity += cap);
self.cached_size = Some(LengthAndCapacity { length, capacity });
}

self.cached_size.unwrap()
}

fn invalidate_cached_size(&mut self) {
self.cached_size = None;
}
}

fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
Expand Down

0 comments on commit e01a7dc

Please sign in to comment.