diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..2248f35 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,52 @@ +name: Rust +on: + push: + branches: + - main + pull_request: + +env: + # This env var is used by Swatinem/rust-cache@v2 for the cache + # key, so we set it to make sure it is always consistent. + CARGO_TERM_COLOR: always + # Disable full debug symbol generation to speed up CI build and keep memory down + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" + RUST_BACKTRACE: "1" + # according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html + # CI builds are faster with incremental disabled. + CARGO_INCREMENTAL: "0" + CARGO_BUILD_JOBS: "1" + +jobs: + lint: + runs-on: ubuntu-24.04 + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: 1.83 + components: rustfmt, clippy + - uses: Swatinem/rust-cache@v2 + - name: Check formatting + run: cargo fmt -- --check + - name: Check clippy + run: cargo clippy --tests --benches -- -D warnings + test: + strategy: + matrix: + machine: + - ubuntu-24.04 + # - ubuntu-2404-4x-arm64 + - macos-14 + runs-on: ${{ matrix.machine }} + steps: + - uses: actions/checkout@v4 + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: 1.83 + - uses: rui314/setup-mold@v1 + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test diff --git a/.gitignore b/.gitignore index d01bd1a..efe3eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,8 @@ Cargo.lock # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..926fe17 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "ocra" +version = "0.1.0" +authors = ["dev@lancedb.com"] +description = "OCRA: A Rust implementation of Cache in arrow-rs' ObjectStore interface" +edition = "2021" +license-file = "LICENSE" +keywords = ["cache", "object-store", "arrow"] +categories = ["caching"] + +[dependencies] +async-trait = "0.1" +bytes = "~1.9" +futures = "0.3" +log = "~0.4" +moka = { version = "~0.12", features = ["future"] } +num_cpus = "1.16" +object_store = "~0.11" +sysinfo = "~0.32" +tokio = { version = "1", features = ["sync"] } + +[dev-dependencies] +criterion = { version = "~0.5", features = ["async_tokio"] } +tempfile = "~3.14" +tokio = { version = "1", features = ["full"] } +rand = "~0.8" + +[[bench]] +name = "memory" +harness = false diff --git a/README.md b/README.md index 5e64ac3..8bd45c0 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ -# ocra -OCRA: Object-store Cache in Rust for All +# OCRA + +**OCRA**: (**A**) (**R**)ust (**C**)ache implementation using _arrow-rs_ [(**O**)bjectStore](https://docs.rs/object_store/latest/object_store/) trait. diff --git a/benches/memory.rs b/benches/memory.rs new file mode 100644 index 0000000..e0074bd --- /dev/null +++ b/benches/memory.rs @@ -0,0 +1,85 @@ +//! Benchmark for in-memory page cache. +//! +//! + +use std::{fs::File, io::Write, sync::Arc}; + +use criterion::{criterion_group, criterion_main, Criterion}; +use object_store::{path::Path, ObjectStore}; +use rand::Rng; + +use ocra::{memory::InMemoryCache, paging::PageCache}; + +fn memory_cache_bench(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut rng = rand::thread_rng(); + + const FILE_SIZE: usize = 1024 * 1024 * 1024; + // TODO: support other object stores later + let store: Arc = Arc::new(object_store::local::LocalFileSystem::new()); + let temp_file = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + { + let mut writer = File::create(temp_file.to_str().unwrap()).unwrap(); + let mut buf = vec![0_u8; 128 * 1024]; + + for _ in 0..FILE_SIZE / (128 * 1024) { + rng.fill(&mut buf[..]); + writer.write_all(&buf).unwrap(); + } + } + + for page_size in &[1024 * 1024, 8 * 1024 * 1024] { + let cache = Arc::new(InMemoryCache::new(FILE_SIZE + 32 * 1024, *page_size)); + let location = Path::from(temp_file.to_str().unwrap()); + + // Warm up the cache + println!("Starting warm up cache with page size: {}", page_size); + rt.block_on(async { + let loc = location.clone(); + for i in 0..FILE_SIZE / page_size { + let data = cache + .get_with(&loc, i as u32, { + let store = store.clone(); + let location = loc.clone(); + async move { + store + .get_range(&location, i * page_size..(i + 1) * page_size) + .await + } + }) + .await + .unwrap(); + assert!(!data.is_empty()); + } + }); + println!("Warm up cache done"); + + c.bench_function( + format!("memory_cache,warm,page_size={}", page_size).as_str(), + |b| { + b.to_async(&rt).iter(|| { + let mut rng = rand::thread_rng(); + let cache = cache.clone(); + let loc = location.clone(); + async move { + let page_id = rng.gen_range(0..FILE_SIZE / page_size); + + let _data = cache + .get_with(&loc, page_id as u32, async { + panic!("Should not be called page_id={}", page_id) + }) + .await + .unwrap(); + } + }) + }, + ); + } +} + +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = memory_cache_bench); + +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..da1c0f9 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,40 @@ +//! **OCRA**: (**A**) (**R**)ust (**C**)ache implementation for *arrow-rs* +//! [(**O**)bjectStore](object_store::ObjectStore). +//! +//! It offers a few `ObjectStore` implementations that work with +//! caches. +//! +//! For example, you can use [`ReadThroughCache`] to wrap an existing +//! `ObjectStore` instance with a [`PageCache`](paging::PageCache). +//! +//! ```no_run +//! # use std::sync::Arc; +//! # use tokio::runtime::Runtime; +//! use object_store::{ObjectStore, local::LocalFileSystem, path::Path}; +//! use ocra::{ReadThroughCache, memory::InMemoryCache}; +//! +//! # let mut rt = Runtime::new().unwrap(); +//! # rt.block_on(async { +//! let fs = Arc::new(LocalFileSystem::new()); +//! // Use 75% of system memory for cache +//! let memory_cache = Arc::new( +//! InMemoryCache::with_sys_memory(0.75).build()); +//! let cached_store: Arc = +//! Arc::new(ReadThroughCache::new(fs, memory_cache)); +//! +//! // Now you can use `cached_store` as a regular ObjectStore +//! let path = Path::from("my-key"); +//! let data = cached_store.get_range(&path, 1024..2048).await.unwrap(); +//! # }) +//! ``` + +// pub mod error; +pub mod memory; +pub mod paging; +mod read_through; + +// We reuse `object_store` Error and Result to make this crate work well +// with the rest of object_store implementations. +pub use object_store::{Error, Result}; + +pub use read_through::ReadThroughCache; diff --git a/src/memory.rs b/src/memory.rs new file mode 100644 index 0000000..d65056b --- /dev/null +++ b/src/memory.rs @@ -0,0 +1,311 @@ +//! In-memory [`PageCache`] implementation +//! +//! User can specify the capacity of the cache, or specify +//! how much percentage of memory should be allocated to it. +//! +//! ``` +//! use ocra::memory::InMemoryCache; +//! +//! // Use 60% of system memory +//! let cache = InMemoryCache::with_sys_memory(0.6).build(); +//! +//! // Use 32 GB of memory +//! let cache = InMemoryCache::builder(32 * 1024 * 1024 * 1024).build(); +//! ``` + +use std::{ + collections::HashMap, + future::Future, + ops::Range, + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +use bytes::Bytes; +use moka::future::Cache; +use object_store::path::Path; +use sysinfo::{MemoryRefreshKind, RefreshKind}; +use tokio::sync::RwLock; + +mod builder; + +pub use self::builder::InMemoryCacheBuilder; +use crate::{paging::PageCache, Error, Result}; + +/// Default memory page size is 8 MB +pub const DEFAULT_PAGE_SIZE: usize = 8 * 1024 * 1024; +const DEFAULT_TIME_TO_LIVE: Duration = Duration::from_secs(60 * 30); // 30 minutes + +/// In-memory [`PageCache`] implementation. +/// +/// This is a LRU mapping of page IDs to page data, with TTL eviction. +/// +#[derive(Debug)] +pub struct InMemoryCache { + /// Capacity in bytes + capacity: usize, + + /// Size of each page + page_size: usize, + + /// In memory page cache: a mapping from `(path id, offset)` to data / bytes. + cache: Cache<(u64, u32), Bytes>, + + /// Provide fast lookup of path id + location_lookup: RwLock>, + + /// Next location id to be assigned + next_location_id: AtomicU64, +} + +impl InMemoryCache { + /// Create a [`Builder`](InMemoryCacheBuilder) to construct [`InMemoryCache`]. + /// + /// # Parameters: + /// - *capacity*: capacity in bytes + /// + /// ``` + /// # use std::time::Duration; + /// use ocra::memory::InMemoryCache; + /// + /// let cache = InMemoryCache::builder(8*1024*1024) + /// .page_size(4096) + /// .time_to_idle(Duration::from_secs(60)) + /// .build(); + /// ``` + #[must_use] + pub fn builder(capacity_bytes: usize) -> InMemoryCacheBuilder { + InMemoryCacheBuilder::new(capacity_bytes) + } + + /// Explicitly create a new [InMemoryCache] with a fixed capacity and page size. + /// + /// # Parameters: + /// - `capacity_bytes`: Max capacity in bytes. + /// - `page_size`: The maximum size of each page. + /// + pub fn new(capacity_bytes: usize, page_size: usize) -> Self { + Self::with_params(capacity_bytes, page_size, DEFAULT_TIME_TO_LIVE) + } + + /// Create a new cache with a size that is a fraction of the system memory + /// + /// warning: does NOT panic if the fraction is greater than 1 + /// but you are responsible for making sure there is + /// 1. no OOM killer, i.e. swap enabled + /// 2. you are okay with the performance of swapping to disk + pub fn with_sys_memory(fraction: f32) -> InMemoryCacheBuilder { + let sys = sysinfo::System::new_with_specifics( + RefreshKind::new().with_memory(MemoryRefreshKind::everything()), + ); + let capacity = (sys.total_memory() as f32 * fraction) as usize; + Self::builder(capacity) + } + + fn with_params(capacity: usize, page_size: usize, time_to_idle: Duration) -> Self { + let cache = Cache::builder() + .max_capacity(capacity as u64) + // weight each key using the size of the value + .weigher(|_key, value: &Bytes| -> u32 { value.len() as u32 }) + .time_to_idle(time_to_idle) + // .eviction_listener(eviction_listener) + .build(); + Self { + capacity, + page_size, + cache, + location_lookup: RwLock::new(HashMap::new()), + next_location_id: AtomicU64::new(0), + } + } + + async fn location_id(&self, location: &Path) -> u64 { + if let Some(&key) = self.location_lookup.read().await.get(location) { + return key; + } + + let mut id_map = self.location_lookup.write().await; + // on lock-escalation, check if someone else has added it + if let Some(&id) = id_map.get(location) { + return id; + } + + let id = self.next_location_id.fetch_add(1, Ordering::SeqCst); + id_map.insert(location.clone(), id); + + id + } +} + +#[async_trait::async_trait] +impl PageCache for InMemoryCache { + /// The size of each page. + fn page_size(&self) -> usize { + self.page_size + } + + /// Cache capacity in bytes. + fn capacity(&self) -> usize { + self.capacity + } + + fn size(&self) -> usize { + self.cache.weighted_size() as usize + } + + async fn get_with( + &self, + location: &Path, + page_id: u32, + loader: impl Future> + Send, + ) -> Result { + let location_id = self.location_id(location).await; + match self + .cache + .try_get_with((location_id, page_id), loader) + .await + { + Ok(bytes) => Ok(bytes), + Err(e) => match e.as_ref() { + Error::NotFound { .. } => Err(Error::NotFound { + path: location.to_string(), + source: Box::new(e), + }), + _ => Err(Error::Generic { + store: "InMemoryCache", + source: Box::new(e), + }), + }, + } + } + + async fn get_range_with( + &self, + location: &Path, + page_id: u32, + range: Range, + loader: impl Future> + Send, + ) -> Result { + assert!(range.start <= range.end && range.end <= self.page_size()); + let bytes = self.get_with(location, page_id, loader).await?; + Ok(bytes.slice(range)) + } + + async fn invalidate(&self, location: &Path, page_id: u32) -> Result<()> { + let location_id = self.location_id(location).await; + self.cache.remove(&(location_id, page_id)).await; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::{ + io::Write, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }; + + use bytes::{BufMut, BytesMut}; + use object_store::{local::LocalFileSystem, ObjectStore}; + use tempfile::tempdir; + + #[tokio::test] + async fn test_get_range() { + let cache = InMemoryCache::new(1024, 512); + let local_fs = Arc::new(LocalFileSystem::new()); + + let tmp_dir = tempdir().unwrap(); + let file_path = tmp_dir.path().join("test.bin"); + std::fs::write(&file_path, "test data").unwrap(); + let location = Path::from(file_path.as_path().to_str().unwrap()); + + let miss = Arc::new(AtomicUsize::new(0)); + + let data = cache + .get_with(&location, 0, { + let miss = miss.clone(); + let local_fs = local_fs.clone(); + let location = location.clone(); + async move { + miss.fetch_add(1, Ordering::SeqCst); + local_fs.get(&location).await.unwrap().bytes().await + } + }) + .await + .unwrap(); + assert_eq!(miss.load(Ordering::SeqCst), 1); + assert_eq!(data, Bytes::from("test data")); + + let data = cache + .get_with(&location, 0, { + let miss = miss.clone(); + let location = location.clone(); + async move { + miss.fetch_add(1, Ordering::SeqCst); + local_fs.get(&location).await.unwrap().bytes().await + } + }) + .await + .unwrap(); + assert_eq!(miss.load(Ordering::SeqCst), 1); + assert_eq!(data, Bytes::from("test data")); + } + + #[tokio::test] + async fn test_eviction() { + const PAGE_SIZE: usize = 512; + let cache = InMemoryCache::new(1024, PAGE_SIZE); + let local_fs = Arc::new(LocalFileSystem::new()); + + let tmp_dir = tempdir().unwrap(); + let file_path = tmp_dir.path().join("test.bin"); + { + let mut file = std::fs::File::create(&file_path).unwrap(); + for i in 0_u64..1024 { + file.write_all(&i.to_be_bytes()).unwrap(); + } + } + let location = Path::from(file_path.as_path().to_str().unwrap()); + cache.cache.run_pending_tasks().await; + + let miss = Arc::new(AtomicUsize::new(0)); + + for (page_id, expected_miss, expected_size) in + [(0, 1, 1), (0, 1, 1), (1, 2, 2), (4, 3, 2), (5, 4, 2)].iter() + { + let data = cache + .get_with(&location, *page_id, { + let miss = miss.clone(); + let local_fs = local_fs.clone(); + let location = location.clone(); + async move { + miss.fetch_add(1, Ordering::SeqCst); + local_fs + .get_range( + &location, + PAGE_SIZE * (*page_id as usize)..PAGE_SIZE * (page_id + 1) as usize, + ) + .await + } + }) + .await + .unwrap(); + assert_eq!(miss.load(Ordering::SeqCst), *expected_miss); + assert_eq!(data.len(), PAGE_SIZE); + + cache.cache.run_pending_tasks().await; + assert_eq!(cache.cache.entry_count(), *expected_size); + + let mut buf = BytesMut::with_capacity(PAGE_SIZE); + for i in page_id * PAGE_SIZE as u32 / 8..(page_id + 1) * PAGE_SIZE as u32 / 8 { + buf.put_u64(i as u64); + } + assert_eq!(data, buf); + } + } +} diff --git a/src/memory/builder.rs b/src/memory/builder.rs new file mode 100644 index 0000000..1e4b19e --- /dev/null +++ b/src/memory/builder.rs @@ -0,0 +1,44 @@ +//! Memory Cache Builder +//! + +use std::time::Duration; + +use super::{InMemoryCache, DEFAULT_PAGE_SIZE, DEFAULT_TIME_TO_LIVE}; + +/// Builder for [`InMemoryCache`] +pub struct InMemoryCacheBuilder { + capacity: usize, + page_size: usize, + + time_to_idle: Duration, +} + +impl InMemoryCacheBuilder { + pub(crate) fn new(capacity: usize) -> Self { + Self { + capacity, + page_size: DEFAULT_PAGE_SIZE, + time_to_idle: DEFAULT_TIME_TO_LIVE, + } + } + + /// Set the page size. + pub fn page_size(&mut self, size: usize) -> &mut Self { + self.page_size = size; + self + } + + /// If an entry has been idle longer than `time_to_idle` seconds, + /// it will be evicted. + /// + /// Default is 30 minutes. + pub fn time_to_idle(&mut self, tti: Duration) -> &mut Self { + self.time_to_idle = tti; + self + } + + #[must_use] + pub fn build(&self) -> InMemoryCache { + InMemoryCache::with_params(self.capacity, self.page_size, self.time_to_idle) + } +} diff --git a/src/paging.rs b/src/paging.rs new file mode 100644 index 0000000..9edea2b --- /dev/null +++ b/src/paging.rs @@ -0,0 +1,64 @@ +//! Trait for page cache +//! +//! A Page cache caches data in fixed-size pages. + +use std::fmt::Debug; +use std::future::Future; +use std::ops::Range; + +use async_trait::async_trait; +use bytes::Bytes; +use object_store::path::Path; + +use crate::Result; + +/// [PageCache] trait. +/// +/// Caching fixed-size pages. Each page has a unique ID. +#[async_trait] +pub trait PageCache: Sync + Send + Debug { + /// The size of each page. + fn page_size(&self) -> usize; + + /// Cache capacity, in number of pages. + fn capacity(&self) -> usize; + + /// Total used cache size in bytes. + fn size(&self) -> usize; + + /// Read data of a page. + /// + /// # Parameters + /// - `location`: the path of the object. + /// - `page_id`: the ID of the page. + /// + /// # Returns + /// - `Ok(Some(Bytes))` if the page exists and the data was read successfully. + /// - `Ok(None)` if the page does not exist. + /// - `Err(Error)` if an error occurred. + async fn get_with( + &self, + location: &Path, + page_id: u32, + loader: impl Future> + Send, + ) -> Result; + + /// Get range of data in the page. + /// + /// # Parameters + /// - `id`: The ID of the page. + /// - `range`: The range of data to read from the page. The range must be within the page size. + /// + /// # Returns + /// See [Self::get_with()]. + async fn get_range_with( + &self, + location: &Path, + page_id: u32, + range: Range, + loader: impl Future> + Send, + ) -> Result; + + /// Remove a page from the cache. + async fn invalidate(&self, location: &Path, page_id: u32) -> Result<()>; +} diff --git a/src/read_through.rs b/src/read_through.rs new file mode 100644 index 0000000..00173d3 --- /dev/null +++ b/src/read_through.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, +}; + +use crate::{paging::PageCache, Result}; + +/// Read-through Page Cache. +/// +#[derive(Debug)] +pub struct ReadThroughCache { + inner: Arc, + cache: Arc, +} + +impl std::fmt::Display for ReadThroughCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ReadThroughCache(inner={}, cache={:?})", + self.inner, self.cache + ) + } +} + +impl ReadThroughCache { + pub fn new(inner: Arc, cache: Arc) -> Self { + Self { inner, cache } + } +} + +#[async_trait] +impl ObjectStore for ReadThroughCache { + async fn put_opts( + &self, + _location: &Path, + _payload: PutPayload, + _options: PutOptions, + ) -> Result { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> Result> { + todo!() + } + + async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result { + todo!() + } + + async fn delete(&self, _location: &Path) -> object_store::Result<()> { + todo!() + } + + fn list(&'_ self, _prefix: Option<&Path>) -> BoxStream<'_, Result> { + todo!() + } + + async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + todo!() + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +}