Skip to content

Commit

Permalink
Address easy review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Jan 23, 2025
1 parent 70d0355 commit 8a95782
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 37 deletions.
19 changes: 10 additions & 9 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ impl<D: Data> Correction<D> {
}

/// Insert a batch of updates.
pub fn insert(&mut self, updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert(updates),
Self::V2(c) => c.insert(updates),
}
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
match self {
Self::V1(c) => c.insert_negated(updates),
Self::V2(c) => c.insert_negated(updates),
Expand Down Expand Up @@ -147,26 +147,27 @@ impl<D> CorrectionV1<D> {

impl<D: Data> CorrectionV1<D> {
/// Insert a batch of updates.
pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
// If the since frontier is empty, discard all updates.
return;
};

for (_, time, _) in &mut updates {
for (_, time, _) in &mut *updates {
*time = std::cmp::max(*time, *since_ts);
}
self.insert_inner(updates);
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
// If the since frontier is empty, discard all updates.
updates.clear();
return;
};

for (_, time, diff) in &mut updates {
for (_, time, diff) in &mut *updates {
*time = std::cmp::max(*time, *since_ts);
*diff = -*diff;
}
Expand All @@ -176,12 +177,12 @@ impl<D: Data> CorrectionV1<D> {
/// Insert a batch of updates.
///
/// The given `updates` must all have been advanced by `self.since`.
fn insert_inner(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
consolidate_updates(&mut updates);
fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
consolidate_updates(updates);
updates.sort_unstable_by_key(|(_, time, _)| *time);

let mut new_size = self.total_size;
let mut updates = updates.into_iter().peekable();
let mut updates = updates.drain(..).peekable();
while let Some(&(_, time, _)) = updates.peek() {
debug_assert!(
self.since.less_equal(&time),
Expand Down
34 changes: 22 additions & 12 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
//! ## Merging Chains
//!
//! Merging multiple chains into a single chain is done using a k-way merge. As the input chains
//! are sorted by (time, data) and consolidated, the some properties hold for the output chain. The
//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The
//! complexity of a merge of K chains containing N updates is O(N log K).
//!
//! There is a twist though: Merging also has to respect the `since` frontier, which determines how
Expand Down Expand Up @@ -176,27 +176,29 @@ impl<D: Data> CorrectionV2<D> {
}

/// Insert a batch of updates.
pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
// If the since is the empty frontier, discard all updates.
updates.clear();
return;
};

for (_, time, _) in &mut updates {
for (_, time, _) in &mut *updates {
*time = std::cmp::max(*time, *since_ts);
}

self.insert_inner(updates);
}

/// Insert a batch of updates, after negating their diffs.
pub fn insert_negated(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
let Some(since_ts) = self.since.as_option() else {
// If the since is the empty frontier, discard all updates.
updates.clear();
return;
};

for (_, time, diff) in &mut updates {
for (_, time, diff) in &mut *updates {
*time = std::cmp::max(*time, *since_ts);
*diff = -*diff;
}
Expand All @@ -207,10 +209,10 @@ impl<D: Data> CorrectionV2<D> {
/// Insert a batch of updates.
///
/// All times are expected to be >= the `since`.
fn insert_inner(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) {
fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) {
debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t)));

consolidate(&mut updates);
consolidate(updates);

let first_update = match updates.first() {
Some((d, t, r)) => (d, *t, *r),
Expand All @@ -227,7 +229,7 @@ impl<D: Data> CorrectionV2<D> {
}
};

chain.extend(updates);
chain.extend(updates.drain(..));

// Restore the chain invariant.
let merge_needed = |chains: &[Chain<_>]| match chains {
Expand All @@ -238,7 +240,7 @@ impl<D: Data> CorrectionV2<D> {
while merge_needed(&self.chains) {
let a = self.chains.pop().unwrap();
let b = self.chains.pop().unwrap();
let merged = merge_chains(vec![a, b], &self.since);
let merged = merge_chains([a, b], &self.since);
self.chains.push(merged);
}

Expand Down Expand Up @@ -307,9 +309,12 @@ impl<D: Data> CorrectionV2<D> {
if needs_merge {
let a = self.chains.remove(i);
let b = std::mem::take(&mut self.chains[i - 1]);
let merged = merge_chains(vec![a, b], &self.since);
let merged = merge_chains([a, b], &self.since);
self.chains[i - 1] = merged;
} else {
// Only advance the index if we didn't merge. A merge can reduce the size of the
// chain at `i - 1`, causing an violation of the chain invariant with the next
// chain, so we might need to merge the two before proceeding to lower indexes.
i -= 1;
}
}
Expand Down Expand Up @@ -720,7 +725,9 @@ impl<D: Data> Cursor<D> {
/// where possible.
///
/// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and
/// the cursor has unique references to its chunks.
/// the cursor has unique references to its chunks. If the unwrap fails, this method returns an
/// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a
/// chain by copying chunks rather than reusing them.
fn try_unwrap(self) -> Result<Chain<D>, (&'static str, Self)> {
if self.limit.is_some() {
return Err(("cursor with limit", self));
Expand Down Expand Up @@ -944,7 +951,10 @@ fn consolidate<D: Data>(updates: &mut Vec<(D, Timestamp, Diff)>) {
}

/// Merge the given chains, advancing times by the given `since` in the process.
fn merge_chains<D: Data>(chains: Vec<Chain<D>>, since: &Antichain<Timestamp>) -> Chain<D> {
fn merge_chains<D: Data>(
chains: impl IntoIterator<Item = Chain<D>>,
since: &Antichain<Timestamp>,
) -> Chain<D> {
let Some(&since_ts) = since.as_option() else {
return Chain::default();
};
Expand Down
16 changes: 8 additions & 8 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ where
}
Some(event) = desired_oks_input.next() => {
match event {
Event::Data(_cap, data) => {
Event::Data(_cap, mut data) => {
// Extract desired rows as positive contributions to `correction_oks`.
if sink_id.is_user() && !data.is_empty() {
trace!(
Expand All @@ -765,7 +765,7 @@ where
);
}

correction_oks.insert(data);
correction_oks.insert(&mut data);

continue;
}
Expand All @@ -776,7 +776,7 @@ where
}
Some(event) = desired_errs_input.next() => {
match event {
Event::Data(_cap, data) => {
Event::Data(_cap, mut data) => {
// Extract desired rows as positive contributions to `correction_errs`.
if sink_id.is_user() && !data.is_empty() {
trace!(
Expand All @@ -794,7 +794,7 @@ where
);
}

correction_errs.insert(data);
correction_errs.insert(&mut data);

continue;
}
Expand All @@ -805,9 +805,9 @@ where
}
Some(event) = persist_oks_input.next() => {
match event {
Event::Data(_cap, data) => {
Event::Data(_cap, mut data) => {
// Extract persist rows as negative contributions to `correction_oks`.
correction_oks.insert_negated(data);
correction_oks.insert_negated(&mut data);

continue;
}
Expand All @@ -818,9 +818,9 @@ where
}
Some(event) = persist_errs_input.next() => {
match event {
Event::Data(_cap, data) => {
Event::Data(_cap, mut data) => {
// Extract persist rows as negative contributions to `correction_errs`.
correction_errs.insert_negated(data);
correction_errs.insert_negated(&mut data);

continue;
}
Expand Down
16 changes: 8 additions & 8 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,8 @@ mod write {
let maybe_batch = tokio::select! {
Some(event) = desired_inputs.ok.next() => {
match event {
Event::Data(_cap, data) => {
state.corrections.ok.insert(data);
Event::Data(_cap, mut data) => {
state.corrections.ok.insert(&mut data);
None
}
Event::Progress(frontier) => {
Expand All @@ -734,8 +734,8 @@ mod write {
}
Some(event) = desired_inputs.err.next() => {
match event {
Event::Data(_cap, data) => {
state.corrections.err.insert(data);
Event::Data(_cap, mut data) => {
state.corrections.err.insert(&mut data);
None
}
Event::Progress(frontier) => {
Expand All @@ -746,8 +746,8 @@ mod write {
}
Some(event) = persist_inputs.ok.next() => {
match event {
Event::Data(_cap, data) => {
state.corrections.ok.insert_negated(data);
Event::Data(_cap, mut data) => {
state.corrections.ok.insert_negated(&mut data);
None
}
Event::Progress(frontier) => {
Expand All @@ -758,8 +758,8 @@ mod write {
}
Some(event) = persist_inputs.err.next() => {
match event {
Event::Data(_cap, data) => {
state.corrections.err.insert_negated(data);
Event::Data(_cap, mut data) => {
state.corrections.err.insert_negated(&mut data);
None
}
Event::Progress(frontier) => {
Expand Down

0 comments on commit 8a95782

Please sign in to comment.