diff --git a/doc/developer/design/20240401_persist_inline_writes.md b/doc/developer/design/20240401_persist_inline_writes.md new file mode 100644 index 0000000000000..2ab10bc0ad5b0 --- /dev/null +++ b/doc/developer/design/20240401_persist_inline_writes.md @@ -0,0 +1,81 @@ +# Persist Inline Writes + +- Associated: [#24832](https://github.com/MaterializeInc/materialize/pull/24832) + +## The Problem + +A non-empty persist write currently always requires a write to S3 and a write to +CockroachDB. S3 writes are much slower than CRDB writes and incur a per-PUT +charge, so in the case of very small writes, this is wasteful of both latency +and cost. + +Persist latencies directly impact the user experience of Materialize in a number +of ways. The above waste is particularly egregious in DDL, which may serially +perform a number of small persist writes. + +## Success Criteria + +Eligible persist writes have latency of `O(crdb write)` and not `O(s3 write)`. + +## Out of Scope + +- Latency guarantees: In particular, to keep persist metadata small, inline + writes are an optimization, they are not guaranteed. +- Read latencies. + +## Solution Proposal + +Persist internally has the concept of a _part_, which corresponds 1:1:1 with a +persist _blob_ and an _object_ in S3. Currently, all data in a persist shard is +stored in S3 and then a _hollow_ reference to it is stored in CRDB. This +reference also includes metadata, such as pushdown statistics and the encoded +size. + +We make this reference instead a two variant enum: `Hollow` and `Inline`. The +`Inline` variant stores the same data as would be written to s3, but in an +encoded protobuf. This protobuf is only decoded in data fetch paths, and is +otherwise passed around as opaque bytes to save allocations and cpu cycles. +Pushdown statistics and the encoded size are both unnecessary for inline parts. + +The persist state stored in CRDB is a control plane concept, so there is both a +performance and a stability risk from mixing the data plane into it. We reduce +the inline parts over time by making compaction flush them out to S3, never +inlining them. However, nothing prevents new writes from arriving faster than +compaction can pull them out. We protect the control plane with the following +two limits to create a hard upper bound on how much data can be inline: + +- `persist_inline_writes_single_max_bytes`: An (exclusive) maximum size of a + write that persist will inline in metadata. +- `persist_inline_writes_total_max_bytes`: An (inclusive) maximum total size of + inline writes in metadata. Any attempted writes beyond this threshold will + instead fall through to the s3 path. + +## Alternatives + +- In addition to S3, also store _blobs_ in CRDB (or a third technology). + + CRDB [self-documents as not a good fit][crdb-large-blob] for workloads + involving large binary blobs: "it's recommended to keep values under 1 MB to + ensure adequate performance". That said, given that inline writes are small + and directly translate to a CRDB write anyway, this would likely be a totally + workable alternative. + + That said, CRDB is already a dominant cost of persist and a large part of that + is a function of the rate of SQL values changed, regardless of the size or + batching of those writes. + + Additionally, putting the writes inline in persist state allows pubsub to push + them around for us, meaning that a reader that is subscribed/listening to the + shard doesn't hit the network when it gets the new seqno. + + A third technology would not be worth the additional operational burden. + +- Make S3 faster. This is not actionable in the short term. + +[crdb-large-blob]: https://www.cockroachlabs.com/docs/stable/bytes#size + +## Open Questions + +- How do we tune the two thresholds? +- Should every inline write result in a compaction request for the new batch + immediately flushing it out to s3? diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index a4caec8f4b746..2b70339768c00 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -84,6 +84,8 @@ "enable_worker_core_affinity": "true", "persist_batch_delete_enabled": "true", "persist_fast_path_limit": "1000", + "persist_inline_writes_single_max_bytes": "4096", + "persist_inline_writes_total_max_bytes": "1048576", "persist_pubsub_client_enabled": "true", "persist_pubsub_push_diff_enabled": "true", "persist_sink_minimum_batch_updates": "128", diff --git a/src/buf.yaml b/src/buf.yaml index e81b6c375c093..96f7f925bb6bc 100644 --- a/src/buf.yaml +++ b/src/buf.yaml @@ -51,8 +51,6 @@ breaking: - expr/src/relation.proto # reason: not yet released - mysql-util/src/desc.proto - # reason: working around a false positive in buf's breaking change lint - - persist-client/src/internal/state.proto # reason: does currently not require backward-compatibility - storage-client/src/client.proto # reason: does currently not require backward-compatibility diff --git a/src/ore/src/lgbytes.rs b/src/ore/src/lgbytes.rs index 58ab9855c97d2..5e0cf2410bd1a 100644 --- a/src/ore/src/lgbytes.rs +++ b/src/ore/src/lgbytes.rs @@ -80,6 +80,12 @@ impl AsRef<[T]> for MetricsRegion { } } +impl From>> for LgBytes { + fn from(region: Arc>) -> Self { + LgBytes { offset: 0, region } + } +} + impl AsRef<[u8]> for LgBytes { fn as_ref(&self) -> &[u8] { // This implementation of [bytes::Buf] chooses to panic instead of @@ -267,10 +273,7 @@ impl LgBytesOpMetrics { /// region, falling back to a heap allocation. pub fn try_mmap>(&self, buf: T) -> LgBytes { let region = self.try_mmap_region(buf); - LgBytes { - offset: 0, - region: Arc::new(region), - } + LgBytes::from(Arc::new(region)) } /// Attempts to copy the given buf into an lgalloc managed file-based mapped diff --git a/src/persist-client/build.rs b/src/persist-client/build.rs index 2971d76adb37c..73ec5e4ceeb10 100644 --- a/src/persist-client/build.rs +++ b/src/persist-client/build.rs @@ -66,6 +66,7 @@ fn main() { // is to re-run if any file in the crate changes; that's still a bit too // broad, but it's better. .emit_rerun_if_changed(false) + .extern_path(".mz_persist", "::mz_persist") .extern_path(".mz_persist_types", "::mz_persist_types") .extern_path(".mz_proto", "::mz_proto") .compile_with_config( diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index d9d0ab6abb2b8..9c44cbaa699fb 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -43,11 +43,11 @@ use tracing::{debug_span, error, trace_span, warn, Instrument}; use crate::async_runtime::IsolatedRuntime; use crate::cfg::MiB; use crate::error::InvalidUsage; -use crate::internal::encoding::{LazyPartStats, Schemas}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, Schemas}; use crate::internal::machine::retry_external; use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; -use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart}; +use crate::internal::state::{BatchPart, HollowBatch, HollowBatchPart, ProtoInlineBatchPart}; use crate::stats::{ part_stats_for_legacy_part, untrimmable_columns, STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, }; @@ -232,6 +232,59 @@ where self.mark_consumed(); ret } + + pub(crate) async fn flush_to_blob( + &mut self, + cfg: &BatchBuilderConfig, + batch_metrics: &BatchWriteMetrics, + isolated_runtime: &Arc, + stats_schemas: &Schemas, + ) { + // It's necessary for correctness to keep the parts in the same order. + // We could introduce concurrency here with FuturesOrdered, but it would + // be pretty unexpected to have inline writes in more than one part, so + // don't bother. + let mut parts = Vec::new(); + for part in self.batch.parts.drain(..) { + let (updates, ts_rewrite) = match part { + BatchPart::Hollow(x) => { + parts.push(BatchPart::Hollow(x)); + continue; + } + BatchPart::Inline { + updates, + ts_rewrite, + } => (updates, ts_rewrite), + }; + let updates = updates + .decode::(&self.metrics.columnar) + .expect("valid inline part"); + let key_lower = updates.key_lower().to_vec(); + + let write_span = + debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id) + .or_current(); + let handle = mz_ore::task::spawn( + || "batch::flush_to_blob", + BatchParts::write_hollow_part( + cfg.clone(), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&self.shard_metrics), + batch_metrics.clone(), + Arc::clone(isolated_runtime), + updates, + key_lower, + ts_rewrite, + stats_schemas.clone(), + ) + .instrument(write_span), + ); + let part = handle.await.expect("part write task failed"); + parts.push(part); + } + self.batch.parts = parts; + } } /// Indicates what work was done in a call to [BatchBuilder::add] @@ -252,6 +305,7 @@ pub struct BatchBuilderConfig { pub(crate) blob_target_size: usize, pub(crate) batch_delete_enabled: bool, pub(crate) batch_builder_max_outstanding_parts: usize, + pub(crate) inline_writes_single_max_bytes: usize, pub(crate) stats_collection_enabled: bool, pub(crate) stats_budget: usize, pub(crate) stats_untrimmable_columns: Arc, @@ -276,6 +330,20 @@ pub(crate) const BLOB_TARGET_SIZE: Config = Config::new( "A target maximum size of persist blob payloads in bytes (Materialize).", ); +pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config = Config::new( + "persist_inline_writes_single_max_bytes", + 0, + "The (exclusive) maximum size of a write that persist will inline in metadata.", +); + +pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config = Config::new( + "persist_inline_writes_total_max_bytes", + 0, + "\ + The (exclusive) maximum total size of inline writes in metadata before \ + persist will backpressure them by flushing out to s3.", +); + impl BatchBuilderConfig { /// Initialize a batch builder config based on a snapshot of the Persist config. pub fn new(value: &PersistConfig, _writer_id: &WriterId) -> Self { @@ -287,6 +355,7 @@ impl BatchBuilderConfig { batch_builder_max_outstanding_parts: value .dynamic .batch_builder_max_outstanding_parts(), + inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value), stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value), stats_budget: STATS_BUDGET_BYTES.get(value), stats_untrimmable_columns: Arc::new(untrimmable_columns(value)), @@ -841,107 +910,56 @@ impl BatchParts { since: Antichain, ) { let desc = Description::new(self.lower.clone(), upper, since); - let metrics = Arc::clone(&self.metrics); - let shard_metrics = Arc::clone(&self.shard_metrics); - let blob = Arc::clone(&self.blob); - let isolated_runtime = Arc::clone(&self.isolated_runtime); let batch_metrics = self.batch_metrics.clone(); - let partial_key = PartialBatchKey::new(&self.cfg.writer_key, &PartId::new()); - let key = partial_key.complete(&self.shard_id); let index = u64::cast_from(self.finished_parts.len() + self.writing_parts.len()); - let stats_collection_enabled = self.cfg.stats_collection_enabled; - let stats_budget = self.cfg.stats_budget; - let schemas = schemas.clone(); - let untrimmable_columns = Arc::clone(&self.cfg.stats_untrimmable_columns); - - let write_span = debug_span!("batch::write_part", shard = %self.shard_id).or_current(); - let handle = mz_ore::task::spawn( - || "batch::write_part", - async move { - let goodbytes = updates.goodbytes(); - let batch = BlobTraceBatchPart { - desc, - updates: vec![updates], - index, - }; - - let (stats, (buf, encode_time)) = isolated_runtime - .spawn_named(|| "batch::encode_part", async move { - let stats = if stats_collection_enabled { - let stats_start = Instant::now(); - match part_stats_for_legacy_part(&schemas, &batch.updates) { - Ok(x) => { - let mut trimmed_bytes = 0; - let x = LazyPartStats::encode(&x, |s| { - trimmed_bytes = trim_to_budget(s, stats_budget, |s| { - untrimmable_columns.should_retain(s) - }); - }); - Some((x, stats_start.elapsed(), trimmed_bytes)) - } - Err(err) => { - error!("failed to construct part stats: {}", err); - None - } - } - } else { - None - }; - - let encode_start = Instant::now(); - let mut buf = Vec::new(); - batch.encode(&mut buf); - - // Drop batch as soon as we can to reclaim its memory. - drop(batch); - (stats, (Bytes::from(buf), encode_start.elapsed())) - }) - .instrument(debug_span!("batch::encode_part")) - .await - .expect("part encode task failed"); - // Can't use the `CodecMetrics::encode` helper because of async. - metrics.codecs.batch.encode_count.inc(); - metrics - .codecs - .batch - .encode_seconds - .inc_by(encode_time.as_secs_f64()); - - let start = Instant::now(); - let payload_len = buf.len(); - let () = retry_external(&metrics.retries.external.batch_set, || async { - shard_metrics.blob_sets.inc(); - blob.set(&key, Bytes::clone(&buf)).await - }) - .instrument(trace_span!("batch::set", payload_len)) - .await; - batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64()); - batch_metrics.bytes.inc_by(u64::cast_from(payload_len)); - batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes)); - let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| { + let ts_rewrite = None; + + let handle = if updates.goodbytes() < self.cfg.inline_writes_single_max_bytes { + let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current(); + mz_ore::task::spawn( + || "batch::inline_part", + async move { + let start = Instant::now(); + let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart { + desc: Some(desc.into_proto()), + index: index.into_proto(), + updates: Some(updates.into_proto()), + }); batch_metrics - .step_stats - .inc_by(stats_step_timing.as_secs_f64()); - if trimmed_bytes > 0 { - metrics.pushdown.parts_stats_trimmed_count.inc(); - metrics - .pushdown - .parts_stats_trimmed_bytes - .inc_by(u64::cast_from(trimmed_bytes)); + .step_inline + .inc_by(start.elapsed().as_secs_f64()); + BatchPart::Inline { + updates, + ts_rewrite, } - stats - }); - - BatchPart::Hollow(HollowBatchPart { - key: partial_key, - encoded_size_bytes: payload_len, + } + .instrument(span), + ) + } else { + let part = BlobTraceBatchPart { + desc, + updates: vec![updates], + index, + }; + let write_span = + debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current(); + mz_ore::task::spawn( + || "batch::write_part", + BatchParts::write_hollow_part( + self.cfg.clone(), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&self.shard_metrics), + batch_metrics.clone(), + Arc::clone(&self.isolated_runtime), + part, key_lower, - stats, - ts_rewrite: None, - }) - } - .instrument(write_span), - ); + ts_rewrite, + schemas.clone(), + ) + .instrument(write_span), + ) + }; self.writing_parts.push_back(handle); while self.writing_parts.len() > self.cfg.batch_builder_max_outstanding_parts { @@ -958,6 +976,98 @@ impl BatchParts { } } + async fn write_hollow_part( + cfg: BatchBuilderConfig, + blob: Arc, + metrics: Arc, + shard_metrics: Arc, + batch_metrics: BatchWriteMetrics, + isolated_runtime: Arc, + updates: BlobTraceBatchPart, + key_lower: Vec, + ts_rewrite: Option>, + schemas: Schemas, + ) -> BatchPart { + let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new()); + let key = partial_key.complete(&shard_metrics.shard_id); + let goodbytes = updates.updates.iter().map(|x| x.goodbytes()).sum::(); + + let (stats, (buf, encode_time)) = isolated_runtime + .spawn_named(|| "batch::encode_part", async move { + let stats = if cfg.stats_collection_enabled { + let stats_start = Instant::now(); + match part_stats_for_legacy_part(&schemas, &updates.updates) { + Ok(x) => { + let mut trimmed_bytes = 0; + let x = LazyPartStats::encode(&x, |s| { + trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| { + cfg.stats_untrimmable_columns.should_retain(s) + }); + }); + Some((x, stats_start.elapsed(), trimmed_bytes)) + } + Err(err) => { + error!("failed to construct part stats: {}", err); + None + } + } + } else { + None + }; + + let encode_start = Instant::now(); + let mut buf = Vec::new(); + updates.encode(&mut buf); + + // Drop batch as soon as we can to reclaim its memory. + drop(updates); + (stats, (Bytes::from(buf), encode_start.elapsed())) + }) + .instrument(debug_span!("batch::encode_part")) + .await + .expect("part encode task failed"); + // Can't use the `CodecMetrics::encode` helper because of async. + metrics.codecs.batch.encode_count.inc(); + metrics + .codecs + .batch + .encode_seconds + .inc_by(encode_time.as_secs_f64()); + + let start = Instant::now(); + let payload_len = buf.len(); + let () = retry_external(&metrics.retries.external.batch_set, || async { + shard_metrics.blob_sets.inc(); + blob.set(&key, Bytes::clone(&buf)).await + }) + .instrument(trace_span!("batch::set", payload_len)) + .await; + batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64()); + batch_metrics.bytes.inc_by(u64::cast_from(payload_len)); + batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes)); + let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| { + batch_metrics + .step_stats + .inc_by(stats_step_timing.as_secs_f64()); + if trimmed_bytes > 0 { + metrics.pushdown.parts_stats_trimmed_count.inc(); + metrics + .pushdown + .parts_stats_trimmed_bytes + .inc_by(u64::cast_from(trimmed_bytes)); + } + stats + }); + + BatchPart::Hollow(HollowBatchPart { + key: partial_key, + encoded_size_bytes: payload_len, + key_lower, + stats, + ts_rewrite, + }) + } + #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))] pub(crate) async fn finish(self) -> Vec> { let mut parts = self.finished_parts; @@ -1020,11 +1130,15 @@ pub(crate) fn validate_truncate_batch( pub(crate) struct PartDeletes(BTreeSet); impl PartDeletes { - // Adds the part to the set to be deleted and returns true if it was already - // present. + // Adds the part to the set to be deleted and returns true if it was newly + // inserted. pub fn add(&mut self, part: &BatchPart) -> bool { match part { BatchPart::Hollow(x) => self.0.insert(x.key.clone()), + BatchPart::Inline { .. } => { + // Nothing to delete. + true + } } } @@ -1186,6 +1300,7 @@ mod tests { for part in &batch.batch.parts { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("batch unexpectedly used inline part"), }; match BlobKey::parse_ids(&part.key.complete(&shard_id)) { Ok((shard, PartialBlobKey::Batch(writer, _))) => { @@ -1236,6 +1351,7 @@ mod tests { for part in &batch.batch.parts { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("batch unexpectedly used inline part"), }; match BlobKey::parse_ids(&part.key.complete(&shard_id)) { Ok((shard, PartialBlobKey::Batch(writer, _))) => { diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index 0cd3aa6301f8b..a9495fd034cbd 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -312,6 +312,8 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { mz_persist::cfg::all_dyn_configs(configs) .add(&crate::batch::BATCH_DELETE_ENABLED) .add(&crate::batch::BLOB_TARGET_SIZE) + .add(&crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES) + .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL) .add(&crate::cfg::CRDB_CONNECT_TIMEOUT) diff --git a/src/persist-client/src/cli/inspect.rs b/src/persist-client/src/cli/inspect.rs index 551740ae86028..4e518e40d8631 100644 --- a/src/persist-client/src/cli/inspect.rs +++ b/src/persist-client/src/cli/inspect.rs @@ -39,7 +39,7 @@ use crate::internal::encoding::{Rollup, UntypedState}; use crate::internal::paths::{ BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey, }; -use crate::internal::state::{BatchPart, HollowBatchPart, ProtoRollup, ProtoStateDiff, State}; +use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State}; use crate::rpc::NoopPubSubSender; use crate::usage::{HumanBytes, StorageUsageClient}; use crate::{Metrics, PersistClient, PersistConfig, ShardId}; @@ -347,17 +347,11 @@ pub async fn blob_batch_part( let parsed = BlobTraceBatchPart::::decode(&buf, &metrics.columnar).expect("decodable"); let desc = parsed.desc.clone(); - let part = HollowBatchPart { - key, - encoded_size_bytes: 0, - key_lower: vec![], - stats: None, - ts_rewrite: None, - }; let encoded_part = EncodedPart::new( metrics.read.snapshot.clone(), parsed.desc.clone(), - &part, + &key.0, + None, parsed, ); let mut out = BatchPartOutput { @@ -605,6 +599,7 @@ pub async fn unreferenced_blobs(args: &StateArgs) -> Result known_parts.insert(x.key.clone()), + BatchPart::Inline { .. } => continue, }; } } diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 6948c86fd2528..92c1b28216e9a 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -36,7 +36,7 @@ use crate::batch::{ ProtoLeasedBatchPart, }; use crate::error::InvalidUsage; -use crate::internal::encoding::{LazyPartStats, LazyProto, Schemas}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; use crate::internal::machine::retry_external; use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics}; use crate::internal::paths::BlobKey; @@ -120,6 +120,14 @@ where part: x.clone(), } } + BatchPart::Inline { + updates, + ts_rewrite, + } => FetchedBlobBuf::Inline { + desc: part.desc.clone(), + updates: updates.clone(), + ts_rewrite: ts_rewrite.clone(), + }, }; let fetched_blob = FetchedBlob { metrics: Arc::clone(&self.metrics), @@ -324,7 +332,7 @@ where read_metrics.part_goodbytes.inc_by(u64::cast_from( parsed.updates.iter().map(|x| x.goodbytes()).sum::(), )); - EncodedPart::new(read_metrics.clone(), registered_desc, part, parsed) + EncodedPart::from_hollow(read_metrics.clone(), registered_desc, part, parsed) }) } @@ -475,6 +483,11 @@ enum FetchedBlobBuf { buf: SegmentedBytes, part: HollowBatchPart, }, + Inline { + desc: Description, + updates: LazyInlineBatchPart, + ts_rewrite: Option>, + }, } impl Clone for FetchedBlob { @@ -506,6 +519,20 @@ impl FetchedBlob { + let parsed = EncodedPart::from_inline( + &self.metrics, + self.read_metrics.clone(), + desc.clone(), + updates, + ts_rewrite.as_ref(), + ); + (parsed, None) + } }; FetchedPart::new( Arc::clone(&self.metrics), @@ -711,14 +738,51 @@ where ) .await } + BatchPart::Inline { + updates, + ts_rewrite, + } => Ok(EncodedPart::from_inline( + metrics, + read_metrics.clone(), + registered_desc.clone(), + updates, + ts_rewrite.as_ref(), + )), } } - pub(crate) fn new( + pub(crate) fn from_inline( + metrics: &Metrics, + read_metrics: ReadMetrics, + desc: Description, + x: &LazyInlineBatchPart, + ts_rewrite: Option<&Antichain>, + ) -> Self { + let parsed = x.decode(&metrics.columnar).expect("valid inline part"); + Self::new(read_metrics, desc, "inline", ts_rewrite, parsed) + } + + pub(crate) fn from_hollow( metrics: ReadMetrics, registered_desc: Description, part: &HollowBatchPart, parsed: BlobTraceBatchPart, + ) -> Self { + Self::new( + metrics, + registered_desc, + &part.key.0, + part.ts_rewrite.as_ref(), + parsed, + ) + } + + pub(crate) fn new( + metrics: ReadMetrics, + registered_desc: Description, + printable_name: &str, + ts_rewrite: Option<&Antichain>, + parsed: BlobTraceBatchPart, ) -> Self { // There are two types of batches in persist: // - Batches written by a persist user (either directly or indirectly @@ -738,11 +802,11 @@ where assert!( PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()), "key={} inline={:?} registered={:?}", - part.key, + printable_name, inline_desc, registered_desc ); - if part.ts_rewrite.is_none() { + if ts_rewrite.is_none() { // The ts rewrite feature allows us to advance the registered // upper of a batch that's already been staged (the inline // upper), so if it's been used, then there's no useful @@ -750,7 +814,7 @@ where assert!( PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()), "key={} inline={:?} registered={:?}", - part.key, + printable_name, inline_desc, registered_desc ); @@ -763,7 +827,7 @@ where inline_desc.since(), &Antichain::from_elem(T::minimum()), "key={} inline={:?} registered={:?}", - part.key, + printable_name, inline_desc, registered_desc ); @@ -771,7 +835,7 @@ where assert_eq!( inline_desc, ®istered_desc, "key={} inline={:?} registered={:?}", - part.key, inline_desc, registered_desc + printable_name, inline_desc, registered_desc ); } @@ -780,7 +844,7 @@ where registered_desc, part: Arc::new(parsed), needs_truncation, - ts_rewrite: part.ts_rewrite.clone(), + ts_rewrite: ts_rewrite.cloned(), } } diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index c478a1d2ec3c4..40b9613f61446 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -28,7 +28,7 @@ use timely::progress::{Antichain, Timestamp}; use timely::PartialOrder; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot, TryAcquireError}; -use tracing::{debug, debug_span, trace, warn, Instrument, Span}; +use tracing::{debug, debug_span, error, trace, warn, Instrument, Span}; use crate::async_runtime::IsolatedRuntime; use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, PartDeletes}; @@ -79,12 +79,18 @@ pub struct CompactConfig { impl CompactConfig { /// Initialize the compaction config from Persist configuration. pub fn new(value: &PersistConfig, writer_id: &WriterId) -> Self { - CompactConfig { + let mut ret = CompactConfig { compaction_memory_bound_bytes: value.dynamic.compaction_memory_bound_bytes(), compaction_yield_after_n_updates: value.compaction_yield_after_n_updates, version: value.build_version.clone(), batch: BatchBuilderConfig::new(value, writer_id), - } + }; + // Use compaction as a method of getting inline writes out of state, to + // make room for more inline writes. We could instead do this at the end + // of compaction by flushing out the batch, but doing it here based on + // the config allows BatchBuilder to do its normal pipelining of writes. + ret.batch.inline_writes_single_max_bytes = 0; + ret } } @@ -689,7 +695,7 @@ where metrics.compaction.batch.clone(), desc.lower().clone(), Arc::clone(&blob), - isolated_runtime, + Arc::clone(&isolated_runtime), shard_id.clone(), cfg.version.clone(), desc.since().clone(), @@ -740,12 +746,31 @@ where } tokio::task::yield_now().await; } - let batch = batch.finish(&real_schemas, desc.upper().clone()).await?; - let hollow_batch = batch.into_hollow_batch(); + let mut batch = batch.finish(&real_schemas, desc.upper().clone()).await?; + + // We use compaction as a method of getting inline writes out of state, + // to make room for more inline writes. This happens in + // `CompactConfig::new` by overriding the inline writes threshold + // config. This is a bit action-at-a-distance, so defensively detect if + // this breaks here and log and correct it if so. + let has_inline_parts = batch.batch.parts.iter().any(|x| match x { + BatchPart::Hollow(_) => false, + BatchPart::Inline { .. } => true, + }); + if has_inline_parts { + error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes"); + let () = batch + .flush_to_blob( + &cfg.batch, + &metrics.compaction.batch, + &isolated_runtime, + &real_schemas, + ) + .await; + } timings.record(&metrics); - - Ok(hollow_batch) + Ok(batch.into_hollow_batch()) } fn validate_req(req: &CompactReq) -> Result<(), anyhow::Error> { @@ -877,6 +902,7 @@ mod tests { assert_eq!(res.output.parts.len(), 1); let part = match &res.output.parts[0] { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("test outputs a hollow part"), }; let (part, updates) = expect_fetch_part( write.blob.as_ref(), @@ -962,6 +988,7 @@ mod tests { assert_eq!(res.output.parts.len(), 1); let part = match &res.output.parts[0] { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => panic!("test outputs a hollow part"), }; let (part, updates) = expect_fetch_part( write.blob.as_ref(), diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index b5538fe41a1a8..9bd7699c1ed6b 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -10,6 +10,7 @@ use std::cmp::Ordering; use std::collections::BTreeMap; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; @@ -17,7 +18,10 @@ use bytes::{Buf, Bytes}; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; use mz_ore::halt; +use mz_persist::indexed::columnar::ColumnarRecords; +use mz_persist::indexed::encoding::BlobTraceBatchPart; use mz_persist::location::{SeqNo, VersionedData}; +use mz_persist::metrics::ColumnarMetrics; use mz_persist_types::stats::{PartStats, ProtoStructStats}; use mz_persist_types::{Codec, Codec64}; use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError}; @@ -25,6 +29,7 @@ use proptest::prelude::Arbitrary; use proptest::strategy::Strategy; use prost::Message; use semver::Version; +use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; use timely::progress::{Antichain, Timestamp}; use uuid::Uuid; @@ -38,10 +43,11 @@ use crate::internal::state::{ HollowBatchPart, HollowRollup, IdempotencyToken, LeasedReaderState, OpaqueState, ProtoCriticalReaderState, ProtoFuelingMerge, ProtoHandleDebugState, ProtoHollowBatch, ProtoHollowBatchPart, ProtoHollowRollup, ProtoIdFuelingMerge, ProtoIdHollowBatch, - ProtoIdSpineBatch, ProtoInlinedDiffs, ProtoLeasedReaderState, ProtoRollup, ProtoSpineBatch, - ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, - ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState, - State, StateCollections, TypedState, WriterState, + ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs, ProtoLeasedReaderState, + ProtoRollup, ProtoSpineBatch, ProtoSpineId, ProtoStateDiff, ProtoStateField, + ProtoStateFieldDiffType, ProtoStateFieldDiffs, ProtoTrace, ProtoU64Antichain, + ProtoU64Description, ProtoVersionedData, ProtoWriterState, State, StateCollections, TypedState, + WriterState, }; use crate::internal::state_diff::{ ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff, @@ -133,6 +139,13 @@ impl Ord for LazyProto { } } +impl Hash for LazyProto { + fn hash(&self, state: &mut H) { + let LazyProto { buf, _phantom } = self; + buf.hash(state); + } +} + impl From<&T> for LazyProto { fn from(value: &T) -> Self { let buf = Bytes::from(value.encode_to_vec()); @@ -1240,6 +1253,16 @@ impl RustType for BatchPart { key_stats: x.stats.into_proto(), ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()), }, + BatchPart::Inline { + updates, + ts_rewrite, + } => ProtoHollowBatchPart { + kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())), + encoded_size_bytes: 0, + key_lower: Bytes::new(), + key_stats: None, + ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()), + }, } } @@ -1248,17 +1271,26 @@ impl RustType for BatchPart { Some(ts_rewrite) => Some(ts_rewrite.into_rust()?), None => None, }; - let encoded_size_bytes = proto.encoded_size_bytes.into_rust()?; match proto.kind { Some(proto_hollow_batch_part::Kind::Key(key)) => { Ok(BatchPart::Hollow(HollowBatchPart { key: key.into_rust()?, - encoded_size_bytes, + encoded_size_bytes: proto.encoded_size_bytes.into_rust()?, key_lower: proto.key_lower.into(), stats: proto.key_stats.into_rust()?, ts_rewrite, })) } + Some(proto_hollow_batch_part::Kind::Inline(x)) => { + assert_eq!(proto.encoded_size_bytes, 0); + assert_eq!(proto.key_lower.len(), 0); + assert!(proto.key_stats.is_none()); + let updates = LazyInlineBatchPart(x.into_rust()?); + Ok(BatchPart::Inline { + updates, + ts_rewrite, + }) + } None => Err(TryFromProtoError::unknown_enum_variant( "ProtoHollowBatchPart::kind", )), @@ -1327,6 +1359,84 @@ impl Arbitrary for LazyPartStats { } } +impl ProtoInlineBatchPart { + pub(crate) fn into_rust( + lgbytes: &ColumnarMetrics, + proto: Self, + ) -> Result, TryFromProtoError> { + // BlobTraceBatchPart has a Vec. Inline writes only + // needs one and it's nice to only have to model one at the + // ProtoInlineBatchPart level. I'm _pretty_ sure that the actual + // BlobTraceBatchPart we've serialized into parquet always have exactly + // one (_maaaaaybe_ zero or one), but that's a scary thing to start + // enforcing, so separate it out (so we can e.g. use sentry errors! to + // confirm before rolling anything out). In the meantime, just construct + // the ProtoInlineBatchPart directly in BatchParts where it still knows + // that it has exactly one ColumnarRecords. + let updates = proto + .updates + .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?; + let updates = ColumnarRecords::from_proto(lgbytes, updates)?; + Ok(BlobTraceBatchPart { + desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?, + index: proto.index.into_rust()?, + updates: vec![updates], + }) + } +} + +/// A batch part stored inlined in State. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct LazyInlineBatchPart(LazyProto); + +impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart { + fn from(value: &ProtoInlineBatchPart) -> Self { + LazyInlineBatchPart(value.into()) + } +} + +impl Serialize for LazyInlineBatchPart { + fn serialize(&self, s: S) -> Result { + // NB: This serialize impl is only used for QA and debugging, so emit a + // truncated version. + let proto = self.0.decode().expect("valid proto"); + let mut s = s.serialize_struct("InlineBatchPart", 3)?; + let () = s.serialize_field("desc", &proto.desc)?; + let () = s.serialize_field("index", &proto.index)?; + let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?; + s.end() + } +} + +impl LazyInlineBatchPart { + pub(crate) fn encoded_size_bytes(&self) -> usize { + self.0.buf.len() + } + + /// Decodes and returns a BlobTraceBatchPart from the encoded + /// representation. + /// + /// This does not cache the returned value, it decodes each time it's + /// called. + pub fn decode( + &self, + lgbytes: &ColumnarMetrics, + ) -> Result, TryFromProtoError> { + let proto = self.0.decode().expect("valid proto"); + ProtoInlineBatchPart::into_rust(lgbytes, proto) + } +} + +impl RustType for LazyInlineBatchPart { + fn into_proto(&self) -> Bytes { + self.0.into_proto() + } + + fn from_proto(proto: Bytes) -> Result { + Ok(LazyInlineBatchPart(proto.into_rust()?)) + } +} + impl RustType for HollowRollup { fn into_proto(&self) -> ProtoHollowRollup { ProtoHollowRollup { @@ -1392,7 +1502,7 @@ mod tests { use crate::internal::paths::PartialRollupKey; use crate::internal::state::tests::any_state; - use crate::internal::state::HandleDebugState; + use crate::internal::state::{BatchPart, HandleDebugState}; use crate::internal::state_diff::StateDiff; use crate::tests::new_test_client_cache; use crate::ShardId; diff --git a/src/persist-client/src/internal/gc.rs b/src/persist-client/src/internal/gc.rs index d6b24bb624e31..dd62371ff6f7c 100644 --- a/src/persist-client/src/internal/gc.rs +++ b/src/persist-client/src/internal/gc.rs @@ -471,6 +471,7 @@ where BatchPart::Hollow(x) => { assert_eq!(batch_parts_to_delete.get(&x.key), None) } + BatchPart::Inline { .. } => {} } } } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 9c0d1cc7229b4..f418c98e30cee 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -20,6 +20,7 @@ use differential_dataflow::lattice::Lattice; use futures::stream::{FuturesUnordered, StreamExt}; use futures::FutureExt; use mz_dyncfg::{Config, ConfigSet}; +use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; #[allow(unused_imports)] // False positive. use mz_ore::fmt::FormatBuffer; @@ -32,6 +33,7 @@ use timely::PartialOrder; use tracing::{debug, info, trace_span, warn, Instrument}; use crate::async_runtime::IsolatedRuntime; +use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES; use crate::cache::StateCache; use crate::cfg::RetryParameters; use crate::critical::CriticalReaderId; @@ -43,9 +45,9 @@ use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; use crate::internal::paths::PartialRollupKey; use crate::internal::state::{ - CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup, - IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections, - Upper, + BatchPart, CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, + HollowRollup, IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, + StateCollections, Upper, }; use crate::internal::state_versions::StateVersions; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; @@ -240,6 +242,9 @@ where CompareAndAppendRes::InvalidUsage(x) => { return CompareAndAppendRes::InvalidUsage(x) } + CompareAndAppendRes::InlineBackpressure => { + return CompareAndAppendRes::InlineBackpressure + } CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => { // If the state machine thinks that the shard upper is not // far enough along, it could be because the caller of this @@ -376,7 +381,7 @@ where loop { let cmd_res = self .applier - .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, _, state| { + .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| { writer_was_present = state.writers.contains_key(writer_id); state.compare_and_append( batch, @@ -385,6 +390,7 @@ where lease_duration_ms, idempotency_token, debug_info, + INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg), ) }) .await; @@ -430,6 +436,16 @@ where if !writer_was_present { metrics.state.writer_added.inc(); } + for part in &batch.parts { + match part { + BatchPart::Inline { updates, .. } => { + let bytes = u64::cast_from(updates.encoded_size_bytes()); + metrics.inline.part_commit_count.inc(); + metrics.inline.part_commit_bytes.inc_by(bytes); + } + BatchPart::Hollow(_) => {} + } + } return CompareAndAppendRes::Success(seqno, writer_maintenance); } Err(CompareAndAppendBreak::AlreadyCommitted) => { @@ -451,6 +467,11 @@ where assert!(indeterminate.is_none()); return CompareAndAppendRes::InvalidUsage(err); } + Err(CompareAndAppendBreak::InlineBackpressure) => { + // We tried to write an inline part, but there was already + // too much in state. Flush it out to s3 and try again. + return CompareAndAppendRes::InlineBackpressure; + } Err(CompareAndAppendBreak::Upper { shard_upper, writer_upper, @@ -1008,10 +1029,12 @@ pub(crate) enum CompareAndAppendRes { Success(SeqNo, WriterMaintenance), InvalidUsage(InvalidUsage), UpperMismatch(SeqNo, Antichain), + InlineBackpressure, } #[cfg(test)] impl CompareAndAppendRes { + #[track_caller] fn unwrap(self) -> (SeqNo, WriterMaintenance) { match self { CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance), @@ -1260,6 +1283,7 @@ pub mod datadriven { use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::trace::Description; use mz_dyncfg::{ConfigUpdates, ConfigVal}; + use mz_persist::indexed::encoding::BlobTraceBatchPart; use mz_persist_types::codec_impls::{StringSchema, UnitSchema}; use crate::batch::{ @@ -1335,6 +1359,17 @@ pub mod datadriven { routine: Vec::new(), } } + + fn to_batch(&self, hollow: HollowBatch) -> Batch { + Batch::new( + true, + Arc::clone(&self.client.metrics), + Arc::clone(&self.client.blob), + self.client.metrics.shards.shard(&self.shard_id, "test"), + self.client.cfg.build_version.clone(), + hollow, + ) + } } /// Scans consensus and returns all states with their SeqNos @@ -1583,7 +1618,7 @@ pub mod datadriven { val: Arc::new(UnitSchema), }; let builder = BatchBuilderInternal::new( - cfg, + cfg.clone(), Arc::clone(&datadriven.client.metrics), Arc::clone(&datadriven.machine.applier.shard_metrics), schemas.clone(), @@ -1599,18 +1634,32 @@ pub mod datadriven { ); let mut builder = BatchBuilder { builder, - stats_schemas: schemas, + stats_schemas: schemas.clone(), }; for ((k, ()), t, d) in updates { builder.add(&k, &(), &t, &d).await.expect("invalid batch"); } - let batch = builder.finish(upper).await?.into_hollow_batch(); + let mut batch = builder.finish(upper).await?; + // We can only reasonably use parts_size_override with hollow batches, + // so if it's set, flush any inline batches out. + if parts_size_override.is_some() { + batch + .flush_to_blob( + &cfg, + &datadriven.client.metrics.user, + &datadriven.client.isolated_runtime, + &schemas, + ) + .await; + } + let batch = batch.into_hollow_batch(); if let Some(size) = parts_size_override { let mut batch = batch.clone(); for part in batch.parts.iter_mut() { match part { - BatchPart::Hollow(x) => x.encoded_size_bytes = size, + BatchPart::Hollow(part) => part.encoded_size_bytes = size, + BatchPart::Inline { .. } => unreachable!("flushed out above"), } } datadriven.batches.insert(output.to_owned(), batch); @@ -1631,8 +1680,16 @@ pub mod datadriven { let mut s = String::new(); for (idx, part) in batch.parts.iter().enumerate() { write!(s, "\n"); - if stats == Some("lower") && !part.key_lower().is_empty() { - writeln!(s, "", std::str::from_utf8(part.key_lower())?) + let key_lower = match part { + BatchPart::Hollow(x) => x.key_lower.clone(), + BatchPart::Inline { updates, .. } => { + let updates: BlobTraceBatchPart = + updates.decode(&datadriven.client.metrics.columnar).unwrap(); + updates.key_lower().to_vec() + } + }; + if stats == Some("lower") && !key_lower.is_empty() { + writeln!(s, "", std::str::from_utf8(&key_lower)?) } match part { BatchPart::Hollow(part) => { @@ -1651,6 +1708,7 @@ pub mod datadriven { } }; } + BatchPart::Inline { .. } => {} }; let part = EncodedPart::fetch( &datadriven.shard_id, @@ -1717,7 +1775,10 @@ pub mod datadriven { let batch = datadriven.batches.get_mut(input).expect("unknown batch"); for part in batch.parts.iter_mut() { match part { - BatchPart::Hollow(part) => part.encoded_size_bytes = size, + BatchPart::Hollow(x) => x.encoded_size_bytes = size, + BatchPart::Inline { .. } => { + panic!("set_batch_parts_size only supports hollow parts") + } } } Ok("ok\n".to_string()) @@ -2065,18 +2126,7 @@ pub mod datadriven { .get(batch) .expect("unknown batch") .clone(); - Batch::new( - true, - Arc::clone(&datadriven.client.metrics), - Arc::clone(&datadriven.client.blob), - datadriven - .client - .metrics - .shards - .shard(&datadriven.shard_id, "test"), - datadriven.client.cfg.build_version.clone(), - hollow, - ) + datadriven.to_batch(hollow) }) .collect(); @@ -2136,33 +2186,52 @@ pub mod datadriven { ) -> Result { let input = args.expect_str("input"); let writer_id = args.expect("writer_id"); - let batch = datadriven + let mut batch = datadriven .batches .get(input) .expect("unknown batch") .clone(); let token = args.optional("token").unwrap_or_else(IdempotencyToken::new); - let indeterminate = args - .optional::("prev_indeterminate") - .map(|x| Indeterminate::new(anyhow::Error::msg(x))); let now = (datadriven.client.cfg.now)(); - let res = datadriven - .machine - .compare_and_append_idempotent( - &batch, - &writer_id, - now, - &token, - &HandleDebugState::default(), - indeterminate, - ) - .await; - let maintenance = match res { - CompareAndAppendRes::Success(_, x) => x, - CompareAndAppendRes::UpperMismatch(_seqno, upper) => { - return Err(anyhow!("{:?}", Upper(upper))) - } - _ => panic!("{:?}", res), + let maintenance = loop { + let indeterminate = args + .optional::("prev_indeterminate") + .map(|x| Indeterminate::new(anyhow::Error::msg(x))); + let res = datadriven + .machine + .compare_and_append_idempotent( + &batch, + &writer_id, + now, + &token, + &HandleDebugState::default(), + indeterminate, + ) + .await; + match res { + CompareAndAppendRes::Success(_, x) => break x, + CompareAndAppendRes::UpperMismatch(_seqno, upper) => { + return Err(anyhow!("{:?}", Upper(upper))) + } + CompareAndAppendRes::InlineBackpressure => { + let mut b = datadriven.to_batch(batch.clone()); + let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, &writer_id); + let schemas = Schemas:: { + key: Arc::new(StringSchema), + val: Arc::new(UnitSchema), + }; + b.flush_to_blob( + &cfg, + &datadriven.client.metrics.user, + &datadriven.client.isolated_runtime, + &schemas, + ) + .await; + batch = b.into_hollow_batch(); + continue; + } + _ => panic!("{:?}", res), + }; }; // TODO: Don't throw away writer maintenance. It's slightly tricky // because we need a WriterId for Compactor. @@ -2227,6 +2296,7 @@ pub mod tests { use mz_persist::location::SeqNo; use timely::progress::Antichain; + use crate::batch::BatchBuilderConfig; use crate::cache::StateCache; use crate::internal::gc::{GarbageCollector, GcReq}; use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD}; @@ -2249,9 +2319,20 @@ pub mod tests { // live entries in consensus. const NUM_BATCHES: u64 = 100; for idx in 0..NUM_BATCHES { - let batch = write + let mut batch = write .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1) .await; + // Flush this batch out so the CaA doesn't get inline writes + // backpressure. + let cfg = BatchBuilderConfig::new(&client.cfg, &write.writer_id); + batch + .flush_to_blob( + &cfg, + &client.metrics.user, + &client.isolated_runtime, + &write.schemas, + ) + .await; let (_, writer_maintenance) = write .machine .compare_and_append( diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 18d36e46dd8c9..bd5408a64ae88 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -96,6 +96,8 @@ pub struct Metrics { pub tasks: TasksMetrics, /// Metrics for columnar data encoding and decoding. pub columnar: ColumnarMetrics, + /// Metrics for inline writes. + pub inline: InlineMetrics, /// Metrics for the persist sink. pub sink: SinkMetrics, @@ -153,6 +155,7 @@ impl Metrics { blob_cache_mem: BlobMemCache::new(registry), tasks: TasksMetrics::new(registry), columnar, + inline: InlineMetrics::new(registry), sink: SinkMetrics::new(registry), s3_blob, postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"), @@ -692,6 +695,7 @@ pub struct BatchWriteMetrics { pub(crate) step_columnar_encoding: Counter, pub(crate) step_stats: Counter, pub(crate) step_part_writing: Counter, + pub(crate) step_inline: Counter, } impl BatchWriteMetrics { @@ -732,6 +736,10 @@ impl BatchWriteMetrics { name: format!("mz_persist_{}_step_part_writing", name), help: format!("blocking time spent writing parts for {} updates", name), )), + step_inline: registry.register(metric!( + name: format!("mz_persist_{}_step_inline", name), + help: format!("time spent encoding {} inline batches", name) + )), } } } @@ -1209,7 +1217,10 @@ pub struct ShardsMetrics { backpressure_emitted_bytes: IntCounterVec, backpressure_last_backpressured_bytes: UIntGaugeVec, backpressure_retired_bytes: IntCounterVec, - rewrite_part_count: mz_ore::metrics::UIntGaugeVec, + rewrite_part_count: UIntGaugeVec, + inline_part_count: UIntGaugeVec, + inline_part_bytes: UIntGaugeVec, + inline_backpressure_count: IntCounterVec, // We hand out `Arc` to read and write handles, but store it // here as `Weak`. This allows us to discover if it's no longer in use and // so we can remove it from the map. @@ -1399,6 +1410,21 @@ impl ShardsMetrics { help: "count of batch parts with rewrites by shard", var_labels: ["shard", "name"], )), + inline_part_count: registry.register(metric!( + name: "mz_persist_shard_inline_part_count", + help: "count of parts inline in shard metadata", + var_labels: ["shard", "name"], + )), + inline_part_bytes: registry.register(metric!( + name: "mz_persist_shard_inline_part_bytes", + help: "total size of parts inline in shard metadata", + var_labels: ["shard", "name"], + )), + inline_backpressure_count: registry.register(metric!( + name: "mz_persist_shard_inline_backpressure_count", + help: "count of CaA attempts retried because of inline backpressure", + var_labels: ["shard", "name"], + )), shards, } } @@ -1477,6 +1503,9 @@ pub struct ShardMetrics { Arc>>, pub backpressure_retired_bytes: Arc>>, pub rewrite_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_part_count: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_part_bytes: DeleteOnDropGauge<'static, AtomicU64, Vec>, + pub inline_backpressure_count: DeleteOnDropCounter<'static, AtomicU64, Vec>, } impl ShardMetrics { @@ -1588,7 +1617,16 @@ impl ShardMetrics { ), rewrite_part_count: shards_metrics .rewrite_part_count - .get_delete_on_drop_gauge(vec![shard, name.to_string()]), + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_part_count: shards_metrics + .inline_part_count + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_part_bytes: shards_metrics + .inline_part_bytes + .get_delete_on_drop_gauge(vec![shard.clone(), name.to_string()]), + inline_backpressure_count: shards_metrics + .inline_backpressure_count + .get_delete_on_drop_counter(vec![shard, name.to_string()]), } } @@ -2216,6 +2254,8 @@ pub struct PushdownMetrics { pub(crate) parts_fetched_bytes: IntCounter, pub(crate) parts_audited_count: IntCounter, pub(crate) parts_audited_bytes: IntCounter, + pub(crate) parts_inline_count: IntCounter, + pub(crate) parts_inline_bytes: IntCounter, pub(crate) parts_stats_trimmed_count: IntCounter, pub(crate) parts_stats_trimmed_bytes: IntCounter, pub part_stats: PartStatsMetrics, @@ -2248,6 +2288,14 @@ impl PushdownMetrics { name: "mz_persist_pushdown_parts_audited_bytes", help: "total size of parts fetched only for pushdown audit", )), + parts_inline_count: registry.register(metric!( + name: "mz_persist_pushdown_parts_inline_count", + help: "count of parts not fetched because they were inline", + )), + parts_inline_bytes: registry.register(metric!( + name: "mz_persist_pushdown_parts_inline_bytes", + help: "total size of parts not fetched because they were inline", + )), parts_stats_trimmed_count: registry.register(metric!( name: "mz_persist_pushdown_parts_stats_trimmed_count", help: "count of trimmed part stats", @@ -2763,6 +2811,29 @@ impl TasksMetrics { } } +#[derive(Debug)] +pub struct InlineMetrics { + pub(crate) part_commit_count: IntCounter, + pub(crate) part_commit_bytes: IntCounter, + pub(crate) backpressure: BatchWriteMetrics, +} + +impl InlineMetrics { + fn new(registry: &MetricsRegistry) -> Self { + InlineMetrics { + part_commit_count: registry.register(metric!( + name: "mz_persist_inline_part_commit_count", + help: "count of inline parts committed to state", + )), + part_commit_bytes: registry.register(metric!( + name: "mz_persist_inline_part_commit_bytes", + help: "total size of of inline parts committed to state", + )), + backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"), + } + } +} + fn blob_key_shard_id(key: &str) -> Option { let (shard_id, _) = BlobKey::parse_ids(key).ok()?; Some(shard_id.to_string()) diff --git a/src/persist-client/src/internal/restore.rs b/src/persist-client/src/internal/restore.rs index bbe6c694f0c5d..0b58ec37f870f 100644 --- a/src/persist-client/src/internal/restore.rs +++ b/src/persist-client/src/internal/restore.rs @@ -88,6 +88,7 @@ pub(crate) async fn restore_blob( for part in &batch.parts { let key = match part { BatchPart::Hollow(x) => x.key.complete(&shard_id), + BatchPart::Inline { .. } => continue, }; check_restored(&key, blob.restore(&key).await); } @@ -98,6 +99,7 @@ pub(crate) async fn restore_blob( for part in &after.parts { let key = match part { BatchPart::Hollow(x) => x.key.complete(&shard_id), + BatchPart::Inline { .. } => continue, }; check_restored(&key, blob.restore(&key).await); } diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto index 4f32742f48d72..2123d743bd41e 100644 --- a/src/persist-client/src/internal/state.proto +++ b/src/persist-client/src/internal/state.proto @@ -7,12 +7,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -// buf breaking: ignore (working around a false positive in buf's breaking change lint) - syntax = "proto3"; package mz_persist_client.internal.state; +import "persist/src/persist.proto"; + message ProtoU64Antichain { repeated int64 elements = 1; } @@ -26,16 +26,24 @@ message ProtoU64Description { message ProtoHollowBatchPart { oneof kind { string key = 1; + bytes inline = 5; } - uint64 encoded_size_bytes = 2; - - bytes key_lower = 3; ProtoU64Antichain ts_rewrite = 4; + // Only set when Kind is Key + uint64 encoded_size_bytes = 2; + bytes key_lower = 3; optional bytes key_stats = 536870906; + reserved 536870907 to 536870911; } +message ProtoInlineBatchPart { + ProtoU64Description desc = 1; + uint64 index = 2; + mz_persist.gen.persist.ProtoColumnarRecords updates = 3; +} + message ProtoHollowBatch { ProtoU64Description desc = 1; repeated ProtoHollowBatchPart parts = 4; diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index de2684bd6bf80..fa9c382ec3266 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -35,7 +35,7 @@ use uuid::Uuid; use crate::critical::CriticalReaderId; use crate::error::InvalidUsage; -use crate::internal::encoding::{parse_id, LazyPartStats}; +use crate::internal::encoding::{parse_id, LazyInlineBatchPart, LazyPartStats}; use crate::internal::gc::GcReq; use crate::internal::paths::{PartialBatchKey, PartialRollupKey}; use crate::internal::trace::{ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace}; @@ -176,12 +176,17 @@ pub struct HandleDebugState { #[serde(tag = "type")] pub enum BatchPart { Hollow(HollowBatchPart), + Inline { + updates: LazyInlineBatchPart, + ts_rewrite: Option>, + }, } impl BatchPart { pub fn encoded_size_bytes(&self) -> usize { match self { BatchPart::Hollow(x) => x.encoded_size_bytes, + BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(), } } @@ -190,24 +195,34 @@ impl BatchPart { pub fn printable_name(&self) -> &str { match self { BatchPart::Hollow(x) => x.key.0.as_str(), + BatchPart::Inline { .. } => "", } } pub fn stats(&self) -> Option<&LazyPartStats> { match self { BatchPart::Hollow(x) => x.stats.as_ref(), + BatchPart::Inline { .. } => None, } } pub fn key_lower(&self) -> &[u8] { match self { BatchPart::Hollow(x) => x.key_lower.as_slice(), + // We don't duplicate the lowest key because this can be + // considerable overhead for small parts. + // + // The empty key might not be a tight lower bound, but it is a valid + // lower bound. If a caller is interested in a tighter lower bound, + // the data is inline. + BatchPart::Inline { .. } => &[], } } pub fn ts_rewrite(&self) -> Option<&Antichain> { match self { BatchPart::Hollow(x) => x.ts_rewrite.as_ref(), + BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(), } } } @@ -222,6 +237,19 @@ impl Ord for BatchPart { fn cmp(&self, other: &Self) -> Ordering { match (self, other) { (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o), + ( + BatchPart::Inline { + updates: s_updates, + ts_rewrite: s_ts_rewrite, + }, + BatchPart::Inline { + updates: o_updates, + ts_rewrite: o_ts_rewrite, + }, + ) => (s_updates, s_ts_rewrite.as_ref().map(|x| x.elements())) + .cmp(&(o_updates, o_ts_rewrite.as_ref().map(|x| x.elements()))), + (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less, + (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater, } } } @@ -363,6 +391,16 @@ impl HollowBatch { emitted_implicit: false, } } + + pub(crate) fn inline_bytes(&self) -> usize { + self.parts + .iter() + .map(|x| match x { + BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(), + BatchPart::Hollow(_) => 0, + }) + .sum() + } } pub(crate) struct HollowBatchRunIter<'a, T> { @@ -456,6 +494,7 @@ impl HollowBatch { for part in &mut self.parts { match part { BatchPart::Hollow(part) => part.ts_rewrite = Some(frontier.clone()), + BatchPart::Inline { ts_rewrite, .. } => *ts_rewrite = Some(frontier.clone()), } } Ok(()) @@ -558,6 +597,7 @@ pub enum CompareAndAppendBreak { writer_upper: Antichain, }, InvalidUsage(InvalidUsage), + InlineBackpressure, } #[derive(Debug)] @@ -699,6 +739,7 @@ where lease_duration_ms: u64, idempotency_token: &IdempotencyToken, debug_info: &HandleDebugState, + inline_writes_total_max_bytes: usize, ) -> ControlFlow, Vec>> { // We expire all writers if the upper and since both advance to the // empty antichain. Gracefully handle this. At the same time, @@ -775,6 +816,19 @@ where }); } + let new_inline_bytes = batch.inline_bytes(); + if new_inline_bytes > 0 { + let mut existing_inline_bytes = 0; + self.trace + .map_batches(|x| existing_inline_bytes += x.inline_bytes()); + // TODO: For very small batches, it may actually _increase_ the size + // of state to flush them out. Consider another threshold under + // which an inline part can be appended no matter what. + if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes { + return Break(CompareAndAppendBreak::InlineBackpressure); + } + } + let merge_reqs = if batch.desc.upper() != batch.desc.lower() { self.trace.push_batch(batch.clone()) } else { @@ -1391,6 +1445,13 @@ where if x.ts_rewrite().is_some() { ret.rewrite_part_count += 1; } + match x { + BatchPart::Hollow(_) => {} + BatchPart::Inline { updates, .. } => { + ret.inline_part_count += 1; + ret.inline_part_bytes += updates.encoded_size_bytes(); + } + } } ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size); ret.state_batches_bytes += batch_size; @@ -1655,6 +1716,8 @@ pub struct StateSizeMetrics { pub state_batches_bytes: usize, pub state_rollups_bytes: usize, pub state_rollup_count: usize, + pub inline_part_count: usize, + pub inline_part_bytes: usize, } #[derive(Default)] @@ -1675,9 +1738,11 @@ pub struct Upper(pub Antichain); pub(crate) mod tests { use std::ops::Range; + use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::now::SYSTEM_TIME; + use mz_proto::RustType; use proptest::prelude::*; use proptest::strategy::ValueTree; @@ -1705,7 +1770,7 @@ pub(crate) mod tests { any::(), any::(), any::(), - proptest::collection::vec(any_hollow_batch_part::(), 0..3), + proptest::collection::vec(any_batch_part::(), 0..3), any::(), any::(), ), @@ -1716,7 +1781,6 @@ pub(crate) mod tests { (Antichain::from_elem(t1), Antichain::from_elem(t0)) }; let since = Antichain::from_elem(since); - let parts = parts.into_iter().map(BatchPart::Hollow).collect::>(); let runs = if runs { vec![parts.len()] } else { vec![] }; HollowBatch { desc: Description::new(lower, upper, since), @@ -1728,6 +1792,24 @@ pub(crate) mod tests { ) } + pub fn any_batch_part() -> impl Strategy> { + Strategy::prop_map( + (any::(), any_hollow_batch_part(), any::>()), + |(is_hollow, hollow, ts_rewrite)| { + if is_hollow { + BatchPart::Hollow(hollow) + } else { + let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap(); + let ts_rewrite = ts_rewrite.map(Antichain::from_elem); + BatchPart::Inline { + updates, + ts_rewrite, + } + } + }, + ) + } + pub fn any_hollow_batch_part( ) -> impl Strategy> { Strategy::prop_map( @@ -2073,6 +2155,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::Upper { shard_upper: Antichain::from_elem(0), @@ -2089,6 +2172,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2101,6 +2185,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds { lower: Antichain::from_elem(5), @@ -2117,6 +2202,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ), Break(CompareAndAppendBreak::InvalidUsage( InvalidEmptyTimeInterval { @@ -2136,6 +2222,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); } @@ -2180,6 +2267,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2250,6 +2338,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2278,6 +2367,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2338,6 +2428,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); assert!(state @@ -2349,6 +2440,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2403,6 +2495,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); @@ -2421,6 +2514,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ) .is_continue()); } @@ -2453,6 +2547,7 @@ pub(crate) mod tests { LEASE_DURATION_MS, &IdempotencyToken::new(), &debug_state(), + 0, ); assert_eq!(state.maybe_gc(false), None); diff --git a/src/persist-client/src/internal/state_serde.json b/src/persist-client/src/internal/state_serde.json index 2d396b10e888f..6037a208b841a 100644 --- a/src/persist-client/src/internal/state_serde.json +++ b/src/persist-client/src/internal/state_serde.json @@ -88,7 +88,7 @@ 16259358130896200968 ], "upper": [ - 18139951554495009546 + 18440881202665513003 ], "batches": [ { @@ -96,97 +96,262 @@ 0 ], "upper": [ - 6077620655969140312 + 9048289724302440207 ], "since": [ - 11695933775649927238 + 16259358130896200968 ], - "len": 7, - "part_runs": [] + "len": 6, + "part_runs": [ + [ + { + "type": "Hollow", + "key": "Ὑ𑜈Ⱥ5:<=%N𚿱68", + "encoded_size_bytes": 5940325926225919910, + "key_lower": "71b019f49aaf03c0ca34281ee4c943fb5497062565268d24444fd890cab724176dca822cc29101aef59d92d9093e90421c4c3fb19e9d384791005f5597b14245d4c8d066b8dfcbc9f561166c536a61edebcb3ff63a85c9da3f", + "stats": { + "len": 3833243168769577315, + "cols": { + "`c,ꬠਣ¥<⃦<ⴭ𝼗&\\ⴧ/sמּVቌꢒ5庳𝔊": { + "len": 9040363185380571339, + "/93\"𐛝*𫞳$ÿ'^`#ÿ🀨-𝔉¥4ﴼ.": { + "len": 18433316110337886671, + "ቊkzCK𑲩z#ൖ": { + "lower": -6764418802614375466, + "upper": 6044295631498089172 + } + }, + "ࠛ𞸧𞹗ോ~🕴}": { + "lower": -1330813832404039491, + "upper": 3486409623257797125 + } + }, + "🕴g𞀣%iPᬥ?/ov𑵯4H K$&:.¥:🕴\\'?ä'": { + "len": 10033782515976642802, + "9ᨃ=\\¥R(pು𓅓tಮP{𞸹": { + "len": 12966863957103026765, + "/`�¥𑄸ㄙ": { + "lower": "507a83593ac523e9967a7078d7250ed657e0e27b60dbf11784345cfaab1016df0c97484c8369e59ac89d11a264b32c6be27d73d4aa58bedc31a438c5aef1674c958015bbb53d1a", + "upper": "afde5a3fefdec0718497af44132b8b9468f722494ede9a6f279399cf5d6e585e9a87fde31cdb49362e8835569bcba367f3709feecc2a6234c760b3d2bd88c95f1d47ebbd4a5bed57dd0a04e36bf9048416c5c735c9d24d08bbae20de57ce10" + }, + "𐞄&Ѩᝈf𝛌<$=$XѨ=`.ዔ'\\𐣱ꟕ᪕": { + "lower": false, + "upper": false + } + } + } + } + }, + "ts_rewrite": null + } + ], + [] + ] }, { "lower": [ - 6077620655969140312 + 9048289724302440207 ], "upper": [ - 9048289724302440207 + 9114565363275948631 ], "since": [ - 16259358130896200968 + 3215901267041653792 ], - "len": 8, + "len": 9, "part_runs": [ [ { "type": "Hollow", - "key": "𝕏`𑜈Ⱥ5:<", - "encoded_size_bytes": 16872256722443379164, - "key_lower": "009abac79278769a8279207fb05f71d6fa9de6df3a3175e60606a6f4b7e471b019f49aaf03c0ca34281ee4c943fb5497062565268d24444fd890cab724176dca822cc29101aef59d92d9093e", + "key": "?𞹎🦔GὙ:Y🜄𐭉¥$秊𖭟𑍋ÞዶS𑜗$;�h<🕴?ኊ¥", + "encoded_size_bytes": 12103938025441788085, + "key_lower": "a72d74b2717e76a37467779bcb5048b6af3e9138f7adc51f496916fed0953525a34fdee47362a299cac1e911f5ff1c8c60535dc1bc53656662b19e30a914", "stats": { - "len": 7478835496344731792, + "len": 3037491846987155811, "cols": { - "NH\\𐚿'𛈏ȺѨj𧂃=൞.*{𐾈,y{1𑂏;M𞺣{G:orຐᩛ:": { - "len": 7735747598686449402 + "dষ𝔑$Ѩ": { + "len": 11107616924147484604, + "𖭕Þ𖫀੪𐲓🕴=ﹶu-\"": { + "len": 429668735319527760 + }, + "𛱵𘥫𑰔ଆn.<ⶩপ𖿣𝙺E🢱1=ꚀC᥀ໜ¦𞹝 { @@ -1118,22 +1128,33 @@ impl ReferencedBlobValidator { assert_eq!(inc_lower, full_lower); assert_eq!(inc_upper, full_upper); + fn part_unique(x: &BatchPart) -> String { + match x { + BatchPart::Hollow(x) => x.key.to_string(), + BatchPart::Inline { + updates, + ts_rewrite, + } => { + let mut h = DefaultHasher::new(); + updates.hash(&mut h); + ts_rewrite.as_ref().map(|x| x.elements()).hash(&mut h); + h.finish().to_string() + } + } + } + // Check that the overall set of parts contained in both representations is the same. let inc_parts: HashSet<_> = self .inc_batches .iter() .flat_map(|x| x.parts.iter()) - .map(|x| match x { - BatchPart::Hollow(x) => &x.key, - }) + .map(part_unique) .collect(); let full_parts = self .full_batches .iter() .flat_map(|x| x.parts.iter()) - .map(|x| match x { - BatchPart::Hollow(x) => &x.key, - }) + .map(part_unique) .collect(); assert_eq!(inc_parts, full_parts); diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 4f3a6c33d7f40..5553de61775e7 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -85,6 +85,9 @@ impl FetchData { FetchData::Unleased { part, .. } => part.key.split().0 >= min_version, FetchData::Leased { part, .. } => match &part.part { BatchPart::Hollow(x) => x.key.split().0 >= min_version, + // Inline parts are only written directly by the user and so may + // be unconsolidated. + BatchPart::Inline { .. } => true, }, FetchData::AlreadyFetched => false, } @@ -308,6 +311,21 @@ impl Consolidator { + let read_metrics = read_metrics(&metrics.read).clone(); + let part = EncodedPart::from_inline( + metrics, + read_metrics, + desc.clone(), + updates, + ts_rewrite.as_ref(), + ); + let c_part = ConsolidationPart::from_encoded(part, &self.filter, true); + (c_part, updates.encoded_size_bytes()) + } }) .collect(); self.runs.push(run); diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index ce29eeb4a4e7c..bbc3a73bf52d3 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -383,15 +383,26 @@ where for mut part_desc in parts { // TODO: Push the filter down into the Subscribe? if STATS_FILTER_ENABLED.get(&cfg) { - let should_fetch = part_desc.part.stats().map_or(true, |stats| { - should_fetch_part(&stats.decode(), current_frontier.borrow()) - }); - let bytes = u64::cast_from(part_desc.part.encoded_size_bytes()); + let (should_fetch, is_inline) = match &part_desc.part { + BatchPart::Hollow(x) => { + let should_fetch = x.stats.as_ref().map_or(true, |stats| { + should_fetch_part(&stats.decode(), current_frontier.borrow()) + }); + (should_fetch, false) + } + BatchPart::Inline { .. } => (true, true), + }; + let bytes = u64::cast_from(part_desc.encoded_size_bytes()); if should_fetch { audit_budget_bytes = audit_budget_bytes.saturating_add(part_desc.part.encoded_size_bytes()); - metrics.pushdown.parts_fetched_count.inc(); - metrics.pushdown.parts_fetched_bytes.inc_by(bytes); + if is_inline { + metrics.pushdown.parts_inline_count.inc(); + metrics.pushdown.parts_inline_bytes.inc_by(bytes); + } else { + metrics.pushdown.parts_fetched_count.inc(); + metrics.pushdown.parts_fetched_bytes.inc_by(bytes); + } } else { metrics.pushdown.parts_filtered_count.inc(); metrics.pushdown.parts_filtered_bytes.inc_by(bytes); @@ -401,6 +412,7 @@ where x.key.hash(&mut h); usize::cast_from(h.finish()) % 100 < STATS_AUDIT_PERCENT.get(&cfg) } + BatchPart::Inline { .. } => false, }; if should_audit && part_desc.part.encoded_size_bytes() < audit_budget_bytes { diff --git a/src/persist-client/src/usage.rs b/src/persist-client/src/usage.rs index 25010c499fa1d..6ee976cfed474 100644 --- a/src/persist-client/src/usage.rs +++ b/src/persist-client/src/usage.rs @@ -454,6 +454,7 @@ impl StorageUsageClient { for part in x.parts.iter() { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => continue, }; let parsed = BlobKey::parse_ids(&part.key.complete(&shard_id)); if let Ok((_, PartialBlobKey::Batch(writer_id, _))) = parsed { @@ -481,6 +482,7 @@ impl StorageUsageClient { for part in x.parts.iter() { let part = match part { BatchPart::Hollow(x) => x, + BatchPart::Inline { .. } => continue, }; current_state_batches_bytes += u64::cast_from(part.encoded_size_bytes); } @@ -740,7 +742,10 @@ mod tests { use semver::Version; use timely::progress::Antichain; - use crate::batch::BLOB_TARGET_SIZE; + use crate::batch::{ + BatchBuilderConfig, BLOB_TARGET_SIZE, INLINE_WRITES_SINGLE_MAX_BYTES, + INLINE_WRITES_TOTAL_MAX_BYTES, + }; use crate::internal::paths::{PartialRollupKey, RollupId}; use crate::tests::new_test_client; use crate::ShardId; @@ -758,6 +763,7 @@ mod tests { ]; let client = new_test_client(&dyncfgs).await; + let inline_writes_enabled = INLINE_WRITES_SINGLE_MAX_BYTES.get(&client.cfg) > 0; let build_version = client.cfg.build_version.clone(); let shard_id_one = ShardId::new(); let shard_id_two = ShardId::new(); @@ -818,7 +824,12 @@ mod tests { assert!(shard_one_size > 0); assert!(shard_two_size > 0); - assert!(shard_one_size < shard_two_size); + if inline_writes_enabled { + // Allow equality, but only if inline writes are enabled. + assert!(shard_one_size <= shard_two_size); + } else { + assert!(shard_one_size < shard_two_size); + } assert_eq!( shard_two_size, writer_one_size + writer_two_size + versioned_size + rollups_size @@ -866,6 +877,7 @@ mod tests { let shard_id = ShardId::new(); let mut client = new_test_client(&dyncfgs).await; + let inline_writes_enabled = INLINE_WRITES_SINGLE_MAX_BYTES.get(&client.cfg) > 0; let (mut write0, _) = client .expect_open::(shard_id) @@ -901,9 +913,11 @@ mod tests { let usage = StorageUsageClient::open(client); let shard_usage_audit = usage.shard_usage_audit(shard_id).await; let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await; - // We've written data. - assert!(shard_usage_audit.current_state_batches_bytes > 0); - assert!(shard_usage_referenced.batches_bytes > 0); + if !inline_writes_enabled { + // We've written data. + assert!(shard_usage_audit.current_state_batches_bytes > 0); + assert!(shard_usage_referenced.batches_bytes > 0); + } // There's always at least one rollup. assert!(shard_usage_audit.current_state_rollups_bytes > 0); assert!(shard_usage_referenced.rollup_bytes > 0); @@ -914,8 +928,10 @@ mod tests { // // write0 wrote a batch, but never linked it in, but is still active. assert!(shard_usage_audit.not_leaked_not_referenced_bytes > 0); - // write0 wrote a batch, but never linked it in, and is now expired. - assert!(shard_usage_audit.leaked_bytes > 0); + if !inline_writes_enabled { + // write0 wrote a batch, but never linked it in, and is now expired. + assert!(shard_usage_audit.leaked_bytes > 0); + } } #[mz_persist_proc::test(tokio::test)] @@ -936,6 +952,11 @@ mod tests { client.cfg.compaction_enabled = false; // make things interesting and create multiple parts per batch client.cfg.set_config(&BLOB_TARGET_SIZE, 0); + // Inline write backpressure will change the encoded size, but the CaAB + // call consumes the Batch, so we don't have any way of getting the new + // one. So, sniff out whether backpressure would flush out the part and + // do it before we get the sizes. + let backpressure_would_flush = INLINE_WRITES_TOTAL_MAX_BYTES.get(&client.cfg) == 0; let (mut write, _read) = client .expect_open::(shard_id) @@ -943,6 +964,23 @@ mod tests { let mut b1 = write.expect_batch(&data[..2], 0, 3).await; let mut b2 = write.expect_batch(&data[2..], 2, 5).await; + if backpressure_would_flush { + let cfg = BatchBuilderConfig::new(&client.cfg, &write.writer_id); + b1.flush_to_blob( + &cfg, + &client.metrics.user, + &client.isolated_runtime, + &write.schemas, + ) + .await; + b2.flush_to_blob( + &cfg, + &client.metrics.user, + &client.isolated_runtime, + &write.schemas, + ) + .await; + } let batches_size = b1 .batch diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index d4a37aff20dfe..4375f72d1ed57 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use mz_ore::instrument; use mz_ore::task::RuntimeExt; use mz_persist::location::Blob; @@ -478,56 +480,89 @@ where let since = Antichain::from_elem(T::minimum()); let desc = Description::new(lower, upper, since); - let any_batch_rewrite = batches - .iter() - .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some())); - let (mut parts, mut num_updates, mut runs) = (vec![], 0, vec![]); - for batch in batches.iter() { - let () = validate_truncate_batch(&batch.batch, &desc, any_batch_rewrite)?; - for run in batch.batch.runs() { - // Mark the boundary if this is not the first run in the batch. - let start_index = parts.len(); - if start_index != 0 { - runs.push(start_index); + let mut received_inline_backpressure = false; + let maintenance = loop { + let any_batch_rewrite = batches + .iter() + .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some())); + let (mut parts, mut num_updates, mut runs) = (vec![], 0, vec![]); + for batch in batches.iter() { + let () = validate_truncate_batch(&batch.batch, &desc, any_batch_rewrite)?; + for run in batch.batch.runs() { + // Mark the boundary if this is not the first run in the batch. + let start_index = parts.len(); + if start_index != 0 { + runs.push(start_index); + } + parts.extend_from_slice(run); } - parts.extend_from_slice(run); + num_updates += batch.batch.len; } - num_updates += batch.batch.len; - } - let heartbeat_timestamp = (self.cfg.now)(); - let res = self - .machine - .compare_and_append( - &HollowBatch { - desc: desc.clone(), - parts, - len: num_updates, - runs, - }, - &self.writer_id, - &self.debug_state, - heartbeat_timestamp, - ) - .await; + let heartbeat_timestamp = (self.cfg.now)(); + let res = self + .machine + .compare_and_append( + &HollowBatch { + desc: desc.clone(), + parts, + len: num_updates, + runs, + }, + &self.writer_id, + &self.debug_state, + heartbeat_timestamp, + ) + .await; - let maintenance = match res { - CompareAndAppendRes::Success(_seqno, maintenance) => { - self.upper = desc.upper().clone(); - for batch in batches.iter_mut() { - batch.mark_consumed(); + match res { + CompareAndAppendRes::Success(_seqno, maintenance) => { + self.upper = desc.upper().clone(); + for batch in batches.iter_mut() { + batch.mark_consumed(); + } + break maintenance; + } + CompareAndAppendRes::InvalidUsage(invalid_usage) => return Err(invalid_usage), + CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => { + // We tried to to a compare_and_append with the wrong expected upper, that + // won't work. Update the cached upper to the current upper. + self.upper = current_upper.clone(); + return Ok(Err(UpperMismatch { + current: current_upper, + expected: expected_upper, + })); + } + CompareAndAppendRes::InlineBackpressure => { + // We tried to write an inline part, but there was already + // too much in state. Flush it out to s3 and try again. + assert_eq!(received_inline_backpressure, false); + received_inline_backpressure = true; + + let cfg = BatchBuilderConfig::new(&self.cfg, &self.writer_id); + // We could have a large number of inline parts (imagine the + // sharded persist_sink), do this flushing concurrently. + let flush_batches = batches + .iter_mut() + .map(|batch| async { + batch + .flush_to_blob( + &cfg, + &self.metrics.inline.backpressure, + &self.isolated_runtime, + &self.schemas, + ) + .await + }) + .collect::>(); + let () = flush_batches.collect::<()>().await; + + for batch in batches.iter() { + assert_eq!(batch.batch.inline_bytes(), 0); + } + + continue; } - maintenance - } - CompareAndAppendRes::InvalidUsage(invalid_usage) => return Err(invalid_usage), - CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => { - // We tried to to a compare_and_append with the wrong expected upper, that - // won't work. Update the cached upper to the current upper. - self.upper = current_upper.clone(); - return Ok(Err(UpperMismatch { - current: current_upper, - expected: expected_upper, - })); } }; diff --git a/src/persist-client/tests/machine/batch b/src/persist-client/tests/machine/batch index 7f67ba58f00dd..eb69611c5e3ca 100644 --- a/src/persist-client/tests/machine/batch +++ b/src/persist-client/tests/machine/batch @@ -319,20 +319,6 @@ part 2 part 3 -# raw key stats are bounded size -write-batch output=b0 lower=0 upper=2 target_size=50 -AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBB 0 1 ----- -parts=1 len=1 - -fetch-batch input=b0 stats=lower ----- - - -AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBB 0 1 - -part 0 - # parts from different batches appended together get different runs write-batch output=b0 lower=0 upper=2 target_size=10 a 0 1 diff --git a/src/persist-client/tests/machine/batch_key_lower_truncate b/src/persist-client/tests/machine/batch_key_lower_truncate new file mode 100644 index 0000000000000..f12dd0aab311f --- /dev/null +++ b/src/persist-client/tests/machine/batch_key_lower_truncate @@ -0,0 +1,29 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# We only keep real key_lower for hollow parts, so disable inline writes. +dyncfg +persist_inline_writes_single_max_bytes 0 +persist_inline_writes_total_max_bytes 0 +---- +ok + +# raw key stats are bounded size +write-batch output=b0 lower=0 upper=2 target_size=50 +AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBB 0 1 +---- +parts=1 len=1 + +fetch-batch input=b0 stats=lower +---- + + +AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBB 0 1 + +part 0 diff --git a/src/persist-client/tests/machine/gc b/src/persist-client/tests/machine/gc index b14540012bda8..139cbf9f75fb3 100644 --- a/src/persist-client/tests/machine/gc +++ b/src/persist-client/tests/machine/gc @@ -7,6 +7,13 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so interesting things happen in Blob. +dyncfg +persist_inline_writes_single_max_bytes 0 +persist_inline_writes_total_max_bytes 0 +---- +ok + # This test uses a simplifying assumption that a batch is # always made up of exactly 1 batch part. This is because # batch parts are given random UUID names, meaning we can't diff --git a/src/persist-client/tests/machine/gc_rollups b/src/persist-client/tests/machine/gc_rollups index 87fb65e6e1195..015aa34dce3c5 100644 --- a/src/persist-client/tests/machine/gc_rollups +++ b/src/persist-client/tests/machine/gc_rollups @@ -7,6 +7,12 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so we can match on batches below. +dyncfg +persist_inline_writes_single_max_bytes 0 +---- +ok + write-batch output=b0 lower=0 upper=1 k1 0 1 ---- diff --git a/src/persist-client/tests/machine/regression_gc_behind b/src/persist-client/tests/machine/regression_gc_behind index 65c0ca2349834..df5ad4a829936 100644 --- a/src/persist-client/tests/machine/regression_gc_behind +++ b/src/persist-client/tests/machine/regression_gc_behind @@ -7,6 +7,12 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so we can match on batches below. +dyncfg +persist_inline_writes_single_max_bytes 0 +---- +ok + # Regression test for #14580, a bug where an over-aggressive internal # validation would fire if a GC request was behind the actual set of live # states. diff --git a/src/persist-client/tests/machine/restore_blob b/src/persist-client/tests/machine/restore_blob index 95ffb040b32e8..227f6b2cc6d16 100644 --- a/src/persist-client/tests/machine/restore_blob +++ b/src/persist-client/tests/machine/restore_blob @@ -7,6 +7,12 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. +# Disable inline writes so interesting things happen in Blob. +dyncfg +persist_inline_writes_single_max_bytes 0 +persist_inline_writes_total_max_bytes 0 +---- +ok # Pre-populate some non-trivial state in our shard. diff --git a/src/persist-proc/src/lib.rs b/src/persist-proc/src/lib.rs index 49207e446dea7..55a92906ab55f 100644 --- a/src/persist-proc/src/lib.rs +++ b/src/persist-proc/src/lib.rs @@ -68,8 +68,25 @@ fn test_impl(attr: TokenStream, item: TokenStream) -> TokenStream { let dyncfgs = [ { - // Placeholder default configuration until inline writes PR. - mz_dyncfg::ConfigUpdates::default() + // Inline writes disabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_writes_single_max_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x.add_dynamic("persist_inline_writes_total_max_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x + }, + { + // Inline writes enabled + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_writes_single_max_bytes", ::mz_dyncfg::ConfigVal::Usize(4 * 1024)); + x.add_dynamic("persist_inline_writes_total_max_bytes", ::mz_dyncfg::ConfigVal::Usize(1024 * 1024)); + x + }, + { + // Stress inline writes backpressure + let mut x = ::mz_dyncfg::ConfigUpdates::default(); + x.add_dynamic("persist_inline_writes_single_max_bytes", ::mz_dyncfg::ConfigVal::Usize(4 * 1024)); + x.add_dynamic("persist_inline_writes_total_max_bytes", ::mz_dyncfg::ConfigVal::Usize(0)); + x }, ]; diff --git a/src/persist/build.rs b/src/persist/build.rs index 93c1e83c7108e..56e73f0d6b28f 100644 --- a/src/persist/build.rs +++ b/src/persist/build.rs @@ -14,6 +14,11 @@ fn main() { prost_build::Config::new() .btree_map(["."]) + .type_attribute( + ".mz_persist.gen.persist.ProtoColumnarRecords", + "#[derive(serde::Serialize)]", + ) + .bytes([".mz_persist.gen.persist.ProtoColumnarRecords"]) .compile_protos(&["persist/src/persist.proto"], &[".."]) .unwrap_or_else(|e| panic!("{e}")) } diff --git a/src/persist/src/indexed/columnar.rs b/src/persist/src/indexed/columnar.rs index 033d3f56ba8f4..e9a4029a096e1 100644 --- a/src/persist/src/indexed/columnar.rs +++ b/src/persist/src/indexed/columnar.rs @@ -15,8 +15,12 @@ use std::sync::Arc; use std::{cmp, fmt}; use arrow2::types::Index; +use bytes::Bytes; +use mz_ore::bytes::MaybeLgBytes; use mz_ore::lgbytes::MetricsRegion; +use mz_proto::{ProtoType, RustType, TryFromProtoError}; +use crate::gen::persist::ProtoColumnarRecords; use crate::metrics::ColumnarMetrics; pub mod arrow; @@ -86,9 +90,9 @@ const BYTES_PER_KEY_VAL_OFFSET: usize = 4; #[derive(Clone, PartialEq)] pub struct ColumnarRecords { len: usize, - key_data: Arc>, + key_data: MaybeLgBytes, key_offsets: Arc>, - val_data: Arc>, + val_data: MaybeLgBytes, val_offsets: Arc>, timestamps: Arc>, diffs: Arc>, @@ -110,8 +114,8 @@ impl ColumnarRecords { /// The number of logical bytes in the represented data, excluding offsets /// and lengths. pub fn goodbytes(&self) -> usize { - (*self.key_data).as_ref().len() - + (*self.val_data).as_ref().len() + self.key_data.as_ref().len() + + self.val_data.as_ref().len() + 8 * (*self.timestamps).as_ref().len() + 8 * (*self.diffs).as_ref().len() } @@ -132,9 +136,9 @@ impl ColumnarRecords { // obvious. ColumnarRecordsRef { len: self.len, - key_data: (*self.key_data).as_ref(), + key_data: self.key_data.as_ref(), key_offsets: (*self.key_offsets).as_ref(), - val_data: (*self.val_data).as_ref(), + val_data: self.val_data.as_ref(), val_offsets: (*self.val_offsets).as_ref(), timestamps: (*self.timestamps).as_ref(), diffs: (*self.diffs).as_ref(), @@ -464,9 +468,9 @@ impl ColumnarRecordsBuilder { // `heap_region` method instead. Revisit if that changes. let ret = ColumnarRecords { len: self.len, - key_data: Arc::new(metrics.lgbytes_arrow.heap_region(self.key_data)), + key_data: MaybeLgBytes::Bytes(Bytes::from(self.key_data)), key_offsets: Arc::new(metrics.lgbytes_arrow.heap_region(self.key_offsets)), - val_data: Arc::new(metrics.lgbytes_arrow.heap_region(self.val_data)), + val_data: MaybeLgBytes::Bytes(Bytes::from(self.val_data)), val_offsets: Arc::new(metrics.lgbytes_arrow.heap_region(self.val_offsets)), timestamps: Arc::new(metrics.lgbytes_arrow.heap_region(self.timestamps)), diffs: Arc::new(metrics.lgbytes_arrow.heap_region(self.diffs)), @@ -483,6 +487,42 @@ impl ColumnarRecordsBuilder { } } +impl ColumnarRecords { + /// See [RustType::into_proto]. + pub fn into_proto(&self) -> ProtoColumnarRecords { + ProtoColumnarRecords { + len: self.len.into_proto(), + key_offsets: (*self.key_offsets).as_ref().to_vec(), + key_data: Bytes::copy_from_slice(self.key_data.as_ref()), + val_offsets: (*self.val_offsets).as_ref().to_vec(), + val_data: Bytes::copy_from_slice(self.val_data.as_ref()), + timestamps: (*self.timestamps).as_ref().to_vec(), + diffs: (*self.diffs).as_ref().to_vec(), + } + } + + /// See [RustType::from_proto]. + pub fn from_proto( + lgbytes: &ColumnarMetrics, + proto: ProtoColumnarRecords, + ) -> Result { + let ret = ColumnarRecords { + len: proto.len.into_rust()?, + key_offsets: Arc::new(lgbytes.lgbytes_arrow.heap_region(proto.key_offsets)), + key_data: MaybeLgBytes::Bytes(proto.key_data), + val_offsets: Arc::new(lgbytes.lgbytes_arrow.heap_region(proto.val_offsets)), + val_data: MaybeLgBytes::Bytes(proto.val_data), + timestamps: Arc::new(lgbytes.lgbytes_arrow.heap_region(proto.timestamps)), + diffs: Arc::new(lgbytes.lgbytes_arrow.heap_region(proto.diffs)), + }; + let () = ret + .borrow() + .validate() + .map_err(TryFromProtoError::InvalidPersistState)?; + Ok(ret) + } +} + #[cfg(test)] mod tests { use mz_persist_types::Codec64; diff --git a/src/persist/src/indexed/columnar/arrow.rs b/src/persist/src/indexed/columnar/arrow.rs index 109dc1aeea4d7..aed7e52d2b970 100644 --- a/src/persist/src/indexed/columnar/arrow.rs +++ b/src/persist/src/indexed/columnar/arrow.rs @@ -17,7 +17,8 @@ use arrow2::array::{Array, BinaryArray, PrimitiveArray}; use arrow2::chunk::Chunk; use arrow2::datatypes::{DataType, Field, Schema}; use mz_dyncfg::Config; -use mz_ore::lgbytes::MetricsRegion; +use mz_ore::bytes::MaybeLgBytes; +use mz_ore::lgbytes::{LgBytes, MetricsRegion}; use once_cell::sync::Lazy; use crate::indexed::columnar::ColumnarRecords; @@ -77,7 +78,7 @@ pub fn encode_arrow_batch_kvtd(x: &ColumnarRecords) -> Chunk> { .to_vec() .try_into() .expect("valid offsets"), - (*x.key_data).as_ref().to_vec().into(), + x.key_data.as_ref().to_vec().into(), None, ))), Box::new(BinaryArray::new( @@ -87,7 +88,7 @@ pub fn encode_arrow_batch_kvtd(x: &ColumnarRecords) -> Chunk> { .to_vec() .try_into() .expect("valid offsets"), - (*x.val_data).as_ref().to_vec().into(), + x.val_data.as_ref().to_vec().into(), None, )), Box::new(PrimitiveArray::new( @@ -172,9 +173,9 @@ pub fn decode_arrow_batch_kvtd( let len = x.len(); let ret = ColumnarRecords { len, - key_data, + key_data: MaybeLgBytes::LgBytes(LgBytes::from(key_data)), key_offsets, - val_data, + val_data: MaybeLgBytes::LgBytes(LgBytes::from(val_data)), val_offsets, timestamps, diffs, diff --git a/src/persist/src/indexed/encoding.rs b/src/persist/src/indexed/encoding.rs index f7cb65c4ffdd0..28496b6b77788 100644 --- a/src/persist/src/indexed/encoding.rs +++ b/src/persist/src/indexed/encoding.rs @@ -231,6 +231,16 @@ impl BlobTraceBatchPart { pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result { decode_trace_parquet(&mut buf.clone().reader(), metrics) } + + /// Scans the part and returns a lower bound on the contained keys. + pub fn key_lower(&self) -> &[u8] { + self.updates + .iter() + .flat_map(|x| x.iter()) + .map(|((key, _), _, _)| key) + .min() + .unwrap_or(&[]) + } } #[derive(PartialOrd, Ord, PartialEq, Eq)] diff --git a/src/persist/src/persist.proto b/src/persist/src/persist.proto index 36f02a1ad4339..7bca23a75c2ec 100644 --- a/src/persist/src/persist.proto +++ b/src/persist/src/persist.proto @@ -64,3 +64,13 @@ enum ProtoBatchFormat { // with a trie-like column structure. ParquetKvtd = 2; } + +message ProtoColumnarRecords { + uint64 len = 1; + repeated int32 key_offsets = 2; + bytes key_data = 3; + repeated int32 val_offsets = 4; + bytes val_data = 5; + repeated int64 timestamps = 6; + repeated int64 diffs = 7; +} diff --git a/test/sqllogictest/explain-pushdown.slt b/test/sqllogictest/explain-pushdown.slt index 8472a44303d0f..fe34cf4b3756f 100644 --- a/test/sqllogictest/explain-pushdown.slt +++ b/test/sqllogictest/explain-pushdown.slt @@ -9,6 +9,12 @@ mode cockroach +# Disable persist inline writes so we get real part numbers below +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET persist_inline_writes_single_max_bytes = 0 +---- +COMPLETE 0 + # EXPLAIN FILTER PUSHDOWN statements are blocked by a feature flag statement ok CREATE TABLE numbers (