Skip to content

Commit

Permalink
Merge pull request #26429 from danhhz/persist_inline_prep
Browse files Browse the repository at this point in the history
persist: preparation for inline writes
  • Loading branch information
danhhz authored Apr 4, 2024
2 parents c02dfbd + 2e3f4b1 commit fe25ab8
Show file tree
Hide file tree
Showing 37 changed files with 845 additions and 434 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
"src/persist",
"src/persist-cli",
"src/persist-client",
"src/persist-proc",
"src/persist-txn",
"src/persist-types",
"src/pgcopy",
Expand Down
4 changes: 2 additions & 2 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ breaking:
- expr/src/relation.proto
# reason: not yet released
- mysql-util/src/desc.proto
# reason: still under active development
- persist-types/src/stats.proto
# reason: working around a false positive in buf's breaking change lint
- persist-client/src/internal/state.proto
# reason: does currently not require backward-compatibility
- storage-client/src/client.proto
# reason: does currently not require backward-compatibility
Expand Down
2 changes: 1 addition & 1 deletion src/dyncfg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl ConfigUpdates {
///
/// If a value of the same config has previously been added to these
/// updates, replaces it.
pub fn add_dynamic(&mut self, name: &'static str, val: ConfigVal) {
pub fn add_dynamic(&mut self, name: &str, val: ConfigVal) {
self.updates.insert(
name.to_owned(),
ProtoConfigVal {
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mz-build-info = { path = "../build-info" }
mz-dyncfg = { path = "../dyncfg" }
mz-ore = { path = "../ore", features = ["bytes_", "test", "tracing_"] }
mz-persist = { path = "../persist" }
mz-persist-proc = { path = "../persist-proc" }
mz-persist-types = { path = "../persist-types" }
mz-proto = { path = "../proto" }
mz-timely-util = { path = "../timely-util" }
Expand Down
100 changes: 68 additions & 32 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
//! A handle to a batch of updates
use std::borrow::Cow;
use std::collections::VecDeque;
use std::collections::{BTreeSet, VecDeque};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::Range;
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -45,9 +45,9 @@ use crate::cfg::MiB;
use crate::error::InvalidUsage;
use crate::internal::encoding::{LazyPartStats, Schemas};
use crate::internal::machine::retry_external;
use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
use crate::internal::state::{HollowBatch, HollowBatchPart};
use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart};
use crate::stats::{
part_stats_for_legacy_part, untrimmable_columns, STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED,
};
Expand All @@ -68,7 +68,7 @@ where
{
pub(crate) batch_delete_enabled: bool,
pub(crate) metrics: Arc<Metrics>,
pub(crate) shard_id: ShardId,
pub(crate) shard_metrics: Arc<ShardMetrics>,

/// The version of Materialize which wrote this batch.
pub(crate) version: Version,
Expand All @@ -91,12 +91,12 @@ where
fn drop(&mut self) {
if self.batch.parts.len() > 0 {
warn!(
"un-consumed Batch, with {} dangling blob keys: {:?}",
"un-consumed Batch, with {} parts and dangling blob keys: {:?}",
self.batch.parts.len(),
self.batch
.parts
.iter()
.map(|x| &x.key.0)
.map(|x| x.printable_name())
.collect::<Vec<_>>(),
);
}
Expand All @@ -114,14 +114,14 @@ where
batch_delete_enabled: bool,
metrics: Arc<Metrics>,
blob: Arc<dyn Blob + Send + Sync>,
shard_id: ShardId,
shard_metrics: Arc<ShardMetrics>,
version: Version,
batch: HollowBatch<T>,
) -> Self {
Self {
batch_delete_enabled,
metrics,
shard_id,
shard_metrics,
version,
batch,
blob,
Expand All @@ -131,7 +131,7 @@ where

/// The `shard_id` of this [Batch].
pub fn shard_id(&self) -> ShardId {
self.shard_id
self.shard_metrics.shard_id
}

/// The `upper` of this [Batch].
Expand Down Expand Up @@ -186,24 +186,19 @@ where

/// Deletes the blobs that make up this batch from the given blob store and
/// marks them as deleted.
#[instrument(level = "debug", fields(shard = %self.shard_id))]
#[instrument(level = "debug", fields(shard = %self.shard_id()))]
pub async fn delete(mut self) {
self.mark_consumed();
if !self.batch_delete_enabled {
return;
}
let deletes = FuturesUnordered::new();
let mut deletes = PartDeletes::default();
for part in self.batch.parts.iter() {
let metrics = Arc::clone(&self.metrics);
let blob = Arc::clone(&self.blob);
deletes.push(async move {
retry_external(&metrics.retries.external.batch_delete, || async {
blob.delete(&part.key).await
})
.await;
});
deletes.add(part);
}
let () = deletes.collect().await;
let () = deletes
.delete(&self.blob, &self.metrics.retries.external.batch_delete)
.await;
}

/// Turns this [`Batch`] into a `HollowBatch`.
Expand All @@ -226,7 +221,7 @@ where
/// [`WriteHandle::batch_from_transmittable_batch`](crate::write::WriteHandle::batch_from_transmittable_batch).
pub fn into_transmittable_batch(mut self) -> ProtoBatch {
let ret = ProtoBatch {
shard_id: self.shard_id.into_proto(),
shard_id: self.shard_metrics.shard_id.into_proto(),
version: self.version.to_string(),
batch: Some(self.batch.into_proto()),
};
Expand Down Expand Up @@ -528,14 +523,15 @@ where
self.flush_part(stats_schemas, key_lower, remainder).await;

let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
let shard_metrics = Arc::clone(&self.parts.shard_metrics);
let parts = self.parts.finish().await;

let desc = Description::new(self.lower, registered_upper, self.since);
let batch = Batch::new(
batch_delete_enabled,
Arc::clone(&self.metrics),
self.blob,
self.shard_id.clone(),
shard_metrics,
self.version,
HollowBatch {
desc,
Expand Down Expand Up @@ -802,8 +798,8 @@ pub(crate) struct BatchParts<T> {
lower: Antichain<T>,
blob: Arc<dyn Blob + Send + Sync>,
isolated_runtime: Arc<IsolatedRuntime>,
writing_parts: VecDeque<JoinHandle<HollowBatchPart<T>>>,
finished_parts: Vec<HollowBatchPart<T>>,
writing_parts: VecDeque<JoinHandle<BatchPart<T>>>,
finished_parts: Vec<BatchPart<T>>,
batch_metrics: BatchWriteMetrics,
}

Expand Down Expand Up @@ -932,13 +928,13 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
stats
});

HollowBatchPart {
BatchPart::Hollow(HollowBatchPart {
key: partial_key,
encoded_size_bytes: payload_len,
key_lower,
stats,
ts_rewrite: None,
}
})
}
.instrument(write_span),
);
Expand All @@ -959,7 +955,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
}

#[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
pub(crate) async fn finish(self) -> Vec<HollowBatchPart<T>> {
pub(crate) async fn finish(self) -> Vec<BatchPart<T>> {
let mut parts = self.finished_parts;
for handle in self.writing_parts {
let part = handle.wait_and_assert_finished().await;
Expand Down Expand Up @@ -991,7 +987,7 @@ pub(crate) fn validate_truncate_batch<T: Timestamp>(
// To prove that there is no data to truncate below the lower, require
// that the lower is <= the rewrite ts.
for part in batch.parts.iter() {
let part_lower_bound = part.ts_rewrite.as_ref().unwrap_or(batch.desc.lower());
let part_lower_bound = part.ts_rewrite().unwrap_or(batch.desc.lower());
if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
return Err(InvalidUsage::InvalidRewrite(format!(
"rewritten batch might have data below {:?} at {:?}",
Expand All @@ -1016,8 +1012,42 @@ pub(crate) fn validate_truncate_batch<T: Timestamp>(
Ok(())
}

#[derive(Debug, Default)]
pub(crate) struct PartDeletes(BTreeSet<PartialBatchKey>);

impl PartDeletes {
// Adds the part to the set to be deleted and returns true if it was already
// present.
pub fn add<T>(&mut self, part: &BatchPart<T>) -> bool {
match part {
BatchPart::Hollow(x) => self.0.insert(x.key.clone()),
}
}

pub async fn delete(self, blob: &Arc<dyn Blob + Send + Sync>, metrics: &Arc<RetryMetrics>) {
let deletes = FuturesUnordered::new();
for key in self.0 {
let metrics = Arc::clone(metrics);
let blob = Arc::clone(blob);
deletes.push(async move {
retry_external(&metrics, || blob.delete(&key)).await;
});
}
let () = deletes.collect().await;
}
}

impl Deref for PartDeletes {
type Target = BTreeSet<PartialBatchKey>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;

use crate::cache::PersistClientCache;
use crate::internal::paths::{BlobKey, PartialBlobKey};
use crate::tests::{all_ok, new_test_client, CodecProduct};
Expand Down Expand Up @@ -1144,6 +1174,9 @@ mod tests {

assert_eq!(batch.batch.parts.len(), 3);
for part in &batch.batch.parts {
let part = match part {
BatchPart::Hollow(x) => x,
};
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
assert_eq!(shard.to_string(), shard_id.to_string());
Expand Down Expand Up @@ -1191,6 +1224,9 @@ mod tests {

assert_eq!(batch.batch.parts.len(), 2);
for part in &batch.batch.parts {
let part = match part {
BatchPart::Hollow(x) => x,
};
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
assert_eq!(shard.to_string(), shard_id.to_string());
Expand Down Expand Up @@ -1231,10 +1267,10 @@ mod tests {
}

// NB: Most edge cases are exercised in datadriven tests.
#[mz_ore::test(tokio::test)]
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // too slow
async fn rewrite_ts_example() {
let client = new_test_client().await;
async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
let (mut write, read) = client
.expect_open::<String, (), u64, i64>(ShardId::new())
.await;
Expand Down
4 changes: 2 additions & 2 deletions src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ where
let bytes = req
.inputs
.iter()
.flat_map(|x| x.parts.iter().map(|x| x.encoded_size_bytes))
.flat_map(|x| x.parts.iter().map(|x| x.encoded_size_bytes()))
.sum::<usize>();
let start = Instant::now();
info!(
Expand Down Expand Up @@ -355,7 +355,7 @@ where
res.output
.parts
.iter()
.map(|x| x.encoded_size_bytes)
.map(|x| x.encoded_size_bytes())
.sum::<usize>(),
start.elapsed(),
);
Expand Down
Loading

0 comments on commit fe25ab8

Please sign in to comment.