Skip to content

Commit

Permalink
feat: add trait for cache stats and atomic stats (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Dec 3, 2024
1 parent 8798f41 commit f810267
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ futures = "~0.3"
log = "~0.4"
moka = { version = "~0.12", features = ["future"] }
num_cpus = "1.16"
object_store = "~0.11"
object_store = "=0.10.2"
sysinfo = "~0.32"
tokio = { version = "1", features = ["sync"] }

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
pub mod memory;
pub mod paging;
mod read_through;
pub mod stats;

// We reuse `object_store` Error and Result to make this crate work well
// with the rest of object_store implementations.
Expand Down
74 changes: 53 additions & 21 deletions src/read_through.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use object_store::{
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
};

use crate::{paging::PageCache, Result};
use crate::{paging::PageCache, stats::CacheStats, Result};

/// Read-through Page Cache.
///
Expand All @@ -19,6 +19,8 @@ pub struct ReadThroughCache<C: PageCache> {
cache: Arc<C>,

parallelism: usize,

stats: Arc<dyn CacheStats>,
}

impl<C: PageCache> std::fmt::Display for ReadThroughCache<C> {
Expand All @@ -33,10 +35,23 @@ impl<C: PageCache> std::fmt::Display for ReadThroughCache<C> {

impl<C: PageCache> ReadThroughCache<C> {
pub fn new(inner: Arc<dyn ObjectStore>, cache: Arc<C>) -> Self {
Self::new_with_stats(
inner,
cache,
Arc::new(crate::stats::AtomicIntCacheStats::new()),
)
}

pub fn new_with_stats(
inner: Arc<dyn ObjectStore>,
cache: Arc<C>,
stats: Arc<dyn CacheStats>,
) -> Self {
Self {
inner,
cache,
parallelism: num_cpus::get(),
stats,
}
}

Expand All @@ -48,6 +63,7 @@ impl<C: PageCache> ReadThroughCache<C> {
async fn get_range<C: PageCache>(
store: Arc<dyn ObjectStore>,
cache: Arc<C>,
stats: Arc<dyn CacheStats>,
location: &Path,
range: Range<usize>,
parallelism: usize,
Expand All @@ -65,22 +81,28 @@ async fn get_range<C: PageCache>(
let range_in_page = intersection.start - offset..intersection.end - offset;
let page_end = std::cmp::min(offset + page_size, meta.size);
let store = store.clone();
let stats = stats.clone();

stats.inc_total_reads();

async move {
// Actual range in the file.
page_cache
.get_range_with(
location,
page_id as u32,
range_in_page,
store.get_range(location, offset..page_end),
)
.get_range_with(location, page_id as u32, range_in_page, async {
stats.inc_total_misses();
store.get_range(location, offset..page_end).await
})
.await
}
})
.buffered(parallelism)
.try_collect::<Vec<_>>()
.await?;

if pages.len() == 1 {
return Ok(pages.into_iter().next().unwrap());
}

// stick all bytes together.
let mut buf = BytesMut::with_capacity(range.len());
for page in pages {
Expand Down Expand Up @@ -122,24 +144,33 @@ impl<C: PageCache> ObjectStore for ReadThroughCache<C> {
let page_size = self.cache.page_size();
let inner = self.inner.clone();
let cache = self.cache.clone();
let stats = self.stats.clone();
let location = location.clone();
let parallelism = self.parallelism;

// TODO: This might yield too many small reads.
let s =
stream::iter((0..file_size).step_by(page_size))
.map(move |offset| {
let loc = location.clone();
let store = inner.clone();
let c = cache.clone();
let page_size = cache.page_size();

async move {
get_range(store, c, &loc, offset..offset + page_size, parallelism).await
}
})
.buffered(self.parallelism)
.boxed();
let s = stream::iter((0..file_size).step_by(page_size))
.map(move |offset| {
let loc = location.clone();
let store = inner.clone();
let stats = stats.clone();
let c = cache.clone();
let page_size = cache.page_size();

async move {
get_range(
store,
c,
stats,
&loc,
offset..offset + page_size,
parallelism,
)
.await
}
})
.buffered(self.parallelism)
.boxed();

let payload = GetResultPayload::Stream(s);
Ok(GetResult {
Expand All @@ -154,6 +185,7 @@ impl<C: PageCache> ObjectStore for ReadThroughCache<C> {
get_range(
self.inner.clone(),
self.cache.clone(),
self.stats.clone(),
location,
range,
self.parallelism,
Expand Down
124 changes: 124 additions & 0 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Cache stats
use std::{
fmt::Debug,
sync::atomic::{AtomicU64, Ordering},
};

use log::warn;

pub trait CacheReadStats: Sync + Send + Debug {
/// Total reads on the cache.
fn total_reads(&self) -> u64;

/// Total hits
fn total_misses(&self) -> u64;

/// Increase total reads by 1.
fn inc_total_reads(&self);

/// Increase total hits by 1.
fn inc_total_misses(&self);
}

pub trait CacheCapacityStats {
fn max_capacity(&self) -> u64;

fn set_max_capacity(&self, val: u64);

fn usage(&self) -> u64;

fn set_usage(&self, val: u64);

fn inc_usage(&self, val: u64);

fn sub_usage(&self, val: u64);
}

pub trait CacheStats: CacheCapacityStats + CacheReadStats {}

#[derive(Debug)]
pub struct AtomicIntCacheStats {
total_reads: AtomicU64,
total_misses: AtomicU64,
max_capacity: AtomicU64,
capacity_usage: AtomicU64,
}

impl AtomicIntCacheStats {
pub fn new() -> Self {
Self {
total_misses: AtomicU64::new(0),
total_reads: AtomicU64::new(0),
max_capacity: AtomicU64::new(0),
capacity_usage: AtomicU64::new(0),
}
}
}

impl Default for AtomicIntCacheStats {
fn default() -> Self {
Self::new()
}
}

impl CacheReadStats for AtomicIntCacheStats {
fn total_misses(&self) -> u64 {
self.total_misses.load(Ordering::Acquire)
}

fn total_reads(&self) -> u64 {
self.total_reads.load(Ordering::Acquire)
}

fn inc_total_reads(&self) {
self.total_reads.fetch_add(1, Ordering::Relaxed);
}

fn inc_total_misses(&self) {
self.total_misses.fetch_add(1, Ordering::Relaxed);
}
}

impl CacheCapacityStats for AtomicIntCacheStats {
fn max_capacity(&self) -> u64 {
self.max_capacity.load(Ordering::Acquire)
}

fn set_max_capacity(&self, val: u64) {
self.max_capacity.store(val, Ordering::Relaxed);
}

fn usage(&self) -> u64 {
self.capacity_usage.load(Ordering::Acquire)
}

fn set_usage(&self, val: u64) {
self.capacity_usage.store(val, Ordering::Relaxed);
}

fn inc_usage(&self, val: u64) {
self.capacity_usage.fetch_add(val, Ordering::Relaxed);
}

fn sub_usage(&self, val: u64) {
let res =
self.capacity_usage
.fetch_update(Ordering::Acquire, Ordering::Relaxed, |current| {
if current < val {
warn!(
"cannot decrement cache usage. current val = {:?} and decrement = {:?}",
current, val
);
None
} else {
Some(current - val)
}
});
if let Err(e) = res {
warn!("error setting cache usage: {:?}", e);
}
}
}

impl CacheStats for AtomicIntCacheStats {}

0 comments on commit f810267

Please sign in to comment.