Skip to content

Commit

Permalink
Store the name of a mounted bucket in metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Vlad Volodkin <[email protected]>
  • Loading branch information
Vlad Volodkin committed Jan 3, 2025
1 parent 641f613 commit 7115790
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::Instrument;

use mountpoint_s3_client::checksums::crc32c_from_base64;

const CACHE_VERSION: &str = "V1";
const CACHE_VERSION: &str = "V2";

/// Configuration for a [ExpressDataCache].
#[derive(Debug)]
Expand All @@ -46,6 +46,8 @@ pub struct ExpressDataCache<Client: ObjectClient> {
config: ExpressDataCacheConfig,
/// Name of the S3 Express bucket to store the blocks.
bucket_name: String,
/// Name of the mounted bucket.
source_bucket_name: String,
}

impl<S, C> From<ObjectClientError<S, C>> for DataCacheError
Expand All @@ -69,6 +71,7 @@ where
prefix: build_prefix(source_bucket_name, config.block_size),
config,
bucket_name: bucket_name.to_owned(),
source_bucket_name: source_bucket_name.to_owned(),
}
}

Expand Down Expand Up @@ -185,7 +188,7 @@ where
.ok_or_else(|| DataCacheError::InvalidBlockChecksum)?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|_| DataCacheError::InvalidBlockChecksum)?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.source_bucket_name, crc32c);
block_metadata.validate_object_metadata(&object_metadata)?;

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
Expand All @@ -211,7 +214,8 @@ where
let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let block_metadata =
BlockMetadata::new(block_idx, block_offset, &cache_key, &self.source_bucket_name, checksum);

self.client
.put_object_single(
Expand Down Expand Up @@ -300,7 +304,7 @@ where
/// wanting to get (and avoid collisions with the key).
/// On miss, bypass the cache and go to the main data source.
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
#[derive(Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
struct BlockMetadata {
block_idx: BlockIndex,
block_offset: u64,
Expand Down Expand Up @@ -600,13 +604,28 @@ mod tests {
);

let (data, checksum) = data.into_inner().unwrap();
let block_metadata = BlockMetadata::new(0, 0, &cache_key, bucket, checksum);
let block_metadata = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum);
let put_params = block_metadata.to_put_object_params();

let (data_2, checksum_2) = data_2.into_inner().unwrap();
let block_metadata_2 = BlockMetadata::new(0, 0, &cache_key, bucket, checksum_2);
let block_metadata_2 = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum_2);
let put_params_2 = block_metadata_2.to_put_object_params();

// Store with correct metadata and expect a successful get_block
client
.put_object_single(bucket, &object_key, &put_params, data.clone())
.in_current_span()
.await
.unwrap();
let (received_data, _) = cache
.get_block(&cache_key, 0, 0, data.len())
.await
.expect("get should succeed with intact metadata")
.expect("block should be non-empty")
.into_inner()
.expect("block should be valid");
assert_eq!(received_data, data);

// Remove the checksum when writing.
client
.put_object_single(bucket, &object_key, &put_params.clone().checksum(None), data.clone())
Expand Down Expand Up @@ -660,6 +679,25 @@ mod tests {
.expect_err("cache should return error if object metadata doesn't match data");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));

// Write data with object metadata header for object from a different bucket
let mut corrupted_metadata = block_metadata.clone();
corrupted_metadata.source_bucket_name = bucket.to_owned();
client
.put_object_single(
bucket,
&object_key,
&corrupted_metadata.to_put_object_params(),
data.clone(),
)
.in_current_span()
.await
.unwrap();
let err = cache
.get_block(&cache_key, 0, 0, data.len())
.await
.expect_err("cache should return error if source bucket does not match");
assert!(matches!(err, DataCacheError::InvalidBlockHeader(_)));

// Get data that's not been written yet
let result = cache
.get_block(&cache_key_non_existent, 0, 0, data.len())
Expand Down

0 comments on commit 7115790

Please sign in to comment.