diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 208124588..4642260cd 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -649,11 +649,25 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo user_agent.key_value("mp-nw-interfaces", &interfaces.len().to_string()); } + // This is a weird looking number! We really want our first request size to be 1MiB, + // which is a common IO size. But Linux's readahead will try to read an extra 128k on on + // top of a 1MiB read, which we'd have to wait for a second request to service. Because + // FUSE doesn't know the difference between regular reads and readahead reads, it will + // send us a READ request for that 128k, so we'll have to block waiting for it even if + // the application doesn't want it. This is all in the noise for sequential IO, but + // waiting for the readahead hurts random IO. So we add 128k to the first request size + // to avoid the latency hit of the second request. + // + // Note the CRT does not respect this value right now, they always return chunks of part size + // but this is the first window size we prefer. + let initial_read_window_size = 1024 * 1024 + 128 * 1024; let mut client_config = S3ClientConfig::new() .auth_config(auth_config) .throughput_target_gbps(throughput_target_gbps) .read_part_size(args.read_part_size.unwrap_or(args.part_size) as usize) .write_part_size(args.write_part_size.unwrap_or(args.part_size) as usize) + .read_backpressure(true) + .initial_read_window(initial_read_window_size) .user_agent(user_agent); #[cfg(feature = "multiple-nw-iface")] if let Some(interfaces) = &args.bind { diff --git a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs index a39d118fa..47db7f735 100644 --- a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs +++ b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs @@ -21,6 +21,12 @@ impl InMemoryDataCache { block_size, } } + + /// Get number of caching blocks for the given cache key. + pub fn block_count(&self, cache_key: &ObjectId) -> usize { + let data = self.data.read().unwrap(); + data.get(cache_key).map_or(0, |cache| cache.len()) + } } impl DataCache for InMemoryDataCache { diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 46c3f0915..b03563cd2 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -1349,6 +1349,8 @@ mod tests { let bucket = "bucket"; let client = Arc::new(MockClient::new(MockClientConfig { bucket: bucket.to_owned(), + enable_backpressure: true, + initial_read_window_size: 1024 * 1024, ..Default::default() })); // Create "dir1" in the client to avoid creating it locally diff --git a/mountpoint-s3/src/fs/error.rs b/mountpoint-s3/src/fs/error.rs index d01c53512..477d09974 100644 --- a/mountpoint-s3/src/fs/error.rs +++ b/mountpoint-s3/src/fs/error.rs @@ -129,9 +129,12 @@ impl From> fo GetObjectError::PreconditionFailed, )) => err!(libc::ESTALE, "object was mutated remotely"), PrefetchReadError::Integrity(e) => err!(libc::EIO, source:e, "integrity error"), + PrefetchReadError::PartReadFailed(e) => err!(libc::EIO, source:e, "part read failed"), PrefetchReadError::GetRequestFailed(_) | PrefetchReadError::GetRequestTerminatedUnexpectedly - | PrefetchReadError::GetRequestReturnedWrongOffset { .. } => { + | PrefetchReadError::GetRequestReturnedWrongOffset { .. } + | PrefetchReadError::BackpressurePreconditionFailed + | PrefetchReadError::ReadWindowIncrement => { err!(libc::EIO, source:err, "get request failed") } } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 3bc5eb75d..984c48ea6 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -1,12 +1,37 @@ //! This module implements a prefetcher for GetObject requests. //! -//! It works by making increasingly larger GetObject requests to the CRT. We want the chunks to be -//! large enough that they can make effective use of the CRT's fan-out parallelism across the S3 -//! frontend, but small enough that we don't accumulate a lot of unread object data in memory or -//! wastefully download data we'll never read. As the reader continues to make sequential reads, -//! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a -//! non-sequential read, we abandon the prefetching and start again with the minimum request size. - +//! It works by relying on the CRT's flow-control window feature. The prefetcher creates a single +//! GetObject request with entire length of the object (starting from the first read offset) and +//! makes increasingly larger read window. We want the chunks to be large enough that they can make +//! effective use of the CRT's fan-out parallelism across the S3 frontend, but small enough that we +//! don't accumulate a lot of unread object data in memory or wastefully download data we'll never +//! read. As the reader continues to make sequential reads, we increase the size of the read window +//! up to some maximum. If the reader ever makes a non-sequential read, we abandon the prefetching +//! and start again with a new GetObject request with minimum read window size. +//! +//! In more technical details, the prefetcher creates a RequestTask when receiving the first read +//! request from the file system or after it has just been reset. The RequestTask consists of two main +//! components. +//! 1. An ObjectPartStream that has a role to continuously fetch data from the sources which can be +//! either S3 or the cache on disk. The ObjectPartStream is spawned and run in a separate thread +//! from the prefetcher. +//! 2. A PartQueue, where we store data received from the ObjectPartStream, waiting to be read from +//! the prefetcher via a RequestTask function. +//! +//! A backpressure mechanism is needed to control how much data we want to store in the part queue at +//! a time as we don't want to download the entire object into memory. For the client part stream, we +//! may be able to rely on the CRT flow-control flow window to block when we don't increase the read +//! window size, but for the caching part stream we don't have the machinery to do that yet. That's why +//! we introduce the BackpressureController and BackpressureLimiter to help solving this. +//! +//! Essentially, the BackpressureController and BackpressureLimiter is a pair of sender/receiver of a +//! channel, created at RequestTask initialization. The sender is handed to the RequestTask. Its role +//! is to communicate with its receiver to tell "when" it is ready to receive more data. The receiver +//! is handed to the ObjectPartStream where the stream should call a provided function "before" fetching +//! more data from the sources and put them into the part queue. The BackpressureLimiter should be used +//! as a mean to block ObjectPartStream thread to fetch more data. + +mod backpressure_controller; mod caching_stream; mod part; mod part_queue; @@ -14,7 +39,6 @@ mod part_stream; mod seek_window; mod task; -use std::collections::VecDeque; use std::fmt::Debug; use std::time::Duration; @@ -24,6 +48,7 @@ use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; +use part::PartOperationError; use part_stream::RequestTaskConfig; use thiserror::Error; use tracing::trace; @@ -80,6 +105,15 @@ pub enum PrefetchReadError { #[error("integrity check failed")] Integrity(#[from] IntegrityError), + + #[error("part read failed")] + PartReadFailed(#[from] PartOperationError), + + #[error("backpressure must be enabled with non-zero initial read window")] + BackpressurePreconditionFailed, + + #[error("read window increment failed")] + ReadWindowIncrement, } pub type DefaultPrefetcher = Prefetcher>; @@ -111,10 +145,8 @@ where #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { - /// Size of the first request in a prefetch run - pub first_request_size: usize, - /// Maximum size of a single prefetch request - pub max_request_size: usize, + /// Maximum size of the read window + pub max_read_window_size: usize, /// Factor to increase the request size by whenever the reader continues making sequential reads pub sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available @@ -128,19 +160,10 @@ pub struct PrefetcherConfig { } impl Default for PrefetcherConfig { + #[allow(clippy::identity_op)] fn default() -> Self { - #[allow(clippy::identity_op)] Self { - // This is a weird looking number! We really want our first request size to be 1MiB, - // which is a common IO size. But Linux's readahead will try to read an extra 128k on on - // top of a 1MiB read, which we'd have to wait for a second request to service. Because - // FUSE doesn't know the difference between regular reads and readahead reads, it will - // send us a READ request for that 128k, so we'll have to block waiting for it even if - // the application doesn't want it. This is all in the noise for sequential IO, but - // waiting for the readahead hurts random IO. So we add 128k to the first request size - // to avoid the latency hit of the second request. - first_request_size: 1 * 1024 * 1024 + 128 * 1024, - max_request_size: 2 * 1024 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), // We want these large enough to tolerate a single out-of-order Linux readahead, which @@ -207,11 +230,7 @@ pub struct PrefetchGetObject { client: Arc, part_stream: Arc, config: PrefetcherConfig, - // Invariant: the offset of the first byte in this task's part queue is always - // self.next_sequential_read_offset. - current_task: Option>, - // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: VecDeque>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -222,7 +241,6 @@ pub struct PrefetchGetObject { /// Start offset for sequential read, used for calculating contiguous read metric sequential_read_start_offset: u64, next_sequential_read_offset: u64, - next_request_size: usize, next_request_offset: u64, size: u64, } @@ -274,13 +292,11 @@ where client, part_stream, config, - current_task: None, - future_tasks: Default::default(), + backpressure_task: None, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, sequential_read_start_offset: 0, next_sequential_read_offset: 0, - next_request_size: config.first_request_size, next_request_offset: 0, bucket: bucket.to_owned(), object_id: ObjectId::new(key.to_owned(), etag), @@ -330,12 +346,13 @@ where } assert_eq!(self.next_sequential_read_offset, offset); - self.prepare_requests(); + if self.backpressure_task.is_none() { + self.backpressure_task = Some(self.spawn_read_backpressure_request()?); + } let mut response = ChecksummedBytes::default(); while to_read > 0 { - let Some(current_task) = self.current_task.as_mut() else { - // If [prepare_requests] didn't spawn a request, we've reached the end of the object. + let Some(current_task) = self.backpressure_task.as_mut() else { trace!(offset, length, "read beyond object size"); break; }; @@ -343,13 +360,9 @@ where let part = current_task.read(to_read as usize).await?; self.backward_seek_window.push(part.clone()); - let part_bytes = part - .into_bytes(&self.object_id, self.next_sequential_read_offset) - .unwrap(); + let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?; self.next_sequential_read_offset += part_bytes.len() as u64; - self.prepare_requests(); - // If we can complete the read with just a single buffer, early return to avoid copying // into a new buffer. This should be the common case as long as part size is larger than // read size, which it almost always is for real S3 clients and FUSE. @@ -365,74 +378,45 @@ where Ok(response) } - /// Runs on every read to prepare and spawn any requests our prefetching logic requires - fn prepare_requests(&mut self) { - let current_task = self.current_task.as_ref(); - if current_task.map(|task| task.remaining() == 0).unwrap_or(true) { - // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.pop_front() { - self.current_task = Some(next_task); - return; - } - self.current_task = self.spawn_next_request(); - } else if current_task - .map(|task| { - // Don't trigger prefetch if we're in a fake task created by backward streaming - task.is_streaming() && task.remaining() <= task.total_size() / 2 - }) - .unwrap_or(false) - && self.future_tasks.is_empty() - { - // The current task is nearing completion, so pre-spawn the next request in anticipation - // of it completing. - if let Some(task) = self.spawn_next_request() { - self.future_tasks.push_back(task); + /// Spawn a backpressure GetObject request which has a range from current offset to the end of the file. + /// We will be using flow-control window to control how much data we want to download into the prefetcher. + fn spawn_read_backpressure_request( + &mut self, + ) -> Result, PrefetchReadError> { + let start = self.next_sequential_read_offset; + let object_size = self.size as usize; + let range = RequestRange::new(object_size, start, object_size); + + // The prefetcher now relies on backpressure mechanism so it must be enabled + let initial_read_window_size = match self.client.initial_read_window_size() { + Some(value) => { + // Also, make sure that we don't get blocked from the beginning + if value == 0 { + return Err(PrefetchReadError::BackpressurePreconditionFailed); + } + value } - } - } - - /// Spawn the next required request - fn spawn_next_request(&mut self) -> Option> { - let start = self.next_request_offset; - if start >= self.size { - return None; - } + None => return Err(PrefetchReadError::BackpressurePreconditionFailed), + }; - let range = RequestRange::new(self.size as usize, start, self.next_request_size); let config = RequestTaskConfig { bucket: self.bucket.clone(), object_id: self.object_id.clone(), range, preferred_part_size: self.preferred_part_size, + initial_read_window_size, + max_read_window_size: self.config.max_read_window_size, + read_window_size_multiplier: self.config.sequential_prefetch_multiplier, }; - let task = self.part_stream.spawn_get_object_request(&self.client, config); - - // [read] will reset these if the reader stops making sequential requests - self.next_request_offset += task.total_size() as u64; - self.next_request_size = self.get_next_request_size(task.total_size()); - - Some(task) - } - - /// Suggest next request size. - /// The next request size is the current request size multiplied by sequential prefetch multiplier. - fn get_next_request_size(&self, request_size: usize) -> usize { - // TODO: this logic doesn't work well right now in the case where part_size < - // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly - // shrinking the request size until it reaches 1. But this isn't a configuration we - // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a - // prefetcher with multiplier 1 is not very good). - (request_size * self.config.sequential_prefetch_multiplier).min(self.config.max_request_size) + Ok(self.part_stream.spawn_get_object_request(&self.client, config)) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - self.current_task = None; - self.future_tasks.drain(..); + self.backpressure_task = None; self.backward_seek_window.clear(); self.sequential_read_start_offset = offset; self.next_sequential_read_offset = offset; - self.next_request_size = self.config.first_request_size; self.next_request_offset = offset; } @@ -445,7 +429,7 @@ where if offset > self.next_sequential_read_offset { self.try_seek_forward(offset).await } else { - self.try_seek_backward(offset) + self.try_seek_backward(offset).await } } @@ -454,43 +438,22 @@ where let total_seek_distance = offset - self.next_sequential_read_offset; histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64); - let Some(current_task) = self.current_task.as_mut() else { + let Some(task) = self.backpressure_task.as_mut() else { // Can't seek if there's no requests in flight at all return Ok(false); }; - // Jump ahead to the right request - if offset >= current_task.end_offset() { - self.next_sequential_read_offset = current_task.end_offset(); - self.current_task = None; - while let Some(next_request) = self.future_tasks.pop_front() { - if next_request.end_offset() > offset { - self.current_task = Some(next_request); - break; - } else { - self.next_sequential_read_offset = next_request.end_offset(); - } - } - if self.current_task.is_none() { - // No inflight task containing the target offset. - trace!(current_offset=?self.next_sequential_read_offset, requested_offset=?offset, "seek failed: not enough inflight data"); - return Ok(false); - } - // We could try harder to preserve the backwards seek buffer if we're near the - // request boundary, but it's probably not worth the trouble. - self.backward_seek_window.clear(); + // Not enough data in the read window to serve the forward seek + if offset >= task.read_window_end_offset() { + return Ok(false); } - // At this point it's guaranteed by the previous if-block that `offset` is in the range of `self.current_task` - let current_task = self - .current_task - .as_mut() - .expect("a request existed that covered this seek offset"); // If we have enough bytes already downloaded (`available`) to skip straight to this read, then do // it. Otherwise, we're willing to wait for the bytes to download only if they're coming "soon", where // soon is defined as up to `max_forward_seek_wait_distance` bytes ahead of the available offset. - let available_offset = current_task.available_offset(); - if offset >= available_offset.saturating_add(self.config.max_forward_seek_wait_distance) { + let available_offset = task.available_offset(); + let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance); + if offset >= available_soon_offset { trace!( requested_offset = offset, available_offset = available_offset, @@ -500,7 +463,7 @@ where } let mut seek_distance = offset - self.next_sequential_read_offset; while seek_distance > 0 { - let part = current_task.read(seek_distance as usize).await?; + let part = task.read(seek_distance as usize).await?; seek_distance -= part.len() as u64; self.next_sequential_read_offset += part.len() as u64; self.backward_seek_window.push(part); @@ -508,8 +471,14 @@ where Ok(true) } - fn try_seek_backward(&mut self, offset: u64) -> Result> { + async fn try_seek_backward(&mut self, offset: u64) -> Result> { assert!(offset < self.next_sequential_read_offset); + + // When the task is None it means either we have just started prefetching or recently reset it, + // in both cases the backward seek window would be empty so we can bail out early. + let Some(task) = self.backpressure_task.as_mut() else { + return Ok(false); + }; let backwards_length_needed = self.next_sequential_read_offset - offset; histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64); @@ -517,14 +486,7 @@ where trace!("seek failed: not enough data in backwards seek window"); return Ok(false); }; - // We're going to create a new fake "request" that contains the parts we read out of the - // window. That sounds a bit hacky, but it keeps all the read logic simple rather than - // needing separate paths for backwards seeks vs others. - let request = RequestTask::from_parts(parts, offset); - if let Some(current_task) = self.current_task.take() { - self.future_tasks.push_front(current_task); - } - self.current_task = Some(request); + task.push_front(parts).await?; self.next_sequential_read_offset = offset; Ok(true) } @@ -552,12 +514,11 @@ mod tests { #![allow(clippy::identity_op)] use crate::data_cache::InMemoryDataCache; - use crate::prefetch::part_stream::ClientPartStream; use super::caching_stream::CachingPartStream; use super::*; use futures::executor::{block_on, ThreadPool}; - use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; + use mountpoint_s3_client::error::GetObjectError; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; use proptest::proptest; @@ -571,9 +532,9 @@ mod tests { #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] - first_request_size: usize, + initial_read_window_size: usize, #[proptest(strategy = "16usize..1*1024*1024")] - max_request_size: usize, + max_read_window_size: usize, #[proptest(strategy = "1usize..8usize")] sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] @@ -582,6 +543,8 @@ mod tests { max_forward_seek_wait_distance: u64, #[proptest(strategy = "1u64..4*1024*1024")] max_backward_seek_distance: u64, + #[proptest(strategy = "16usize..1*1024*1024")] + cache_block_size: usize, } fn default_stream() -> ClientPartStream { @@ -604,6 +567,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -613,8 +578,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, + max_read_window_size: test_config.max_read_window_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, @@ -645,12 +609,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config); } @@ -662,12 +627,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -679,17 +645,98 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 256 * 1024 * 1024 + 111, 1024 * 1024, config); } + fn fail_with_backpressure_precondition_test( + part_stream: Stream, + test_config: TestConfig, + client_config: MockClientConfig, + ) where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let client = MockClient::new(client_config); + let read_size = 1 * MB; + let object_size = 8 * MB; + let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); + let etag = object.etag(); + + let prefetcher_config = PrefetcherConfig { + max_read_window_size: test_config.max_read_window_size, + sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, + ..Default::default() + }; + + let prefetcher = Prefetcher::new(part_stream, prefetcher_config); + let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", object_size as u64, etag); + let result = block_on(request.read(0, read_size)); + assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed))); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_not_enabled(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is not enabled for the client + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: false, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_zero_read_window(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is enabled but initial read window size is zero + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: 0, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + fn fail_sequential_read_test( part_stream: Stream, size: u64, @@ -700,6 +747,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = MockClient::new(config); @@ -711,8 +760,7 @@ mod tests { let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, + max_read_window_size: test_config.max_read_window_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, ..Default::default() }; @@ -748,12 +796,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; let mut get_failures = HashMap::new(); @@ -788,18 +837,17 @@ mod tests { fn proptest_sequential_read_with_cache( size in 1u64..1 * 1024 * 1024, read_size in 1usize..1 * 1024 * 1024, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } #[test] fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig) { + config: TestConfig) { // Pick read size smaller than the object size let read_size = (size as usize / read_factor).max(1); - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } } @@ -808,12 +856,13 @@ mod tests { let object_size = 854966; let read_size = 161647; let config = TestConfig { - first_request_size: 484941, - max_request_size: 81509, + initial_read_window_size: 484941, + max_read_window_size: 81509, sequential_prefetch_multiplier: 1, client_part_size: 181682, max_forward_seek_wait_distance: 1, max_backward_seek_distance: 18668, + cache_block_size: 1 * MB, }; run_sequential_read_test(default_stream(), object_size, read_size, config); } @@ -827,6 +876,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -836,8 +887,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, + max_read_window_size: test_config.max_read_window_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, max_backward_seek_distance: test_config.max_backward_seek_distance, @@ -895,11 +945,10 @@ mod tests { #[test] fn proptest_random_read_with_cache( reads in random_read_strategy(1 * 1024 * 1024), - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { let (object_size, reads) = reads; - run_random_read_test(caching_stream(block_size), object_size, reads, config); + run_random_read_test(caching_stream(config.cache_block_size), object_size, reads, config); } } @@ -908,12 +957,13 @@ mod tests { let object_size = 724314; let reads = vec![(0, 516883)]; let config = TestConfig { - first_request_size: 3684779, - max_request_size: 2147621, + initial_read_window_size: 3684779, + max_read_window_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -923,12 +973,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 278499), (311250, 1)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -938,12 +989,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 2260662, max_backward_seek_distance: 2369799, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -953,12 +1005,13 @@ mod tests { let object_size = 14201; let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; let config = TestConfig { - first_request_size: 457999, - max_request_size: 863511, + initial_read_window_size: 457999, + max_read_window_size: 863511, sequential_prefetch_multiplier: 5, client_part_size: 1972409, max_forward_seek_wait_distance: 2810651, max_backward_seek_distance: 3531090, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -971,6 +1024,9 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: PART_SIZE, + enable_backpressure: true, + // For simplicity, prefetch the whole object in one request. + initial_read_window_size: OBJECT_SIZE, ..Default::default() }; @@ -1002,12 +1058,7 @@ mod tests { HashMap::new(), )); - // For simplicity, prefetch the whole object in one request. - let prefetcher_config = PrefetcherConfig { - first_request_size: OBJECT_SIZE, - ..Default::default() - }; - let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); + let prefetcher = Prefetcher::new(default_stream(), Default::default()); block_on(async { let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); @@ -1045,6 +1096,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: FIRST_REQUEST_SIZE, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1053,12 +1106,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; - - let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); + let prefetcher = Prefetcher::new(default_stream(), Default::default()); // Try every possible seek from first_read_size for offset in first_read_size + 1..OBJECT_SIZE { @@ -1079,11 +1127,12 @@ mod tests { #[test_case(125, 110; "read in second request")] fn test_backward_seek(first_read_size: usize, part_size: usize) { const OBJECT_SIZE: usize = 200; - const FIRST_REQUEST_SIZE: usize = 100; let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: part_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1092,11 +1141,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; - let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); + let prefetcher = Prefetcher::new(default_stream(), Default::default()); // Try every possible seek from first_read_size for offset in 0..first_read_size { @@ -1131,16 +1176,18 @@ mod tests { fn sequential_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); let object_size = rng.gen_range(1u64..1 * 1024 * 1024); - let first_request_size = rng.gen_range(16usize..1 * 1024 * 1024); - let max_request_size = rng.gen_range(16usize..1 * 1024 * 1024); + let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1150,8 +1197,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, + max_read_window_size, sequential_prefetch_multiplier, max_forward_seek_wait_distance, max_backward_seek_distance, @@ -1184,20 +1230,22 @@ mod tests { fn random_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); - let first_request_size = rng.gen_range(16usize..32 * 1024); - let max_request_size = rng.gen_range(16usize..32 * 1024); - // Try to prevent testing very small reads of very large objects, which are easy to OOM - // under Shuttle (lots of concurrent tasks) - let max_object_size = first_request_size.min(max_request_size) * 20; - let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); + let max_read_window_size = rng.gen_range(16usize..32 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024); + // Try to prevent testing very small reads of very large objects, which are easy to OOM + // under Shuttle (lots of concurrent tasks) + let max_object_size = initial_read_window_size.min(max_read_window_size) * 20; + let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1207,8 +1255,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, + max_read_window_size, sequential_prefetch_multiplier, max_forward_seek_wait_distance, max_backward_seek_distance, diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs new file mode 100644 index 000000000..77c6eec11 --- /dev/null +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -0,0 +1,193 @@ +use std::ops::Range; + +use async_channel::{unbounded, Receiver, Sender}; +use tracing::trace; + +use super::PrefetchReadError; + +#[derive(Debug)] +pub enum BackpressureFeedbackEvent { + /// An event where data with a certain length has been read out of the stream + DataRead(usize), + /// An event indicating part queue stall + PartQueueStall, +} + +pub struct BackpressureConfig { + /// Backpressure's initial read window size + pub initial_read_window_size: usize, + /// Maximum read window size that the backpressure controller is allowed to scale up to + pub max_read_window_size: usize, + /// Factor to increase the read window size by when the part queue is stalled + pub read_window_size_multiplier: usize, + /// Request range to apply backpressure + pub request_range: Range, +} + +#[derive(Debug)] +pub struct BackpressureController { + read_window_updater: Sender, + preferred_read_window_size: usize, + max_read_window_size: usize, + read_window_size_multiplier: usize, + /// Upper bound of the current read window. The request can return data up to this + /// offset *exclusively*. This value must be advanced to continue fetching new data. + read_window_end_offset: u64, + /// Next offset of the data to be read. It is used for tracking how many bytes of + /// data has been read out of the stream. + next_read_offset: u64, + /// End offset for the request we want to apply backpressure. The request can return + /// data up to this offset *exclusively*. + request_end_offset: u64, +} + +#[derive(Debug)] +pub struct BackpressureLimiter { + read_window_incrementing_queue: Receiver, + /// Upper bound of the current read window. + /// Calling [BackpressureLimiter::wait_for_read_window_increment()] will block current + /// thread until this value is advanced. + read_window_end_offset: u64, + /// End offset for the request we want to apply backpressure. The request can return + /// data up to this offset *exclusively*. + request_end_offset: u64, +} + +/// Creates a [BackpressureController] and its related [BackpressureLimiter]. +/// We use a pair of these to for providing feedback to backpressure stream. +/// +/// [BackpressureLimiter] is used on producer side of the object stream, that is, any +/// [super::part_stream::ObjectPartStream] that support backpressure. The producer can call +/// `wait_for_read_window_increment` to wait for feedback from the consumer. This method +/// could block when they know that the producer requires read window incrementing. +/// +/// [BackpressureController] will be given to the consumer side of the object stream. +/// It can be used anywhere to set preferred read window size for the stream and tell the +/// producer when its read window should be increased. +pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureController, BackpressureLimiter) { + let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; + let (read_window_updater, read_window_incrementing_queue) = unbounded(); + let controller = BackpressureController { + read_window_updater, + preferred_read_window_size: config.initial_read_window_size, + max_read_window_size: config.max_read_window_size, + read_window_size_multiplier: config.read_window_size_multiplier, + read_window_end_offset, + next_read_offset: config.request_range.start, + request_end_offset: config.request_range.end, + }; + let limiter = BackpressureLimiter { + read_window_incrementing_queue, + read_window_end_offset, + request_end_offset: config.request_range.end, + }; + (controller, limiter) +} + +impl BackpressureController { + pub fn read_window_end_offset(&self) -> u64 { + self.read_window_end_offset + } + + /// Send a feedback to the backpressure controller when reading data out of the stream. The backpressure controller + /// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. + pub async fn send_feedback(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError> { + match event { + BackpressureFeedbackEvent::DataRead(length) => { + self.next_read_offset += length as u64; + // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. + if self.remaining_window() < (self.preferred_read_window_size / 2) + && self.read_window_end_offset < self.request_end_offset + { + let new_read_window_end_offset = self + .next_read_offset + .saturating_add(self.preferred_read_window_size as u64) + .min(self.request_end_offset); + debug_assert!(self.read_window_end_offset < new_read_window_end_offset); + let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; + trace!( + preferred_read_window_size = self.preferred_read_window_size, + next_read_offset = self.next_read_offset, + read_window_end_offset = self.read_window_end_offset, + to_increase, + "incrementing read window" + ); + self.increment_read_window(to_increase).await; + self.read_window_end_offset = new_read_window_end_offset; + } + } + BackpressureFeedbackEvent::PartQueueStall => self.try_scaling_up(), + } + Ok(()) + } + + // Send an increment read window request to the stream producer + async fn increment_read_window(&self, len: usize) { + // This should not block since the channel is unbounded + let _ = self + .read_window_updater + .send(len) + .await + .inspect_err(|_| trace!("read window incrementing queue is already closed")); + } + + fn remaining_window(&self) -> usize { + self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize + } + + // Try scaling up preferred read window size with a multiplier configured at initialization. + fn try_scaling_up(&mut self) { + if self.preferred_read_window_size < self.max_read_window_size { + let new_read_window_size = + (self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); + trace!( + current_size = self.preferred_read_window_size, + new_size = new_read_window_size, + "scaling up preferred read window" + ); + self.preferred_read_window_size = new_read_window_size; + } + } +} + +impl BackpressureLimiter { + pub fn read_window_end_offset(&self) -> u64 { + self.read_window_end_offset + } + + /// Checks if there is enough read window to put the next item with a given offset to the stream. + /// It blocks until receiving enough incrementing read window requests to serve the next part. + /// + /// Returns the new read window offset. + pub async fn wait_for_read_window_increment( + &mut self, + offset: u64, + ) -> Result, PrefetchReadError> { + // There is already enough read window so no need to block + if self.read_window_end_offset > offset { + // Check the read window incrementing queue to see there is an early request to increase read window + let new_read_window_offset = if let Ok(len) = self.read_window_incrementing_queue.try_recv() { + self.read_window_end_offset += len as u64; + Some(self.read_window_end_offset) + } else { + None + }; + return Ok(new_read_window_offset); + } + + // Reaching here means there is not enough read window, so we block until it is large enough + while self.read_window_end_offset <= offset && self.read_window_end_offset < self.request_end_offset { + trace!( + offset, + read_window_offset = self.read_window_end_offset, + "blocking for read window increment" + ); + let recv = self.read_window_incrementing_queue.recv().await; + match recv { + Ok(len) => self.read_window_end_offset += len as u64, + Err(_) => return Err(PrefetchReadError::ReadWindowIncrement), + } + } + Ok(Some(self.read_window_end_offset)) + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 6d287007c..a42121417 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -10,10 +10,11 @@ use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, DataCache}; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig, BackpressureLimiter}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::part_stream::{ - read_from_request, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig, + read_from_client_stream, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig, }; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; @@ -48,30 +49,35 @@ where where Client: ObjectClient + Clone + Send + Sync + 'static, { - let range = config.range.align(self.cache.block_size(), false); - - let start = range.start(); - let size = range.len(); + let range = config.range; + let backpressure_config = BackpressureConfig { + initial_read_window_size: config.initial_read_window_size, + max_read_window_size: config.max_read_window_size, + read_window_size_multiplier: config.read_window_size_multiplier, + request_range: range.into(), + }; + let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config); let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); let request_task = { - let request = CachingRequest::new(client.clone(), self.cache.clone(), config); + let request = CachingRequest::new(client.clone(), self.cache.clone(), backpressure_limiter, config); let span = debug_span!("prefetch", ?range); request.get_from_cache(range, part_queue_producer).instrument(span) }; let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, range, part_queue, backpressure_controller) } } #[derive(Debug)] -struct CachingRequest { +struct CachingRequest { client: Client, cache: Arc, + backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, } @@ -80,11 +86,37 @@ where Client: ObjectClient + Clone + Send + Sync + 'static, Cache: DataCache + Send + Sync, { - fn new(client: Client, cache: Arc, config: RequestTaskConfig) -> Self { - Self { client, cache, config } + fn new( + client: Client, + cache: Arc, + backpressure_limiter: BackpressureLimiter, + config: RequestTaskConfig, + ) -> Self { + Self { + client, + cache, + backpressure_limiter, + config, + } } - async fn get_from_cache(self, range: RequestRange, part_queue_producer: PartQueueProducer) { + // We have changed how often this method is being called after backpressure is used. + // Before, every time the prefetcher asked for more data and a new RequestTask is + // spawned, we would first check the cache and fall back to the client at the first + // cache miss. + // Now new RequestTasks are only spawned on out-of-order reads, while sequential data + // is requested via backpressure. This means that a fully sequential read will switch + // entirely to the client after a single cache miss. + // + // In theory, that could mean more requests to S3, but in practice the previous behavior + // would only be better when we have data cache scattered across the ranges and the new + // RequestTasks must happen to start somewhere in one of those ranges to benefit from + // the cache. This change should only affect sequential read workloads. + async fn get_from_cache( + mut self, + range: RequestRange, + part_queue_producer: PartQueueProducer, + ) { let cache_key = &self.config.object_id; let block_size = self.cache.block_size(); let block_range = self.block_indices_for_byte_range(&range); @@ -99,9 +131,19 @@ where match self.cache.get_block(cache_key, block_index, block_offset) { Ok(Some(block)) => { trace!(?cache_key, ?range, block_index, "cache hit"); - let part = make_part(block, block_index, block_offset, block_size, cache_key, &range); + // Cache blocks always contain bytes in the request range + let part = try_make_part(&block, block_offset, cache_key, &range).unwrap(); part_queue_producer.push(Ok(part)); block_offset += block_size; + + if let Err(e) = self + .backpressure_limiter + .wait_for_read_window_increment(block_offset) + .await + { + part_queue_producer.push(Err(e)); + break; + } continue; } Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"), @@ -129,19 +171,22 @@ where } async fn get_from_client( - &self, + &mut self, range: RequestRange, block_range: Range, part_queue_producer: PartQueueProducer, ) { let bucket = &self.config.bucket; let cache_key = &self.config.object_id; + let first_read_window_end_offset = self.config.range.start() + self.config.initial_read_window_size as u64; let block_size = self.cache.block_size(); assert!(block_size > 0); // Always request a range aligned with block boundaries (or to the end of the object). - let block_aligned_byte_range = - (block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64); + let start_offset = block_range.start * block_size; + let end_offset = (block_range.end * block_size).min(range.object_size() as u64); + let request_len = (end_offset - start_offset) as usize; + let block_aligned_byte_range = RequestRange::new(range.object_size(), start_offset, request_len); trace!( key = cache_key.key(), @@ -150,6 +195,15 @@ where "fetching data from client" ); + let request_stream = read_from_client_stream( + &mut self.backpressure_limiter, + &self.client, + bucket.clone(), + cache_key.clone(), + first_read_window_end_offset, + block_aligned_byte_range, + ); + let mut part_composer = CachingPartComposer { part_queue_producer, cache_key: cache_key.clone(), @@ -159,13 +213,6 @@ where buffer: ChecksummedBytes::default(), cache: self.cache.clone(), }; - - let request_stream = read_from_request( - self.client.clone(), - bucket.clone(), - cache_key.clone(), - block_aligned_byte_range, - ); part_composer.try_compose_parts(request_stream).await; } @@ -229,11 +276,29 @@ where // Split the body into blocks. let mut body: Bytes = body.into(); + let mut offset = offset; while !body.is_empty() { let remaining = (block_size as usize).saturating_sub(self.buffer.len()).min(body.len()); - let chunk = body.split_to(remaining); + let chunk: ChecksummedBytes = body.split_to(remaining).into(); + + // We need to return some bytes to the part queue even before we can fill an entire caching block because + // we want to start the feedback loop for the flow-control window. + // + // This is because the read window may not be aligned to block boundaries and therefore not enough to fetch + // the entire block, but we know it always fetch enough data for the prefetcher to start reading. + // For example, consider that we got a file system read request with range 2MB to 4MB and we have to start + // reading from block_offset=0 and block_size=5MB. The first read window might have a range up to 4MB which + // is enough to serve the read request but if the prefetcher is not able to read anything it cannot tell + // the stream to move its read window. + // + // A side effect from this is the delay on cache updating which makes testing a bit more complicated because + // the cache is not updated synchronously. + if let Some(part) = try_make_part(&chunk, offset, &self.cache_key, &self.original_range) { + self.part_queue_producer.push(Ok(part)); + } + offset += chunk.len() as u64; self.buffer - .extend(chunk.into()) + .extend(chunk) .inspect_err(|e| warn!(key, error=?e, "integrity check for body part failed"))?; if self.buffer.len() < block_size as usize { break; @@ -247,14 +312,6 @@ where self.block_offset, &self.cache_key, ); - self.part_queue_producer.push(Ok(make_part( - self.buffer.clone(), - self.block_index, - self.block_offset, - block_size, - &self.cache_key, - &self.original_range, - ))); self.block_index += 1; self.block_offset += block_size; self.buffer = ChecksummedBytes::default(); @@ -277,16 +334,7 @@ where self.block_offset, &self.cache_key, ); - self.part_queue_producer.push(Ok(make_part( - self.buffer.clone(), - self.block_index, - self.block_offset, - block_size, - &self.cache_key, - &self.original_range, - ))); } - Ok(()) } } @@ -309,35 +357,21 @@ fn update_cache( metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); } -/// Creates a Part that can be streamed to the prefetcher from the given cache block. -/// If required, trims the block bytes to the request range. -fn make_part( - block: ChecksummedBytes, - block_index: u64, - block_offset: u64, - block_size: u64, - cache_key: &ObjectId, - range: &RequestRange, -) -> Part { - assert_eq!(block_offset, block_index * block_size, "invalid block offset"); - - let block_size = block.len(); - let part_range = range - .trim_start(block_offset) - .trim_end(block_offset + block_size as u64); - trace!( - ?cache_key, - block_index, - ?part_range, - block_offset, - block_size, - "creating part from block data", - ); - - let trim_start = (part_range.start().saturating_sub(block_offset)) as usize; - let trim_end = (part_range.end().saturating_sub(block_offset)) as usize; - let bytes = block.slice(trim_start..trim_end); - Part::new(cache_key.clone(), part_range.start(), bytes) +/// Creates a Part that can be streamed to the prefetcher if the given bytes +/// are in the request range, otherwise return None. +fn try_make_part(bytes: &ChecksummedBytes, offset: u64, object_id: &ObjectId, range: &RequestRange) -> Option { + let part_range = range.trim_start(offset).trim_end(offset + bytes.len() as u64); + if part_range.is_empty() { + return None; + } + trace!(?part_range, "creating part trimmed to the request range"); + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + Some(Part::new( + object_id.clone(), + part_range.start(), + bytes.slice(trim_start..trim_end), + )) } #[cfg(test)] @@ -345,6 +379,8 @@ mod tests { // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry #![allow(clippy::identity_op)] + use std::{thread, time::Duration}; + use futures::executor::{block_on, ThreadPool}; use mountpoint_s3_client::{ mock_client::{MockClient, MockClientConfig, MockObject, Operation}, @@ -352,7 +388,7 @@ mod tests { }; use test_case::test_case; - use crate::data_cache::InMemoryDataCache; + use crate::{data_cache::InMemoryDataCache, object::ObjectId}; use super::*; @@ -378,11 +414,18 @@ mod tests { let object = MockObject::ramp(seed, object_size, ETag::for_tests()); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -391,6 +434,8 @@ mod tests { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let stream = CachingPartStream::new(runtime, cache); let range = RequestRange::new(object_size, offset as u64, preferred_size); + let expected_start_block = (range.start() as usize).div_euclid(block_size); + let expected_end_block = (range.end() as usize).div_ceil(block_size); let first_read_count = { // First request (from client) @@ -399,7 +444,10 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, - preferred_part_size: 0, + preferred_part_size: 256 * KB, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, }; let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); @@ -407,6 +455,13 @@ mod tests { }; assert!(first_read_count > 0); + // Wait until all blocks are saved to the cache before spawning a new request + let expected_block_count = expected_end_block - expected_start_block; + while stream.cache.block_count(&id) < expected_block_count { + thread::sleep(Duration::from_millis(10)); + } + assert_eq!(expected_block_count, stream.cache.block_count(&id)); + let second_read_count = { // Second request (from cache) let get_object_counter = mock_client.new_counter(Operation::GetObject); @@ -414,7 +469,10 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, - preferred_part_size: 0, + preferred_part_size: 256 * KB, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, }; let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); @@ -434,11 +492,18 @@ mod tests { let object = MockObject::ramp(seed, object_size, ETag::for_tests()); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -453,7 +518,10 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range: RequestRange::new(object_size, offset as u64, preferred_size), - preferred_part_size: 0, + preferred_part_size: 256 * KB, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, }; let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index 8542ed173..2e9106dce 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -1,6 +1,6 @@ use thiserror::Error; -use crate::checksums::ChecksummedBytes; +use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::object::ObjectId; /// A self-identifying part of an S3 object. Users can only retrieve the bytes from this part if @@ -21,7 +21,13 @@ impl Part { } } - pub fn into_bytes(self, id: &ObjectId, offset: u64) -> Result { + pub fn extend(&mut self, other: &Part) -> Result<(), PartOperationError> { + let expected_offset = self.offset + self.checksummed_bytes.len() as u64; + other.check(&self.id, expected_offset)?; + Ok(self.checksummed_bytes.extend(other.clone().checksummed_bytes)?) + } + + pub fn into_bytes(self, id: &ObjectId, offset: u64) -> Result { self.check(id, offset).map(|_| self.checksummed_bytes) } @@ -38,6 +44,10 @@ impl Part { } } + pub(super) fn offset(&self) -> u64 { + self.offset + } + pub(super) fn len(&self) -> usize { self.checksummed_bytes.len() } @@ -46,15 +56,15 @@ impl Part { self.checksummed_bytes.is_empty() } - fn check(&self, id: &ObjectId, offset: u64) -> Result<(), PartMismatchError> { + fn check(&self, id: &ObjectId, offset: u64) -> Result<(), PartOperationError> { if self.id != *id { - return Err(PartMismatchError::Id { + return Err(PartOperationError::IdMismatch { actual: self.id.clone(), requested: id.to_owned(), }); } if self.offset != offset { - return Err(PartMismatchError::Offset { + return Err(PartOperationError::OffsetMismatch { actual: self.offset, requested: offset, }); @@ -64,10 +74,117 @@ impl Part { } #[derive(Debug, Error)] -pub enum PartMismatchError { +pub enum PartOperationError { + #[error("part integrity check failed")] + Integrity(#[from] IntegrityError), + #[error("wrong part id: actual={actual:?}, requested={requested:?}")] - Id { actual: ObjectId, requested: ObjectId }, + IdMismatch { actual: ObjectId, requested: ObjectId }, #[error("wrong part offset: actual={actual}, requested={requested}")] - Offset { actual: u64, requested: u64 }, + OffsetMismatch { actual: u64, requested: u64 }, +} + +#[cfg(test)] +mod tests { + use mountpoint_s3_client::types::ETag; + + use crate::{checksums::ChecksummedBytes, object::ObjectId, prefetch::part::PartOperationError}; + + use super::Part; + + #[test] + fn test_append() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset + first_part_len as u64; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + first.extend(&second).expect("should be able to extend"); + assert_eq!(first_part_len + second_part_len, first.len()); + first.check(&object_id, first_offset).expect("the part should be valid"); + } + + #[test] + fn test_append_with_mismatch_object_id() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_object_id = ObjectId::new("other".to_owned(), ETag::for_tests()); + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(second_object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartOperationError::IdMismatch { + actual: _, + requested: _ + }) + )); + } + + #[test] + fn test_append_with_mismatch_offset() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartOperationError::OffsetMismatch { + actual: _, + requested: _ + }) + )); + } } diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index e680babb1..30a2c264c 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -51,7 +51,7 @@ impl PartQueue { /// empty. /// /// If this method returns an Err, the PartQueue must never be accessed again. - pub async fn read(&self, length: usize) -> Result> { + pub async fn read(&mut self, length: usize) -> Result> { let mut current_part = self.current_part.lock().await; assert!( @@ -93,6 +93,27 @@ impl PartQueue { Ok(part) } + /// Push a new [Part] onto the front of the queue + /// which actually just concatenate it with the current part + pub async fn push_front(&self, mut part: Part) -> Result<(), PrefetchReadError> { + let part_len = part.len(); + let mut current_part = self.current_part.lock().await; + + assert!( + !self.failed.load(Ordering::SeqCst), + "cannot use a PartQueue after failure" + ); + + if let Some(current_part) = current_part.as_mut() { + part.extend(current_part)?; + *current_part = part; + } else { + *current_part = Some(part); + } + metrics::gauge!("prefetch.bytes_in_queue").increment(part_len as f64); + Ok(()) + } + pub fn bytes_received(&self) -> usize { self.bytes_received.load(Ordering::SeqCst) } @@ -160,7 +181,7 @@ mod tests { async fn run_test(ops: Vec) { let part_id = ObjectId::new("key".to_owned(), ETag::for_tests()); - let (part_queue, part_queue_producer) = unbounded_part_queue::(); + let (mut part_queue, part_queue_producer) = unbounded_part_queue::(); let mut current_offset = 0; let mut current_length = 0; for op in ops { diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 3065c6e0f..f34422e3e 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -2,18 +2,21 @@ use async_stream::try_stream; use bytes::Bytes; use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, Stream, StreamExt}; -use mountpoint_s3_client::ObjectClient; +use mountpoint_s3_client::{types::GetObjectRequest, ObjectClient}; use std::marker::{Send, Sync}; use std::{fmt::Debug, ops::Range}; use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureLimiter; + /// A generic interface to retrieve data from objects in a S3-like store. pub trait ObjectPartStream { /// Spawns a request to get the content of an object. The object data will be retrieved in fixed size @@ -35,6 +38,9 @@ pub struct RequestTaskConfig { pub object_id: ObjectId, pub range: RequestRange, pub preferred_part_size: usize, + pub initial_read_window_size: usize, + pub max_read_window_size: usize, + pub read_window_size_multiplier: usize, } /// The range of a [ObjectPartStream::spawn_get_object_request] request. @@ -180,36 +186,47 @@ where Client: ObjectClient + Clone + Send + Sync + 'static, { assert!(config.preferred_part_size > 0); - let request_range = config - .range - .align(client.read_part_size().unwrap_or(8 * 1024 * 1024) as u64, true); - let start = request_range.start(); - let size = request_range.len(); + let range = config.range; + + let backpressure_config = BackpressureConfig { + initial_read_window_size: config.initial_read_window_size, + max_read_window_size: config.max_read_window_size, + read_window_size_multiplier: config.read_window_size_multiplier, + request_range: range.into(), + }; + let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller(backpressure_config); let (part_queue, part_queue_producer) = unbounded_part_queue(); - trace!(range=?request_range, "spawning request"); + trace!(?range, "spawning request"); - let span = debug_span!("prefetch", range=?request_range); + let span = debug_span!("prefetch", ?range); let client = client.clone(); - let bucket = config.bucket.clone(); let task_handle = self .runtime .spawn_with_handle( async move { + let first_read_window_end_offset = config.range.start() + config.initial_read_window_size as u64; + let request_stream = read_from_client_stream( + &mut backpressure_limiter, + &client, + config.bucket, + config.object_id.clone(), + first_read_window_end_offset, + config.range, + ); + let part_composer = ClientPartComposer { part_queue_producer, - object_id: config.object_id.clone(), + object_id: config.object_id, preferred_part_size: config.preferred_part_size, }; - - let request_stream = read_from_request(client, bucket, config.object_id, request_range.into()); part_composer.try_compose_parts(request_stream).await; } .instrument(span), ) .unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, range, part_queue, backpressure_controller) } } @@ -239,8 +256,10 @@ impl ClientPartComposer { // in order to avoid validating checksum on large parts at read. let mut body: Bytes = body.into(); let mut curr_offset = offset; + let alignment = self.preferred_part_size; while !body.is_empty() { - let chunk_size = self.preferred_part_size.min(body.len()); + let distance_to_align = alignment - (curr_offset % alignment as u64) as usize; + let chunk_size = distance_to_align.min(body.len()); let chunk = body.split_to(chunk_size); // S3 doesn't provide checksum for us if the request range is not aligned to // object part boundaries, so we're computing our own checksum here. @@ -254,30 +273,99 @@ impl ClientPartComposer { } } +/// Creates a request stream with a given range. The stream will be served from two `GetObject` requests where the first request serves +/// data up to `first_read_window_end_offset` and the second request serves the rest of the stream. +/// A [PrefetchReadError] is returned when the request cannot be completed. +/// +/// This is a workaround for a specific issue where initial read window size could be very small (~1MB), but the CRT only returns data +/// in chunks of part size (default to 8MB) even if initial read window is smaller than that, which make time to first byte much higher +/// than expected. +pub fn read_from_client_stream<'a, Client: ObjectClient + Clone + 'a>( + backpressure_limiter: &'a mut BackpressureLimiter, + client: &'a Client, + bucket: String, + object_id: ObjectId, + first_read_window_end_offset: u64, + range: RequestRange, +) -> impl Stream> + 'a { + try_stream! { + // Let's start by issuing the first request with a range trimmed to initial read window offset + let first_req_range = range.trim_end(first_read_window_end_offset); + if !first_req_range.is_empty() { + let first_request_stream = read_from_request( + backpressure_limiter, + client, + bucket.clone(), + object_id.clone(), + first_req_range.into(), + ); + pin_mut!(first_request_stream); + while let Some(next) = first_request_stream.next().await { + yield(next?); + } + } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = range.trim_start(first_read_window_end_offset); + if !range.is_empty() { + let request_stream = read_from_request( + backpressure_limiter, + client, + bucket.clone(), + object_id.clone(), + range.into(), + ); + pin_mut!(request_stream); + while let Some(next) = request_stream.next().await { + yield(next?); + } + } + } +} + /// Creates a `GetObject` request with the specified range and sends received body parts to the stream. /// A [PrefetchReadError] is returned when the request cannot be completed. -pub fn read_from_request( - client: Client, +fn read_from_request<'a, Client: ObjectClient + 'a>( + backpressure_limiter: &'a mut BackpressureLimiter, + client: &'a Client, bucket: String, id: ObjectId, request_range: Range, -) -> impl Stream> { +) -> impl Stream> + 'a { try_stream! { let request = client - .get_object(&bucket, id.key(), Some(request_range), Some(id.etag().clone())) + .get_object(&bucket, id.key(), Some(request_range.clone()), Some(id.etag().clone())) .await .inspect_err(|e| error!(key=id.key(), error=?e, "GetObject request failed")) .map_err(PrefetchReadError::GetRequestFailed)?; pin_mut!(request); + let read_window_size_diff = backpressure_limiter + .read_window_end_offset() + .saturating_sub(request.as_ref().read_window_end_offset()) as usize; + request.as_mut().increment_read_window(read_window_size_diff); + while let Some(next) = request.next().await { let (offset, body) = next .inspect_err(|e| error!(key=id.key(), error=?e, "GetObject body part failed")) .map_err(PrefetchReadError::GetRequestFailed)?; - trace!(offset, length = body.len(), "received GetObject part"); + let length = body.len() as u64; + trace!(offset, length, "received GetObject part"); metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); yield(offset, body); + + let next_offset = offset + length; + // We are reaching the end so don't have to wait for more read window + if next_offset == request_range.end { + break; + } + // Blocks if read window increment if it's not enough to read the next offset + if let Some(next_read_window_offset) = backpressure_limiter.wait_for_read_window_increment(next_offset).await? { + let diff = next_read_window_offset.saturating_sub(request.as_ref().read_window_end_offset()) as usize; + request.as_mut().increment_read_window(diff); + } } trace!("request finished"); } diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index c55ebf532..1fb7edc63 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -1,65 +1,82 @@ use futures::future::RemoteHandle; +use crate::prefetch::backpressure_controller::BackpressureFeedbackEvent::{DataRead, PartQueueStall}; use crate::prefetch::part::Part; -use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; +use crate::prefetch::part_queue::PartQueue; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureController; +use super::part_stream::RequestRange; + /// A single GetObject request submitted to the S3 client #[derive(Debug)] pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) - task_handle: Option>, + _task_handle: RemoteHandle<()>, remaining: usize, - start_offset: u64, - total_size: usize, + range: RequestRange, part_queue: PartQueue, + backpressure_controller: BackpressureController, } impl RequestTask { - pub fn from_handle(task_handle: RemoteHandle<()>, size: usize, offset: u64, part_queue: PartQueue) -> Self { + pub fn from_handle( + task_handle: RemoteHandle<()>, + range: RequestRange, + part_queue: PartQueue, + backpressure_controller: BackpressureController, + ) -> Self { Self { - task_handle: Some(task_handle), - remaining: size, - start_offset: offset, - total_size: size, + _task_handle: task_handle, + remaining: range.len(), + range, part_queue, + backpressure_controller, } } - pub fn from_parts(parts: impl IntoIterator, offset: u64) -> Self { - let mut size = 0; - let (part_queue, part_queue_producer) = unbounded_part_queue(); - for part in parts { - size += part.len(); - part_queue_producer.push(Ok(part)); - } - Self { - task_handle: None, - remaining: size, - start_offset: offset, - total_size: size, - part_queue, + // Push a given list of parts in front of the part queue + pub async fn push_front(&mut self, parts: Vec) -> Result<(), PrefetchReadError> { + // Merge all parts into one single part by pushing them to the front of the part queue. + // This could result in a really big part, but we normally use this only for backward seek + // so its size should not be bigger than the prefetcher's `max_backward_seek_distance`. + for part in parts.into_iter().rev() { + self.remaining += part.len(); + self.part_queue.push_front(part).await?; } + Ok(()) } pub async fn read(&mut self, length: usize) -> Result> { let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); self.remaining -= part.len(); + + // We read some data out of the part queue so the read window should be moved + self.backpressure_controller.send_feedback(DataRead(part.len())).await?; + + let next_offset = part.offset() + part.len() as u64; + let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize; + // If the part queue is empty it means we are reading faster than the task could prefetch, + // so we should use larger window for the task. + if remaining_in_queue == 0 { + self.backpressure_controller.send_feedback(PartQueueStall).await?; + } + Ok(part) } pub fn start_offset(&self) -> u64 { - self.start_offset + self.range.start() } pub fn end_offset(&self) -> u64 { - self.start_offset + self.total_size as u64 + self.range.end() } pub fn total_size(&self) -> usize { - self.total_size + self.range.len() } pub fn remaining(&self) -> usize { @@ -68,12 +85,10 @@ impl RequestTask { /// Maximum offset which data is known to be already in the `self.part_queue` pub fn available_offset(&self) -> u64 { - self.start_offset + self.part_queue.bytes_received() as u64 + self.start_offset() + self.part_queue.bytes_received() as u64 } - /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and - /// shouldn't be counted for prefetcher progress. - pub fn is_streaming(&self) -> bool { - self.task_handle.is_some() + pub fn read_window_end_offset(&self) -> u64 { + self.backpressure_controller.read_window_end_offset() } } diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index aa8290fe8..87399531f 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -49,6 +49,7 @@ pub type TestClientBox = Box; pub struct TestSessionConfig { pub part_size: usize, + pub initial_read_window_size: usize, pub filesystem_config: S3FilesystemConfig, pub prefetcher_config: PrefetcherConfig, pub auth_config: S3ClientAuthConfig, @@ -56,8 +57,10 @@ pub struct TestSessionConfig { impl Default for TestSessionConfig { fn default() -> Self { + let part_size = 8 * 1024 * 1024; Self { - part_size: 8 * 1024 * 1024, + part_size, + initial_read_window_size: part_size, filesystem_config: Default::default(), prefetcher_config: Default::default(), auth_config: Default::default(), @@ -125,6 +128,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -162,6 +167,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -284,7 +291,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) .endpoint_config(EndpointConfig::new(®ion)) - .auth_config(test_config.auth_config); + .auth_config(test_config.auth_config) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); @@ -316,7 +325,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) - .endpoint_config(EndpointConfig::new(®ion)); + .endpoint_config(EndpointConfig::new(®ion)) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 3afec9c5e..53ee65a7c 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -37,6 +37,8 @@ pub fn make_test_filesystem( let client_config = MockClientConfig { bucket: bucket.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index a6ec2ec11..3eeb7eea0 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -735,6 +735,8 @@ async fn test_upload_aborted_on_write_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -812,6 +814,8 @@ async fn test_upload_aborted_on_fsync_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -874,6 +878,8 @@ async fn test_upload_aborted_on_release_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -1489,7 +1495,9 @@ async fn test_readdir_rewind_with_local_files_only() { async fn test_lookup_404_not_an_error() { let name = "test_lookup_404_not_an_error"; let (bucket, prefix) = get_test_bucket_and_prefix(name); - let client_config = S3ClientConfig::default().endpoint_config(EndpointConfig::new(&get_test_region())); + let client_config = S3ClientConfig::default() + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); let fs = make_test_filesystem_with_client( client, @@ -1523,7 +1531,8 @@ async fn test_lookup_forbidden() { let auth_config = get_crt_client_auth_config(get_scoped_down_credentials(&policy).await); let client_config = S3ClientConfig::default() .auth_config(auth_config) - .endpoint_config(EndpointConfig::new(&get_test_region())); + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); // create an empty file diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index c7ece49af..4f47571d4 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,6 +1,5 @@ use fuser::BackgroundSession; use mountpoint_s3::data_cache::InMemoryDataCache; -use mountpoint_s3::prefetch::PrefetcherConfig; use std::fs::{File, OpenOptions}; use std::io::Read; use tempfile::TempDir; @@ -65,30 +64,32 @@ fn read_test_mock_with_cache(object_size: usize) { ); } -/// test for checking either prefetching fails or read original object when object is mutated during read. -/// Prefetching of next request occurs when more than half of the current request is being read. -/// So, when we read the first block, it prefetches the requests ti require to fulfill and the next request -/// depending on size of last request. -/// If object is mutated, E-Tag for the new prefetch request will change and hence the request will fail giving IO error. -fn prefetch_test_etag(creator_fn: F, prefix: &str, request_size: usize, read_size: usize) -where +/// Test for checking either prefetching fails or read original object when object is mutated during read. +/// Prefetching of next read window occurs when more than half of the current window is being read. +/// When we read the first block, it prefetches the data with a window size enough to fulfill the request +/// then increase the window size when needed. +/// If object is mutated, reading a part from the next read window would fail from pre-condition (ETag) error. +fn prefetch_test_etag( + creator_fn: F, + prefix: &str, + part_size: usize, + initial_read_window_size: usize, + read_size: usize, +) where F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), { - const OBJECT_SIZE: usize = 1024 * 1024; - - let prefetcher_config = PrefetcherConfig { - first_request_size: request_size, - ..Default::default() - }; - + // Object needs to be larger than part size because the CRT returns data in chunks of part size, + // we would not be able to see the failures if it's smaller. + let object_size = part_size * 2; let (mount_point, _session, mut test_client) = creator_fn( prefix, TestSessionConfig { - prefetcher_config, + part_size, + initial_read_window_size, ..Default::default() }, ); - let original_data_buf = vec![0u8; OBJECT_SIZE]; + let original_data_buf = vec![0u8; object_size]; test_client.put_object("dir/hello.txt", &original_data_buf).unwrap(); @@ -104,7 +105,7 @@ where .expect("Should be able to read file to buf"); // changed the value of data buf to distinguish it from previous data of the object. - let final_data_buf = vec![255u8; OBJECT_SIZE]; + let final_data_buf = vec![255u8; object_size]; test_client.put_object("dir/hello.txt", &final_data_buf).unwrap(); let mut dest_buf = vec![0u8; read_size]; @@ -149,11 +150,13 @@ where #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new, "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -162,11 +165,13 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -174,22 +179,31 @@ fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { - prefetch_test_etag(fuse::s3_session::new, "prefetch_test_etag_s3", request_size, read_size); +fn prefetch_test_etag_s3(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; + prefetch_test_etag( + fuse::s3_session::new, + "prefetch_test_etag_s3", + part_size, + initial_read_window_size, + read_size, + ); } #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_s3_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; prefetch_test_etag( fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_s3", - request_size, + part_size, + initial_read_window_size, read_size, ); } diff --git a/mountpoint-s3/tests/mock_s3_tests.rs b/mountpoint-s3/tests/mock_s3_tests.rs index cdf17bb0d..4aaa95dfa 100644 --- a/mountpoint-s3/tests/mock_s3_tests.rs +++ b/mountpoint-s3/tests/mock_s3_tests.rs @@ -134,6 +134,7 @@ fn create_fs_with_mock_s3(bucket: &str) -> (TestS3Filesystem, MockS let client_config = S3ClientConfig::default() .endpoint_config(endpoint_config) .auth_config(S3ClientAuthConfig::NoSigning) + .read_backpressure(true) .max_attempts(NonZeroUsize::new(3).unwrap()); // retry S3 request 3 times (which equals the existing default) let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); (