Skip to content

Commit

Permalink
Lazy atomic upload
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Dec 12, 2024
1 parent 7af624d commit 411173c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 20 deletions.
17 changes: 10 additions & 7 deletions mountpoint-s3/src/async_util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;

use futures::future::{BoxFuture, FutureExt};
use futures::task::{Spawn, SpawnError};

/// Type-erasure for a [Spawn] implementation.
Expand All @@ -27,16 +27,14 @@ impl BoxRuntime {

/// Holds a value lazily initialized when awaiting a future.
pub struct Lazy<T, E> {
future: Option<PinFuture<T, E>>,
future: Option<BoxFuture<'static, Result<T, E>>>,
value: Option<T>,
}

type PinFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;

impl<T, E> Lazy<T, E> {
pub fn new(f: impl Future<Output = Result<T, E>> + Send + 'static) -> Self {
Self {
future: Some(Box::pin(f)),
future: Some(f.boxed()),
value: None,
}
}
Expand All @@ -48,9 +46,14 @@ impl<T, E> Lazy<T, E> {
Ok(())
}

pub async fn get_mut(&mut self) -> Result<&mut T, E> {
pub async fn get_mut(&mut self) -> Result<Option<&mut T>, E> {
self.force().await?;
Ok(self.value.as_mut())
}

pub async fn into_inner(mut self) -> Result<Option<T>, E> {
self.force().await?;
Ok(self.value.as_mut().unwrap())
Ok(self.value.take())
}
}

Expand Down
42 changes: 30 additions & 12 deletions mountpoint-s3/src/upload/atomic.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::fmt::Debug;

use mountpoint_s3_client::{
checksums::{crc32c_from_base64, Crc32c},
types::{ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview},
ObjectClient, PutObjectRequest,
use futures::task::SpawnExt as _;
use mountpoint_s3_client::checksums::{crc32c, crc32c_from_base64, Crc32c};
use mountpoint_s3_client::error::{ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{
ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview,
};
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_client::{ObjectClient, PutObjectRequest};
use tracing::error;

use crate::{checksums::combine_checksums, ServerSideEncryption};
use crate::async_util::Lazy;
use crate::checksums::combine_checksums;
use crate::ServerSideEncryption;

use super::{UploadError, Uploader};

Expand All @@ -18,16 +21,19 @@ const MAX_S3_MULTIPART_UPLOAD_PARTS: usize = 10000;
///
/// Wraps a PutObject request and enforces sequential writes.
pub struct UploadRequest<Client: ObjectClient> {
request: Lazy<Client::PutObjectRequest, ObjectClientError<PutObjectError, Client::ClientError>>,
bucket: String,
key: String,
next_request_offset: u64,
hasher: crc32c::Hasher,
request: Client::PutObjectRequest,
maximum_upload_size: Option<usize>,
sse: ServerSideEncryption,
}

impl<Client: ObjectClient> UploadRequest<Client> {
impl<Client> UploadRequest<Client>
where
Client: ObjectClient + Send + Clone + 'static,
{
pub async fn new(
uploader: &Uploader<Client>,
bucket: &str,
Expand Down Expand Up @@ -58,18 +64,24 @@ impl<Client: ObjectClient> UploadRequest<Client> {
params = params.server_side_encryption(sse_type);
params = params.ssekms_key_id(key_id);

let request = uploader.client.put_object(bucket, key, &params).await?;
let put_bucket = bucket.to_owned();
let put_key = key.to_owned();
let client = uploader.client.clone();
let request_handle = uploader
.runtime
.spawn_with_handle(async move { client.put_object(&put_bucket, &put_key, &params).await })
.unwrap();
let maximum_upload_size = uploader
.client
.write_part_size()
.map(|ps| ps.saturating_mul(MAX_S3_MULTIPART_UPLOAD_PARTS));

Ok(UploadRequest {
request: Lazy::new(request_handle),
bucket: bucket.to_owned(),
key: key.to_owned(),
next_request_offset: 0,
hasher: crc32c::Hasher::new(),
request,
maximum_upload_size,
sse: uploader.server_side_encryption.clone(),
})
Expand All @@ -94,7 +106,8 @@ impl<Client: ObjectClient> UploadRequest<Client> {
}

self.hasher.update(data);
self.request.write(data).await?;
self.request.get_mut().await?.unwrap().write(data).await?;

self.next_request_offset += data.len() as u64;
Ok(data.len())
}
Expand All @@ -104,6 +117,9 @@ impl<Client: ObjectClient> UploadRequest<Client> {
let checksum = self.hasher.finalize();
let result = self
.request
.into_inner()
.await?
.unwrap()
.review_and_complete(move |review| verify_checksums(review, size, checksum))
.await?;
if let Err(err) = self
Expand Down Expand Up @@ -225,7 +241,9 @@ mod tests {
..Default::default()
}));
let uploader = new_uploader_for_test(client.clone(), None, ServerSideEncryption::default(), true);
let request = uploader.start_atomic_upload(bucket, key).await.unwrap();
let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap();

_ = request.write(0, &[]).await.unwrap();

assert!(!client.contains_key(key));
assert!(client.is_upload_in_progress(key));
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/upload/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ where
&mut self,
capacity: usize,
) -> Result<UploadBuffer<Client>, UploadError<Client::ClientError>> {
let checksum_algorithm = self.checksum_algorithm.get_mut().await?.clone();
let checksum_algorithm = self.checksum_algorithm.get_mut().await?.unwrap().clone();

while self.requests_in_queue > 0 {
match UploadBuffer::try_new(capacity, &checksum_algorithm, self.mem_limiter.clone())? {
Expand Down

0 comments on commit 411173c

Please sign in to comment.