Skip to content

Commit

Permalink
Wait for CreateMPU
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Dec 12, 2024
1 parent eecf301 commit 7af624d
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 104 deletions.
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,10 @@ impl S3RequestError {
fn construction_failure(inner: impl Into<ConstructionError>) -> Self {
S3RequestError::ConstructionFailure(inner.into())
}

fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self {
S3RequestError::InternalError(Box::new(inner))
}
}

impl ProvideErrorMetadata for S3RequestError {
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl S3CrtClient {
result = request => {
// If we did not received the headers first, the request must have failed.
result?;
return Err(ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(ObjectHeadersError::MissingHeaders))));
return Err(S3RequestError::internal_failure(ObjectHeadersError::MissingHeaders).into());
}
};

Expand Down
162 changes: 84 additions & 78 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use crate::object_client::{
};
use async_trait::async_trait;
use futures::channel::oneshot::{self, Receiver};
use futures::future::FusedFuture as _;
use futures::select_biased;
use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
use mountpoint_s3_crt::io::stream::InputStream;
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview};
use thiserror::Error;
use tracing::error;
use xmltree::Element;

Expand All @@ -30,80 +33,84 @@ impl S3CrtClient {
key: &str,
params: &PutObjectParams,
) -> ObjectClientResult<S3PutObjectRequest, PutObjectError, S3RequestError> {
let span = request_span!(self.inner, "put_object", bucket, key);
let mut message = self.new_put_request(
bucket,
key,
params.storage_class.as_deref(),
params.server_side_encryption.as_deref(),
params.ssekms_key_id.as_deref(),
)?;

let checksum_config = match params.trailing_checksums {
PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
PutObjectTrailingChecksums::Disabled => None,
};
message.set_checksum_config(checksum_config);
let review_callback = ReviewCallbackBox::default();
let (on_headers, response_headers) = response_headers_handler();
// Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
let (mpu_created_sender, mut mpu_created) = oneshot::channel();

for (name, value) in &params.object_metadata {
message
.set_header(&Header::new(format!("x-amz-meta-{}", name), value))
.map_err(S3RequestError::construction_failure)?
}
for (name, value) in &params.custom_headers {
message
.inner
.add_header(&Header::new(name, value))
.map_err(S3RequestError::construction_failure)?;
}
let mut request = {
let span = request_span!(self.inner, "put_object", bucket, key);
let mut message = self.new_put_request(
bucket,
key,
params.storage_class.as_deref(),
params.server_side_encryption.as_deref(),
params.ssekms_key_id.as_deref(),
)?;

let review_callback = ReviewCallbackBox::default();
let callback = review_callback.clone();
let checksum_config = match params.trailing_checksums {
PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
PutObjectTrailingChecksums::Disabled => None,
};
message.set_checksum_config(checksum_config);

let (on_headers, response_headers) = response_headers_handler();
for (name, value) in &params.object_metadata {
message
.set_header(&Header::new(format!("x-amz-meta-{}", name), value))
.map_err(S3RequestError::construction_failure)?
}
for (name, value) in &params.custom_headers {
message
.inner
.add_header(&Header::new(name, value))
.map_err(S3RequestError::construction_failure)?;
}

let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
options.part_size(self.inner.write_part_size as u64);
let callback = review_callback.clone();

// Before the first write, we need to await for the multi-part upload to be created, so we can report errors.
// To do so, we need to detect one of two events (whichever comes first):
// * a CreateMultipartUpload request completes successfully (potentially after a number of retries),
// * the meta-request fails.
let (mpu_created_sender, mpu_created) = oneshot::channel();
let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender)));
let on_error_sender = on_mpu_created_sender.clone();

let body = self.inner.make_simple_http_request_from_options(
options,
span,
move |metrics| {
if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
// Signal that a CreateMultipartUpload completed successfully (unless the meta-request had already failed).
if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
_ = sender.send(Ok(()));
let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
options.part_size(self.inner.write_part_size as u64);

let on_mpu_created_sender = Mutex::new(Some(mpu_created_sender));

self.inner.make_simple_http_request_from_options(
options,
span,
move |metrics| {
if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() {
// Signal that a CreateMultipartUpload completed successfully (unless the meta-request had already failed).
if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() {
_ = sender.send(());
}
}
}
},
move |result| {
// Signal that the meta-request failed (unless a CreateMultipartUpload had already completed successfully).
if let Some(sender) = on_error_sender.lock().unwrap().take() {
_ = sender.send(Err(result.crt_error.into()));
}
None
},
on_headers,
)?;
},
|_| None,
on_headers,
)?
};

select_biased! {
mpu = mpu_created => mpu.unwrap(),
result = request => {
// If the MPU was not created, the request must have failed.
result?;
return Err(S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed).into());
}
};

// Guaranteed when select_biased! executes the CreateMPU branch.
assert!(!request.is_terminated());

Ok(S3PutObjectRequest {
body,
request,
review_callback,
start_time: Instant::now(),
total_bytes: 0,
response_headers,
state: S3PutObjectRequestState::CreatingMPU(mpu_created),
state: S3PutObjectRequestState::Idle,
})
}

Expand Down Expand Up @@ -260,7 +267,7 @@ impl ReviewCallbackBox {
/// object.
#[derive(Debug)]
pub struct S3PutObjectRequest {
body: S3HttpRequest<Vec<u8>, PutObjectError>,
request: S3HttpRequest<Vec<u8>, PutObjectError>,
review_callback: ReviewCallbackBox,
start_time: Instant,
total_bytes: u64,
Expand All @@ -273,16 +280,21 @@ pub struct S3PutObjectRequest {
/// Internal state for a [S3PutObjectRequest].
#[derive(Debug)]
enum S3PutObjectRequestState {
/// Initial state indicating that CreateMultipartUpload may still be in progress. To be awaited on first
/// write so errors can be reported early. The signal indicates that CreateMultipartUpload completed
/// successfully, or that the MPU failed.
CreatingMPU(oneshot::Receiver<Result<(), S3RequestError>>),
/// A write operation is in progress or was interrupted before completion.
PendingWrite,
/// Idle state between write calls.
Idle,
}

/// Internal errors for a [S3PutObjectRequest].
#[derive(Debug, Error)]
enum S3PutObjectRequestError {
#[error("A previous write operation did not complete successfully")]
PreviousWriteFailed,
#[error("The CreateMultiPartUpload request did not succeed")]
CreateMultipartUploadFailed,
}

fn get_etag(response_headers: &Headers) -> Result<ETag, HeadersError> {
Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into())
}
Expand Down Expand Up @@ -370,19 +382,14 @@ impl PutObjectRequest for S3PutObjectRequest {
// Writing to the meta request may require multiple calls. Set the internal
// state to `PendingWrite` until we are done.
match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
S3PutObjectRequestState::CreatingMPU(create_mpu) => {
// On first write, check the pending CreateMultipartUpload so we can report errors.
// Wait for CreateMultipartUpload to complete successfully, or the MPU to fail.
create_mpu.await.unwrap()?;
}
S3PutObjectRequestState::PendingWrite => {
// Fail if a previous write was not completed.
return Err(S3RequestError::RequestCanceled.into());
return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
}
S3PutObjectRequestState::Idle => {}
}

let meta_request = &mut self.body.meta_request;
let meta_request = &mut self.request.meta_request;
let mut slice = slice;
while !slice.is_empty() {
// Write will fail if the request has already finished (because of an error).
Expand All @@ -406,24 +413,23 @@ impl PutObjectRequest for S3PutObjectRequest {
mut self,
review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
// No need to check for `CreatingMPU`: errors will be reported on completing the upload.
if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
// Fail if a previous write was not completed.
return Err(S3RequestError::RequestCanceled.into());
return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into());
}

self.review_callback.set(review_callback);

// Write will fail if the request has already finished (because of an error).
_ = self
.body
.request
.meta_request
.write(&[], true)
.await
.map_err(S3RequestError::CrtError)?;

// Now wait for the request to finish.
let _ = self.body.await?;
let _ = self.request.await?;

let elapsed = self.start_time.elapsed();
emit_throughput_metric(self.total_bytes, elapsed, "put_object");
Expand Down
38 changes: 13 additions & 25 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ use std::collections::HashMap;
use std::time::Duration;

use common::*;

use futures::{pin_mut, FutureExt, StreamExt};
use mountpoint_s3_client::checksums::crc32c_to_base64;
use rand::Rng;
use test_case::test_case;

use mountpoint_s3_client::checksums::{crc32c, crc32c_to_base64};
use mountpoint_s3_client::config::S3ClientConfig;
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::{
ChecksumAlgorithm, GetObjectParams, HeadObjectParams, ObjectClientResult, PutObjectParams, PutObjectResult,
PutObjectTrailingChecksums,
};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::checksums::crc32c;
use rand::Rng;
use test_case::test_case;

// Simple test for PUT object. Puts a single, small object as a single part and checks that the
// contents are correct with a GET.
Expand Down Expand Up @@ -282,7 +283,7 @@ async fn test_put_object_write_cancelled() {
.expect_err("further writes should fail");
assert!(matches!(
err,
ObjectClientError::ClientError(S3RequestError::RequestCanceled)
ObjectClientError::ClientError(S3RequestError::InternalError(_))
));
}

Expand All @@ -294,26 +295,11 @@ async fn test_put_object_initiate_failure() {

let params = PutObjectParams::new().storage_class("INVALID_STORAGE_CLASS".into());

let mut request = client
// The MPU initiation should fail, so we should get an error from put_object.
let _err = client
.put_object(&bucket, &key, &params)
.await
.expect("put_object should succeed");

// The MPU initiation should fail, so we should get an error when we try to write.
let _err = request.write(&[1, 2, 3, 4]).await.expect_err("write should fail");

// Try again just to make sure the failure is fused correctly and doesn't block forever if
// someone (incorrectly) tries to write again after a failure.
let _err = request
.write(&[1, 2, 3, 4])
.await
.expect_err("second write should fail");

// Abort the request (which should already have been canceled)
drop(request);

// Wait a bit to let any requests settle.
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
.expect_err("put_object should fail");

let sdk_client = get_test_sdk_client().await;
let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key)
Expand Down Expand Up @@ -435,8 +421,10 @@ async fn test_put_user_object_metadata_bad_header(object_metadata: HashMap<Strin

let params = PutObjectParams::new().object_metadata(object_metadata.clone());

let mut request = client.put_object(&bucket, &key, &params).await.unwrap();
request.write(b"data").await.expect_err("header parsing should fail");
client
.put_object(&bucket, &key, &params)
.await
.expect_err("header parsing should fail");
}

#[test_case(true; "pass review")]
Expand Down

0 comments on commit 7af624d

Please sign in to comment.