From 7af624d3d669388d9c141005a3dfab5d961094e9 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Wed, 27 Nov 2024 14:21:28 +0000 Subject: [PATCH] Wait for CreateMPU Signed-off-by: Alessandro Passaro --- mountpoint-s3-client/src/s3_crt_client.rs | 4 + .../src/s3_crt_client/get_object.rs | 2 +- .../src/s3_crt_client/put_object.rs | 162 +++++++++--------- mountpoint-s3-client/tests/put_object.rs | 38 ++-- 4 files changed, 102 insertions(+), 104 deletions(-) diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index 1642c7895..e0bc343dc 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -1054,6 +1054,10 @@ impl S3RequestError { fn construction_failure(inner: impl Into) -> Self { S3RequestError::ConstructionFailure(inner.into()) } + + fn internal_failure(inner: impl std::error::Error + Send + Sync + 'static) -> Self { + S3RequestError::InternalError(Box::new(inner)) + } } impl ProvideErrorMetadata for S3RequestError { diff --git a/mountpoint-s3-client/src/s3_crt_client/get_object.rs b/mountpoint-s3-client/src/s3_crt_client/get_object.rs index 15d00426a..cf8e90faa 100644 --- a/mountpoint-s3-client/src/s3_crt_client/get_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/get_object.rs @@ -120,7 +120,7 @@ impl S3CrtClient { result = request => { // If we did not received the headers first, the request must have failed. result?; - return Err(ObjectClientError::ClientError(S3RequestError::InternalError(Box::new(ObjectHeadersError::MissingHeaders)))); + return Err(S3RequestError::internal_failure(ObjectHeadersError::MissingHeaders).into()); } }; diff --git a/mountpoint-s3-client/src/s3_crt_client/put_object.rs b/mountpoint-s3-client/src/s3_crt_client/put_object.rs index d290167ad..7db3c4dad 100644 --- a/mountpoint-s3-client/src/s3_crt_client/put_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/put_object.rs @@ -8,9 +8,12 @@ use crate::object_client::{ }; use async_trait::async_trait; use futures::channel::oneshot::{self, Receiver}; +use futures::future::FusedFuture as _; +use futures::select_biased; use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError}; use mountpoint_s3_crt::io::stream::InputStream; use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestResult, RequestType, UploadReview}; +use thiserror::Error; use tracing::error; use xmltree::Element; @@ -30,80 +33,84 @@ impl S3CrtClient { key: &str, params: &PutObjectParams, ) -> ObjectClientResult { - let span = request_span!(self.inner, "put_object", bucket, key); - let mut message = self.new_put_request( - bucket, - key, - params.storage_class.as_deref(), - params.server_side_encryption.as_deref(), - params.ssekms_key_id.as_deref(), - )?; - - let checksum_config = match params.trailing_checksums { - PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()), - PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()), - PutObjectTrailingChecksums::Disabled => None, - }; - message.set_checksum_config(checksum_config); + let review_callback = ReviewCallbackBox::default(); + let (on_headers, response_headers) = response_headers_handler(); + // Before the first write, we need to await for the multi-part upload to be created, so we can report errors. + let (mpu_created_sender, mut mpu_created) = oneshot::channel(); - for (name, value) in ¶ms.object_metadata { - message - .set_header(&Header::new(format!("x-amz-meta-{}", name), value)) - .map_err(S3RequestError::construction_failure)? - } - for (name, value) in ¶ms.custom_headers { - message - .inner - .add_header(&Header::new(name, value)) - .map_err(S3RequestError::construction_failure)?; - } + let mut request = { + let span = request_span!(self.inner, "put_object", bucket, key); + let mut message = self.new_put_request( + bucket, + key, + params.storage_class.as_deref(), + params.server_side_encryption.as_deref(), + params.ssekms_key_id.as_deref(), + )?; - let review_callback = ReviewCallbackBox::default(); - let callback = review_callback.clone(); + let checksum_config = match params.trailing_checksums { + PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()), + PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()), + PutObjectTrailingChecksums::Disabled => None, + }; + message.set_checksum_config(checksum_config); - let (on_headers, response_headers) = response_headers_handler(); + for (name, value) in ¶ms.object_metadata { + message + .set_header(&Header::new(format!("x-amz-meta-{}", name), value)) + .map_err(S3RequestError::construction_failure)? + } + for (name, value) in ¶ms.custom_headers { + message + .inner + .add_header(&Header::new(name, value)) + .map_err(S3RequestError::construction_failure)?; + } - let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject); - options.send_using_async_writes(true); - options.on_upload_review(move |review| callback.invoke(review)); - options.part_size(self.inner.write_part_size as u64); + let callback = review_callback.clone(); - // Before the first write, we need to await for the multi-part upload to be created, so we can report errors. - // To do so, we need to detect one of two events (whichever comes first): - // * a CreateMultipartUpload request completes successfully (potentially after a number of retries), - // * the meta-request fails. - let (mpu_created_sender, mpu_created) = oneshot::channel(); - let on_mpu_created_sender = Arc::new(Mutex::new(Some(mpu_created_sender))); - let on_error_sender = on_mpu_created_sender.clone(); - - let body = self.inner.make_simple_http_request_from_options( - options, - span, - move |metrics| { - if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() { - // Signal that a CreateMultipartUpload completed successfully (unless the meta-request had already failed). - if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() { - _ = sender.send(Ok(())); + let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject); + options.send_using_async_writes(true); + options.on_upload_review(move |review| callback.invoke(review)); + options.part_size(self.inner.write_part_size as u64); + + let on_mpu_created_sender = Mutex::new(Some(mpu_created_sender)); + + self.inner.make_simple_http_request_from_options( + options, + span, + move |metrics| { + if metrics.request_type() == RequestType::CreateMultipartUpload && !metrics.error().is_err() { + // Signal that a CreateMultipartUpload completed successfully (unless the meta-request had already failed). + if let Some(sender) = on_mpu_created_sender.lock().unwrap().take() { + _ = sender.send(()); + } } - } - }, - move |result| { - // Signal that the meta-request failed (unless a CreateMultipartUpload had already completed successfully). - if let Some(sender) = on_error_sender.lock().unwrap().take() { - _ = sender.send(Err(result.crt_error.into())); - } - None - }, - on_headers, - )?; + }, + |_| None, + on_headers, + )? + }; + + select_biased! { + mpu = mpu_created => mpu.unwrap(), + result = request => { + // If the MPU was not created, the request must have failed. + result?; + return Err(S3RequestError::internal_failure(S3PutObjectRequestError::CreateMultipartUploadFailed).into()); + } + }; + + // Guaranteed when select_biased! executes the CreateMPU branch. + assert!(!request.is_terminated()); Ok(S3PutObjectRequest { - body, + request, review_callback, start_time: Instant::now(), total_bytes: 0, response_headers, - state: S3PutObjectRequestState::CreatingMPU(mpu_created), + state: S3PutObjectRequestState::Idle, }) } @@ -260,7 +267,7 @@ impl ReviewCallbackBox { /// object. #[derive(Debug)] pub struct S3PutObjectRequest { - body: S3HttpRequest, PutObjectError>, + request: S3HttpRequest, PutObjectError>, review_callback: ReviewCallbackBox, start_time: Instant, total_bytes: u64, @@ -273,16 +280,21 @@ pub struct S3PutObjectRequest { /// Internal state for a [S3PutObjectRequest]. #[derive(Debug)] enum S3PutObjectRequestState { - /// Initial state indicating that CreateMultipartUpload may still be in progress. To be awaited on first - /// write so errors can be reported early. The signal indicates that CreateMultipartUpload completed - /// successfully, or that the MPU failed. - CreatingMPU(oneshot::Receiver>), /// A write operation is in progress or was interrupted before completion. PendingWrite, /// Idle state between write calls. Idle, } +/// Internal errors for a [S3PutObjectRequest]. +#[derive(Debug, Error)] +enum S3PutObjectRequestError { + #[error("A previous write operation did not complete successfully")] + PreviousWriteFailed, + #[error("The CreateMultiPartUpload request did not succeed")] + CreateMultipartUploadFailed, +} + fn get_etag(response_headers: &Headers) -> Result { Ok(response_headers.get_as_string(ETAG_HEADER_NAME)?.into()) } @@ -370,19 +382,14 @@ impl PutObjectRequest for S3PutObjectRequest { // Writing to the meta request may require multiple calls. Set the internal // state to `PendingWrite` until we are done. match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) { - S3PutObjectRequestState::CreatingMPU(create_mpu) => { - // On first write, check the pending CreateMultipartUpload so we can report errors. - // Wait for CreateMultipartUpload to complete successfully, or the MPU to fail. - create_mpu.await.unwrap()?; - } S3PutObjectRequestState::PendingWrite => { // Fail if a previous write was not completed. - return Err(S3RequestError::RequestCanceled.into()); + return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into()); } S3PutObjectRequestState::Idle => {} } - let meta_request = &mut self.body.meta_request; + let meta_request = &mut self.request.meta_request; let mut slice = slice; while !slice.is_empty() { // Write will fail if the request has already finished (because of an error). @@ -406,24 +413,23 @@ impl PutObjectRequest for S3PutObjectRequest { mut self, review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static, ) -> ObjectClientResult { - // No need to check for `CreatingMPU`: errors will be reported on completing the upload. if matches!(self.state, S3PutObjectRequestState::PendingWrite) { // Fail if a previous write was not completed. - return Err(S3RequestError::RequestCanceled.into()); + return Err(S3RequestError::internal_failure(S3PutObjectRequestError::PreviousWriteFailed).into()); } self.review_callback.set(review_callback); // Write will fail if the request has already finished (because of an error). _ = self - .body + .request .meta_request .write(&[], true) .await .map_err(S3RequestError::CrtError)?; // Now wait for the request to finish. - let _ = self.body.await?; + let _ = self.request.await?; let elapsed = self.start_time.elapsed(); emit_throughput_metric(self.total_bytes, elapsed, "put_object"); diff --git a/mountpoint-s3-client/tests/put_object.rs b/mountpoint-s3-client/tests/put_object.rs index 941158871..3a4af41f2 100644 --- a/mountpoint-s3-client/tests/put_object.rs +++ b/mountpoint-s3-client/tests/put_object.rs @@ -6,8 +6,12 @@ use std::collections::HashMap; use std::time::Duration; use common::*; + use futures::{pin_mut, FutureExt, StreamExt}; -use mountpoint_s3_client::checksums::crc32c_to_base64; +use rand::Rng; +use test_case::test_case; + +use mountpoint_s3_client::checksums::{crc32c, crc32c_to_base64}; use mountpoint_s3_client::config::S3ClientConfig; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::{ @@ -15,9 +19,6 @@ use mountpoint_s3_client::types::{ PutObjectTrailingChecksums, }; use mountpoint_s3_client::{ObjectClient, PutObjectRequest, S3CrtClient, S3RequestError}; -use mountpoint_s3_crt::checksums::crc32c; -use rand::Rng; -use test_case::test_case; // Simple test for PUT object. Puts a single, small object as a single part and checks that the // contents are correct with a GET. @@ -282,7 +283,7 @@ async fn test_put_object_write_cancelled() { .expect_err("further writes should fail"); assert!(matches!( err, - ObjectClientError::ClientError(S3RequestError::RequestCanceled) + ObjectClientError::ClientError(S3RequestError::InternalError(_)) )); } @@ -294,26 +295,11 @@ async fn test_put_object_initiate_failure() { let params = PutObjectParams::new().storage_class("INVALID_STORAGE_CLASS".into()); - let mut request = client + // The MPU initiation should fail, so we should get an error from put_object. + let _err = client .put_object(&bucket, &key, ¶ms) .await - .expect("put_object should succeed"); - - // The MPU initiation should fail, so we should get an error when we try to write. - let _err = request.write(&[1, 2, 3, 4]).await.expect_err("write should fail"); - - // Try again just to make sure the failure is fused correctly and doesn't block forever if - // someone (incorrectly) tries to write again after a failure. - let _err = request - .write(&[1, 2, 3, 4]) - .await - .expect_err("second write should fail"); - - // Abort the request (which should already have been canceled) - drop(request); - - // Wait a bit to let any requests settle. - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + .expect_err("put_object should fail"); let sdk_client = get_test_sdk_client().await; let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key) @@ -435,8 +421,10 @@ async fn test_put_user_object_metadata_bad_header(object_metadata: HashMap