-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compute: spillable MV correction buffer #30083
Conversation
ffdf64d
to
77afd64
Compare
25b0987
to
0b2e359
Compare
ffb5488
to
4697f9c
Compare
4356350
to
185c99c
Compare
e95c32f
to
49c0569
Compare
The feature benchmarks report a bunch of regressions. Some increased CPU and memory usage is expected, but the memory regressions here are worryingly large:
I have a suspicion that this isn't an actual regression but a result of the usage of lgalloc, which holds onto allocated memory and only releases it slowly over time. The |
49c0569
to
e01a7dc
Compare
I retried the feature benchmarks with lgalloc disabled and things do look better:
Still a higher than expected memory regression for some of them, I want to look into those. |
9d51f8f
to
c8846d6
Compare
let mut heap = MergeHeap::from_iter(cursors); | ||
let mut merged = Chain::default(); | ||
while let Some(cursor1) = heap.pop() { | ||
let (data, time, mut diff) = cursor1.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we'd have the option to reuse whole chunks, if all updates in the current chunk are less than the first update in the next cursor on the heap. However, if we do this naively we could end up with a lot empty space in our chunks, and therefore chains that have a lot more chunks than they'd need were their updates tightly packed.
//! Unfortunately, performing consolidation as described above can break the chain invariant and we | ||
//! might need to restore it by merging chains, including ones containing future updates. This is | ||
//! something that would be great to fix! In the meantime the hope is that in steady state it | ||
//! doesn't matter too much because either there are no future retractions and U is approximately | ||
//! equal to N, or the amount of future retractions is much larger than the amount of current | ||
//! changes, in which case removing the current changes has a good chance of leaving the chain | ||
//! invariant intact. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is, I think, the main wrinkle in the current implementation. It'd be nice to never have to look at future updates when consolidating the current ones. All alternatives I've come up with so far are not great:
- Don't consolidate the correction updates at all, just provide a read-only merging iterator. That's sufficient for the MV sink to work, but it doesn't explain when the correction contents will be consolidated. Inserts will trigger merges, but if few inserts happen, we can't rely on those. We'd need some form of idle merging, but that adds significant complexity.
- Skip restoring the chain invariant and rely on subsequent inserts to do so. It's not clear to me if that actually improves anything or just moves work from one operation to the other. It definitely makes the data structure harder to reason about, since now the chain invariant isn't an invariant anymore and we can't make crisp statements about the complexity of inserts anymore.
0993a98
to
6607ce2
Compare
c825ca6
to
f0c6e77
Compare
2262dee
to
70d0355
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments, but I'm still reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change looks good on a high level, and I think we can soon merge it and iterate on it from there. I left some comments, but what stands out for me are two things:
- It seems better to use the
Array
type instead ofStackWrapper
since we want constant capacity chunks.ChunkedStack
will give us strange behavior I fear, it has space for more elements we'd ever want in a chunk. Note thatArray
doesn't know about columnation, so some glue code would need to be written. - The cursor API expected to copy data element-by-element, which can be inefficient. Is there an alternative where we can move whole chunks, or partial chunks?
} | ||
|
||
/// Return consolidated updates before the given `upper`. | ||
pub fn updates_before<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function could return an iterator over (&D, _, _)
to avoid cloning all contents. This is important when computing the initial snapshot since it would roughly 2x the memory requirements.
It might not be possible to do this right now, and I see that your implementation follows existing APIs, so please record this as an issue so we can change it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's hard as long as we're using the SourceData
type. Well, something to note as future work I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Though I think this doesn't 2x the memory requirements since the cloning only happens during iteration and there is no place where the entire iterator is collected at the same time. It is given to a persist BatchBuilder
which uses bounded memory by uploading parts to S3 as it goes.
let needs_merge = self.chains.get(i).is_some_and(|a| { | ||
let b = &self.chains[i - 1]; | ||
a.len() * CHAIN_PROPORTIONALITY > b.len() | ||
}); | ||
if needs_merge { | ||
let a = self.chains.remove(i); | ||
let b = std::mem::take(&mut self.chains[i - 1]); | ||
let merged = merge_chains(vec![a, b], &self.since); | ||
self.chains[i - 1] = merged; | ||
} else { | ||
i -= 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand the self.chains.get(i).is_some_and
part: If we unconditionally decremented i
by 1 in every iteration, chains at i
should always exist, no? That would simplify the logic a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but we cannot unconditionally decrement i
unfortunately. Merging two chains might result in a smaller change that breaks the invariant again for chains you have already looked at, so you have to go back and merge these too.
For example, assume you have these three chains:
[0] xxxx
[1] xxxx
[2] x <- i
--- no merge required ---
[0] xxxx
[1] xxxx <- i
[2] x
--- merge ---
[0] x <- i
[1] x
--- done ---
You end up not having restored the invariant.
Whereas if you only advance i
when you don't merge, you merge chains at i
until the invariant is restored up to i
again, before you continue. When you get to i = 0
the invariant holds for all chains.
while let Some(cursor) = remaining.take() { | ||
if cursor.chunk_offset == 0 { | ||
remaining = Some(cursor); | ||
break; | ||
} | ||
let update = cursor.get(); | ||
chain.push(update); | ||
remaining = cursor.step(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop here (and elsewhere) performs element-by-element copies. It seems to me that we could trade-off how compactly we store data with how much we need to copy around. For example, when we extracted some prefix of a chunk, we could just leave the remaining data in the chunk and remember the offset. Then, we'd need to add a check that chunks in a chain are always at least half full to avoid sitting on too much memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! This PR takes the conservative approach of always packing updates as tightly as possible, in exchange for more work that has to be done. There are definitely ways to optimize things, but I'd like to leave these as follow-ups given that they make the code more complicated and this PR is already large enough.
impl<D: Data> From<Cursor<D>> for Chain<D> { | ||
fn from(cursor: Cursor<D>) -> Self { | ||
match cursor.try_unwrap() { | ||
Ok(chain) => chain, | ||
Err((_, cursor)) => { | ||
let mut chain = Chain::default(); | ||
chain.push_cursor(cursor); | ||
chain | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I can follow when the try_unwrap
call in this From
implementation can fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We know nothing about the input cursor here, so it can fail for all the reasons described in the docs of try_unwrap
: there might be a limit
or overwrite_ts
set, or some of the referenced chunks might be shared with other cursors.
8a95782
to
4336fb6
Compare
I addressed most comments and created follow-up issues for the rest. RFAL! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Let's merge and then iterate based on that!
let Some(since_ts) = self.since.as_option() else { | ||
// If the since frontier is empty, discard all updates. | ||
return; | ||
}; | ||
|
||
for (_, time, _) in &mut updates { | ||
for (_, time, _) in &mut *updates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this equivalent to just using updates
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason it's not. If you try that you get an error with the helpful hint that you should use a reborrow instead 🤷
This commit introduces a "correction v2" buffer that differs from the existing one in that it stores data in columnated regions that can be spilled to disk. It follows the general design of the arrangement merge batcher, with the major difference that it sorts updates by time first, in an attempt to more efficiently deal with the presence of updates in the future (commonly introduced by temporal filters).
d6a275b
to
dee1f3f
Compare
TFTR! |
This commit adds a new dyncfg, `enable_compute_correction_v2`, that controlls whether the MV sink v2 should use the old or the new implementation of the correction buffer. Disabled by default for now, even in CI, to give us more time to improve performance.
dee1f3f
to
1c3d58b
Compare
This PR introduces a "correction v2" buffer that differs from the existing one in that it stores data in columnated regions that can be spilled to disk. It follows the general design of the arrangement merge batcher, with the major difference that it sorts updates by time first, in an attempt to more efficiently deal with the presence of updates in
the future (commonly introduced by temporal filters).
The new correction buffer can be switched on through a feature flag,
enable_compute_correction_v2
, and is switched off by default. The plan is to keep it disabled in production but have it available for emergencies where replicas fail to hydrate due to the MV memory spike. Eventually we'll want to make the new correction buffer the default, but we should do more performance testing before that.Motivation
Part of https://github.com/MaterializeInc/database-issues/issues/8464
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.