From 33aa7517badb60ed623d5024a6fee5cd38ee91a6 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 15:57:36 -0800 Subject: [PATCH 01/22] init --- .gitignore | 6 +++++- Cargo.toml | 16 ++++++++++++++++ src/lib.rs | 1 + 3 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 Cargo.toml create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore index d01bd1a..efe3eb1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,8 @@ Cargo.lock # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ \ No newline at end of file +#.idea/ + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..336cf12 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ocra" +version = "0.1.0" +authors = ["dev@lancedb.com"] +description = "OCRA: A Rust implementation of Cache in arrow-rs' ObjectStore interface" +edition = "2021" + +[dependencies] +async-trait = "0.1" +bytes = "~1.9" +futures = "0.3" +log = "~0.4" +moka = { version = "~0.12", features = ["future"] } +object_store = "~0.11" +sha2 = "~0.10" +num_cpus = "1.16" diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..48bd195 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +//! A Rust Cache implementation using [object_store::ObjectStore] interface. From 5fc3dbefc971a92c182ba6a26db4bcc50b0b5490 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 15:59:51 -0800 Subject: [PATCH 02/22] add license file --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 336cf12..298fd7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["dev@lancedb.com"] description = "OCRA: A Rust implementation of Cache in arrow-rs' ObjectStore interface" edition = "2021" +license-file = "LICENSE" [dependencies] async-trait = "0.1" From e9d5e88bd4709d4fb8b4db3ca3b586ead10754ac Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 16:20:54 -0800 Subject: [PATCH 03/22] rust ci --- .github/workflows/rust.yml | 47 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..97e6bf8 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,47 @@ +name: Rust +on: + push: + branches: + - main + pull_request: + +env: + # This env var is used by Swatinem/rust-cache@v2 for the cache + # key, so we set it to make sure it is always consistent. + CARGO_TERM_COLOR: always + # Disable full debug symbol generation to speed up CI build and keep memory down + # "1" means line tables only, which is useful for panic tracebacks. + RUSTFLAGS: "-C debuginfo=1" + RUST_BACKTRACE: "1" + # according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html + # CI builds are faster with incremental disabled. + CARGO_INCREMENTAL: "0" + CARGO_BUILD_JOBS: "1" + +jobs: + lint: + runs-on: ubuntu-24.04 + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - name: Check formatting + run: cargo fmt -- --check + - name: Check clippy + run: cargo clippy --locked --tests --benches -- -D warnings + test: + strategy: + matrix: + machine: + - ubuntu-24.04 + - ubuntu-2404-4x-arm64 + - macos-14 + runs-on: ${{ matrix.machine }} + steps: + - uses: actions/checkout@v4 + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: 1.83 + - uses: rui314/setup-mold@v1 + - uses: Swatinem/rust-cache@v2 + - name: Run tests + run: cargo test From 1936912c723acb1fe1c3995553b0c46ffcc241f2 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 16:58:42 -0800 Subject: [PATCH 04/22] cargo --- Cargo.toml | 3 +++ src/lib.rs | 32 ++++++++++++++++++++++++- src/memory.rs | 17 +++++++++++++ src/paging.rs | 58 +++++++++++++++++++++++++++++++++++++++++++++ src/read_through.rs | 29 +++++++++++++++++++++++ 5 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 src/memory.rs create mode 100644 src/paging.rs create mode 100644 src/read_through.rs diff --git a/Cargo.toml b/Cargo.toml index 298fd7c..e512f2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,6 @@ moka = { version = "~0.12", features = ["future"] } object_store = "~0.11" sha2 = "~0.10" num_cpus = "1.16" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index 48bd195..1a21e59 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,31 @@ -//! A Rust Cache implementation using [object_store::ObjectStore] interface. +//! **OCRA**: an *arrow-rs* [**O**bjectStore](object_store::ObjectStore) +//! implementation with **C**ache in **R**ust for **A**ll +//! +//! ```no_run +//! # use std::sync::Arc; +//! # use tokio::runtime::Runtime; +//! use object_store::{ObjectStore, local::LocalFileSystem}; +//! use ocra::{ReadThroughCache, memory::InMemoryCache}; +//! +//! # let mut rt = Runtime::new().unwrap(); +//! # rt.block_on(async { +//! let fs = LocalFileSystem::new(); +//! // Use 75% of system memory for cache +//! let memory_cache = InMemoryCache::with_system_memory(0.75, 4 * 1024 * 1024); +//! let cached: Arc = +//! Arc::new(ReadThroughCache::new(fs, memory_cache)); +//! +//! // Now you can use `cached` as a regular ObjectStore +//! let data = cached.get_range("my-key", 1024..2048).await.unwrap(); +//! # }) +//! ``` + +pub mod memory; +mod paging; +mod read_through; + +// We reuse `object_store` Error and Result to make this crate work well +// with the rest of object_store implementations. +pub use object_store::{Error, Result}; + +pub use read_through::ReadThroughCache; diff --git a/src/memory.rs b/src/memory.rs new file mode 100644 index 0000000..335a745 --- /dev/null +++ b/src/memory.rs @@ -0,0 +1,17 @@ +//! In-memory page cache +//! + +/// In-memory Page Cache +pub struct InMemoryCache {} + +impl InMemoryCache { + /// Create a new cache with a size that is a fraction of the system memory + /// + /// warning: does NOT panic if the fraction is greater than 1 + /// but you are responsible for making sure there is + /// 1. no OOM killer, i.e. swap enabled + /// 2. you are okay with the performance of swapping to disk + pub fn with_sys_memory(_fraction: f32, _page_size: usize) -> Self { + Self {} + } +} diff --git a/src/paging.rs b/src/paging.rs new file mode 100644 index 0000000..0644a5d --- /dev/null +++ b/src/paging.rs @@ -0,0 +1,58 @@ +//! Trait for page cache +//! +//! A Page cache caches data in fixed-size pages. + +use std::fmt::Debug; +use std::ops::Range; + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::Result; + +/// [PageCache] trait. +/// +/// Caching fixed-size pages. Each page has a unique ID. +#[async_trait] +pub trait PageCache: Sync + Send + Debug { + /// The size of each page. + fn page_size(&self) -> usize; + + /// Cache capacity, in number of pages. + fn capacity(&self) -> usize; + + /// How many pages are cached. + fn len(&self) -> usize; + + /// Returns true if the cache is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Read data from a Page. + /// + /// # Returns + /// - `Ok(Some(Bytes))` if the page exists and the data was read successfully. + /// - `Ok(None)` if the page does not exist. + /// - `Err(Error)` if an error occurred. + async fn get(&self, id: [u8; 32]) -> Result>; + + /// Get range of data in the page. + /// + /// # Parameters + /// - `id`: The ID of the page. + /// - `range`: The range of data to read from the page. The range must be within the page size. + /// + /// # Returns + /// See [Self::get()]. + async fn get_range(&self, id: [u8; 32], range: Range) -> Result>; + + /// Put a page in the cache. + /// + /// # Parameters + /// - `id`: The ID of the page. + /// - `page`: The data to put in the page. The page must not be larger than the page size. + /// If the page is smaller than the page size, the remaining space will be zeroed. + /// + async fn put(&self, id: [u8; 32], page: Bytes) -> Result<()>; +} diff --git a/src/read_through.rs b/src/read_through.rs new file mode 100644 index 0000000..70ceacc --- /dev/null +++ b/src/read_through.rs @@ -0,0 +1,29 @@ +use std::sync::Arc; + +use object_store::ObjectStore; + +use crate::paging::PageCache; + +/// Read-through Page Cache. +/// +#[derive(Debug)] +pub struct ReadThroughCache { + inner: Arc, + cache: Arc, +} + +impl std::fmt::Display for ReadThroughCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ReadThroughCache(inner={}, cache={:?})", + self.inner, self.cache + ) + } +} + +impl ReadThroughCache { + pub fn new(inner: Arc, cache: Arc) -> Self { + Self { inner, cache } + } +} From 765d66b798d91d0a57b47c1a22c38a4f432c3a35 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 17:25:51 -0800 Subject: [PATCH 05/22] pass test --- src/lib.rs | 10 +++++---- src/memory.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++ src/read_through.rs | 53 +++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 109 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1a21e59..4f2f5b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,19 +4,21 @@ //! ```no_run //! # use std::sync::Arc; //! # use tokio::runtime::Runtime; -//! use object_store::{ObjectStore, local::LocalFileSystem}; +//! use object_store::{ObjectStore, local::LocalFileSystem, path::Path}; //! use ocra::{ReadThroughCache, memory::InMemoryCache}; //! //! # let mut rt = Runtime::new().unwrap(); //! # rt.block_on(async { -//! let fs = LocalFileSystem::new(); +//! let fs = Arc::new(LocalFileSystem::new()); //! // Use 75% of system memory for cache -//! let memory_cache = InMemoryCache::with_system_memory(0.75, 4 * 1024 * 1024); +//! let memory_cache = Arc::new( +//! InMemoryCache::with_sys_memory(0.75, 4 * 1024 * 1024)); //! let cached: Arc = //! Arc::new(ReadThroughCache::new(fs, memory_cache)); //! //! // Now you can use `cached` as a regular ObjectStore -//! let data = cached.get_range("my-key", 1024..2048).await.unwrap(); +//! let path = Path::from("my-key"); +//! let data = cached.get_range(&path, 1024..2048).await.unwrap(); //! # }) //! ``` diff --git a/src/memory.rs b/src/memory.rs index 335a745..4a8990b 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -1,7 +1,14 @@ //! In-memory page cache //! +use std::ops::Range; + +use bytes::Bytes; + +use crate::{paging::PageCache, Result}; + /// In-memory Page Cache +#[derive(Debug)] pub struct InMemoryCache {} impl InMemoryCache { @@ -15,3 +22,48 @@ impl InMemoryCache { Self {} } } + +#[async_trait::async_trait] +impl PageCache for InMemoryCache { + /// The size of each page. + fn page_size(&self) -> usize { + todo!() + } + + /// Cache capacity, in number of pages. + fn capacity(&self) -> usize { + todo!() + } + + /// How many pages are cached. + fn len(&self) -> usize { + todo!() + } + + async fn get(&self, id: [u8; 32]) -> Result> { + todo!() + } + + /// Get range of data in the page. + /// + /// # Parameters + /// - `id`: The ID of the page. + /// - `range`: The range of data to read from the page. The range must be within the page size. + /// + /// # Returns + /// See [Self::get()]. + async fn get_range(&self, id: [u8; 32], range: Range) -> Result> { + todo!() + } + + /// Put a page in the cache. + /// + /// # Parameters + /// - `id`: The ID of the page. + /// - `page`: The data to put in the page. The page must not be larger than the page size. + /// If the page is smaller than the page size, the remaining space will be zeroed. + /// + async fn put(&self, id: [u8; 32], page: Bytes) -> Result<()> { + todo!() + } +} diff --git a/src/read_through.rs b/src/read_through.rs index 70ceacc..39317d4 100644 --- a/src/read_through.rs +++ b/src/read_through.rs @@ -1,8 +1,13 @@ use std::sync::Arc; -use object_store::ObjectStore; +use async_trait::async_trait; +use futures::stream::BoxStream; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, +}; -use crate::paging::PageCache; +use crate::{paging::PageCache, Result}; /// Read-through Page Cache. /// @@ -27,3 +32,47 @@ impl ReadThroughCache { Self { inner, cache } } } + +#[async_trait] +impl ObjectStore for ReadThroughCache { + async fn put_opts( + &self, + _location: &Path, + _payload: PutPayload, + _options: PutOptions, + ) -> Result { + todo!() + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOpts, + ) -> Result> { + todo!() + } + + async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result { + todo!() + } + + async fn delete(&self, _location: &Path) -> object_store::Result<()> { + todo!() + } + + fn list(&'_ self, _prefix: Option<&Path>) -> BoxStream<'_, Result> { + todo!() + } + + async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result { + todo!() + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +} From b2333cf5f013ecbe3e42d3a558f411ad56e4f567 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 18:21:47 -0800 Subject: [PATCH 06/22] dont lock --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 97e6bf8..b31d49b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -27,7 +27,7 @@ jobs: - name: Check formatting run: cargo fmt -- --check - name: Check clippy - run: cargo clippy --locked --tests --benches -- -D warnings + run: cargo clippy --tests --benches -- -D warnings test: strategy: matrix: From 8eec09aef6553477ef3854f46a4a2ca1924561e3 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 19:13:11 -0800 Subject: [PATCH 07/22] memory --- Cargo.toml | 3 +- src/lib.rs | 4 +- src/memory.rs | 100 +++++++++++++++++++++++++++++++++++++----- src/memory/builder.rs | 43 ++++++++++++++++++ src/paging.rs | 4 +- 5 files changed, 139 insertions(+), 15 deletions(-) create mode 100644 src/memory/builder.rs diff --git a/Cargo.toml b/Cargo.toml index e512f2f..3fe0c5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,10 @@ bytes = "~1.9" futures = "0.3" log = "~0.4" moka = { version = "~0.12", features = ["future"] } +num_cpus = "1.16" object_store = "~0.11" sha2 = "~0.10" -num_cpus = "1.16" +sysinfo = "0.32" [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index 4f2f5b6..86eff18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ //! let fs = Arc::new(LocalFileSystem::new()); //! // Use 75% of system memory for cache //! let memory_cache = Arc::new( -//! InMemoryCache::with_sys_memory(0.75, 4 * 1024 * 1024)); +//! InMemoryCache::with_sys_memory(0.75).build()); //! let cached: Arc = //! Arc::new(ReadThroughCache::new(fs, memory_cache)); //! @@ -23,7 +23,7 @@ //! ``` pub mod memory; -mod paging; +pub mod paging; mod read_through; // We reuse `object_store` Error and Result to make this crate work well diff --git a/src/memory.rs b/src/memory.rs index 4a8990b..fc30e83 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -1,38 +1,118 @@ -//! In-memory page cache +//! In-memory [PageCache] implementation //! +//! User can specify the capacity of the cache, or specify +//! how much percentage of memory should be allocated to it. +//! +//! ``` +//! use ocra::memory::InMemoryCache; +//! +//! // Use 60% of system memory +//! let cache = InMemoryCache::with_sys_memory(0.6).build(); +//! +//! // Use 32 GB of memory +//! let cache = InMemoryCache::builder(32 * 1024 * 1024 * 1024).build(); +//! ``` -use std::ops::Range; +use std::{ops::Range, time::Duration}; use bytes::Bytes; +use moka::future::Cache; +use sysinfo::{MemoryRefreshKind, RefreshKind}; +mod builder; + +pub use self::builder::InMemoryCacheBuilder; use crate::{paging::PageCache, Result}; -/// In-memory Page Cache +/// Default memory page size is 8 MB +pub const DEFAULT_PAGE_SIZE: u64 = 8 * 1024 * 1024; +const DEFAULT_TIME_TO_LIVE: Duration = Duration::from_secs(60 * 30); // 30 minutes + +/// In-memory [PageCache] implementation. +/// +/// This is a LRU mapping of page IDs to page data, with TTL eviction. +/// #[derive(Debug)] -pub struct InMemoryCache {} +pub struct InMemoryCache { + /// Capacity in bytes + capacity: u64, + + /// Size of each page + page_size: u64, + + /// Page cache: a mapping from `(path id, offset)` to data / bytes. + cache: Cache<(u32, u32), Bytes>, +} impl InMemoryCache { + /// Create a [`Builder`](InMemoryCacheBuilder) to construct [InMemoryCache]. + /// + /// Parameters: + /// - *capacity*: capacity in bytes + /// + /// ``` + /// # use std::time::Duration; + /// use ocra::memory::InMemoryCache; + /// + /// let cache = InMemoryCache::builder(8*1024*1024) + /// .page_size(4096) + /// .time_to_idle(Duration::from_secs(60)) + /// .build(); + /// ``` + pub fn builder(capacity_bytes: u64) -> InMemoryCacheBuilder { + InMemoryCacheBuilder::new(capacity_bytes) + } + + /// Create a new [InMemoryCache] with a fixed capacity and page size. + /// + /// Parameters: + /// - `capacity_bytes`: Max capacity in bytes. + /// - `page_size`: The maximum size of each page. + /// + pub fn new(capacity_bytes: u64, page_size: u64) -> Self { + Self::with_params(capacity_bytes, page_size, DEFAULT_TIME_TO_LIVE) + } + /// Create a new cache with a size that is a fraction of the system memory /// /// warning: does NOT panic if the fraction is greater than 1 /// but you are responsible for making sure there is /// 1. no OOM killer, i.e. swap enabled /// 2. you are okay with the performance of swapping to disk - pub fn with_sys_memory(_fraction: f32, _page_size: usize) -> Self { - Self {} + pub fn with_sys_memory(fraction: f32) -> InMemoryCacheBuilder { + let sys = sysinfo::System::new_with_specifics( + RefreshKind::new().with_memory(MemoryRefreshKind::everything()), + ); + let capacity = (sys.total_memory() as f32 * fraction) as u64; + Self::builder(capacity) + } + + fn with_params(capacity_bytes: u64, page_size: u64, time_to_idle: Duration) -> Self { + let cache = Cache::builder() + .max_capacity(capacity_bytes) + // weight each key using the size of the value + .weigher(|_key, value: &Bytes| -> u32 { value.len() as u32 }) + .time_to_idle(time_to_idle) + // .eviction_listener(eviction_listener) + .build(); + Self { + capacity: capacity_bytes, + page_size: page_size, + cache, + } } } #[async_trait::async_trait] impl PageCache for InMemoryCache { /// The size of each page. - fn page_size(&self) -> usize { - todo!() + fn page_size(&self) -> u64 { + self.page_size } /// Cache capacity, in number of pages. - fn capacity(&self) -> usize { - todo!() + fn capacity(&self) -> u64 { + self.capacity } /// How many pages are cached. diff --git a/src/memory/builder.rs b/src/memory/builder.rs new file mode 100644 index 0000000..bacff58 --- /dev/null +++ b/src/memory/builder.rs @@ -0,0 +1,43 @@ +//! Memory Cache Builder +//! + +use std::time::Duration; + +use super::{InMemoryCache, DEFAULT_PAGE_SIZE, DEFAULT_TIME_TO_LIVE}; + +/// Builder for [InMemoryCache] +pub struct InMemoryCacheBuilder { + capacity: u64, + page_size: u64, + + time_to_idle: Duration, +} + +impl InMemoryCacheBuilder { + pub(crate) fn new(capacity: u64) -> Self { + Self { + capacity, + page_size: DEFAULT_PAGE_SIZE, + time_to_idle: DEFAULT_TIME_TO_LIVE, + } + } + + /// Set the page size. + pub fn page_size(&mut self, size: u64) -> &mut Self { + self.page_size = size; + self + } + + /// If an entry has been idle longer than `time_to_idle` seconds, + /// it will be evicted. + /// + /// Default is 30 minutes. + pub fn time_to_idle(&mut self, tti: Duration) -> &mut Self { + self.time_to_idle = tti; + self + } + + pub fn build(&self) -> InMemoryCache { + InMemoryCache::with_params(self.capacity, self.page_size, self.time_to_idle) + } +} diff --git a/src/paging.rs b/src/paging.rs index 0644a5d..c7589c2 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -16,10 +16,10 @@ use crate::Result; #[async_trait] pub trait PageCache: Sync + Send + Debug { /// The size of each page. - fn page_size(&self) -> usize; + fn page_size(&self) -> u64; /// Cache capacity, in number of pages. - fn capacity(&self) -> usize; + fn capacity(&self) -> u64; /// How many pages are cached. fn len(&self) -> usize; From b348979ab1731bc063494f71fe5a07c1df465696 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 29 Nov 2024 19:16:06 -0800 Subject: [PATCH 08/22] a --- src/memory.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/memory.rs b/src/memory.rs index fc30e83..4a27e3c 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -47,7 +47,7 @@ pub struct InMemoryCache { impl InMemoryCache { /// Create a [`Builder`](InMemoryCacheBuilder) to construct [InMemoryCache]. /// - /// Parameters: + /// # Parameters: /// - *capacity*: capacity in bytes /// /// ``` @@ -63,9 +63,9 @@ impl InMemoryCache { InMemoryCacheBuilder::new(capacity_bytes) } - /// Create a new [InMemoryCache] with a fixed capacity and page size. + /// Explicitly create a new [InMemoryCache] with a fixed capacity and page size. /// - /// Parameters: + /// # Parameters: /// - `capacity_bytes`: Max capacity in bytes. /// - `page_size`: The maximum size of each page. /// @@ -110,7 +110,7 @@ impl PageCache for InMemoryCache { self.page_size } - /// Cache capacity, in number of pages. + /// Cache capacity in bytes. fn capacity(&self) -> u64 { self.capacity } From c59b95b11a27f5b806d9f8ac9cf38dc6dff103a8 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 13:16:23 -0800 Subject: [PATCH 09/22] a --- src/lib.rs | 1 + src/memory.rs | 76 +++++++++++++++++++++++++++++-------------- src/memory/builder.rs | 8 ++--- src/paging.rs | 42 ++++++++++++++++++++---- src/read_through.rs | 12 +++---- 5 files changed, 98 insertions(+), 41 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 86eff18..411ea64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ //! # }) //! ``` +// pub mod error; pub mod memory; pub mod paging; mod read_through; diff --git a/src/memory.rs b/src/memory.rs index 4a27e3c..a191ee4 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -13,19 +13,23 @@ //! let cache = InMemoryCache::builder(32 * 1024 * 1024 * 1024).build(); //! ``` -use std::{ops::Range, time::Duration}; +use std::{future::Future, ops::Range, time::Duration}; use bytes::Bytes; use moka::future::Cache; +use object_store::path::Path; use sysinfo::{MemoryRefreshKind, RefreshKind}; mod builder; pub use self::builder::InMemoryCacheBuilder; -use crate::{paging::PageCache, Result}; +use crate::{ + paging::{to_page_key, PageCache, PageKey}, + Error, Result, +}; /// Default memory page size is 8 MB -pub const DEFAULT_PAGE_SIZE: u64 = 8 * 1024 * 1024; +pub const DEFAULT_PAGE_SIZE: usize = 8 * 1024 * 1024; const DEFAULT_TIME_TO_LIVE: Duration = Duration::from_secs(60 * 30); // 30 minutes /// In-memory [PageCache] implementation. @@ -35,13 +39,13 @@ const DEFAULT_TIME_TO_LIVE: Duration = Duration::from_secs(60 * 30); // 30 minut #[derive(Debug)] pub struct InMemoryCache { /// Capacity in bytes - capacity: u64, + capacity: usize, /// Size of each page - page_size: u64, + page_size: usize, /// Page cache: a mapping from `(path id, offset)` to data / bytes. - cache: Cache<(u32, u32), Bytes>, + cache: Cache, } impl InMemoryCache { @@ -59,7 +63,7 @@ impl InMemoryCache { /// .time_to_idle(Duration::from_secs(60)) /// .build(); /// ``` - pub fn builder(capacity_bytes: u64) -> InMemoryCacheBuilder { + pub fn builder(capacity_bytes: usize) -> InMemoryCacheBuilder { InMemoryCacheBuilder::new(capacity_bytes) } @@ -69,7 +73,7 @@ impl InMemoryCache { /// - `capacity_bytes`: Max capacity in bytes. /// - `page_size`: The maximum size of each page. /// - pub fn new(capacity_bytes: u64, page_size: u64) -> Self { + pub fn new(capacity_bytes: usize, page_size: usize) -> Self { Self::with_params(capacity_bytes, page_size, DEFAULT_TIME_TO_LIVE) } @@ -83,13 +87,13 @@ impl InMemoryCache { let sys = sysinfo::System::new_with_specifics( RefreshKind::new().with_memory(MemoryRefreshKind::everything()), ); - let capacity = (sys.total_memory() as f32 * fraction) as u64; + let capacity = (sys.total_memory() as f32 * fraction) as usize; Self::builder(capacity) } - fn with_params(capacity_bytes: u64, page_size: u64, time_to_idle: Duration) -> Self { + fn with_params(capacity_bytes: usize, page_size: usize, time_to_idle: Duration) -> Self { let cache = Cache::builder() - .max_capacity(capacity_bytes) + .max_capacity(capacity_bytes as u64) // weight each key using the size of the value .weigher(|_key, value: &Bytes| -> u32 { value.len() as u32 }) .time_to_idle(time_to_idle) @@ -106,12 +110,12 @@ impl InMemoryCache { #[async_trait::async_trait] impl PageCache for InMemoryCache { /// The size of each page. - fn page_size(&self) -> u64 { + fn page_size(&self) -> usize { self.page_size } /// Cache capacity in bytes. - fn capacity(&self) -> u64 { + fn capacity(&self) -> usize { self.capacity } @@ -120,20 +124,38 @@ impl PageCache for InMemoryCache { todo!() } - async fn get(&self, id: [u8; 32]) -> Result> { - todo!() + async fn get_with( + &self, + location: &Path, + page_id: u64, + loader: impl Future> + Send, + ) -> Result { + let key = to_page_key(location, page_id); + match self.cache.try_get_with(key, loader).await { + Ok(bytes) => Ok(bytes), + Err(e) => match e.as_ref() { + Error::NotFound { .. } => Err(Error::NotFound { + path: location.to_string(), + source: Box::new(e), + }), + _ => Err(Error::Generic { + store: "InMemoryCache", + source: Box::new(e), + }), + }, + } } - /// Get range of data in the page. - /// - /// # Parameters - /// - `id`: The ID of the page. - /// - `range`: The range of data to read from the page. The range must be within the page size. - /// - /// # Returns - /// See [Self::get()]. - async fn get_range(&self, id: [u8; 32], range: Range) -> Result> { - todo!() + async fn get_range_with( + &self, + location: &Path, + page_id: u64, + range: Range, + loader: impl Future> + Send, + ) -> Result { + assert!(range.start <= range.end && range.end <= self.page_size()); + let bytes = self.get_with(location, page_id, loader).await?; + Ok(bytes.slice(range)) } /// Put a page in the cache. @@ -146,4 +168,8 @@ impl PageCache for InMemoryCache { async fn put(&self, id: [u8; 32], page: Bytes) -> Result<()> { todo!() } + + async fn remove(&self, id: [u8; 32]) -> Result<()> { + todo!() + } } diff --git a/src/memory/builder.rs b/src/memory/builder.rs index bacff58..744665e 100644 --- a/src/memory/builder.rs +++ b/src/memory/builder.rs @@ -7,14 +7,14 @@ use super::{InMemoryCache, DEFAULT_PAGE_SIZE, DEFAULT_TIME_TO_LIVE}; /// Builder for [InMemoryCache] pub struct InMemoryCacheBuilder { - capacity: u64, - page_size: u64, + capacity: usize, + page_size: usize, time_to_idle: Duration, } impl InMemoryCacheBuilder { - pub(crate) fn new(capacity: u64) -> Self { + pub(crate) fn new(capacity: usize) -> Self { Self { capacity, page_size: DEFAULT_PAGE_SIZE, @@ -23,7 +23,7 @@ impl InMemoryCacheBuilder { } /// Set the page size. - pub fn page_size(&mut self, size: u64) -> &mut Self { + pub fn page_size(&mut self, size: usize) -> &mut Self { self.page_size = size; self } diff --git a/src/paging.rs b/src/paging.rs index c7589c2..36991da 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -3,23 +3,35 @@ //! A Page cache caches data in fixed-size pages. use std::fmt::Debug; +use std::future::Future; use std::ops::Range; use async_trait::async_trait; use bytes::Bytes; +use object_store::path::Path; +use sha2::{Digest, Sha256}; use crate::Result; +pub(crate) type PageKey = [u8; 32]; + +pub(crate) fn to_page_key(location: &Path, offset: u64) -> PageKey { + let mut hasher = Sha256::new(); + hasher.update(location.as_ref()); + hasher.update(&offset.to_be_bytes()); + hasher.finalize().into() +} + /// [PageCache] trait. /// /// Caching fixed-size pages. Each page has a unique ID. #[async_trait] pub trait PageCache: Sync + Send + Debug { /// The size of each page. - fn page_size(&self) -> u64; + fn page_size(&self) -> usize; /// Cache capacity, in number of pages. - fn capacity(&self) -> u64; + fn capacity(&self) -> usize; /// How many pages are cached. fn len(&self) -> usize; @@ -29,13 +41,22 @@ pub trait PageCache: Sync + Send + Debug { self.len() == 0 } - /// Read data from a Page. + /// Read data of a page. + /// + /// # Parameters + /// - `location`: the path of the object. + /// - `page_id`: the ID of the page. /// /// # Returns /// - `Ok(Some(Bytes))` if the page exists and the data was read successfully. /// - `Ok(None)` if the page does not exist. /// - `Err(Error)` if an error occurred. - async fn get(&self, id: [u8; 32]) -> Result>; + async fn get_with( + &self, + location: &Path, + page_id: u64, + loader: impl Future> + Send, + ) -> Result; /// Get range of data in the page. /// @@ -44,8 +65,14 @@ pub trait PageCache: Sync + Send + Debug { /// - `range`: The range of data to read from the page. The range must be within the page size. /// /// # Returns - /// See [Self::get()]. - async fn get_range(&self, id: [u8; 32], range: Range) -> Result>; + /// See [Self::get_with()]. + async fn get_range_with( + &self, + location: &Path, + page_id: u64, + range: Range, + loader: impl Future> + Send, + ) -> Result; /// Put a page in the cache. /// @@ -55,4 +82,7 @@ pub trait PageCache: Sync + Send + Debug { /// If the page is smaller than the page size, the remaining space will be zeroed. /// async fn put(&self, id: [u8; 32], page: Bytes) -> Result<()>; + + /// Remove a page from the cache. + async fn remove(&self, key: [u8; 32]) -> Result<()>; } diff --git a/src/read_through.rs b/src/read_through.rs index 39317d4..00173d3 100644 --- a/src/read_through.rs +++ b/src/read_through.rs @@ -12,12 +12,12 @@ use crate::{paging::PageCache, Result}; /// Read-through Page Cache. /// #[derive(Debug)] -pub struct ReadThroughCache { +pub struct ReadThroughCache { inner: Arc, - cache: Arc, + cache: Arc, } -impl std::fmt::Display for ReadThroughCache { +impl std::fmt::Display for ReadThroughCache { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -27,14 +27,14 @@ impl std::fmt::Display for ReadThroughCache { } } -impl ReadThroughCache { - pub fn new(inner: Arc, cache: Arc) -> Self { +impl ReadThroughCache { + pub fn new(inner: Arc, cache: Arc) -> Self { Self { inner, cache } } } #[async_trait] -impl ObjectStore for ReadThroughCache { +impl ObjectStore for ReadThroughCache { async fn put_opts( &self, _location: &Path, From 9982f08e472295885807e38438e4c4c6d7863933 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 13:21:22 -0800 Subject: [PATCH 10/22] all apis --- src/memory.rs | 22 +++++++++------------- src/paging.rs | 11 +---------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/src/memory.rs b/src/memory.rs index a191ee4..992d071 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -121,7 +121,7 @@ impl PageCache for InMemoryCache { /// How many pages are cached. fn len(&self) -> usize { - todo!() + self.cache.entry_count() as usize } async fn get_with( @@ -158,18 +158,14 @@ impl PageCache for InMemoryCache { Ok(bytes.slice(range)) } - /// Put a page in the cache. - /// - /// # Parameters - /// - `id`: The ID of the page. - /// - `page`: The data to put in the page. The page must not be larger than the page size. - /// If the page is smaller than the page size, the remaining space will be zeroed. - /// - async fn put(&self, id: [u8; 32], page: Bytes) -> Result<()> { - todo!() + async fn invalidate(&self, location: &Path, page_id: u64) -> Result<()> { + let key = to_page_key(location, page_id); + self.cache.remove(&key).await; + Ok(()) } +} - async fn remove(&self, id: [u8; 32]) -> Result<()> { - todo!() - } +#[cfg(test)] +mod tests { + use super::*; } diff --git a/src/paging.rs b/src/paging.rs index 36991da..1f0ac79 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -74,15 +74,6 @@ pub trait PageCache: Sync + Send + Debug { loader: impl Future> + Send, ) -> Result; - /// Put a page in the cache. - /// - /// # Parameters - /// - `id`: The ID of the page. - /// - `page`: The data to put in the page. The page must not be larger than the page size. - /// If the page is smaller than the page size, the remaining space will be zeroed. - /// - async fn put(&self, id: [u8; 32], page: Bytes) -> Result<()>; - /// Remove a page from the cache. - async fn remove(&self, key: [u8; 32]) -> Result<()>; + async fn invalidate(&self, location: &Path, page_id: u64) -> Result<()>; } From df3800907ee438e27ef067c2cad64dd836e89888 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 13:44:05 -0800 Subject: [PATCH 11/22] add test --- Cargo.toml | 3 +++ src/memory.rs | 58 +++++++++++++++++++++++++++++++++++++++++++++++---- src/paging.rs | 2 +- 3 files changed, 58 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3fe0c5a..4addb0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,8 @@ authors = ["dev@lancedb.com"] description = "OCRA: A Rust implementation of Cache in arrow-rs' ObjectStore interface" edition = "2021" license-file = "LICENSE" +keywords = ["cache", "object-store", "arrow"] +categories = ["caching"] [dependencies] async-trait = "0.1" @@ -19,3 +21,4 @@ sysinfo = "0.32" [dev-dependencies] tokio = { version = "1", features = ["full"] } +tempfile = "3.2" diff --git a/src/memory.rs b/src/memory.rs index 992d071..ab84bed 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -91,17 +91,17 @@ impl InMemoryCache { Self::builder(capacity) } - fn with_params(capacity_bytes: usize, page_size: usize, time_to_idle: Duration) -> Self { + fn with_params(capacity: usize, page_size: usize, time_to_idle: Duration) -> Self { let cache = Cache::builder() - .max_capacity(capacity_bytes as u64) + .max_capacity(capacity as u64) // weight each key using the size of the value .weigher(|_key, value: &Bytes| -> u32 { value.len() as u32 }) .time_to_idle(time_to_idle) // .eviction_listener(eviction_listener) .build(); Self { - capacity: capacity_bytes, - page_size: page_size, + capacity, + page_size, cache, } } @@ -168,4 +168,54 @@ impl PageCache for InMemoryCache { #[cfg(test)] mod tests { use super::*; + + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + use object_store::{local::LocalFileSystem, ObjectStore}; + use tempfile::tempdir; + + #[tokio::test] + async fn test_get_range() { + let cache = InMemoryCache::new(1024, 512); + let local_fs = Arc::new(LocalFileSystem::new()); + + let tmp_dir = tempdir().unwrap(); + let file_path = tmp_dir.path().join("test.bin"); + std::fs::write(&file_path, "test data").unwrap(); + let location = Path::from(file_path.as_path().to_str().unwrap()); + + let miss = Arc::new(AtomicUsize::new(0)); + + let data = cache + .get_with(&location, 0, { + let miss = miss.clone(); + let local_fs = local_fs.clone(); + let location = location.clone(); + async move { + miss.fetch_add(1, Ordering::SeqCst); + local_fs.get(&location).await.unwrap().bytes().await + } + }) + .await + .unwrap(); + assert_eq!(miss.load(Ordering::SeqCst), 1); + assert_eq!(data, Bytes::from("test data")); + + let data = cache + .get_with(&location, 0, { + let miss = miss.clone(); + let location = location.clone(); + async move { + miss.fetch_add(1, Ordering::SeqCst); + local_fs.get(&location).await.unwrap().bytes().await + } + }) + .await + .unwrap(); + assert_eq!(miss.load(Ordering::SeqCst), 1); + assert_eq!(data, Bytes::from("test data")); + } } diff --git a/src/paging.rs b/src/paging.rs index 1f0ac79..593d299 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -18,7 +18,7 @@ pub(crate) type PageKey = [u8; 32]; pub(crate) fn to_page_key(location: &Path, offset: u64) -> PageKey { let mut hasher = Sha256::new(); hasher.update(location.as_ref()); - hasher.update(&offset.to_be_bytes()); + hasher.update(offset.to_be_bytes()); hasher.finalize().into() } From 918e25aac0b04b30d349727f153bd9ca7df5a6b9 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:12:51 -0800 Subject: [PATCH 12/22] test evicition --- src/memory.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++------- src/paging.rs | 9 +------ 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/src/memory.rs b/src/memory.rs index ab84bed..374a588 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -44,7 +44,7 @@ pub struct InMemoryCache { /// Size of each page page_size: usize, - /// Page cache: a mapping from `(path id, offset)` to data / bytes. + /// In memory page cache: a mapping from `(path id, offset)` to data / bytes. cache: Cache, } @@ -119,11 +119,6 @@ impl PageCache for InMemoryCache { self.capacity } - /// How many pages are cached. - fn len(&self) -> usize { - self.cache.entry_count() as usize - } - async fn get_with( &self, location: &Path, @@ -169,11 +164,15 @@ impl PageCache for InMemoryCache { mod tests { use super::*; - use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, + use std::{ + io::Write, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; + use bytes::{BufMut, BytesMut}; use object_store::{local::LocalFileSystem, ObjectStore}; use tempfile::tempdir; @@ -218,4 +217,57 @@ mod tests { assert_eq!(miss.load(Ordering::SeqCst), 1); assert_eq!(data, Bytes::from("test data")); } + + #[tokio::test] + async fn test_eviction() { + const PAGE_SIZE: usize = 512; + let cache = InMemoryCache::new(1024, PAGE_SIZE); + let local_fs = Arc::new(LocalFileSystem::new()); + + let tmp_dir = tempdir().unwrap(); + let file_path = tmp_dir.path().join("test.bin"); + { + let mut file = std::fs::File::create(&file_path).unwrap(); + for i in 0_u64..1024 { + file.write_all(&i.to_be_bytes()).unwrap(); + } + } + let location = Path::from(file_path.as_path().to_str().unwrap()); + cache.cache.run_pending_tasks().await; + + let miss = Arc::new(AtomicUsize::new(0)); + + for (page_id, expected_miss, expected_size) in + [(0, 1, 1), (0, 1, 1), (1, 2, 2), (4, 3, 2), (5, 4, 2)].iter() + { + let data = cache + .get_with(&location, *page_id, { + let miss = miss.clone(); + let local_fs = local_fs.clone(); + let location = location.clone(); + async move { + miss.fetch_add(1, Ordering::SeqCst); + local_fs + .get_range( + &location, + PAGE_SIZE * (*page_id as usize)..PAGE_SIZE * (page_id + 1) as usize, + ) + .await + } + }) + .await + .unwrap(); + assert_eq!(miss.load(Ordering::SeqCst), *expected_miss); + assert_eq!(data.len(), PAGE_SIZE); + + cache.cache.run_pending_tasks().await; + assert_eq!(cache.cache.entry_count(), *expected_size); + + let mut buf = BytesMut::with_capacity(PAGE_SIZE); + for i in page_id * PAGE_SIZE as u64 / 8..(page_id + 1) * PAGE_SIZE as u64 / 8 { + buf.put_u64(i); + } + assert_eq!(data, buf); + } + } } diff --git a/src/paging.rs b/src/paging.rs index 593d299..8707197 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -15,6 +15,7 @@ use crate::Result; pub(crate) type PageKey = [u8; 32]; +/// Convert a location and offset to a page key. pub(crate) fn to_page_key(location: &Path, offset: u64) -> PageKey { let mut hasher = Sha256::new(); hasher.update(location.as_ref()); @@ -33,14 +34,6 @@ pub trait PageCache: Sync + Send + Debug { /// Cache capacity, in number of pages. fn capacity(&self) -> usize; - /// How many pages are cached. - fn len(&self) -> usize; - - /// Returns true if the cache is empty. - fn is_empty(&self) -> bool { - self.len() == 0 - } - /// Read data of a page. /// /// # Parameters From 925fc6a2100e59b8625c1142a553325805444bf9 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:20:40 -0800 Subject: [PATCH 13/22] arm does not like me today --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b31d49b..6732f74 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -33,7 +33,7 @@ jobs: matrix: machine: - ubuntu-24.04 - - ubuntu-2404-4x-arm64 + # - ubuntu-2404-4x-arm64 - macos-14 runs-on: ${{ matrix.machine }} steps: From 1f468c9e65e2002a44f24ea5a56a94d5443a8c44 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:22:32 -0800 Subject: [PATCH 14/22] use cache in lint job --- .github/workflows/rust.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6732f74..7308d5b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -24,6 +24,10 @@ jobs: timeout-minutes: 30 steps: - uses: actions/checkout@v4 + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: 1.83 + - uses: Swatinem/rust-cache@v2 - name: Check formatting run: cargo fmt -- --check - name: Check clippy From a5ee3f8301d4441fe6de562a25d2f1c019345278 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:23:56 -0800 Subject: [PATCH 15/22] component --- .github/workflows/rust.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7308d5b..2248f35 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -27,6 +27,7 @@ jobs: - uses: actions-rust-lang/setup-rust-toolchain@v1 with: toolchain: 1.83 + components: rustfmt, clippy - uses: Swatinem/rust-cache@v2 - name: Check formatting run: cargo fmt -- --check From 27eceeb5074d72f4c3fe1caadb9a275f05acb2e5 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:29:31 -0800 Subject: [PATCH 16/22] must use --- src/memory.rs | 7 ++++--- src/memory/builder.rs | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/memory.rs b/src/memory.rs index 374a588..cda1a02 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -1,4 +1,4 @@ -//! In-memory [PageCache] implementation +//! In-memory [`PageCache`] implementation //! //! User can specify the capacity of the cache, or specify //! how much percentage of memory should be allocated to it. @@ -32,7 +32,7 @@ use crate::{ pub const DEFAULT_PAGE_SIZE: usize = 8 * 1024 * 1024; const DEFAULT_TIME_TO_LIVE: Duration = Duration::from_secs(60 * 30); // 30 minutes -/// In-memory [PageCache] implementation. +/// In-memory [`PageCache`] implementation. /// /// This is a LRU mapping of page IDs to page data, with TTL eviction. /// @@ -49,7 +49,7 @@ pub struct InMemoryCache { } impl InMemoryCache { - /// Create a [`Builder`](InMemoryCacheBuilder) to construct [InMemoryCache]. + /// Create a [`Builder`](InMemoryCacheBuilder) to construct [`InMemoryCache`]. /// /// # Parameters: /// - *capacity*: capacity in bytes @@ -63,6 +63,7 @@ impl InMemoryCache { /// .time_to_idle(Duration::from_secs(60)) /// .build(); /// ``` + #[must_use] pub fn builder(capacity_bytes: usize) -> InMemoryCacheBuilder { InMemoryCacheBuilder::new(capacity_bytes) } diff --git a/src/memory/builder.rs b/src/memory/builder.rs index 744665e..1e4b19e 100644 --- a/src/memory/builder.rs +++ b/src/memory/builder.rs @@ -5,7 +5,7 @@ use std::time::Duration; use super::{InMemoryCache, DEFAULT_PAGE_SIZE, DEFAULT_TIME_TO_LIVE}; -/// Builder for [InMemoryCache] +/// Builder for [`InMemoryCache`] pub struct InMemoryCacheBuilder { capacity: usize, page_size: usize, @@ -37,6 +37,7 @@ impl InMemoryCacheBuilder { self } + #[must_use] pub fn build(&self) -> InMemoryCache { InMemoryCache::with_params(self.capacity, self.page_size, self.time_to_idle) } From 0e479861c906c1edab2d8ce087460778370c58fa Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:41:00 -0800 Subject: [PATCH 17/22] doc --- src/lib.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 411ea64..da1c0f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,11 @@ -//! **OCRA**: an *arrow-rs* [**O**bjectStore](object_store::ObjectStore) -//! implementation with **C**ache in **R**ust for **A**ll +//! **OCRA**: (**A**) (**R**)ust (**C**)ache implementation for *arrow-rs* +//! [(**O**)bjectStore](object_store::ObjectStore). +//! +//! It offers a few `ObjectStore` implementations that work with +//! caches. +//! +//! For example, you can use [`ReadThroughCache`] to wrap an existing +//! `ObjectStore` instance with a [`PageCache`](paging::PageCache). //! //! ```no_run //! # use std::sync::Arc; @@ -13,12 +19,12 @@ //! // Use 75% of system memory for cache //! let memory_cache = Arc::new( //! InMemoryCache::with_sys_memory(0.75).build()); -//! let cached: Arc = +//! let cached_store: Arc = //! Arc::new(ReadThroughCache::new(fs, memory_cache)); //! -//! // Now you can use `cached` as a regular ObjectStore +//! // Now you can use `cached_store` as a regular ObjectStore //! let path = Path::from("my-key"); -//! let data = cached.get_range(&path, 1024..2048).await.unwrap(); +//! let data = cached_store.get_range(&path, 1024..2048).await.unwrap(); //! # }) //! ``` From 8c83417d8b41f89ab53f0f5e003ea23514f14882 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:42:54 -0800 Subject: [PATCH 18/22] docs --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5e64ac3..828bd39 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # ocra -OCRA: Object-store Cache in Rust for All + +**OCRA**: (**A**) (**R**)ust (**C**)ache implementation using _arrow-rs_ [(**O**)bjectStore](https://docs.rs/object_store/latest/object_store/) trait. From 4b7fbf7b922554673a4915fdba3cdb7c79476ba8 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 14:45:32 -0800 Subject: [PATCH 19/22] s --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 828bd39..8bd45c0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -# ocra +# OCRA **OCRA**: (**A**) (**R**)ust (**C**)ache implementation using _arrow-rs_ [(**O**)bjectStore](https://docs.rs/object_store/latest/object_store/) trait. From 2f4fe6148e342645925c539c59bba28215acbb41 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 16:47:27 -0800 Subject: [PATCH 20/22] add benchmark --- Cargo.toml | 11 ++++++--- benches/memory.rs | 25 +++++++++++++++++++ src/memory.rs | 63 ++++++++++++++++++++++++++++++++++++----------- src/paging.rs | 17 +++---------- 4 files changed, 84 insertions(+), 32 deletions(-) create mode 100644 benches/memory.rs diff --git a/Cargo.toml b/Cargo.toml index 4addb0c..acc73b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,14 @@ log = "~0.4" moka = { version = "~0.12", features = ["future"] } num_cpus = "1.16" object_store = "~0.11" -sha2 = "~0.10" -sysinfo = "0.32" +sysinfo = "~0.32" +tokio = { version = "1", features = ["sync"] } [dev-dependencies] +criterion = { version = "~0.5", features = ["async_tokio"] } +tempfile = "~3.14" tokio = { version = "1", features = ["full"] } -tempfile = "3.2" + +[[bench]] +name = "memory" +harness = false diff --git a/benches/memory.rs b/benches/memory.rs new file mode 100644 index 0000000..ee279fe --- /dev/null +++ b/benches/memory.rs @@ -0,0 +1,25 @@ +//! Benchmark for in-memory page cache. +//! +use criterion::{criterion_group, criterion_main, Criterion}; + +use ocra::memory::InMemoryCache; + +fn memory_cache_bench(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + const FILE_SiZE: usize = 1024 * 1024 * 1024; + // TODO: support other object store later + let local_fs = object_store::local::LocalFileSystem::new(); + + + c.bench_function("memory_cache,warm", |b| { + b.to_async(&rt).iter(|| async {}); + }); +} + +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10); + targets = memory_cache_bench); + +criterion_main!(benches); diff --git a/src/memory.rs b/src/memory.rs index cda1a02..543701b 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -13,20 +13,24 @@ //! let cache = InMemoryCache::builder(32 * 1024 * 1024 * 1024).build(); //! ``` -use std::{future::Future, ops::Range, time::Duration}; +use std::{ + collections::HashMap, + future::Future, + ops::Range, + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; use bytes::Bytes; use moka::future::Cache; use object_store::path::Path; use sysinfo::{MemoryRefreshKind, RefreshKind}; +use tokio::sync::RwLock; mod builder; pub use self::builder::InMemoryCacheBuilder; -use crate::{ - paging::{to_page_key, PageCache, PageKey}, - Error, Result, -}; +use crate::{paging::PageCache, Error, Result}; /// Default memory page size is 8 MB pub const DEFAULT_PAGE_SIZE: usize = 8 * 1024 * 1024; @@ -45,7 +49,13 @@ pub struct InMemoryCache { page_size: usize, /// In memory page cache: a mapping from `(path id, offset)` to data / bytes. - cache: Cache, + cache: Cache<(u64, u32), Bytes>, + + /// Provide fast lookup of path id + location_lookup: RwLock>, + + /// Next location id to be assigned + next_location_id: AtomicU64, } impl InMemoryCache { @@ -104,8 +114,27 @@ impl InMemoryCache { capacity, page_size, cache, + location_lookup: RwLock::new(HashMap::new()), + next_location_id: AtomicU64::new(0), } } + + async fn location_id(&self, location: &Path) -> u64 { + if let Some(&key) = self.location_lookup.read().await.get(location) { + return key; + } + + let mut id_map = self.location_lookup.write().await; + // on lock-escalation, check if someone else has added it + if let Some(&id) = id_map.get(location) { + return id; + } + + let id = self.next_location_id.fetch_add(1, Ordering::SeqCst); + id_map.insert(location.clone(), id); + + id + } } #[async_trait::async_trait] @@ -123,11 +152,15 @@ impl PageCache for InMemoryCache { async fn get_with( &self, location: &Path, - page_id: u64, + page_id: u32, loader: impl Future> + Send, ) -> Result { - let key = to_page_key(location, page_id); - match self.cache.try_get_with(key, loader).await { + let location_id = self.location_id(location).await; + match self + .cache + .try_get_with((location_id, page_id), loader) + .await + { Ok(bytes) => Ok(bytes), Err(e) => match e.as_ref() { Error::NotFound { .. } => Err(Error::NotFound { @@ -145,7 +178,7 @@ impl PageCache for InMemoryCache { async fn get_range_with( &self, location: &Path, - page_id: u64, + page_id: u32, range: Range, loader: impl Future> + Send, ) -> Result { @@ -154,9 +187,9 @@ impl PageCache for InMemoryCache { Ok(bytes.slice(range)) } - async fn invalidate(&self, location: &Path, page_id: u64) -> Result<()> { - let key = to_page_key(location, page_id); - self.cache.remove(&key).await; + async fn invalidate(&self, location: &Path, page_id: u32) -> Result<()> { + let location_id = self.location_id(location).await; + self.cache.remove(&(location_id, page_id)).await; Ok(()) } } @@ -265,8 +298,8 @@ mod tests { assert_eq!(cache.cache.entry_count(), *expected_size); let mut buf = BytesMut::with_capacity(PAGE_SIZE); - for i in page_id * PAGE_SIZE as u64 / 8..(page_id + 1) * PAGE_SIZE as u64 / 8 { - buf.put_u64(i); + for i in page_id * PAGE_SIZE as u32 / 8..(page_id + 1) * PAGE_SIZE as u32 / 8 { + buf.put_u64(i as u64); } assert_eq!(data, buf); } diff --git a/src/paging.rs b/src/paging.rs index 8707197..7f63322 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -9,20 +9,9 @@ use std::ops::Range; use async_trait::async_trait; use bytes::Bytes; use object_store::path::Path; -use sha2::{Digest, Sha256}; use crate::Result; -pub(crate) type PageKey = [u8; 32]; - -/// Convert a location and offset to a page key. -pub(crate) fn to_page_key(location: &Path, offset: u64) -> PageKey { - let mut hasher = Sha256::new(); - hasher.update(location.as_ref()); - hasher.update(offset.to_be_bytes()); - hasher.finalize().into() -} - /// [PageCache] trait. /// /// Caching fixed-size pages. Each page has a unique ID. @@ -47,7 +36,7 @@ pub trait PageCache: Sync + Send + Debug { async fn get_with( &self, location: &Path, - page_id: u64, + page_id: u32, loader: impl Future> + Send, ) -> Result; @@ -62,11 +51,11 @@ pub trait PageCache: Sync + Send + Debug { async fn get_range_with( &self, location: &Path, - page_id: u64, + page_id: u32, range: Range, loader: impl Future> + Send, ) -> Result; /// Remove a page from the cache. - async fn invalidate(&self, location: &Path, page_id: u64) -> Result<()>; + async fn invalidate(&self, location: &Path, page_id: u32) -> Result<()>; } From 28eada3113400ff4d23b9a2f521b43c83013c27a Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 19:06:06 -0800 Subject: [PATCH 21/22] add benchmark --- Cargo.toml | 1 + benches/memory.rs | 76 ++++++++++++++++++++++++++++++++++++++++++----- src/memory.rs | 14 ++++++++- src/paging.rs | 3 ++ 4 files changed, 85 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index acc73b8..926fe17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "1", features = ["sync"] } criterion = { version = "~0.5", features = ["async_tokio"] } tempfile = "~3.14" tokio = { version = "1", features = ["full"] } +rand = "~0.8" [[bench]] name = "memory" diff --git a/benches/memory.rs b/benches/memory.rs index ee279fe..4cbf7b7 100644 --- a/benches/memory.rs +++ b/benches/memory.rs @@ -1,20 +1,80 @@ //! Benchmark for in-memory page cache. //! +//! + +use std::{fs::File, io::Write, sync::Arc}; + use criterion::{criterion_group, criterion_main, Criterion}; +use object_store::{path::Path, ObjectStore}; +use rand::Rng; -use ocra::memory::InMemoryCache; +use ocra::{memory::InMemoryCache, paging::PageCache}; fn memory_cache_bench(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); + let mut rng = rand::thread_rng(); + + const FILE_SIZE: usize = 1024 * 1024 * 1024; + // TODO: support other object stores later + let store: Arc = Arc::new(object_store::local::LocalFileSystem::new()); + let temp_file = tempfile::NamedTempFile::new().unwrap().into_temp_path(); + { + let mut writer = File::create(temp_file.to_str().unwrap()).unwrap(); + let mut buf = vec![0_u8; 128 * 1024]; + + for _ in 0..FILE_SIZE / (128 * 1024) { + rng.fill(&mut buf[..]); + writer.write_all(&buf).unwrap(); + } + } + + for page_size in &[1024 * 1024, 8 * 1024 * 1024] { + let cache = Arc::new(InMemoryCache::new(FILE_SIZE + 32 * 1024, *page_size)); + let location = Path::from(temp_file.to_str().unwrap()); + + // Warm up the cache + println!("Starting warm up cache with page size: {}", page_size); + rt.block_on(async { + let loc = location.clone(); + for i in 0..FILE_SIZE / page_size { + let data = cache + .get_with(&loc, i as u32, { + let store = store.clone(); + let location = loc.clone(); + async move { + store + .get_range(&location, i * page_size..(i + 1) * page_size) + .await + } + }) + .await + .unwrap(); + assert!(!data.is_empty()); + } + cache.cache.run_pending_tasks().await; + }); - const FILE_SiZE: usize = 1024 * 1024 * 1024; - // TODO: support other object store later - let local_fs = object_store::local::LocalFileSystem::new(); - + c.bench_function( + format!("memory_cache,warm,page_size={}", page_size).as_str(), + |b| { + b.to_async(&rt).iter(|| { + let mut rng = rand::thread_rng(); + let cache = cache.clone(); + let loc = location.clone(); + async move { + let page_id = rng.gen_range(0..FILE_SIZE / page_size); - c.bench_function("memory_cache,warm", |b| { - b.to_async(&rt).iter(|| async {}); - }); + let _data = cache + .get_with(&loc, page_id as u32, async { + panic!("Should not be called page_id={}", page_id) + }) + .await + .unwrap(); + } + }) + }, + ); + } } criterion_group!( diff --git a/src/memory.rs b/src/memory.rs index 543701b..1634947 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -49,7 +49,7 @@ pub struct InMemoryCache { page_size: usize, /// In memory page cache: a mapping from `(path id, offset)` to data / bytes. - cache: Cache<(u64, u32), Bytes>, + pub cache: Cache<(u64, u32), Bytes>, /// Provide fast lookup of path id location_lookup: RwLock>, @@ -149,6 +149,18 @@ impl PageCache for InMemoryCache { self.capacity } + fn size(&self) -> usize { + self.cache.weighted_size() as usize + } + + async fn get(&self, location: &Path, page_id: u32) -> Result> { + let location_id = self.location_id(location).await; + match self.cache.get(&(location_id, page_id)).await { + Some(bytes) => Ok(Some(bytes)), + None => Ok(None), + } + } + async fn get_with( &self, location: &Path, diff --git a/src/paging.rs b/src/paging.rs index 7f63322..b6ecdf9 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -23,6 +23,9 @@ pub trait PageCache: Sync + Send + Debug { /// Cache capacity, in number of pages. fn capacity(&self) -> usize; + fn size(&self) -> usize; + + async fn get(&self, location: &Path, page_id: u32) -> Result>; /// Read data of a page. /// /// # Parameters From a9aacb2bc38c5c7b33aa7ac1e331beccfb4f1c4a Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sun, 1 Dec 2024 19:07:36 -0800 Subject: [PATCH 22/22] add benchmark --- benches/memory.rs | 2 +- src/memory.rs | 10 +--------- src/paging.rs | 2 +- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/benches/memory.rs b/benches/memory.rs index 4cbf7b7..e0074bd 100644 --- a/benches/memory.rs +++ b/benches/memory.rs @@ -51,8 +51,8 @@ fn memory_cache_bench(c: &mut Criterion) { .unwrap(); assert!(!data.is_empty()); } - cache.cache.run_pending_tasks().await; }); + println!("Warm up cache done"); c.bench_function( format!("memory_cache,warm,page_size={}", page_size).as_str(), diff --git a/src/memory.rs b/src/memory.rs index 1634947..d65056b 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -49,7 +49,7 @@ pub struct InMemoryCache { page_size: usize, /// In memory page cache: a mapping from `(path id, offset)` to data / bytes. - pub cache: Cache<(u64, u32), Bytes>, + cache: Cache<(u64, u32), Bytes>, /// Provide fast lookup of path id location_lookup: RwLock>, @@ -153,14 +153,6 @@ impl PageCache for InMemoryCache { self.cache.weighted_size() as usize } - async fn get(&self, location: &Path, page_id: u32) -> Result> { - let location_id = self.location_id(location).await; - match self.cache.get(&(location_id, page_id)).await { - Some(bytes) => Ok(Some(bytes)), - None => Ok(None), - } - } - async fn get_with( &self, location: &Path, diff --git a/src/paging.rs b/src/paging.rs index b6ecdf9..9edea2b 100644 --- a/src/paging.rs +++ b/src/paging.rs @@ -23,9 +23,9 @@ pub trait PageCache: Sync + Send + Debug { /// Cache capacity, in number of pages. fn capacity(&self) -> usize; + /// Total used cache size in bytes. fn size(&self) -> usize; - async fn get(&self, location: &Path, page_id: u32) -> Result>; /// Read data of a page. /// /// # Parameters