From 7115790df84899f51e972086e840a992513d1cf7 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 3 Jan 2025 13:12:52 +0000 Subject: [PATCH] Store the name of a mounted bucket in metadata Signed-off-by: Vlad Volodkin --- .../src/data_cache/express_data_cache.rs | 50 ++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index b3e3c79ff..2b5573a74 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -19,7 +19,7 @@ use tracing::Instrument; use mountpoint_s3_client::checksums::crc32c_from_base64; -const CACHE_VERSION: &str = "V1"; +const CACHE_VERSION: &str = "V2"; /// Configuration for a [ExpressDataCache]. #[derive(Debug)] @@ -46,6 +46,8 @@ pub struct ExpressDataCache { config: ExpressDataCacheConfig, /// Name of the S3 Express bucket to store the blocks. bucket_name: String, + /// Name of the mounted bucket. + source_bucket_name: String, } impl From> for DataCacheError @@ -69,6 +71,7 @@ where prefix: build_prefix(source_bucket_name, config.block_size), config, bucket_name: bucket_name.to_owned(), + source_bucket_name: source_bucket_name.to_owned(), } } @@ -185,7 +188,7 @@ where .ok_or_else(|| DataCacheError::InvalidBlockChecksum)?; let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|_| DataCacheError::InvalidBlockChecksum)?; - let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c); + let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.source_bucket_name, crc32c); block_metadata.validate_object_metadata(&object_metadata)?; Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c))) @@ -211,7 +214,8 @@ where let object_key = get_s3_key(&self.prefix, &cache_key, block_idx); let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; - let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum); + let block_metadata = + BlockMetadata::new(block_idx, block_offset, &cache_key, &self.source_bucket_name, checksum); self.client .put_object_single( @@ -300,7 +304,7 @@ where /// wanting to get (and avoid collisions with the key). /// On miss, bypass the cache and go to the main data source. #[cfg_attr(test, derive(proptest_derive::Arbitrary))] -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] struct BlockMetadata { block_idx: BlockIndex, block_offset: u64, @@ -600,13 +604,28 @@ mod tests { ); let (data, checksum) = data.into_inner().unwrap(); - let block_metadata = BlockMetadata::new(0, 0, &cache_key, bucket, checksum); + let block_metadata = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum); let put_params = block_metadata.to_put_object_params(); let (data_2, checksum_2) = data_2.into_inner().unwrap(); - let block_metadata_2 = BlockMetadata::new(0, 0, &cache_key, bucket, checksum_2); + let block_metadata_2 = BlockMetadata::new(0, 0, &cache_key, source_bucket, checksum_2); let put_params_2 = block_metadata_2.to_put_object_params(); + // Store with correct metadata and expect a successful get_block + client + .put_object_single(bucket, &object_key, &put_params, data.clone()) + .in_current_span() + .await + .unwrap(); + let (received_data, _) = cache + .get_block(&cache_key, 0, 0, data.len()) + .await + .expect("get should succeed with intact metadata") + .expect("block should be non-empty") + .into_inner() + .expect("block should be valid"); + assert_eq!(received_data, data); + // Remove the checksum when writing. client .put_object_single(bucket, &object_key, &put_params.clone().checksum(None), data.clone()) @@ -660,6 +679,25 @@ mod tests { .expect_err("cache should return error if object metadata doesn't match data"); assert!(matches!(err, DataCacheError::InvalidBlockHeader(_))); + // Write data with object metadata header for object from a different bucket + let mut corrupted_metadata = block_metadata.clone(); + corrupted_metadata.source_bucket_name = bucket.to_owned(); + client + .put_object_single( + bucket, + &object_key, + &corrupted_metadata.to_put_object_params(), + data.clone(), + ) + .in_current_span() + .await + .unwrap(); + let err = cache + .get_block(&cache_key, 0, 0, data.len()) + .await + .expect_err("cache should return error if source bucket does not match"); + assert!(matches!(err, DataCacheError::InvalidBlockHeader(_))); + // Get data that's not been written yet let result = cache .get_block(&cache_key_non_existent, 0, 0, data.len())