Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Dec 16, 2024
1 parent 9e60370 commit f472f60
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 42 deletions.
7 changes: 3 additions & 4 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ where
{
type GetObjectResponse = FailureGetResponse<Client, GetWrapperState>;
type PutObjectRequest = FailurePutObjectRequest<Client, GetWrapperState>;
type BackpressureHandle = Client::BackpressureHandle;
type ClientError = Client::ClientError;

fn read_part_size(&self) -> Option<usize> {
Expand Down Expand Up @@ -218,11 +217,11 @@ pub struct FailureGetResponse<Client: ObjectClient, GetWrapperState> {
impl<Client: ObjectClient + Send + Sync, FailState: Send + Sync> GetObjectResponse
for FailureGetResponse<Client, FailState>
{
type BackpressureHandle = Client::BackpressureHandle;
type BackpressureHandle = <<Client as ObjectClient>::GetObjectResponse as GetObjectResponse>::BackpressureHandle;
type ClientError = Client::ClientError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.request.take_backpressure_handle()
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.request.backpressure_handle()
}

fn get_object_metadata(&self) -> ObjectMetadata {
Expand Down
15 changes: 10 additions & 5 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ fn validate_checksum(
}
Ok(provided_checksum)
}
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct MockBackpressureHandle {
read_window_end_offset: Arc<AtomicU64>,
}
Expand All @@ -679,6 +679,11 @@ impl ClientBackpressureHandle for MockBackpressureHandle {
self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
}

fn ensure_read_window(&mut self, desired_end_offset: u64) {
let diff = desired_end_offset.saturating_sub(self.read_window_end_offset()) as usize;
self.increment_read_window(diff);
}

fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset.load(Ordering::SeqCst)
}
Expand Down Expand Up @@ -715,8 +720,8 @@ impl GetObjectResponse for MockGetObjectResponse {
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.backpressure_handle.take()
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.backpressure_handle.as_mut()
}

fn get_object_metadata(&self) -> ObjectMetadata {
Expand Down Expand Up @@ -777,7 +782,6 @@ fn mock_client_error<T, E>(s: impl Into<Cow<'static, str>>) -> ObjectClientResul
impl ObjectClient for MockClient {
type GetObjectResponse = MockGetObjectResponse;
type PutObjectRequest = MockPutObjectRequest;
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn read_part_size(&self) -> Option<usize> {
Expand Down Expand Up @@ -1326,7 +1330,8 @@ mod tests {
.await
.expect("should not fail");
let mut backpressure_handle = get_request
.take_backpressure_handle()
.backpressure_handle()
.cloned()
.expect("should be able to get a backpressure handle");

let mut accum = vec![];
Expand Down
5 changes: 2 additions & 3 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl GetObjectResponse for ThroughputGetObjectResponse {
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.request.take_backpressure_handle()
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.request.backpressure_handle()
}

fn get_object_metadata(&self) -> ObjectMetadata {
Expand Down Expand Up @@ -106,7 +106,6 @@ impl Stream for ThroughputGetObjectResponse {
impl ObjectClient for ThroughputMockClient {
type GetObjectResponse = ThroughputGetObjectResponse;
type PutObjectRequest = MockPutObjectRequest;
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn read_part_size(&self) -> Option<usize> {
Expand Down
15 changes: 7 additions & 8 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ pub use etag::ETag;
#[cfg_attr(not(docsrs), async_trait)]
#[auto_impl(Arc)]
pub trait ObjectClient {
type GetObjectResponse: GetObjectResponse<
ClientError = Self::ClientError,
BackpressureHandle = Self::BackpressureHandle,
>;
type GetObjectResponse: GetObjectResponse<ClientError = Self::ClientError>;
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type BackpressureHandle: ClientBackpressureHandle + Send + Sync;
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;

/// Query the part size this client uses for GET operations to the object store. This
Expand Down Expand Up @@ -604,7 +600,10 @@ pub trait ClientBackpressureHandle {
/// Increment the flow-control read window, so that response data continues downloading.
fn increment_read_window(&mut self, len: usize);

/// Get the upper bound of the current read window. When backpressure is enabled, [GetObjectRequest] can
/// Move the upper bound of the read window to the given offset if it's not already there.
fn ensure_read_window(&mut self, desired_end_offset: u64);

/// Get the upper bound of the read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(&self) -> u64;
}
Expand All @@ -618,14 +617,14 @@ pub trait ClientBackpressureHandle {
pub trait GetObjectResponse:
Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
{
type BackpressureHandle: ClientBackpressureHandle;
type BackpressureHandle: ClientBackpressureHandle + Clone + Send + Sync;
type ClientError: std::error::Error + Send + Sync + 'static;

/// Take the backpressure handle from the response.
///
/// If `enable_read_backpressure` is false this call will return `None`,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle>;
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle>;

/// Get the object's user defined metadata.
fn get_object_metadata(&self) -> ObjectMetadata;
Expand Down
2 changes: 0 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::time::{Duration, Instant};

use futures::future::{Fuse, FusedFuture};
use futures::FutureExt;
use get_object::S3BackpressureHandle;
use mountpoint_s3_crt::auth::credentials::{
CredentialsProvider, CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions,
};
Expand Down Expand Up @@ -1256,7 +1255,6 @@ fn emit_throughput_metric(bytes: u64, duration: Duration, op: &'static str) {
impl ObjectClient for S3CrtClient {
type GetObjectResponse = S3GetObjectResponse;
type PutObjectRequest = S3PutObjectRequest;
type BackpressureHandle = S3BackpressureHandle;
type ClientError = S3RequestError;

fn read_part_size(&self) -> Option<usize> {
Expand Down
9 changes: 7 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl ClientBackpressureHandle for S3BackpressureHandle {
self.meta_request.increment_read_window(len as u64);
}

fn ensure_read_window(&mut self, desired_end_offset: u64) {
let diff = desired_end_offset.saturating_sub(self.read_window_end_offset()) as usize;
self.increment_read_window(diff);
}

fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset.load(Ordering::SeqCst)
}
Expand Down Expand Up @@ -204,8 +209,8 @@ impl GetObjectResponse for S3GetObjectResponse {
type BackpressureHandle = S3BackpressureHandle;
type ClientError = S3RequestError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.backpressure_handle.take()
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.backpressure_handle.as_mut()
}

fn get_object_metadata(&self) -> ObjectMetadata {
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ pub async fn check_backpressure_get_result(
let mut accum = vec![];
let mut next_offset = range.map(|r| r.start).unwrap_or(0);
let mut backpressure_handle = response
.take_backpressure_handle()
.backpressure_handle()
.cloned()
.expect("should be able to get a backpressure handle");
pin_mut!(response);
while let Some(r) = response.next().await {
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3-client/tests/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ async fn test_mutated_during_get_object_backpressure() {
.get_object(&bucket, &key, &GetObjectParams::new().range(Some(range.clone())))
.await
.expect("should not fail");
let mut backpressure_handle = get_request.take_backpressure_handle().unwrap();

// Verify that we can receive the first part successfully
let first_part = get_request.next().await.expect("result should not be empty");
Expand All @@ -186,7 +185,10 @@ async fn test_mutated_during_get_object_backpressure() {
.await
.unwrap();

backpressure_handle.increment_read_window(part_size);
get_request
.backpressure_handle()
.unwrap()
.increment_read_window(part_size);

// Verify that the next part is error
let next = get_request.next().await.expect("result should not be empty");
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where
}

// Ensure the flow-control window is large enough for reading a block of data if backpressure is enabled.
fn ensure_read_window(&self, backpressure_handle: &mut Option<impl ClientBackpressureHandle>) {
fn ensure_read_window(&self, backpressure_handle: Option<&mut impl ClientBackpressureHandle>) {
if let Some(backpressure_handle) = backpressure_handle {
backpressure_handle.increment_read_window(self.config.block_size as usize);
}
Expand Down Expand Up @@ -152,10 +152,10 @@ where
return Err(DataCacheError::IoFailure(e.into()));
}
};
let mut backpressure_handle = result.take_backpressure_handle();
let mut backpressure_handle = result.backpressure_handle().cloned();

// Guarantee that the request will start even in case of `initial_read_window == 0`.
self.ensure_read_window(&mut backpressure_handle);
self.ensure_read_window(backpressure_handle.as_mut());

let mut buffer: Bytes = Bytes::new();
pin_mut!(result);
Expand All @@ -177,7 +177,7 @@ where
};

// Ensure the flow-control window is large enough.
self.ensure_read_window(&mut backpressure_handle);
self.ensure_read_window(backpressure_handle.as_mut());
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
Expand Down
18 changes: 7 additions & 11 deletions mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,10 @@ fn read_from_request<'a, Client: ObjectClient + 'a>(
.inspect_err(|e| error!(key=id.key(), error=?e, "GetObject request failed"))
.map_err(PrefetchReadError::GetRequestFailed)?;

let mut backpressure_handle = request.take_backpressure_handle();
ensure_read_window(&mut backpressure_handle, backpressure_limiter.read_window_end_offset());
let mut backpressure_handle = request.backpressure_handle().cloned();
if let Some(handle) = backpressure_handle.as_mut() {
handle.ensure_read_window(backpressure_limiter.read_window_end_offset());
}

pin_mut!(request);
while let Some(next) = request.next().await {
Expand All @@ -396,21 +398,15 @@ fn read_from_request<'a, Client: ObjectClient + 'a>(
}
// Blocks if read window increment if it's not enough to read the next offset
if let Some(next_read_window_end_offset) = backpressure_limiter.wait_for_read_window_increment(next_offset).await? {
ensure_read_window(&mut backpressure_handle, next_read_window_end_offset);
if let Some(handle) = backpressure_handle.as_mut() {
handle.ensure_read_window(next_read_window_end_offset);
}
}
}
trace!("request finished");
}
}

fn ensure_read_window(backpressure_handle: &mut Option<impl ClientBackpressureHandle>, desired_end_offset: u64) {
if let Some(backpressure_handle) = backpressure_handle {
let read_window_size_diff =
desired_end_offset.saturating_sub(backpressure_handle.read_window_end_offset()) as usize;
backpressure_handle.increment_read_window(read_window_size_diff);
}
}

#[cfg(test)]
mod tests {
// It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
Expand Down

0 comments on commit f472f60

Please sign in to comment.