Skip to content

Commit

Permalink
Use StackWrapper's preferred capacity instead of inventing our own
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Jan 28, 2025
1 parent 1000f7e commit dee1f3f
Showing 1 changed file with 16 additions and 26 deletions.
42 changes: 16 additions & 26 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::containers::stack::StackWrapper;
use timely::container::columnation::Columnation;
use timely::container::SizableContainer;
use timely::progress::Antichain;
use timely::{Container, PartialOrder};

Expand All @@ -135,9 +136,6 @@ use crate::sink::correction::LengthAndCapacity;
/// Determines the size factor of subsequent chains required by the chain invariant.
const CHAIN_PROPORTIONALITY: usize = 3;

/// The capacity of a [`Chunk`] in bytes.
const CHUNK_SIZE_BYTES: usize = 8 << 10;

/// Convenient alias for use in data trait bounds.
pub trait Data: differential_dataflow::Data + Columnation {}
impl<D: differential_dataflow::Data + Columnation> Data for D {}
Expand Down Expand Up @@ -405,7 +403,7 @@ impl<D: Data> Chain<D> {
debug_assert!(self.can_accept(update));

match self.chunks.last_mut() {
Some(c) if c.capacity_left() => c.push(update),
Some(c) if !c.at_capacity() => c.push(update),
Some(_) | None => {
let chunk = Chunk::from_update(update);
self.push_chunk(chunk);
Expand Down Expand Up @@ -782,10 +780,10 @@ impl<D: Data> From<Cursor<D>> for Chain<D> {
///
/// All updates in a chunk are sorted by (time, data) and consolidated.
///
/// We would like all chunks to have size [`CHUNK_SIZE_BYTES`], to make it easy for the allocator
/// to re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API
/// doesn't provide a convenient way to pre-size regions, so chunks are currently only fixed-size
/// in spirit.
/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
/// re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API doesn't
/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
/// spirit.
struct Chunk<D: Data> {
/// The contained updates.
data: StackWrapper<(D, Timestamp, Diff)>,
Expand All @@ -795,9 +793,11 @@ struct Chunk<D: Data> {

impl<D: Data> Default for Chunk<D> {
fn default() -> Self {
let capacity = Self::capacity();
let mut data = StackWrapper::default();
data.ensure_capacity(&mut None);

Self {
data: StackWrapper::with_capacity(capacity),
data,
cached_size: None,
}
}
Expand All @@ -812,32 +812,22 @@ impl<D: Data> fmt::Debug for Chunk<D> {
impl<D: Data> Chunk<D> {
/// Create a new chunk containing a single update.
fn from_update<DT: Borrow<D>>(update: (DT, Timestamp, Diff)) -> Self {
let capacity = Self::capacity();
let mut data = StackWrapper::with_capacity(capacity);

let (d, t, r) = update;
data.copy_destructured(d.borrow(), &t, &r);

Self {
data,
cached_size: None,
}
}
let mut chunk = Self::default();
chunk.data.copy_destructured(d.borrow(), &t, &r);

/// Return the chunk capacity implied by [`CHUNK_SIZE_BYTES`] and the update size.
fn capacity() -> usize {
let size = std::mem::size_of::<(D, Timestamp, Diff)>();
std::cmp::max(CHUNK_SIZE_BYTES / size, 1)
chunk
}

/// Return the number of updates in the chunk.
fn len(&self) -> usize {
Container::len(&self.data)
}

/// Return the number of updates left before the chunk capacity is reached.
fn capacity_left(&self) -> bool {
self.len() < Self::capacity()
/// Return whether the chunk is at capacity.
fn at_capacity(&self) -> bool {
self.data.at_capacity()
}

/// Return the update at the given index.
Expand Down

0 comments on commit dee1f3f

Please sign in to comment.