Skip to content

Commit

Permalink
Emit shared cache durations in cache hit, miss and error conditions
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Beal <[email protected]>
  • Loading branch information
muddyfish committed Nov 22, 2024
1 parent 47e1d56 commit d1bee2f
Showing 1 changed file with 89 additions and 68 deletions.
157 changes: 89 additions & 68 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,31 +97,21 @@ where

Ok(())
}
}

#[async_trait]
impl<Client> DataCache for ExpressDataCache<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
async fn get_block(
async fn read_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();

) -> Result<Option<ChecksummedBytes>, (DataCacheError, &'static str)> {
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.over_max_object_size", "type" => "read").increment(1);
return Ok(None);
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
return Err((DataCacheError::InvalidBlockOffset, "invalid_block_offset"));
}

let object_key = get_s3_key(&self.prefix, cache_key, block_idx);
Expand All @@ -136,12 +126,10 @@ where
{
Ok(result) => result,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
return Err((DataCacheError::IoFailure(e.into()), "get_object_failure"));
}
};

Expand All @@ -155,83 +143,69 @@ where
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
emit_failure_metric_read("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
return Err((DataCacheError::InvalidBlockOffset, "invalid_block_offset"));
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
result.as_mut().increment_read_window(self.config.block_size as usize);
}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
return Ok(None);
}
Err(e) => {
emit_failure_metric_read("get_object_failure");
return Err(DataCacheError::IoFailure(e.into()));
return Err((DataCacheError::IoFailure(e.into()), "get_object_failure"));
}
}
}
let buffer = buffer.freeze();

let object_metadata = result.get_object_metadata().await.map_err(|err| {
emit_failure_metric_read("invalid_block_metadata");
DataCacheError::IoFailure(err.into())
})?;
let object_metadata = result
.get_object_metadata()
.await
.map_err(|err| (DataCacheError::IoFailure(err.into()), "invalid_block_metadata"))?;

let checksum = result.get_object_checksum().await.map_err(|err| {
emit_failure_metric_read("invalid_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let crc32c_b64 = checksum.checksum_crc32c.ok_or_else(|| {
emit_failure_metric_read("missing_block_checksum");
DataCacheError::BlockChecksumMissing
})?;
let crc32c = crc32c_from_base64(&crc32c_b64).map_err(|err| {
emit_failure_metric_read("unparsable_block_checksum");
DataCacheError::IoFailure(err.into())
})?;
let checksum = result
.get_object_checksum()
.await
.map_err(|err| (DataCacheError::IoFailure(err.into()), "invalid_block_checksum"))?;
let crc32c_b64 = checksum
.checksum_crc32c
.ok_or_else(|| (DataCacheError::BlockChecksumMissing, "missing_block_checksum"))?;
let crc32c = crc32c_from_base64(&crc32c_b64)
.map_err(|err| (DataCacheError::IoFailure(err.into()), "unparsable_block_checksum"))?;

let block_metadata = BlockMetadata::new(block_idx, block_offset, cache_key, &self.bucket_name, crc32c);
block_metadata
.validate_object_metadata(&object_metadata)
.inspect_err(|_| emit_failure_metric_read("invalid_block_metadata"))?;

metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(buffer.len() as u64);
metrics::histogram!("express_data_cache.read_duration_us").record(start.elapsed().as_micros() as f64);
.map_err(|err| (err, "invalid_block_metadata"))?;

Ok(Some(ChecksummedBytes::new_from_inner_data(buffer, crc32c)))
}

async fn put_block(
async fn write_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
) -> Result<(), (DataCacheError, &'static str)> {
if object_size > self.config.max_object_size {
metrics::counter!("express_data_cache.over_max_object_size", "type" => "write").increment(1);
return Ok(());
}

if block_offset != block_idx * self.config.block_size {
emit_failure_metric_write("invalid_block_offset");
return Err(DataCacheError::InvalidBlockOffset);
return Err((DataCacheError::InvalidBlockOffset, "invalid_block_offset"));
}

let object_key = get_s3_key(&self.prefix, &cache_key, block_idx);

let (data, checksum) = bytes.into_inner().map_err(|_| {
emit_failure_metric_write("invalid_block_content");
DataCacheError::InvalidBlockContent
})?;
let (data, checksum) = bytes
.into_inner()
.map_err(|_| (DataCacheError::InvalidBlockContent, "invalid_block_content"))?;
let block_metadata = BlockMetadata::new(block_idx, block_offset, &cache_key, &self.bucket_name, checksum);
let data_length = data.len() as u64;

self.client
.put_object_single(
Expand All @@ -242,29 +216,76 @@ where
)
.in_current_span()
.await
.inspect_err(|_| {
emit_failure_metric_write("put_object_failure");
})?;
.map_err(|err| (err.into(), "put_object_failure"))?;

metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(data_length);
metrics::histogram!("express_data_cache.write_duration_us").record(start.elapsed().as_micros() as f64);
Ok(())
}
}

fn block_size(&self) -> u64 {
self.config.block_size
#[async_trait]
impl<Client> DataCache for ExpressDataCache<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
async fn get_block(
&self,
cache_key: &ObjectId,
block_idx: BlockIndex,
block_offset: u64,
object_size: usize,
) -> DataCacheResult<Option<ChecksummedBytes>> {
let start = Instant::now();
let (result, result_type) = match self.read_block(cache_key, block_idx, block_offset, object_size).await {
Ok(Some(data)) => {
metrics::counter!("express_data_cache.block_hit").increment(1);
metrics::counter!("express_data_cache.total_bytes", "type" => "read").increment(data.len() as u64);
(Ok(Some(data)), "cache_hit")
}
Ok(None) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
(Ok(None), "cache_miss")
}
Err((err, reason)) => {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
(Err(err), "cache_error")
}
};
metrics::histogram!("express_data_cache.read_duration_us", "type" => result_type)
.record(start.elapsed().as_micros() as f64);
result
}
}

#[inline]
fn emit_failure_metric_read(reason: &'static str) {
metrics::counter!("express_data_cache.block_hit").increment(0);
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "read").increment(1);
}
async fn put_block(
&self,
cache_key: ObjectId,
block_idx: BlockIndex,
block_offset: u64,
bytes: ChecksummedBytes,
object_size: usize,
) -> DataCacheResult<()> {
let start = Instant::now();
let (result, result_type) = match self
.write_block(cache_key, block_idx, block_offset, bytes, object_size)
.await
{
Ok(()) => {
metrics::counter!("express_data_cache.total_bytes", "type" => "write").increment(object_size as u64);
(Ok(()), "cache_hit")
}
Err((err, reason)) => {
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
(Err(err), "cache_error")
}
};
metrics::histogram!("express_data_cache.write_duration_us", "type" => result_type)
.record(start.elapsed().as_micros() as f64);
result
}

#[inline]
fn emit_failure_metric_write(reason: &'static str) {
metrics::counter!("express_data_cache.block_err", "reason" => reason, "type" => "write").increment(1);
fn block_size(&self) -> u64 {
self.config.block_size
}
}

/// Metadata about the cached object to ensure that the object we've retrieved is the one we were
Expand Down

0 comments on commit d1bee2f

Please sign in to comment.