From 8a95782ddac1c4bc0f29ba03663a91fb3c2498cf Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 23 Jan 2025 17:57:28 +0100 Subject: [PATCH] Address easy review comments --- src/compute/src/sink/correction.rs | 19 +++++------ src/compute/src/sink/correction_v2.rs | 34 +++++++++++++------- src/compute/src/sink/materialized_view.rs | 16 ++++----- src/compute/src/sink/materialized_view_v2.rs | 16 ++++----- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index 3f270454765fa..06d5c8fc335ad 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -45,7 +45,7 @@ impl Correction { } /// Insert a batch of updates. - pub fn insert(&mut self, updates: Vec<(D, Timestamp, Diff)>) { + pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { match self { Self::V1(c) => c.insert(updates), Self::V2(c) => c.insert(updates), @@ -53,7 +53,7 @@ impl Correction { } /// Insert a batch of updates, after negating their diffs. - pub fn insert_negated(&mut self, updates: Vec<(D, Timestamp, Diff)>) { + pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { match self { Self::V1(c) => c.insert_negated(updates), Self::V2(c) => c.insert_negated(updates), @@ -147,26 +147,27 @@ impl CorrectionV1 { impl CorrectionV1 { /// Insert a batch of updates. - pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { + pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { let Some(since_ts) = self.since.as_option() else { // If the since frontier is empty, discard all updates. return; }; - for (_, time, _) in &mut updates { + for (_, time, _) in &mut *updates { *time = std::cmp::max(*time, *since_ts); } self.insert_inner(updates); } /// Insert a batch of updates, after negating their diffs. - pub fn insert_negated(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { + pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { let Some(since_ts) = self.since.as_option() else { // If the since frontier is empty, discard all updates. + updates.clear(); return; }; - for (_, time, diff) in &mut updates { + for (_, time, diff) in &mut *updates { *time = std::cmp::max(*time, *since_ts); *diff = -*diff; } @@ -176,12 +177,12 @@ impl CorrectionV1 { /// Insert a batch of updates. /// /// The given `updates` must all have been advanced by `self.since`. - fn insert_inner(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { - consolidate_updates(&mut updates); + fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { + consolidate_updates(updates); updates.sort_unstable_by_key(|(_, time, _)| *time); let mut new_size = self.total_size; - let mut updates = updates.into_iter().peekable(); + let mut updates = updates.drain(..).peekable(); while let Some(&(_, time, _)) = updates.peek() { debug_assert!( self.since.less_equal(&time), diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs index 7007aa3a30765..9fd1e121f26d3 100644 --- a/src/compute/src/sink/correction_v2.rs +++ b/src/compute/src/sink/correction_v2.rs @@ -90,7 +90,7 @@ //! ## Merging Chains //! //! Merging multiple chains into a single chain is done using a k-way merge. As the input chains -//! are sorted by (time, data) and consolidated, the some properties hold for the output chain. The +//! are sorted by (time, data) and consolidated, the same properties hold for the output chain. The //! complexity of a merge of K chains containing N updates is O(N log K). //! //! There is a twist though: Merging also has to respect the `since` frontier, which determines how @@ -176,13 +176,14 @@ impl CorrectionV2 { } /// Insert a batch of updates. - pub fn insert(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { + pub fn insert(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { let Some(since_ts) = self.since.as_option() else { // If the since is the empty frontier, discard all updates. + updates.clear(); return; }; - for (_, time, _) in &mut updates { + for (_, time, _) in &mut *updates { *time = std::cmp::max(*time, *since_ts); } @@ -190,13 +191,14 @@ impl CorrectionV2 { } /// Insert a batch of updates, after negating their diffs. - pub fn insert_negated(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { + pub fn insert_negated(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { let Some(since_ts) = self.since.as_option() else { // If the since is the empty frontier, discard all updates. + updates.clear(); return; }; - for (_, time, diff) in &mut updates { + for (_, time, diff) in &mut *updates { *time = std::cmp::max(*time, *since_ts); *diff = -*diff; } @@ -207,10 +209,10 @@ impl CorrectionV2 { /// Insert a batch of updates. /// /// All times are expected to be >= the `since`. - fn insert_inner(&mut self, mut updates: Vec<(D, Timestamp, Diff)>) { + fn insert_inner(&mut self, updates: &mut Vec<(D, Timestamp, Diff)>) { debug_assert!(updates.iter().all(|(_, t, _)| self.since.less_equal(t))); - consolidate(&mut updates); + consolidate(updates); let first_update = match updates.first() { Some((d, t, r)) => (d, *t, *r), @@ -227,7 +229,7 @@ impl CorrectionV2 { } }; - chain.extend(updates); + chain.extend(updates.drain(..)); // Restore the chain invariant. let merge_needed = |chains: &[Chain<_>]| match chains { @@ -238,7 +240,7 @@ impl CorrectionV2 { while merge_needed(&self.chains) { let a = self.chains.pop().unwrap(); let b = self.chains.pop().unwrap(); - let merged = merge_chains(vec![a, b], &self.since); + let merged = merge_chains([a, b], &self.since); self.chains.push(merged); } @@ -307,9 +309,12 @@ impl CorrectionV2 { if needs_merge { let a = self.chains.remove(i); let b = std::mem::take(&mut self.chains[i - 1]); - let merged = merge_chains(vec![a, b], &self.since); + let merged = merge_chains([a, b], &self.since); self.chains[i - 1] = merged; } else { + // Only advance the index if we didn't merge. A merge can reduce the size of the + // chain at `i - 1`, causing an violation of the chain invariant with the next + // chain, so we might need to merge the two before proceeding to lower indexes. i -= 1; } } @@ -720,7 +725,9 @@ impl Cursor { /// where possible. /// /// An unwrap is only successful if the cursor's `limit` and `overwrite_ts` are both `None` and - /// the cursor has unique references to its chunks. + /// the cursor has unique references to its chunks. If the unwrap fails, this method returns an + /// `Err` containing the cursor in an unchanged state, allowing the caller to convert it into a + /// chain by copying chunks rather than reusing them. fn try_unwrap(self) -> Result, (&'static str, Self)> { if self.limit.is_some() { return Err(("cursor with limit", self)); @@ -944,7 +951,10 @@ fn consolidate(updates: &mut Vec<(D, Timestamp, Diff)>) { } /// Merge the given chains, advancing times by the given `since` in the process. -fn merge_chains(chains: Vec>, since: &Antichain) -> Chain { +fn merge_chains( + chains: impl IntoIterator>, + since: &Antichain, +) -> Chain { let Some(&since_ts) = since.as_option() else { return Chain::default(); }; diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index c6b7af2ac8479..7855882600b73 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -747,7 +747,7 @@ where } Some(event) = desired_oks_input.next() => { match event { - Event::Data(_cap, data) => { + Event::Data(_cap, mut data) => { // Extract desired rows as positive contributions to `correction_oks`. if sink_id.is_user() && !data.is_empty() { trace!( @@ -765,7 +765,7 @@ where ); } - correction_oks.insert(data); + correction_oks.insert(&mut data); continue; } @@ -776,7 +776,7 @@ where } Some(event) = desired_errs_input.next() => { match event { - Event::Data(_cap, data) => { + Event::Data(_cap, mut data) => { // Extract desired rows as positive contributions to `correction_errs`. if sink_id.is_user() && !data.is_empty() { trace!( @@ -794,7 +794,7 @@ where ); } - correction_errs.insert(data); + correction_errs.insert(&mut data); continue; } @@ -805,9 +805,9 @@ where } Some(event) = persist_oks_input.next() => { match event { - Event::Data(_cap, data) => { + Event::Data(_cap, mut data) => { // Extract persist rows as negative contributions to `correction_oks`. - correction_oks.insert_negated(data); + correction_oks.insert_negated(&mut data); continue; } @@ -818,9 +818,9 @@ where } Some(event) = persist_errs_input.next() => { match event { - Event::Data(_cap, data) => { + Event::Data(_cap, mut data) => { // Extract persist rows as negative contributions to `correction_errs`. - correction_errs.insert_negated(data); + correction_errs.insert_negated(&mut data); continue; } diff --git a/src/compute/src/sink/materialized_view_v2.rs b/src/compute/src/sink/materialized_view_v2.rs index 91aeb0886a779..370721802f7dd 100644 --- a/src/compute/src/sink/materialized_view_v2.rs +++ b/src/compute/src/sink/materialized_view_v2.rs @@ -722,8 +722,8 @@ mod write { let maybe_batch = tokio::select! { Some(event) = desired_inputs.ok.next() => { match event { - Event::Data(_cap, data) => { - state.corrections.ok.insert(data); + Event::Data(_cap, mut data) => { + state.corrections.ok.insert(&mut data); None } Event::Progress(frontier) => { @@ -734,8 +734,8 @@ mod write { } Some(event) = desired_inputs.err.next() => { match event { - Event::Data(_cap, data) => { - state.corrections.err.insert(data); + Event::Data(_cap, mut data) => { + state.corrections.err.insert(&mut data); None } Event::Progress(frontier) => { @@ -746,8 +746,8 @@ mod write { } Some(event) = persist_inputs.ok.next() => { match event { - Event::Data(_cap, data) => { - state.corrections.ok.insert_negated(data); + Event::Data(_cap, mut data) => { + state.corrections.ok.insert_negated(&mut data); None } Event::Progress(frontier) => { @@ -758,8 +758,8 @@ mod write { } Some(event) = persist_inputs.err.next() => { match event { - Event::Data(_cap, data) => { - state.corrections.err.insert_negated(data); + Event::Data(_cap, mut data) => { + state.corrections.err.insert_negated(&mut data); None } Event::Progress(frontier) => {