From 2b822f99146c7082973d99d9a92bc7045e63fdae Mon Sep 17 00:00:00 2001 From: Simon Beal Date: Thu, 14 Nov 2024 13:04:49 +0000 Subject: [PATCH] Validate that shared cache bucket is write-able and supports the EXPRESS_ONEZONE storage class Signed-off-by: Simon Beal --- mountpoint-s3/src/cli.rs | 3 + .../src/data_cache/express_data_cache.rs | 79 ++++++++++++++++++- mountpoint-s3/tests/common/fuse.rs | 22 ++++-- mountpoint-s3/tests/fuse_tests/cache_test.rs | 79 ++++++++++++++++++- 4 files changed, 169 insertions(+), 14 deletions(-) diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index f0ead18c0..64435818a 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -11,6 +11,7 @@ use std::time::Duration; use anyhow::{anyhow, Context as _}; use clap::{value_parser, ArgGroup, Parser, ValueEnum}; use fuser::{MountOption, Session}; +use futures::executor::block_on; use futures::task::Spawn; use mountpoint_s3_client::config::{AddressingStyle, EndpointConfig, S3ClientAuthConfig, S3ClientConfig}; use mountpoint_s3_client::error::ObjectClientError; @@ -895,6 +896,7 @@ where (None, Some((config, bucket_name, cache_bucket_name))) => { tracing::trace!("using S3 Express One Zone bucket as a cache for object content"); let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name); + block_on(express_cache.verify_cache_valid())?; let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config); let fuse_session = create_filesystem( @@ -934,6 +936,7 @@ where tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content"); let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?; let express_cache = ExpressDataCache::new(client.clone(), config, bucket_name, cache_bucket_name); + block_on(express_cache.verify_cache_valid())?; let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone()); let prefetcher = caching_prefetch(cache, runtime, prefetcher_config); diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index 3401ceb4d..77df24847 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -72,6 +72,30 @@ where bucket_name: bucket_name.to_owned(), } } + + pub async fn verify_cache_valid(&self) -> anyhow::Result<()> { + let object_key = format!("{}/_mountpoint_cache_metadata", &self.prefix); + // This data is human-readable, and not expected to be read by Mountpoint. + // The file format used here is NOT stable. + // For now, let's just include the data that's guaranteed to be correct as it's what + // calculates the prefix. + let data = format!( + "source_bucket={}\nblock_size={}", + self.bucket_name, self.config.block_size + ); + + self.client + .put_object_single( + &self.bucket_name, + &object_key, + &PutObjectSingleParams::new().storage_class("EXPRESS_ONEZONE".to_string()), + data, + ) + .in_current_span() + .await?; + + Ok(()) + } } #[async_trait] @@ -328,10 +352,10 @@ mod tests { use crate::sync::Arc; use proptest::{prop_assert, proptest}; - use test_case::test_case; - - use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; + use mountpoint_s3_client::failure_client::{countdown_failure_client, CountdownFailureConfig}; + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError}; use mountpoint_s3_client::types::ETag; + use test_case::test_case; #[test_case(1024, 512 * 1024; "block_size smaller than part_size")] #[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")] @@ -552,6 +576,55 @@ mod tests { assert!(matches!(err, DataCacheError::InvalidBlockHeader(_))); } + #[tokio::test] + async fn test_verify_cache_valid_success() { + let source_bucket = "source-bucket"; + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: 8 * 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 8 * 1024 * 1024, + ..Default::default() + }; + let client = Arc::new(MockClient::new(config)); + let cache = ExpressDataCache::new(client.clone(), Default::default(), source_bucket, bucket); + + cache.verify_cache_valid().await.expect("cache should work"); + } + + #[tokio::test] + async fn test_verify_cache_valid_failure() { + let source_bucket = "source-bucket"; + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: 8 * 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 8 * 1024 * 1024, + ..Default::default() + }; + let client = Arc::new(MockClient::new(config)); + + let mut put_single_failures = HashMap::new(); + put_single_failures.insert(1, MockClientError("error".to_owned().into()).into()); + + let failure_client = Arc::new(countdown_failure_client( + client.clone(), + CountdownFailureConfig { + put_single_failures, + ..Default::default() + }, + )); + + let cache = ExpressDataCache::new(failure_client, Default::default(), source_bucket, bucket); + + cache + .verify_cache_valid() + .await + .expect_err("cache should not report valid if cannot write"); + } + proptest! { #[test] fn proptest_creates_small_s3_keys(key: String, etag: String, block_idx: BlockIndex, source_description: String, block_size: u64) { diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index 29d568fac..8040e08b6 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -310,6 +310,9 @@ pub mod mock_session { pub mod s3_session { use super::*; + use crate::common::s3::{ + get_test_bucket_and_prefix, get_test_endpoint_config, get_test_region, get_test_sdk_client, + }; use aws_sdk_s3::operation::head_object::HeadObjectError; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::{ChecksumAlgorithm, GlacierJobParameters, RestoreRequest, Tier}; @@ -319,10 +322,6 @@ pub mod s3_session { use mountpoint_s3_client::types::{Checksum, PutObjectTrailingChecksums}; use mountpoint_s3_client::S3CrtClient; - use crate::common::s3::{ - get_test_bucket_and_prefix, get_test_endpoint_config, get_test_region, get_test_sdk_client, - }; - /// Create a FUSE mount backed by a real S3 client pub fn new(test_name: &str, test_config: TestSessionConfig) -> TestSession { let mount_dir = tempfile::tempdir().unwrap(); @@ -363,7 +362,11 @@ pub mod s3_session { let (bucket, prefix) = get_test_bucket_and_prefix(test_name); let region = get_test_region(); - let client = create_crt_client(test_config.part_size, test_config.initial_read_window_size); + let client = create_crt_client( + test_config.part_size, + test_config.initial_read_window_size, + Default::default(), + ); let runtime = client.event_loop_group(); let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); let session = create_fuse_session( @@ -380,12 +383,17 @@ pub mod s3_session { } } - pub fn create_crt_client(part_size: usize, initial_read_window_size: usize) -> S3CrtClient { + pub fn create_crt_client( + part_size: usize, + initial_read_window_size: usize, + auth_config: S3ClientAuthConfig, + ) -> S3CrtClient { let client_config = S3ClientConfig::default() .part_size(part_size) .endpoint_config(get_test_endpoint_config()) .read_backpressure(true) - .initial_read_window(initial_read_window_size); + .initial_read_window(initial_read_window_size) + .auth_config(auth_config); S3CrtClient::new(client_config).unwrap() } diff --git a/mountpoint-s3/tests/fuse_tests/cache_test.rs b/mountpoint-s3/tests/fuse_tests/cache_test.rs index 35e32c958..b30330a69 100644 --- a/mountpoint-s3/tests/fuse_tests/cache_test.rs +++ b/mountpoint-s3/tests/fuse_tests/cache_test.rs @@ -1,11 +1,18 @@ use crate::common::cache::CacheTestWrapper; +use crate::common::creds::get_scoped_down_credentials; use crate::common::fuse::create_fuse_session; use crate::common::fuse::s3_session::create_crt_client; use crate::common::s3::{get_test_bucket, get_test_prefix}; +use crate::fuse_tests::cache_test::S3RequestError::CrtError; use mountpoint_s3::data_cache::{DataCache, DiskDataCache, DiskDataCacheConfig}; use mountpoint_s3::prefetch::caching_prefetch; -use mountpoint_s3_client::S3CrtClient; +use mountpoint_s3_client::config::S3ClientAuthConfig; +use mountpoint_s3_client::error::{ObjectClientError, PutObjectError}; +use mountpoint_s3_client::S3RequestError::ResponseError; +use mountpoint_s3_client::{S3CrtClient, S3RequestError}; +use mountpoint_s3_crt::auth::credentials::{CredentialsProvider, CredentialsProviderStaticOptions}; +use mountpoint_s3_crt::common::allocator::Allocator; use fuser::BackgroundSession; use rand::{Rng, RngCore, SeedableRng}; @@ -40,7 +47,7 @@ async fn express_invalid_block_read() { let prefix = get_test_prefix("express_invalid_block_read"); // Mount the bucket - let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); + let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default()); let cache = CacheTestWrapper::new(ExpressDataCache::new( client.clone(), Default::default(), @@ -100,7 +107,7 @@ async fn express_invalid_block_read() { #[test_case("key", 100, 1024 * 1024; "big file")] #[cfg(all(feature = "s3_tests", feature = "s3express_tests"))] fn express_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) { - let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); + let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default()); let bucket_name = get_standard_bucket(); let express_bucket_name = get_express_bucket(); let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &express_bucket_name); @@ -129,7 +136,7 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) }; let cache = DiskDataCache::new(cache_dir.path().to_path_buf(), cache_config); - let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE); + let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default()); let bucket_name = get_test_bucket(); cache_write_read_base( @@ -143,6 +150,70 @@ fn disk_cache_write_read(key_suffix: &str, key_size: usize, object_size: usize) ); } +#[tokio::test] +async fn express_cache_verify_fail_non_express() { + let client = create_crt_client(CLIENT_PART_SIZE, CLIENT_PART_SIZE, Default::default()); + let bucket_name = get_standard_bucket(); + let cache_bucket_name = get_standard_bucket(); + let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &cache_bucket_name); + let err = cache + .verify_cache_valid() + .await + .expect_err("cannot use standard bucket as shared cache"); + + match err + .downcast::>() + .expect("error should be ObjectClientError") + { + ObjectClientError::ClientError(ResponseError(request_result)) => { + let body = request_result.error_response_body.as_ref().expect("should have body"); + let body = body.clone().into_string().unwrap(); + assert!(body.contains("InvalidStorageClass")); + } + _ => panic!("wrong error type"), + } +} + +#[tokio::test] +async fn express_cache_verify_fail_forbidden() { + let bucket_name = get_standard_bucket(); + let cache_bucket_name = get_express_bucket(); + + let policy = r#"{"Statement": [ + {"Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload"], "Resource": "arn:aws:s3:::__BUCKET__/*"}, + {"Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::__BUCKET__"} + ]}"#; + let policy = policy.replace("__BUCKET__", &cache_bucket_name); + let credentials = get_scoped_down_credentials(policy).await; + + // Build a S3CrtClient that uses a static credentials provider with the creds we just got + let config = CredentialsProviderStaticOptions { + access_key_id: credentials.access_key_id(), + secret_access_key: credentials.secret_access_key(), + session_token: credentials.session_token(), + }; + let provider = CredentialsProvider::new_static(&Allocator::default(), config).unwrap(); + + let client = create_crt_client( + CLIENT_PART_SIZE, + CLIENT_PART_SIZE, + S3ClientAuthConfig::Provider(provider), + ); + + let cache = ExpressDataCache::new(client.clone(), Default::default(), &bucket_name, &cache_bucket_name); + let err = cache.verify_cache_valid().await.expect_err("cache must be write-able"); + + match err + .downcast::>() + .expect("error should be ObjectClientError") + { + ObjectClientError::ClientError(CrtError(err)) => { + assert!(err.to_string().contains("AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED")) + } + _ => panic!("wrong error type"), + } +} + fn cache_write_read_base( client: S3CrtClient, bucket: &str,