Skip to content

Commit

Permalink
Refactor based on PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish committed Dec 6, 2024
1 parent 25f0455 commit 3705db8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
17 changes: 15 additions & 2 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub enum DataCacheError {
IoFailure(#[source] anyhow::Error),
#[error("Block header was not valid: {0}")]
InvalidBlockHeader(String),
#[error("Block checksum was not present")]
BlockChecksumMissing,
#[error("Block checksum was not valid")]
InvalidBlockChecksum,
#[error("Block content was not valid/readable")]
InvalidBlockContent,
#[error("Block offset does not match block index")]
Expand All @@ -42,6 +42,19 @@ pub enum DataCacheError {
EvictionFailure,
}

impl DataCacheError {
fn get_reason(&self) -> &'static str {
match self {
DataCacheError::IoFailure(_) => "io_failure",
DataCacheError::InvalidBlockHeader(_) => "invalid_block_header",
DataCacheError::InvalidBlockChecksum => "invalid_block_checksum",
DataCacheError::InvalidBlockContent => "invalid_block_content",
DataCacheError::InvalidBlockOffset => "invalid_block_offset",
DataCacheError::EvictionFailure => "eviction_failure",
}
}
}

pub type DataCacheResult<Value> = Result<Value, DataCacheError>;

/// Data cache for fixed-size checksummed buffers.
Expand Down
41 changes: 20 additions & 21 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ where
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> Result<Option<ChecksummedBytes>, (DataCacheError, &'static str)> {
) -> Result<Option<ChecksummedBytes>, DataCacheError> {
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
return Err((DataCacheError::InvalidBlockOffset, "invalid_block_offset"));
return Err(DataCacheError::InvalidBlockOffset);
}

let object_key = get_s3_key(&self.prefix, cache_key, block_idx);
Expand All @@ -129,7 +129,7 @@ where
return Ok(None);
}
Err(e) => {
return Err((DataCacheError::IoFailure(e.into()), "get_object_failure"));
return Err(DataCacheError::IoFailure(e.into()));
}
};

Expand All @@ -142,7 +142,7 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
return Err((DataCacheError::InvalidBlockOffset, "invalid_block_offset"));
return Err(DataCacheError::InvalidBlockOffset);
}

buffer = if buffer.is_empty() {
Expand All @@ -161,7 +161,7 @@ where
return Ok(None);
}
Err(e) => {
return Err((DataCacheError::IoFailure(e.into()), "get_object_failure"));
return Err(DataCacheError::IoFailure(e.into()));
}
}
}
Expand All @@ -170,17 +170,16 @@ where

let checksum = result
.get_object_checksum()
.map_err(|err| (DataCacheError::IoFailure(err.into()), "invalid_block_checksum"))?;
.map_err(|_| DataCacheError::InvalidBlockChecksum)?;
let crc32c_b64 = checksum

Check warning on line 174 in mountpoint-s3/src/data_cache/express_data_cache.rs

View workflow job for this annotation

GitHub Actions / Formatting

Diff in /home/runner/work/mountpoint-s3/mountpoint-s3/mountpoint-s3/src/data_cache/express_data_cache.rs
.checksum_crc32c
.ok_or_else(|| (DataCacheError::BlockChecksumMissing, "missing_block_checksum"))?;
.ok_or_else(|| DataCacheError::InvalidBlockChecksum)?;
let crc32c = crc32c_from_base64(&crc32c_b64)
.map_err(|err| (DataCacheError::IoFailure(err.into()), "unparsable_block_checksum"))?;
.map_err(|_| DataCacheError::InvalidBlockChecksum)?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata
.validate_object_metadata(&object_metadata)
.map_err(|err| (err, "invalid_block_metadata"))?;
.validate_object_metadata(&object_metadata)?;

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}
Expand All @@ -192,21 +191,21 @@ where
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> Result<(), (DataCacheError, &'static str)> {
) -> Result<(), DataCacheError> {
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
return Err((DataCacheError::InvalidBlockOffset, "invalid_block_offset"));
return Err(DataCacheError::InvalidBlockOffset);
}

Check warning on line 203 in mountpoint-s3/src/data_cache/express_data_cache.rs

View workflow job for this annotation

GitHub Actions / Formatting

Diff in /home/runner/work/mountpoint-s3/mountpoint-s3/mountpoint-s3/src/data_cache/express_data_cache.rs
let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes
.into_inner()
.map_err(|_| (DataCacheError::InvalidBlockContent, "invalid_block_content"))?;
.map_err(|_| DataCacheError::InvalidBlockContent)?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);

self.client
Expand All @@ -218,7 +217,7 @@ where
)
.in_current_span()
.await
.map_err(|err| (err.into(), "put_object_failure"))?;
.map_err(|err| DataCacheError::IoFailure(err.into()))?;

Ok(())
}
Expand All @@ -241,15 +240,15 @@ where
Ok(Some(data)) => {
metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(data.len() as u64);
(Ok(Some(data)), "hit")
(Ok(Some(data)), "ok")
}
Ok(None) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
(Ok(None), "miss")
}

Check warning on line 248 in mountpoint-s3/src/data_cache/express_data_cache.rs

View workflow job for this annotation

GitHub Actions / Formatting

Diff in /home/runner/work/mountpoint-s3/mountpoint-s3/mountpoint-s3/src/data_cache/express_data_cache.rs
Err((err, reason)) => {
Err(err) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
metrics::counter!("express_data_cache.block_err", "reason" => err.get_reason(), "type" => "read").increment(1);
(Err(err), "error")
}
};
Expand All @@ -273,10 +272,10 @@ where
{
Ok(()) => {
metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(object_size as u64);
(Ok(()), "hit")
(Ok(()), "ok")

Check warning on line 275 in mountpoint-s3/src/data_cache/express_data_cache.rs

View workflow job for this annotation

GitHub Actions / Formatting

Diff in /home/runner/work/mountpoint-s3/mountpoint-s3/mountpoint-s3/src/data_cache/express_data_cache.rs
}
Err((err, reason)) => {
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
Err(err) => {
metrics::counter!("express_data_cache.block_err", "reason" => err.get_reason(), "type" => "write").increment(1);
(Err(err), "error")
}
};
Expand Down Expand Up @@ -611,7 +610,7 @@ mod tests {
.get_block(&cache_key, 0, 0, data.len())
.await
.expect_err("cache should return error if checksum isn't present");
assert!(matches!(err, DataCacheError::BlockChecksumMissing));
assert!(matches!(err, DataCacheError::InvalidBlockChecksum));

// Remove the object metadata when writing.
client
Expand Down

0 comments on commit 3705db8

Please sign in to comment.