From a1096c2139ea2485c18b87a61fe042a06d88cf8f Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Thu, 14 Nov 2024 21:21:10 +0000 Subject: [PATCH 1/6] Add a test for an invalid cache block Signed-off-by: Vlad Volodkin --- mountpoint-s3/src/data_cache.rs | 2 +- .../src/data_cache/express_data_cache.rs | 4 +- mountpoint-s3/tests/common/cache.rs | 29 +++-- mountpoint-s3/tests/common/fuse.rs | 17 ++- mountpoint-s3/tests/fuse_tests/cache_test.rs | 117 +++++++++++++++--- 5 files changed, 141 insertions(+), 28 deletions(-) diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index 76f941f26..f3d031d54 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -16,7 +16,7 @@ use thiserror::Error; pub use crate::checksums::ChecksummedBytes; pub use crate::data_cache::cache_directory::ManagedCacheDir; pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig}; -pub use crate::data_cache::express_data_cache::{ExpressDataCache, ExpressDataCacheConfig}; +pub use crate::data_cache::express_data_cache::{build_prefix, get_s3_key, ExpressDataCache, ExpressDataCacheConfig}; pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache; pub use crate::data_cache::multilevel_cache::MultilevelDataCache; diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index 3c9e83d96..3401ceb4d 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -300,7 +300,7 @@ impl BlockMetadata { } /// Get the prefix for objects we'll be creating in S3 -fn build_prefix(source_bucket_name: &str, block_size: u64) -> String { +pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String { hex::encode( Sha256::new() .chain_update(CACHE_VERSION.as_bytes()) @@ -311,7 +311,7 @@ fn build_prefix(source_bucket_name: &str, block_size: u64) -> String { } /// Get the S3 key this block should be written to or read from. -fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String { +pub fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String { let hashed_cache_key = hex::encode( Sha256::new() .chain_update(cache_key.key()) diff --git a/mountpoint-s3/tests/common/cache.rs b/mountpoint-s3/tests/common/cache.rs index 3d86d07f3..481fc0c0f 100644 --- a/mountpoint-s3/tests/common/cache.rs +++ b/mountpoint-s3/tests/common/cache.rs @@ -8,7 +8,7 @@ use std::{ use async_trait::async_trait; use mountpoint_s3::{ - data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult}, + data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}, object::ObjectId, }; @@ -21,6 +21,8 @@ struct CacheTestWrapperInner { cache: Cache, /// Number of times the `get_block` succeded and returned data get_block_hit_count: AtomicU64, + /// Number of times the `get_block` failed because of an invalid block + get_block_invalid_count: AtomicU64, /// Number of times the `put_block` was completed put_block_count: AtomicU64, } @@ -39,6 +41,7 @@ impl CacheTestWrapper { inner: Arc::new(CacheTestWrapperInner { cache, get_block_hit_count: AtomicU64::new(0), + get_block_invalid_count: AtomicU64::new(0), put_block_count: AtomicU64::new(0), }), } @@ -62,6 +65,11 @@ impl CacheTestWrapper { self.inner.get_block_hit_count.load(Ordering::SeqCst) } + /// Number of times the `get_block` finished because of an invalid block + pub fn get_block_invalid_count(&self) -> u64 { + self.inner.get_block_invalid_count.load(Ordering::SeqCst) + } + /// Number of times the `put_block` was completed pub fn put_block_count(&self) -> u64 { self.inner.put_block_count.load(Ordering::SeqCst) @@ -77,18 +85,23 @@ impl DataCache for CacheTestWrapper DataCacheResult> { - let result: Option = self + let result = self .inner .cache .get_block(cache_key, block_idx, block_offset, object_size) - .await?; + .await; - // The cache hit happens only if the `get_block` was successful and returned data - if result.is_some() { - self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst); - } + match result.as_ref() { + Ok(Some(_)) => { + self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst); + } + Err(DataCacheError::InvalidBlockHeader(_)) => { + self.inner.get_block_invalid_count.fetch_add(1, Ordering::SeqCst); + } + _ => (), + }; - Ok(result) + result } async fn put_block( diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index 29d568fac..216961640 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -21,6 +21,8 @@ pub trait TestClient: Send { self.put_object_params(key, value, PutObjectParams::default()) } + fn get_object_etag(&self, key: &str) -> Result>; + fn put_object_params( &self, key: &str, @@ -256,6 +258,10 @@ pub mod mock_session { Ok(()) } + fn get_object_etag(&self, _key: &str) -> Result> { + panic!("not implemented"); + } + fn remove_object(&self, key: &str) -> Result<(), Box> { let full_key = format!("{}{}", self.prefix, key); self.client.remove_object(&full_key); @@ -389,7 +395,7 @@ pub mod s3_session { S3CrtClient::new(client_config).unwrap() } - fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient { + pub fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient { let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); SDKTestClient { prefix: prefix.to_owned(), @@ -428,6 +434,15 @@ pub mod s3_session { Ok(tokio_block_on(request.send()).map(|_| ())?) } + fn get_object_etag(&self, key: &str) -> Result> { + let full_key = format!("{}{}", self.prefix, key); + let request = self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send(); + Ok( + tokio_block_on(request) + .map(|result| result.e_tag.expect("head object response must return an etag"))?, + ) + } + fn remove_object(&self, key: &str) -> Result<(), Box> { let full_key = format!("{}{}", self.prefix, key); let request = self diff --git a/mountpoint-s3/tests/fuse_tests/cache_test.rs b/mountpoint-s3/tests/fuse_tests/cache_test.rs index ff9eb90c5..581bac892 100644 --- a/mountpoint-s3/tests/fuse_tests/cache_test.rs +++ b/mountpoint-s3/tests/fuse_tests/cache_test.rs @@ -1,19 +1,83 @@ use crate::common::cache::CacheTestWrapper; -use crate::common::fuse::create_fuse_session; -use crate::common::fuse::s3_session::create_crt_client; -use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_bucket_and_prefix}; -use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache}; +use crate::common::fuse::s3_session::{create_crt_client, create_test_client}; +use crate::common::fuse::{create_fuse_session, TestClient}; +use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_prefix, get_test_region}; +use fuser::BackgroundSession; +use mountpoint_s3::data_cache::{ + build_prefix, get_s3_key, BlockIndex, DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, +}; +use mountpoint_s3::object::ObjectId; use mountpoint_s3::prefetch::caching_prefetch; +use mountpoint_s3_client::types::{PutObjectParams, PutObjectTrailingChecksums}; use mountpoint_s3_client::S3CrtClient; use rand::{Rng, RngCore, SeedableRng}; use rand_chacha::ChaChaRng; use std::fs; use std::time::Duration; +use tempfile::TempDir; use test_case::test_case; const CACHE_BLOCK_SIZE: u64 = 1024 * 1024; const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024; +/// A test that checks that an invalid block may not be served from the cache +#[test] +fn express_invalid_block_read() { + let region = get_test_region(); + let bucket = get_standard_bucket(); + let cache_bucket = get_express_bucket(); + let prefix = get_test_prefix("express_invalid_block_read"); + + // Mount the bucket + let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); + let cache = CacheTestWrapper::new(ExpressDataCache::new( + client.clone(), + Default::default(), + &bucket, + &cache_bucket, + )); + let (mount_point, _session) = mount_bucket(client, cache.clone(), &bucket, &prefix); + + // Put an object to the mounted bucket + let bucket_client = create_test_client(®ion, &bucket, &prefix); + let object_key = get_object_key(&prefix, "key", 100); + let object_data = "object_data"; + bucket_client.put_object(&object_key, object_data.as_bytes()).unwrap(); + let object_etag = bucket_client.get_object_etag(&object_key).unwrap(); + + // Read data twice, expect cache hits and no errors + let path = mount_point.path().join(&object_key); + + let put_block_count = cache.put_block_count(); + let read = fs::read(&path).expect("read should succeed"); + assert_eq!(read, object_data.as_bytes()); + cache.wait_for_put(Duration::from_secs(10), put_block_count); + + let read = fs::read(&path).expect("read should succeed"); + assert_eq!(read, object_data.as_bytes()); + + assert_eq!(cache.get_block_invalid_count(), 0, "no invalid blocks yet"); + assert!(cache.get_block_hit_count() > 0, "reads should result in a cache hit"); + + // Corrupt the cache block + let cache_bucket_client = create_test_client(®ion, &cache_bucket, ""); + let object_id = get_object_id(&prefix, &object_key, &object_etag); + let block_key = get_express_cache_block_key(&bucket, &object_id, 0); + let put_object_params = PutObjectParams::default().trailing_checksums(PutObjectTrailingChecksums::Enabled); + cache_bucket_client + .put_object_params(&block_key, "invalid_block".as_bytes(), put_object_params) + .unwrap(); + + // Read data after the block was corrupted, expect errors, but still the correct data + let path = mount_point.path().join(&object_key); + let read = fs::read(&path).expect("read should succeed"); + assert_eq!(read, object_data.as_bytes()); + assert!( + cache.get_block_invalid_count() > 0, + "read should result in cache errors" + ); +} + #[test_case("key", 100, 1024; "simple")] #[test_case("£", 100, 1024; "non-ascii key")] #[test_case("key", 1024, 1024; "long key")] @@ -26,6 +90,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz cache_write_read_base( client, + &bucket_name, key_suffix, key_size, object_size, @@ -48,8 +113,10 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); + let bucket_name = get_standard_bucket(); cache_write_read_base( client, + &bucket_name, key_suffix, key_size, object_size, @@ -60,6 +127,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) fn cache_write_read_base( client: S3CrtClient, + bucket: &str, key_suffix: &str, key_size: usize, object_size: usize, @@ -68,21 +136,11 @@ fn cache_write_read_base( ) where Cache: DataCache + Send + Sync + 'static, { - let (bucket, prefix) = get_test_bucket_and_prefix(test_name); + let prefix = get_test_prefix(test_name); // Mount a bucket - let mount_point = tempfile::tempdir().unwrap(); - let runtime = client.event_loop_group(); let cache = CacheTestWrapper::new(cache); - let prefetcher = caching_prefetch(cache.clone(), runtime, Default::default()); - let _session = create_fuse_session( - client, - prefetcher, - &bucket, - &prefix, - mount_point.path(), - Default::default(), - ); + let (mount_point, _session) = mount_bucket(client, cache.clone(), bucket, &prefix); // Write an object, no caching happens yet let key = get_object_key(&prefix, key_suffix, key_size); @@ -128,3 +186,30 @@ fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) let padding = "0".repeat(padding_size); format!("{last_key_part}{padding}") } + +fn mount_bucket(client: S3CrtClient, cache: Cache, bucket: &str, prefix: &str) -> (TempDir, BackgroundSession) +where + Cache: DataCache + Send + Sync + 'static, +{ + let mount_point = tempfile::tempdir().unwrap(); + let runtime = client.event_loop_group(); + let prefetcher = caching_prefetch(cache, runtime, Default::default()); + let session = create_fuse_session( + client, + prefetcher, + bucket, + prefix, + mount_point.path(), + Default::default(), + ); + (mount_point, session) +} + +fn get_object_id(prefix: &str, key: &str, etag: &str) -> ObjectId { + ObjectId::new(format!("{prefix}{key}"), etag.into()) +} + +fn get_express_cache_block_key(bucket: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String { + let block_key_prefix = build_prefix(bucket, CACHE_BLOCK_SIZE); + get_s3_key(&block_key_prefix, cache_key, block_idx) +} From dee9afa30cc8cc62b54e1979c9c9fb65b97d376c Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Thu, 14 Nov 2024 21:37:24 +0000 Subject: [PATCH 2/6] Implement MockTestClient::get_object_etag Signed-off-by: Vlad Volodkin --- mountpoint-s3-client/src/mock_client.rs | 8 ++++++++ mountpoint-s3/tests/common/fuse.rs | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index dc8c0e959..f237530a0 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -102,6 +102,14 @@ impl MockClient { add_object(&self.objects, key, value); } + /// Get object's etag + pub fn get_object_etag(&self, key: &str) -> Result { + match self.objects.read().unwrap().get(key) { + Some(object) => Ok(object.etag.clone()), + None => Err(MockClientError("object not found".into())), + } + } + /// Remove object for the mock client's bucket pub fn remove_object(&self, key: &str) { self.objects.write().unwrap().remove(key); diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index 216961640..808823512 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -258,8 +258,8 @@ pub mod mock_session { Ok(()) } - fn get_object_etag(&self, _key: &str) -> Result> { - panic!("not implemented"); + fn get_object_etag(&self, key: &str) -> Result> { + Ok(self.client.get_object_etag(key)?.into_inner()) } fn remove_object(&self, key: &str) -> Result<(), Box> { From c32be2e94dad4146ec9e7c409d7b85eeed150dcd Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 15 Nov 2024 12:17:20 +0000 Subject: [PATCH 3/6] Use S3CrtClient in the tests Signed-off-by: Vlad Volodkin --- mountpoint-s3-client/src/mock_client.rs | 8 ----- mountpoint-s3/tests/common/fuse.rs | 17 +--------- mountpoint-s3/tests/fuse_tests/cache_test.rs | 33 +++++++++++--------- 3 files changed, 19 insertions(+), 39 deletions(-) diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index f237530a0..dc8c0e959 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -102,14 +102,6 @@ impl MockClient { add_object(&self.objects, key, value); } - /// Get object's etag - pub fn get_object_etag(&self, key: &str) -> Result { - match self.objects.read().unwrap().get(key) { - Some(object) => Ok(object.etag.clone()), - None => Err(MockClientError("object not found".into())), - } - } - /// Remove object for the mock client's bucket pub fn remove_object(&self, key: &str) { self.objects.write().unwrap().remove(key); diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index 808823512..29d568fac 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -21,8 +21,6 @@ pub trait TestClient: Send { self.put_object_params(key, value, PutObjectParams::default()) } - fn get_object_etag(&self, key: &str) -> Result>; - fn put_object_params( &self, key: &str, @@ -258,10 +256,6 @@ pub mod mock_session { Ok(()) } - fn get_object_etag(&self, key: &str) -> Result> { - Ok(self.client.get_object_etag(key)?.into_inner()) - } - fn remove_object(&self, key: &str) -> Result<(), Box> { let full_key = format!("{}{}", self.prefix, key); self.client.remove_object(&full_key); @@ -395,7 +389,7 @@ pub mod s3_session { S3CrtClient::new(client_config).unwrap() } - pub fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient { + fn create_test_client(region: &str, bucket: &str, prefix: &str) -> impl TestClient { let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); SDKTestClient { prefix: prefix.to_owned(), @@ -434,15 +428,6 @@ pub mod s3_session { Ok(tokio_block_on(request.send()).map(|_| ())?) } - fn get_object_etag(&self, key: &str) -> Result> { - let full_key = format!("{}{}", self.prefix, key); - let request = self.sdk_client.head_object().bucket(&self.bucket).key(full_key).send(); - Ok( - tokio_block_on(request) - .map(|result| result.e_tag.expect("head object response must return an etag"))?, - ) - } - fn remove_object(&self, key: &str) -> Result<(), Box> { let full_key = format!("{}{}", self.prefix, key); let request = self diff --git a/mountpoint-s3/tests/fuse_tests/cache_test.rs b/mountpoint-s3/tests/fuse_tests/cache_test.rs index 581bac892..ddb59cdaf 100644 --- a/mountpoint-s3/tests/fuse_tests/cache_test.rs +++ b/mountpoint-s3/tests/fuse_tests/cache_test.rs @@ -1,15 +1,18 @@ use crate::common::cache::CacheTestWrapper; -use crate::common::fuse::s3_session::{create_crt_client, create_test_client}; -use crate::common::fuse::{create_fuse_session, TestClient}; -use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_prefix, get_test_region}; +use crate::common::fuse::create_fuse_session; +use crate::common::fuse::s3_session::create_crt_client; +use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_prefix}; +use crate::common::tokio_block_on; use fuser::BackgroundSession; use mountpoint_s3::data_cache::{ build_prefix, get_s3_key, BlockIndex, DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, }; + use mountpoint_s3::object::ObjectId; use mountpoint_s3::prefetch::caching_prefetch; -use mountpoint_s3_client::types::{PutObjectParams, PutObjectTrailingChecksums}; -use mountpoint_s3_client::S3CrtClient; +use mountpoint_s3_client::types::{PutObjectSingleParams, UploadChecksum}; +use mountpoint_s3_client::{ObjectClient, S3CrtClient}; +use mountpoint_s3_crt::checksums::crc32c; use rand::{Rng, RngCore, SeedableRng}; use rand_chacha::ChaChaRng; use std::fs; @@ -23,7 +26,6 @@ const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024; /// A test that checks that an invalid block may not be served from the cache #[test] fn express_invalid_block_read() { - let region = get_test_region(); let bucket = get_standard_bucket(); let cache_bucket = get_express_bucket(); let prefix = get_test_prefix("express_invalid_block_read"); @@ -36,14 +38,15 @@ fn express_invalid_block_read() { &bucket, &cache_bucket, )); - let (mount_point, _session) = mount_bucket(client, cache.clone(), &bucket, &prefix); + let (mount_point, _session) = mount_bucket(client.clone(), cache.clone(), &bucket, &prefix); // Put an object to the mounted bucket - let bucket_client = create_test_client(®ion, &bucket, &prefix); let object_key = get_object_key(&prefix, "key", 100); + let full_object_key = format!("{prefix}{object_key}"); let object_data = "object_data"; - bucket_client.put_object(&object_key, object_data.as_bytes()).unwrap(); - let object_etag = bucket_client.get_object_etag(&object_key).unwrap(); + let result = tokio_block_on(client.put_object_single(&bucket, &full_object_key, &Default::default(), object_data)) + .expect("put object must succeed"); + let object_etag = result.etag.into_inner(); // Read data twice, expect cache hits and no errors let path = mount_point.path().join(&object_key); @@ -60,13 +63,13 @@ fn express_invalid_block_read() { assert!(cache.get_block_hit_count() > 0, "reads should result in a cache hit"); // Corrupt the cache block - let cache_bucket_client = create_test_client(®ion, &cache_bucket, ""); let object_id = get_object_id(&prefix, &object_key, &object_etag); let block_key = get_express_cache_block_key(&bucket, &object_id, 0); - let put_object_params = PutObjectParams::default().trailing_checksums(PutObjectTrailingChecksums::Enabled); - cache_bucket_client - .put_object_params(&block_key, "invalid_block".as_bytes(), put_object_params) - .unwrap(); + let corrupted_block = "corrupted_block"; + let checksum = crc32c::checksum(corrupted_block.as_bytes()); + let put_object_params = PutObjectSingleParams::default().checksum(Some(UploadChecksum::Crc32c(checksum))); + tokio_block_on(client.put_object_single(&cache_bucket, &block_key, &put_object_params, corrupted_block)) + .expect("put object must succeed"); // Read data after the block was corrupted, expect errors, but still the correct data let path = mount_point.path().join(&object_key); From 5624745c0ef0517832ab378f190477cc7456c7c6 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 15 Nov 2024 15:07:02 +0000 Subject: [PATCH 4/6] Run disk-cache tests against express bucket too, imrove comments Signed-off-by: Vlad Volodkin --- mountpoint-s3/tests/common/s3.rs | 13 ++++--- mountpoint-s3/tests/fuse_tests/cache_test.rs | 36 ++++++++++++++------ mountpoint-s3/tests/fuse_tests/mod.rs | 2 +- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/mountpoint-s3/tests/common/s3.rs b/mountpoint-s3/tests/common/s3.rs index cb6fc3573..b8faff14b 100644 --- a/mountpoint-s3/tests/common/s3.rs +++ b/mountpoint-s3/tests/common/s3.rs @@ -8,11 +8,7 @@ use rand_chacha::rand_core::OsRng; use crate::common::tokio_block_on; pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) { - #[cfg(not(feature = "s3express_tests"))] - let bucket = get_standard_bucket(); - #[cfg(feature = "s3express_tests")] - let bucket = get_express_bucket(); - + let bucket = get_test_bucket(); let prefix = get_test_prefix(test_name); (bucket, prefix) @@ -29,6 +25,13 @@ pub fn get_test_prefix(test_name: &str) -> String { format!("{prefix}{test_name}/{nonce}/") } +pub fn get_test_bucket() -> String { + #[cfg(not(feature = "s3express_tests"))] + return get_standard_bucket(); + #[cfg(feature = "s3express_tests")] + return get_express_bucket(); +} + #[cfg(feature = "s3express_tests")] pub fn get_express_bucket() -> String { std::env::var("S3_EXPRESS_ONE_ZONE_BUCKET_NAME") diff --git a/mountpoint-s3/tests/fuse_tests/cache_test.rs b/mountpoint-s3/tests/fuse_tests/cache_test.rs index ddb59cdaf..ff494ba1d 100644 --- a/mountpoint-s3/tests/fuse_tests/cache_test.rs +++ b/mountpoint-s3/tests/fuse_tests/cache_test.rs @@ -1,17 +1,25 @@ use crate::common::cache::CacheTestWrapper; use crate::common::fuse::create_fuse_session; use crate::common::fuse::s3_session::create_crt_client; -use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_prefix}; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] +use crate::common::s3::{get_express_bucket, get_standard_bucket}; +use crate::common::s3::{get_test_bucket, get_test_prefix}; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use crate::common::tokio_block_on; use fuser::BackgroundSession; -use mountpoint_s3::data_cache::{ - build_prefix, get_s3_key, BlockIndex, DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, -}; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] +use mountpoint_s3::data_cache::{build_prefix, get_s3_key, BlockIndex, ExpressDataCache}; +use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig}; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3::object::ObjectId; use mountpoint_s3::prefetch::caching_prefetch; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3_client::types::{PutObjectSingleParams, UploadChecksum}; -use mountpoint_s3_client::{ObjectClient, S3CrtClient}; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] +use mountpoint_s3_client::ObjectClient; +use mountpoint_s3_client::S3CrtClient; +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3_crt::checksums::crc32c; use rand::{Rng, RngCore, SeedableRng}; use rand_chacha::ChaChaRng; @@ -25,6 +33,7 @@ const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024; /// A test that checks that an invalid block may not be served from the cache #[test] +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] fn express_invalid_block_read() { let bucket = get_standard_bucket(); let cache_bucket = get_express_bucket(); @@ -41,7 +50,7 @@ fn express_invalid_block_read() { let (mount_point, _session) = mount_bucket(client.clone(), cache.clone(), &bucket, &prefix); // Put an object to the mounted bucket - let object_key = get_object_key(&prefix, "key", 100); + let object_key = generate_unprefixed_key(&prefix, "key", 100); let full_object_key = format!("{prefix}{object_key}"); let object_data = "object_data"; let result = tokio_block_on(client.put_object_single(&bucket, &full_object_key, &Default::default(), object_data)) @@ -62,7 +71,7 @@ fn express_invalid_block_read() { assert_eq!(cache.get_block_invalid_count(), 0, "no invalid blocks yet"); assert!(cache.get_block_hit_count() > 0, "reads should result in a cache hit"); - // Corrupt the cache block + // Corrupt the cache block by replacing it with an object holding no metadata let object_id = get_object_id(&prefix, &object_key, &object_etag); let block_key = get_express_cache_block_key(&bucket, &object_id, 0); let corrupted_block = "corrupted_block"; @@ -71,7 +80,7 @@ fn express_invalid_block_read() { tokio_block_on(client.put_object_single(&cache_bucket, &block_key, &put_object_params, corrupted_block)) .expect("put object must succeed"); - // Read data after the block was corrupted, expect errors, but still the correct data + // Expect a successfull read from the source bucket. We expect cache errors being recorded because of the corrupted block. let path = mount_point.path().join(&object_key); let read = fs::read(&path).expect("read should succeed"); assert_eq!(read, object_data.as_bytes()); @@ -85,6 +94,7 @@ fn express_invalid_block_read() { #[test_case("£", 100, 1024; "non-ascii key")] #[test_case("key", 1024, 1024; "long key")] #[test_case("key", 100, 1024 * 1024; "big file")] +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) { let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); let bucket_name = get_standard_bucket(); @@ -106,6 +116,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz #[test_case("£", 100, 1024; "non-ascii key")] #[test_case("key", 1024, 1024; "long key")] #[test_case("key", 100, 1024 * 1024; "big file")] +#[cfg(feature = "s3_tests")] fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) { let cache_dir = tempfile::tempdir().unwrap(); let cache_config = DiskDataCacheConfig { @@ -116,7 +127,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); - let bucket_name = get_standard_bucket(); + let bucket_name = get_test_bucket(); cache_write_read_base( client, &bucket_name, @@ -146,7 +157,7 @@ fn cache_write_read_base( let (mount_point, _session) = mount_bucket(client, cache.clone(), bucket, &prefix); // Write an object, no caching happens yet - let key = get_object_key(&prefix, key_suffix, key_size); + let key = generate_unprefixed_key(&prefix, key_suffix, key_size); let path = mount_point.path().join(&key); let written = random_binary_data(object_size); fs::write(&path, &written).expect("write should succeed"); @@ -180,7 +191,8 @@ fn random_binary_data(size_in_bytes: usize) -> Vec { } /// Creates a random key which has a size of at least `min_size_in_bytes` -fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String { +/// The `key_prefix` is not included in the return value. +fn generate_unprefixed_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String { let random_suffix: u64 = rand::thread_rng().gen(); let last_key_part = format!("{key_suffix}{random_suffix}"); // part of the key after all the "/" let full_key = format!("{key_prefix}{last_key_part}"); @@ -208,10 +220,12 @@ where (mount_point, session) } +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] fn get_object_id(prefix: &str, key: &str, etag: &str) -> ObjectId { ObjectId::new(format!("{prefix}{key}"), etag.into()) } +#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] fn get_express_cache_block_key(bucket: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String { let block_key_prefix = build_prefix(bucket, CACHE_BLOCK_SIZE); get_s3_key(&block_key_prefix, cache_key, block_idx) diff --git a/mountpoint-s3/tests/fuse_tests/mod.rs b/mountpoint-s3/tests/fuse_tests/mod.rs index f4347439e..ae22c55c6 100644 --- a/mountpoint-s3/tests/fuse_tests/mod.rs +++ b/mountpoint-s3/tests/fuse_tests/mod.rs @@ -1,4 +1,4 @@ -#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] +#[cfg(feature = "s3_tests")] mod cache_test; mod consistency_test; mod fork_test; From 1afe45bb95df532ea8198065873d4510fa4d1da0 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 15 Nov 2024 16:46:57 +0000 Subject: [PATCH 5/6] Review comments Signed-off-by: Vlad Volodkin --- mountpoint-s3/tests/fuse_tests/cache_test.rs | 37 +++++++++++--------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/mountpoint-s3/tests/fuse_tests/cache_test.rs b/mountpoint-s3/tests/fuse_tests/cache_test.rs index ff494ba1d..0e69871ad 100644 --- a/mountpoint-s3/tests/fuse_tests/cache_test.rs +++ b/mountpoint-s3/tests/fuse_tests/cache_test.rs @@ -1,40 +1,41 @@ use crate::common::cache::CacheTestWrapper; use crate::common::fuse::create_fuse_session; use crate::common::fuse::s3_session::create_crt_client; +use crate::common::s3::{get_test_bucket, get_test_prefix}; + +use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig}; +use mountpoint_s3::prefetch::caching_prefetch; +use mountpoint_s3_client::S3CrtClient; + +use rand::{Rng, RngCore, SeedableRng}; +use rand_chacha::ChaChaRng; +use std::fs; +use std::time::Duration; +use tempfile::TempDir; +use test_case::test_case; + #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use crate::common::s3::{get_express_bucket, get_standard_bucket}; -use crate::common::s3::{get_test_bucket, get_test_prefix}; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] -use crate::common::tokio_block_on; use fuser::BackgroundSession; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3::data_cache::{build_prefix, get_s3_key, BlockIndex, ExpressDataCache}; -use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig}; - #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3::object::ObjectId; -use mountpoint_s3::prefetch::caching_prefetch; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3_client::types::{PutObjectSingleParams, UploadChecksum}; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3_client::ObjectClient; -use mountpoint_s3_client::S3CrtClient; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3_crt::checksums::crc32c; -use rand::{Rng, RngCore, SeedableRng}; -use rand_chacha::ChaChaRng; -use std::fs; -use std::time::Duration; -use tempfile::TempDir; -use test_case::test_case; const CACHE_BLOCK_SIZE: u64 = 1024 * 1024; const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024; /// A test that checks that an invalid block may not be served from the cache -#[test] +#[tokio::test] #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] -fn express_invalid_block_read() { +async fn express_invalid_block_read() { let bucket = get_standard_bucket(); let cache_bucket = get_express_bucket(); let prefix = get_test_prefix("express_invalid_block_read"); @@ -53,7 +54,9 @@ fn express_invalid_block_read() { let object_key = generate_unprefixed_key(&prefix, "key", 100); let full_object_key = format!("{prefix}{object_key}"); let object_data = "object_data"; - let result = tokio_block_on(client.put_object_single(&bucket, &full_object_key, &Default::default(), object_data)) + let result = client + .put_object_single(&bucket, &full_object_key, &Default::default(), object_data) + .await .expect("put object must succeed"); let object_etag = result.etag.into_inner(); @@ -77,7 +80,9 @@ fn express_invalid_block_read() { let corrupted_block = "corrupted_block"; let checksum = crc32c::checksum(corrupted_block.as_bytes()); let put_object_params = PutObjectSingleParams::default().checksum(Some(UploadChecksum::Crc32c(checksum))); - tokio_block_on(client.put_object_single(&cache_bucket, &block_key, &put_object_params, corrupted_block)) + client + .put_object_single(&cache_bucket, &block_key, &put_object_params, corrupted_block) + .await .expect("put object must succeed"); // Expect a successfull read from the source bucket. We expect cache errors being recorded because of the corrupted block. From fb62bbdc306cd65a5407028d4271c77ef80838c3 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 15 Nov 2024 16:55:09 +0000 Subject: [PATCH 6/6] Fix use Signed-off-by: Vlad Volodkin --- mountpoint-s3/tests/fuse_tests/cache_test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mountpoint-s3/tests/fuse_tests/cache_test.rs b/mountpoint-s3/tests/fuse_tests/cache_test.rs index 0e69871ad..35e32c958 100644 --- a/mountpoint-s3/tests/fuse_tests/cache_test.rs +++ b/mountpoint-s3/tests/fuse_tests/cache_test.rs @@ -7,6 +7,7 @@ use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig}; use mountpoint_s3::prefetch::caching_prefetch; use mountpoint_s3_client::S3CrtClient; +use fuser::BackgroundSession; use rand::{Rng, RngCore, SeedableRng}; use rand_chacha::ChaChaRng; use std::fs; @@ -17,8 +18,6 @@ use test_case::test_case; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use crate::common::s3::{get_express_bucket, get_standard_bucket}; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] -use fuser::BackgroundSession; -#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3::data_cache::{build_prefix, get_s3_key, BlockIndex, ExpressDataCache}; #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] use mountpoint_s3::object::ObjectId;