diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index f3d031d54..3afe16ce9 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -32,8 +32,8 @@ pub enum DataCacheError { IoFailure(#[source] anyhow::Error), #[error("Block header was not valid: {0}")] InvalidBlockHeader(String), - #[error("Block checksum was not present")] - BlockChecksumMissing, + #[error("Block checksum was not valid")] + InvalidBlockChecksum, #[error("Block content was not valid/readable")] InvalidBlockContent, #[error("Block offset does not match block index")] @@ -42,6 +42,19 @@ pub enum DataCacheError { EvictionFailure, } +impl DataCacheError { + fn reason(&self) -> &'static str { + match self { + DataCacheError::IoFailure(_) => "io_failure", + DataCacheError::InvalidBlockHeader(_) => "invalid_block_header", + DataCacheError::InvalidBlockChecksum => "invalid_block_checksum", + DataCacheError::InvalidBlockContent => "invalid_block_content", + DataCacheError::InvalidBlockOffset => "invalid_block_offset", + DataCacheError::EvictionFailure => "eviction_failure", + } + } +} + pub type DataCacheResult = Result; /// Data cache for fixed-size checksummed buffers. diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs index cf59359af..b3e3c79ff 100644 --- a/mountpoint-s3/src/data_cache/express_data_cache.rs +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -105,30 +105,20 @@ where backpressure_handle.increment_read_window(self.config.block_size as usize); } } -} -#[async_trait] -impl DataCache for ExpressDataCache -where - Client: ObjectClient + Send + Sync + 'static, -{ - async fn get_block( + async fn read_block( &self, cache_key: &ObjectId, block_idx: BlockIndex, block_offset: u64, object_size: usize, ) -> DataCacheResult> { - let start = Instant::now(); - if object_size > self.config.max_object_size { - metrics::counter!("express_data_cache.block_hit").increment(0); metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1); return Ok(None); } if block_offset != block_idx * self.config.block_size { - emit_failure_metric_read("invalid_block_offset"); return Err(DataCacheError::InvalidBlockOffset); } @@ -144,11 +134,9 @@ where { Ok(result) => result, Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => { - metrics::counter!("express_data_cache.block_hit").increment(0); return Ok(None); } Err(e) => { - emit_failure_metric_read("get_object_failure"); return Err(DataCacheError::IoFailure(e.into())); } }; @@ -163,7 +151,6 @@ where match chunk { Ok((offset, body)) => { if offset != buffer.len() as u64 { - emit_failure_metric_read("invalid_block_offset"); return Err(DataCacheError::InvalidBlockOffset); } @@ -180,11 +167,9 @@ where self.ensure_read_window(backpressure_handle.as_mut()); } Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => { - metrics::counter!("express_data_cache.block_hit").increment(0); return Ok(None); } Err(e) => { - emit_failure_metric_read("get_object_failure"); return Err(DataCacheError::IoFailure(e.into())); } } @@ -192,32 +177,21 @@ where let object_metadata = result.get_object_metadata(); - let checksum = result.get_object_checksum().map_err(|err| { - emit_failure_metric_read("invalid_block_checksum"); - DataCacheError::IoFailure(err.into()) - })?; - let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| { - emit_failure_metric_read("missing_block_checksum"); - DataCacheError::BlockChecksumMissing - })?; - let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| { - emit_failure_metric_read("unparsable_block_checksum"); - DataCacheError::IoFailure(err.into()) - })?; + let checksum = result + .get_object_checksum() + .map_err(|_| DataCacheError::InvalidBlockChecksum)?; + let crc32c_b64 = checksum + .checksum_crc32c + .ok_or_else(|| DataCacheError::InvalidBlockChecksum)?; + let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|_| DataCacheError::InvalidBlockChecksum)?; let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c); - block_metadata - .validate_object_metadata(&object_metadata) - .inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?; - - metrics::counter!("express_data_cache.block_hit").increment(1); - metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64); - metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64); + block_metadata.validate_object_metadata(&object_metadata)?; Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c))) } - async fn put_block( + async fn write_block( &self, cache_key: ObjectId, block_idx: BlockIndex, @@ -225,25 +199,19 @@ where bytes: ChecksummedBytes, object_size: usize, ) -> DataCacheResult<()> { - let start = Instant::now(); if object_size > self.config.max_object_size { metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1); return Ok(()); } if block_offset != block_idx * self.config.block_size { - emit_failure_metric_write("invalid_block_offset"); return Err(DataCacheError::InvalidBlockOffset); } let object_key = get_s3_key(&self.prefix, &cache_key, block_idx); - let (data, checksum) = bytes.into_inner().map_err(|_| { - emit_failure_metric_write("invalid_block_content"); - DataCacheError::InvalidBlockContent - })?; + let (data, checksum) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum); - let data_length = data.len() as u64; self.client .put_object_single( @@ -254,29 +222,78 @@ where ) .in_current_span() .await - .inspect_err(|_| { - emit_failure_metric_write("put_object_failure"); - })?; + .map_err(|err| DataCacheError::IoFailure(err.into()))?; - metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length); - metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64); Ok(()) } +} - fn block_size(&self) -> u64 { - self.config.block_size +#[async_trait] +impl DataCache for ExpressDataCache +where + Client: ObjectClient + Send + Sync + 'static, +{ + async fn get_block( + &self, + cache_key: &ObjectId, + block_idx: BlockIndex, + block_offset: u64, + object_size: usize, + ) -> DataCacheResult> { + let start = Instant::now(); + let (result, result_type) = match self.read_block(cache_key, block_idx, block_offset, object_size).await { + Ok(Some(data)) => { + metrics::counter!("express_data_cache.block_hit").increment(1); + metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(data.len() as u64); + (Ok(Some(data)), "ok") + } + Ok(None) => { + metrics::counter!("express_data_cache.block_hit").increment(0); + (Ok(None), "miss") + } + Err(err) => { + metrics::counter!("express_data_cache.block_hit").increment(0); + metrics::counter!("express_data_cache.block_err", "reason" => err.reason(), "type" => "read") + .increment(1); + (Err(err), "error") + } + }; + metrics::histogram!("express_data_cache.read_duration_us", "type" => result_type) + .record(start.elapsed().as_micros() as f64); + result } -} -#[inline] -fn emit_failure_metric_read(reason: &'static str) { - metrics::counter!("express_data_cache.block_hit").increment(0); - metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1); -} + async fn put_block( + &self, + cache_key: ObjectId, + block_idx: BlockIndex, + block_offset: u64, + bytes: ChecksummedBytes, + object_size: usize, + ) -> DataCacheResult<()> { + let start = Instant::now(); + let (result, result_type) = match self + .write_block(cache_key, block_idx, block_offset, bytes, object_size) + .await + { + Ok(()) => { + metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(object_size as u64); + (Ok(()), "ok") + } + Err(err) => { + metrics::counter!("express_data_cache.block_err", "reason" => err.reason(), "type" => "write") + .increment(1); + (Err(err), "error") + } + }; + metrics::histogram!("express_data_cache.write_duration_us", "type" => result_type) + .record(start.elapsed().as_micros() as f64); + result + } -#[inline] -fn emit_failure_metric_write(reason: &'static str) { - metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1); + fn block_size(&self) -> u64 { + self.config.block_size + } } /// Metadata about the cached object to ensure that the object we've retrieved is the one we were @@ -335,7 +352,7 @@ impl BlockMetadata { } /// Validate the object metadata headers received match this BlockMetadata object. - pub fn validate_object_metadata(&self, headers: &HashMap) -> Result<(), DataCacheError> { + pub fn validate_object_metadata(&self, headers: &HashMap) -> DataCacheResult<()> { self.validate_header(headers, "cache-version", |version| version == CACHE_VERSION)?; self.validate_header(headers, "block-idx", |block_idx| { block_idx.parse() == Ok(self.block_idx) @@ -362,7 +379,7 @@ impl BlockMetadata { headers: &HashMap, header: &str, is_valid: F, - ) -> Result<(), DataCacheError> { + ) -> DataCacheResult<()> { let value = headers .get(header) .ok_or(DataCacheError::InvalidBlockHeader(header.to_string()))?; @@ -600,7 +617,7 @@ mod tests { .get_block(&cache_key, 0, 0, data.len()) .await .expect_err("cache should return error if checksum isn't present"); - assert!(matches!(err, DataCacheError::BlockChecksumMissing)); + assert!(matches!(err, DataCacheError::InvalidBlockChecksum)); // Remove the object metadata when writing. client