Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve get_object interface for backpressure #1200

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* `get_object` method now requires a `GetObjectParams` parameter.
Two of the existing parameters, `range` and `if_match` have been moved to `GetObjectParams`.
([#1121](https://github.com/awslabs/mountpoint-s3/pull/1121))
* `increment_read_window` and `read_window_end_offset` methods have been removed from `GetObjectResponse`.
`ClientBackpressureHandle` can be used to interact with flow-control window instead, it can be retrieved from `take_backpressure_handle` method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update method name.

([#1200](https://github.com/awslabs/mountpoint-s3/pull/1200))
* `head_object` method now requires a `HeadObjectParams` parameter.
The structure itself is not required to specify anything to achieve the existing behavior.
([#1083](https://github.com/awslabs/mountpoint-s3/pull/1083))
Expand Down
16 changes: 6 additions & 10 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ where
{
type GetObjectResponse = FailureGetResponse<Client, GetWrapperState>;
type PutObjectRequest = FailurePutObjectRequest<Client, GetWrapperState>;
type BackpressureHandle = Client::BackpressureHandle;
type ClientError = Client::ClientError;

fn read_part_size(&self) -> Option<usize> {
Expand Down Expand Up @@ -217,25 +218,20 @@ pub struct FailureGetResponse<Client: ObjectClient, GetWrapperState> {
impl<Client: ObjectClient + Send + Sync, FailState: Send + Sync> GetObjectResponse
for FailureGetResponse<Client, FailState>
{
type BackpressureHandle = Client::BackpressureHandle;
type ClientError = Client::ClientError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.request.take_backpressure_handle()
}

fn get_object_metadata(&self) -> ObjectMetadata {
self.request.get_object_metadata()
}

fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
self.request.get_object_checksum()
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl<Client: ObjectClient, FailState> Stream for FailureGetResponse<Client, FailState> {
Expand Down
10 changes: 5 additions & 5 deletions mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ pub mod config {
/// Types used by all object clients
pub mod types {
pub use super::object_client::{
Checksum, ChecksumAlgorithm, ChecksumMode, CopyObjectParams, CopyObjectResult, DeleteObjectResult, ETag,
GetBodyPart, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectParams, GetObjectResponse,
HeadObjectParams, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult, ObjectInfo,
ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus,
UploadChecksum, UploadReview, UploadReviewPart,
Checksum, ChecksumAlgorithm, ChecksumMode, ClientBackpressureHandle, CopyObjectParams, CopyObjectResult,
DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectParams,
GetObjectResponse, HeadObjectParams, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums,
RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};
}

Expand Down
76 changes: 51 additions & 25 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Write;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};
Expand All @@ -26,13 +27,13 @@ use crate::checksums::{
};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, ChecksumMode, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError,
DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts,
GetObjectAttributesResult, GetObjectError, GetObjectParams, GetObjectResponse, HeadObjectError, HeadObjectParams,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectChecksumError, ObjectClient,
ObjectClientError, ObjectClientResult, ObjectInfo, ObjectMetadata, ObjectPart, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus,
UploadChecksum, UploadReview, UploadReviewPart,
Checksum, ChecksumAlgorithm, ChecksumMode, ClientBackpressureHandle, CopyObjectError, CopyObjectParams,
CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectParams, GetObjectResponse,
HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute,
ObjectChecksumError, ObjectClient, ObjectClientError, ObjectClientResult, ObjectInfo, ObjectMetadata, ObjectPart,
PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -668,6 +669,20 @@ fn validate_checksum(
}
Ok(provided_checksum)
}
#[derive(Debug)]
pub struct MockBackpressureHandle {
read_window_end_offset: Arc<AtomicU64>,
}

impl ClientBackpressureHandle for MockBackpressureHandle {
fn increment_read_window(&mut self, len: usize) {
self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
}

fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset.load(Ordering::SeqCst)
}
}

#[derive(Debug)]
pub struct MockGetObjectResponse {
Expand All @@ -676,7 +691,8 @@ pub struct MockGetObjectResponse {
length: usize,
part_size: usize,
enable_backpressure: bool,
read_window_end_offset: u64,
read_window_end_offset: Arc<AtomicU64>,
backpressure_handle: Option<MockBackpressureHandle>,
}

impl MockGetObjectResponse {
Expand All @@ -696,23 +712,20 @@ impl MockGetObjectResponse {

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectResponse for MockGetObjectResponse {
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.backpressure_handle.take()
}

fn get_object_metadata(&self) -> ObjectMetadata {
self.object.object_metadata.clone()
}

fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
Ok(self.object.checksum.clone())
}

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.read_window_end_offset += len as u64;
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
self.read_window_end_offset
}
}

impl Stream for MockGetObjectResponse {
Expand All @@ -726,7 +739,8 @@ impl Stream for MockGetObjectResponse {
let next_read_size = self.part_size.min(self.length);

// Simulate backpressure mechanism
if self.enable_backpressure && self.next_offset >= self.read_window_end_offset {
let read_window_end_offset = self.read_window_end_offset.load(Ordering::SeqCst);
if self.enable_backpressure && self.next_offset >= read_window_end_offset {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
Expand Down Expand Up @@ -763,6 +777,7 @@ fn mock_client_error<T, E>(s: impl Into<Cow<'static, str>>) -> ObjectClientResul
impl ObjectClient for MockClient {
type GetObjectResponse = MockGetObjectResponse;
type PutObjectRequest = MockPutObjectRequest;
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn read_part_size(&self) -> Option<usize> {
Expand Down Expand Up @@ -855,13 +870,24 @@ impl ObjectClient for MockClient {
(0, object.len())
};

let read_window_end_offset = Arc::new(AtomicU64::new(
next_offset + self.config.initial_read_window_size as u64,
));
let backpressure_handle = if self.config.enable_backpressure {
Some(MockBackpressureHandle {
read_window_end_offset: read_window_end_offset.clone(),
})
} else {
None
};
Ok(MockGetObjectResponse {
object: object.clone(),
next_offset,
length,
part_size: self.config.part_size,
enable_backpressure: self.config.enable_backpressure,
read_window_end_offset: next_offset + self.config.initial_read_window_size as u64,
read_window_end_offset,
backpressure_handle,
})
} else {
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey))
Expand Down Expand Up @@ -1199,7 +1225,7 @@ enum MockObjectParts {

#[cfg(test)]
mod tests {
use futures::{pin_mut, StreamExt};
use futures::StreamExt;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::ops::Range;
Expand Down Expand Up @@ -1295,11 +1321,13 @@ mod tests {
rng.fill_bytes(&mut body);
client.add_object(key, MockObject::from_bytes(&body, ETag::for_tests()));

let get_request = client
let mut get_request = client
.get_object("test_bucket", key, &GetObjectParams::new().range(range.clone()))
.await
.expect("should not fail");
pin_mut!(get_request);
let mut backpressure_handle = get_request
.take_backpressure_handle()
.expect("should be able to get a backpressure handle");

let mut accum = vec![];
let mut next_offset = range.as_ref().map(|r| r.start).unwrap_or(0);
Expand All @@ -1309,10 +1337,8 @@ mod tests {
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);

while next_offset >= get_request.as_ref().read_window_end_offset() {
get_request
.as_mut()
.increment_read_window(backpressure_read_window_size);
while next_offset >= backpressure_handle.read_window_end_offset() {
backpressure_handle.increment_read_window(backpressure_read_window_size);
}
}
let expected_range = range.unwrap_or(0..size as u64);
Expand Down
28 changes: 13 additions & 15 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::object_client::{
PutObjectResult, PutObjectSingleParams,
};

use super::MockBackpressureHandle;

/// A [MockClient] that rate limits overall download throughput to simulate a target network
/// performance without the jitter or service latency of targeting a real service. Note that while
/// the rate limit is shared by all downloading streams, there is no fairness, so some streams can
Expand Down Expand Up @@ -60,36 +62,31 @@ impl ThroughputMockClient {
}

#[pin_project]
pub struct ThroughputGetObjectRequest {
pub struct ThroughputGetObjectResponse {
#[pin]
request: MockGetObjectResponse,
rate_limiter: LeakyBucket,
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectResponse for ThroughputGetObjectRequest {
impl GetObjectResponse for ThroughputGetObjectResponse {
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle> {
self.request.take_backpressure_handle()
}

fn get_object_metadata(&self) -> ObjectMetadata {
self.request.object.object_metadata.clone()
}

fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
Ok(self.request.object.checksum.clone())
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl Stream for ThroughputGetObjectRequest {
impl Stream for ThroughputGetObjectResponse {
type Item = ObjectClientResult<GetBodyPart, GetObjectError, MockClientError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -107,8 +104,9 @@ impl Stream for ThroughputGetObjectRequest {

#[async_trait]
impl ObjectClient for ThroughputMockClient {
type GetObjectResponse = ThroughputGetObjectRequest;
type GetObjectResponse = ThroughputGetObjectResponse;
type PutObjectRequest = MockPutObjectRequest;
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn read_part_size(&self) -> Option<usize> {
Expand Down Expand Up @@ -156,7 +154,7 @@ impl ObjectClient for ThroughputMockClient {
) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
let request = self.inner.get_object(bucket, key, params).await?;
let rate_limiter = self.rate_limiter.clone();
Ok(ThroughputGetObjectRequest { request, rate_limiter })
Ok(ThroughputGetObjectResponse { request, rate_limiter })
}

async fn list_objects(
Expand Down
54 changes: 32 additions & 22 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fmt::{self, Debug};
use std::ops::Range;
use std::pin::Pin;
use std::time::SystemTime;

use async_trait::async_trait;
Expand All @@ -27,8 +26,12 @@ pub use etag::ETag;
#[cfg_attr(not(docsrs), async_trait)]
#[auto_impl(Arc)]
pub trait ObjectClient {
type GetObjectResponse: GetObjectResponse<ClientError = Self::ClientError>;
type GetObjectResponse: GetObjectResponse<
ClientError = Self::ClientError,
BackpressureHandle = Self::BackpressureHandle,
>;
type PutObjectRequest: PutObjectRequest<ClientError = Self::ClientError>;
type BackpressureHandle: ClientBackpressureHandle + Send + Sync;
passaro marked this conversation as resolved.
Show resolved Hide resolved
type ClientError: std::error::Error + ProvideErrorMetadata + Send + Sync + 'static;

/// Query the part size this client uses for GET operations to the object store. This
Expand Down Expand Up @@ -586,6 +589,26 @@ impl UploadChecksum {
}
}

/// A handle for controlling backpressure enabled requests.
///
/// If the client was created with `enable_read_backpressure` set true,
/// each meta request has a flow-control window that shrinks as response
/// body data is downloaded (headers do not affect the size of the window).
/// The client's `initial_read_window` determines the starting size of each meta request's window.
/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
/// Maintain a larger window to keep up a high download throughput,
/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
/// Maintain a smaller window to limit the amount of data buffered in memory.
pub trait ClientBackpressureHandle {
/// Increment the flow-control read window, so that response data continues downloading.
fn increment_read_window(&mut self, len: usize);

/// Get the upper bound of the current read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(&self) -> u64;
}

/// A streaming response to a GetObject request.
///
/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
Expand All @@ -595,33 +618,20 @@ impl UploadChecksum {
pub trait GetObjectResponse:
Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
{
type BackpressureHandle: ClientBackpressureHandle;
type ClientError: std::error::Error + Send + Sync + 'static;

/// Take the backpressure handle from the response.
///
/// If `enable_read_backpressure` is false this call will return `None`,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn take_backpressure_handle(&mut self) -> Option<Self::BackpressureHandle>;
passaro marked this conversation as resolved.
Show resolved Hide resolved

/// Get the object's user defined metadata.
fn get_object_metadata(&self) -> ObjectMetadata;

/// Get the object's checksum, if uploaded with one
fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError>;

/// Increment the flow-control window, so that response data continues downloading.
///
/// If the client was created with `enable_read_backpressure` set true,
/// each meta request has a flow-control window that shrinks as response
/// body data is downloaded (headers do not affect the size of the window).
/// The client's `initial_read_window` determines the starting size of each meta request's window.
/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
/// Maintain a larger window to keep up a high download throughput,
/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
/// Maintain a smaller window to limit the amount of data buffered in memory.
///
/// If `enable_read_backpressure` is false this call will have no effect,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn increment_read_window(self: Pin<&mut Self>, len: usize);

/// Get the upper bound of the current read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(self: Pin<&Self>) -> u64;
}

/// Failures to return object checksum
Expand Down
Loading
Loading