Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test for an invalid cache block #1139

Merged
merged 6 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use thiserror::Error;
pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::{ExpressDataCache, ExpressDataCacheConfig};
pub use crate::data_cache::express_data_cache::{build_prefix, get_s3_key, ExpressDataCache, ExpressDataCacheConfig};
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
pub use crate::data_cache::multilevel_cache::MultilevelDataCache;

Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl BlockMetadata {
}

/// Get the prefix for objects we'll be creating in S3
fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
pub fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
hex::encode(
Sha256::new()
.chain_update(CACHE_VERSION.as_bytes())
Expand All @@ -311,7 +311,7 @@ fn build_prefix(source_bucket_name: &str, block_size: u64) -> String {
}

/// Get the S3 key this block should be written to or read from.
fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
pub fn get_s3_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
let hashed_cache_key = hex::encode(
Sha256::new()
.chain_update(cache_key.key())
Expand Down
29 changes: 21 additions & 8 deletions mountpoint-s3/tests/common/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use async_trait::async_trait;
use mountpoint_s3::{
data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult},
data_cache::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult},
object::ObjectId,
};

Expand All @@ -21,6 +21,8 @@ struct CacheTestWrapperInner<Cache> {
cache: Cache,
/// Number of times the `get_block` succeded and returned data
get_block_hit_count: AtomicU64,
/// Number of times the `get_block` failed because of an invalid block
get_block_invalid_count: AtomicU64,
/// Number of times the `put_block` was completed
put_block_count: AtomicU64,
}
Expand All @@ -39,6 +41,7 @@ impl<Cache> CacheTestWrapper<Cache> {
inner: Arc::new(CacheTestWrapperInner {
cache,
get_block_hit_count: AtomicU64::new(0),
get_block_invalid_count: AtomicU64::new(0),
put_block_count: AtomicU64::new(0),
}),
}
Expand All @@ -62,6 +65,11 @@ impl<Cache> CacheTestWrapper<Cache> {
self.inner.get_block_hit_count.load(Ordering::SeqCst)
}

/// Number of times the `get_block` finished because of an invalid block
pub fn get_block_invalid_count(&self) -> u64 {
self.inner.get_block_invalid_count.load(Ordering::SeqCst)
}

/// Number of times the `put_block` was completed
pub fn put_block_count(&self) -> u64 {
self.inner.put_block_count.load(Ordering::SeqCst)
Expand All @@ -77,18 +85,23 @@ impl<Cache: DataCache + Send + Sync + 'static> DataCache for CacheTestWrapper<Ca
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let result: Option<ChecksummedBytes> = self
let result = self
.inner
.cache
.get_block(cache_key, block_idx, block_offset, object_size)
.await?;
.await;

// The cache hit happens only if the `get_block` was successful and returned data
if result.is_some() {
self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst);
}
match result.as_ref() {
Ok(Some(_)) => {
self.inner.get_block_hit_count.fetch_add(1, Ordering::SeqCst);
}
Err(DataCacheError::InvalidBlockHeader(_)) => {
self.inner.get_block_invalid_count.fetch_add(1, Ordering::SeqCst);
}
_ => (),
};

Ok(result)
result
}

async fn put_block(
Expand Down
13 changes: 8 additions & 5 deletions mountpoint-s3/tests/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ use rand_chacha::rand_core::OsRng;
use crate::common::tokio_block_on;

pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) {
#[cfg(not(feature = "s3express_tests"))]
let bucket = get_standard_bucket();
#[cfg(feature = "s3express_tests")]
let bucket = get_express_bucket();

let bucket = get_test_bucket();
let prefix = get_test_prefix(test_name);

(bucket, prefix)
Expand All @@ -29,6 +25,13 @@ pub fn get_test_prefix(test_name: &str) -> String {
format!("{prefix}{test_name}/{nonce}/")
}

pub fn get_test_bucket() -> String {
#[cfg(not(feature = "s3express_tests"))]
return get_standard_bucket();
#[cfg(feature = "s3express_tests")]
return get_express_bucket();
}

#[cfg(feature = "s3express_tests")]
pub fn get_express_bucket() -> String {
std::env::var("S3_EXPRESS_ONE_ZONE_BUCKET_NAME")
Expand Down
138 changes: 122 additions & 16 deletions mountpoint-s3/tests/fuse_tests/cache_test.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,104 @@
use crate::common::cache::CacheTestWrapper;
use crate::common::fuse::create_fuse_session;
use crate::common::fuse::s3_session::create_crt_client;
use crate::common::s3::{get_express_bucket, get_standard_bucket, get_test_bucket_and_prefix};
use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig, ExpressDataCache};
use crate::common::s3::{get_test_bucket, get_test_prefix};

use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig};
use mountpoint_s3::prefetch::caching_prefetch;
use mountpoint_s3_client::S3CrtClient;

use fuser::BackgroundSession;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::fs;
use std::time::Duration;
use tempfile::TempDir;
use test_case::test_case;

#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking: we should consider moving (some of) the use into the functions that require them (to avoid all the cfg).

use crate::common::s3::{get_express_bucket, get_standard_bucket};
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3::data_cache::{build_prefix, get_s3_key, BlockIndex, ExpressDataCache};
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3::object::ObjectId;
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3_client::types::{PutObjectSingleParams, UploadChecksum};
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3_client::ObjectClient;
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
use mountpoint_s3_crt::checksums::crc32c;

const CACHE_BLOCK_SIZE: u64 = 1024 * 1024;
const CLIENT_PART_SIZE: usize = 8 * 1024 * 1024;

/// A test that checks that an invalid block may not be served from the cache
#[tokio::test]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
async fn express_invalid_block_read() {
let bucket = get_standard_bucket();
let cache_bucket = get_express_bucket();
let prefix = get_test_prefix("express_invalid_block_read");

// Mount the bucket
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let cache = CacheTestWrapper::new(ExpressDataCache::new(
client.clone(),
Default::default(),
&bucket,
&cache_bucket,
));
let (mount_point, _session) = mount_bucket(client.clone(), cache.clone(), &bucket, &prefix);

// Put an object to the mounted bucket
let object_key = generate_unprefixed_key(&prefix, "key", 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pass the prefix then? can't you subtract the prefix's length from 100, if that's what you want?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually can't we just use a fixed key here? And leave the other test where we generate the key intact?

let full_object_key = format!("{prefix}{object_key}");
let object_data = "object_data";
let result = client
.put_object_single(&bucket, &full_object_key, &Default::default(), object_data)
.await
.expect("put object must succeed");
let object_etag = result.etag.into_inner();

// Read data twice, expect cache hits and no errors
let path = mount_point.path().join(&object_key);

let put_block_count = cache.put_block_count();
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, object_data.as_bytes());
cache.wait_for_put(Duration::from_secs(10), put_block_count);

let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, object_data.as_bytes());

assert_eq!(cache.get_block_invalid_count(), 0, "no invalid blocks yet");
assert!(cache.get_block_hit_count() > 0, "reads should result in a cache hit");

// Corrupt the cache block by replacing it with an object holding no metadata
let object_id = get_object_id(&prefix, &object_key, &object_etag);
let block_key = get_express_cache_block_key(&bucket, &object_id, 0);
let corrupted_block = "corrupted_block";
let checksum = crc32c::checksum(corrupted_block.as_bytes());
let put_object_params = PutObjectSingleParams::default().checksum(Some(UploadChecksum::Crc32c(checksum)));
client
.put_object_single(&cache_bucket, &block_key, &put_object_params, corrupted_block)
.await
.expect("put object must succeed");

// Expect a successfull read from the source bucket. We expect cache errors being recorded because of the corrupted block.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: successful

let path = mount_point.path().join(&object_key);
let read = fs::read(&path).expect("read should succeed");
assert_eq!(read, object_data.as_bytes());
assert!(
cache.get_block_invalid_count() > 0,
"read should result in cache errors"
);
}

#[test_case("key", 100, 1024; "simple")]
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);
let bucket_name = get_standard_bucket();
Expand All @@ -26,6 +107,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz

cache_write_read_base(
client,
&bucket_name,
key_suffix,
key_size,
object_size,
Expand All @@ -38,6 +120,7 @@ fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usiz
#[test_case("£", 100, 1024; "non-ascii key")]
#[test_case("key", 1024, 1024; "long key")]
#[test_case("key", 100, 1024 * 1024; "big file")]
#[cfg(feature = "s3_tests")]
fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) {
let cache_dir = tempfile::tempdir().unwrap();
let cache_config = DiskDataCacheConfig {
Expand All @@ -48,8 +131,10 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)

let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE);

let bucket_name = get_test_bucket();
cache_write_read_base(
client,
&bucket_name,
key_suffix,
key_size,
object_size,
Expand All @@ -60,6 +145,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize)

fn cache_write_read_base<Cache>(
client: S3CrtClient,
bucket: &str,
key_suffix: &str,
key_size: usize,
object_size: usize,
Expand All @@ -68,24 +154,14 @@ fn cache_write_read_base<Cache>(
) where
Cache: DataCache + Send + Sync + 'static,
{
let (bucket, prefix) = get_test_bucket_and_prefix(test_name);
let prefix = get_test_prefix(test_name);

// Mount a bucket
let mount_point = tempfile::tempdir().unwrap();
let runtime = client.event_loop_group();
let cache = CacheTestWrapper::new(cache);
let prefetcher = caching_prefetch(cache.clone(), runtime, Default::default());
let _session = create_fuse_session(
client,
prefetcher,
&bucket,
&prefix,
mount_point.path(),
Default::default(),
);
let (mount_point, _session) = mount_bucket(client, cache.clone(), bucket, &prefix);

// Write an object, no caching happens yet
let key = get_object_key(&prefix, key_suffix, key_size);
let key = generate_unprefixed_key(&prefix, key_suffix, key_size);
let path = mount_point.path().join(&key);
let written = random_binary_data(object_size);
fs::write(&path, &written).expect("write should succeed");
Expand Down Expand Up @@ -119,7 +195,8 @@ fn random_binary_data(size_in_bytes: usize) -> Vec<u8> {
}

/// Creates a random key which has a size of at least `min_size_in_bytes`
fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
/// The `key_prefix` is not included in the return value.
fn generate_unprefixed_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize) -> String {
let random_suffix: u64 = rand::thread_rng().gen();
let last_key_part = format!("{key_suffix}{random_suffix}"); // part of the key after all the "/"
let full_key = format!("{key_prefix}{last_key_part}");
Expand All @@ -128,3 +205,32 @@ fn get_object_key(key_prefix: &str, key_suffix: &str, min_size_in_bytes: usize)
let padding = "0".repeat(padding_size);
format!("{last_key_part}{padding}")
}

fn mount_bucket<Cache>(client: S3CrtClient, cache: Cache, bucket: &str, prefix: &str) -> (TempDir, BackgroundSession)
where
Cache: DataCache + Send + Sync + 'static,
{
let mount_point = tempfile::tempdir().unwrap();
let runtime = client.event_loop_group();
let prefetcher = caching_prefetch(cache, runtime, Default::default());
let session = create_fuse_session(
client,
prefetcher,
bucket,
prefix,
mount_point.path(),
Default::default(),
);
(mount_point, session)
}

#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn get_object_id(prefix: &str, key: &str, etag: &str) -> ObjectId {
ObjectId::new(format!("{prefix}{key}"), etag.into())
}

#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
fn get_express_cache_block_key(bucket: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String {
let block_key_prefix = build_prefix(bucket, CACHE_BLOCK_SIZE);
get_s3_key(&block_key_prefix, cache_key, block_idx)
}
2 changes: 1 addition & 1 deletion mountpoint-s3/tests/fuse_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(all(feature = "s3_tests", feature = "s3express_tests"))]
#[cfg(feature = "s3_tests")]
mod cache_test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not blocking)

Copy link
Contributor Author

@vladem vladem Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to keep it to make cargo test --features fuse_tests --no-run buildable. The file only contains s3_tests so it's preferable to having multiple snippets like this in the code:

#[cfg(feature = "s3_tests")]
use crate::common::fuse::s3_session::create_crt_client;

mod consistency_test;
mod fork_test;
Expand Down
Loading