Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Dec 17, 2024
1 parent f472f60 commit 63cb8b4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 30 deletions.
2 changes: 1 addition & 1 deletion mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Two of the existing parameters, `range` and `if_match` have been moved to `GetObjectParams`.
([#1121](https://github.com/awslabs/mountpoint-s3/pull/1121))
* `increment_read_window` and `read_window_end_offset` methods have been removed from `GetObjectResponse`.
`ClientBackpressureHandle` can be used to interact with flow-control window instead, it can be retrieved from `take_backpressure_handle` method.
`ClientBackpressureHandle` can be used to interact with flow-control window instead, it can be retrieved from `backpressure_handle` method.
([#1200](https://github.com/awslabs/mountpoint-s3/pull/1200))
* `head_object` method now requires a `HeadObjectParams` parameter.
The structure itself is not required to specify anything to achieve the existing behavior.
Expand Down
25 changes: 10 additions & 15 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,6 @@ pub struct MockGetObjectResponse {
next_offset: u64,
length: usize,
part_size: usize,
enable_backpressure: bool,
read_window_end_offset: Arc<AtomicU64>,
backpressure_handle: Option<MockBackpressureHandle>,
}

Expand Down Expand Up @@ -744,11 +742,12 @@ impl Stream for MockGetObjectResponse {
let next_read_size = self.part_size.min(self.length);

// Simulate backpressure mechanism
let read_window_end_offset = self.read_window_end_offset.load(Ordering::SeqCst);
if self.enable_backpressure && self.next_offset >= read_window_end_offset {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
if let Some(handle) = &self.backpressure_handle {
if self.next_offset >= handle.read_window_end_offset() {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
}
}
let next_part = self.object.read(self.next_offset, next_read_size);

Expand Down Expand Up @@ -874,13 +873,11 @@ impl ObjectClient for MockClient {
(0, object.len())
};

let read_window_end_offset = Arc::new(AtomicU64::new(
next_offset + self.config.initial_read_window_size as u64,
));
let backpressure_handle = if self.config.enable_backpressure {
Some(MockBackpressureHandle {
read_window_end_offset: read_window_end_offset.clone(),
})
let read_window_end_offset = Arc::new(AtomicU64::new(
next_offset + self.config.initial_read_window_size as u64,
));
Some(MockBackpressureHandle { read_window_end_offset })
} else {
None
};
Expand All @@ -889,8 +886,6 @@ impl ObjectClient for MockClient {
next_offset,
length,
part_size: self.config.part_size,
enable_backpressure: self.config.enable_backpressure,
read_window_end_offset,
backpressure_handle,
})
} else {
Expand Down
25 changes: 11 additions & 14 deletions mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ impl S3CrtClient {
) -> Result<S3GetObjectResponse, ObjectClientError<GetObjectError, S3RequestError>> {
let requested_checksums = params.checksum_mode.as_ref() == Some(&ChecksumMode::Enabled);
let next_offset = params.range.as_ref().map(|r| r.start).unwrap_or(0);
let read_window_end_offset = next_offset + self.inner.initial_read_window_size as u64;
let read_window_end_offset = Arc::new(AtomicU64::new(read_window_end_offset));

let (part_sender, part_receiver) = futures::channel::mpsc::unbounded();
let (headers_sender, mut headers_receiver) = futures::channel::oneshot::channel();
Expand Down Expand Up @@ -132,8 +130,10 @@ impl S3CrtClient {
assert!(!request.is_terminated());

let backpressure_handle = if self.inner.enable_backpressure {
let read_window_end_offset =
Arc::new(AtomicU64::new(next_offset + self.inner.initial_read_window_size as u64));
Some(S3BackpressureHandle {
read_window_end_offset: read_window_end_offset.clone(),
read_window_end_offset,
meta_request: request.meta_request.clone(),
})
} else {
Expand All @@ -143,11 +143,9 @@ impl S3CrtClient {
request,
part_receiver,
requested_checksums,
enable_backpressure: self.inner.enable_backpressure,
backpressure_handle,
headers,
next_offset,
read_window_end_offset,
})
}
}
Expand All @@ -161,6 +159,8 @@ enum ObjectHeadersError {

#[derive(Clone, Debug)]
pub struct S3BackpressureHandle {
/// Upper bound of the current read window. When backpressure is enabled, [S3GetObjectRequest]
/// can return data up to this offset *exclusively*.
read_window_end_offset: Arc<AtomicU64>,
meta_request: MetaRequest,
}
Expand Down Expand Up @@ -194,14 +194,10 @@ pub struct S3GetObjectResponse {
#[pin]
part_receiver: UnboundedReceiver<GetBodyPart>,
requested_checksums: bool,
enable_backpressure: bool,
backpressure_handle: Option<S3BackpressureHandle>,
headers: Headers,
/// Next offset of the data to be polled from [poll_next]
next_offset: u64,
/// Upper bound of the current read window. When backpressure is enabled, [S3GetObjectRequest]
/// can return data up to this offset *exclusively*.
read_window_end_offset: Arc<AtomicU64>,
}

#[cfg_attr(not(docsrs), async_trait)]
Expand Down Expand Up @@ -256,11 +252,12 @@ impl Stream for S3GetObjectResponse {
// the next chunk we want to return error instead of keeping the request blocked.
// This prevents a risk of deadlock from using the [S3CrtClient], users must implement
// their own logic to block the request if they really want to block a [GetObjectRequest].
let read_window_end_offset = this.read_window_end_offset.load(Ordering::SeqCst);
if *this.enable_backpressure && read_window_end_offset <= *this.next_offset {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(
S3RequestError::EmptyReadWindow,
))));
if let Some(handle) = &this.backpressure_handle {
if *this.next_offset >= handle.read_window_end_offset() {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(
S3RequestError::EmptyReadWindow,
))));
}
}
Poll::Pending
}
Expand Down

0 comments on commit 63cb8b4

Please sign in to comment.