Skip to content

Commit

Permalink
Merge pull request #515 from splitgraph/best-effort-chunk-coalescing
Browse files Browse the repository at this point in the history
Best effort chunk coalescing implementation
  • Loading branch information
gruuya authored Apr 17, 2024
2 parents 7d2289b + c480358 commit 64f7356
Showing 1 changed file with 156 additions and 127 deletions.
283 changes: 156 additions & 127 deletions src/object_store/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/// with some additions to weigh it by the file size.
use crate::config::schema::str_to_hex_hash;
use async_trait::async_trait;
use bytes::{Buf, BufMut, Bytes};
use bytes::{Buf, Bytes, BytesMut};
use futures::stream::BoxStream;
use moka::future::{Cache, CacheBuilder, FutureExt};
use moka::notification::RemovalCause;
Expand All @@ -18,7 +18,6 @@ use std::fmt::{Debug, Formatter};

use std::fs::remove_dir_all;
use std::io;
use std::io::ErrorKind;
use std::ops::Range;
use std::path::{Path, PathBuf};

Expand All @@ -41,20 +40,14 @@ impl CacheFileManager {
Self { base_path }
}

async fn write_file(
&self,
cache_key: &CacheKey,
data: Arc<Bytes>,
) -> io::Result<PathBuf> {
async fn write_file(&self, cache_key: &CacheKey, data: Bytes) -> io::Result<PathBuf> {
let mut path = self.base_path.to_path_buf();
path.push(cache_key.as_filename());

// Should this happen normally?
// TODO: when does this happen?
if path.exists() {
return Err(io::Error::new(
ErrorKind::Other,
"Internal error: cached file path already exists",
));
debug!("{cache_key:?} file already exists, skipping write");
return Ok(path.clone());
}

tokio::fs::write(&path, data.as_ref()).await?;
Expand Down Expand Up @@ -88,12 +81,18 @@ impl Drop for CacheFileManager {
}
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Clone, Hash, Eq, PartialEq)]
pub struct CacheKey {
path: object_store::path::Path,
range: Range<usize>,
}

impl Debug for CacheKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{:?}", self.path, self.range)
}
}

impl CacheKey {
fn as_filename(&self) -> String {
format!(
Expand All @@ -108,7 +107,7 @@ impl CacheKey {
#[derive(Clone)]
pub enum CacheValue {
File(PathBuf, usize),
Memory(Arc<Bytes>),
Memory(Bytes),
}

impl CacheValue {
Expand Down Expand Up @@ -180,7 +179,6 @@ impl CachingObjectStore {
// Clone the pointer since we can't pass the whole struct to the cache
let eviction_file_manager = file_manager.clone();

// TODO: experiment with time-to-idle
let cache: Cache<CacheKey, CacheValue> = CacheBuilder::new(max_cache_size)
.weigher(|_, v: &CacheValue| v.size() as u32)
.async_eviction_listener(move |k, v, cause| {
Expand Down Expand Up @@ -217,88 +215,145 @@ impl CachingObjectStore {
}
}

/// Get a certain range chunk, delineated in units of self.min_fetch_size. If the chunk
/// is cached, return it directly. Otherwise, fetch and return it.
/// Get a continuous range of chunks, each delineated in units of self.min_fetch_size. The main
/// goal is to coalesce fetching of any missing chunks in batches of maximum range to minimize
// the outgoing calls needed to satisfy the incoming call.
///
/// There is some nuance in the fetching process worth elaborating on. Namely, when there is a
/// cache miss:
/// - All concurrent calls for the same missing chunk are coalesced, thanks to `Cache::try_get_with`,
/// and so there is only one outbound request for the data.
/// - As soon as the data is fetched we:
/// a) spawn a task to persist the data to the disk
/// b) add a cache entry with the in-memory data (which is shared by the write task)
/// - Subsequently, all waiting calls are unblocked and will get the new cache value that is
/// either read from disk (if the write task finished quickly enough), or from memory (if the
/// write task is still running).
/// The algorithm works as follows:
/// - If a chunk is missing from the cache add it to a batch of pending chunks to fetch at once.
/// - If the chunk is present in the cache fetch the entire pending chunk batch (if any) and
/// a) add a cache entry with the in-memory data (which is shared by the write task)
/// b) spawn a task to persist the data to the disk
/// - All subsequent calls will get the new cache value that is either read from disk (if the
/// write task finished quickly enough), or from memory (if the write task is still running).
/// - Once the write task completes it will either replace the cache value with a file pointer,
/// (if it completed successfully), or invalidate the memory entry (if it didn't).
async fn get_chunk(
///
/// NB: This is a best-effort implementation, i.e. there are no synchronization primitives used
/// so there is no guarantee that another thread won't duplicate some of the requests.
async fn get_chunk_range(
&self,
path: &object_store::path::Path,
chunk: u32,
) -> Result<Bytes, object_store::Error> {
let range = (chunk as usize * self.min_fetch_size as usize)
..((chunk + 1) as usize * self.min_fetch_size as usize);

let key = CacheKey {
path: path.to_owned(),
range: range.clone(),
};

let value = self
.cache
.try_get_with::<_, object_store::Error>(key.clone(), async move {
// The Arc here is solely to avoid copying the data into the closure below, as the
// writing can be done through a reference as well.
debug!("Fetching data for {key:?}");
let data = Arc::new(self.inner.get_range(path, range).await?);

// Run the blocking write + cache value insert in a separate task
let cache = self.cache.clone();
let file_manager = self.file_manager.clone();
let size = data.len();
let data_to_write = data.clone();
tokio::task::spawn(async move {
match file_manager.write_file(&key, data_to_write).await {
Ok(path) => {
// Write task completed successfully, replace the in-memory cache entry
// with the file-pointer one.
debug!("Upserting file pointer for {key:?} into the cache");
let value = CacheValue::File(path, size);
cache.insert(key, value).await;
location: &object_store::path::Path,
start_chunk: usize,
end_chunk: usize,
) -> object_store::Result<Bytes> {
let mut result = BytesMut::with_capacity(
(end_chunk.saturating_sub(start_chunk) + 1) * self.min_fetch_size as usize,
);

let mut chunk_batch = vec![];
for chunk in start_chunk..=end_chunk {
let chunk_range = (chunk * self.min_fetch_size as usize)
..((chunk + 1) * self.min_fetch_size as usize);

let key = CacheKey {
path: location.to_owned(),
range: chunk_range.clone(),
};

let chunk_data = match self.cache.get(&key).await {
// If the value is missing extend the chunk range to fetch and continue
None => {
chunk_batch.push(key);
None
}
Some(value) => {
// Now get the cache value for the current chunk
match value {
CacheValue::Memory(data) => {
debug!("Cache value for {key:?} fetched from memory");
Some(data.clone())
}
Err(err) => {
// Write task failed, remove the cache entry; we could also defer that to
// TTL/LRU eviction, but then we risk ballooning the memory usage.
warn!("Failed writing value for {key:?} to a file: {err}");
warn!("Removing cache entry");
cache.invalidate(&key).await;
CacheValue::File(path, _) => {
debug!("Cache value for {key:?} fetching from the file");
match self.file_manager.read_file(path).await {
Ok(data) => Some(data),
Err(err) => {
warn!(
"Re-downloading cache value for {key:?}: {err}"
);
let data = self
.inner
.get_range(location, chunk_range.clone())
.await?;
self.cache_chunk_data(key, data.clone()).await;
Some(data)
}
}
}
}
});
}
};

// While the write task runs cache the in-memory bytes and serve that to all calls
// prior to the write task completing.
debug!("Caching value for ({path:?}, {chunk}) in memory");
Ok(CacheValue::Memory(data))
})
.await
.map_err(|e| object_store::Error::Generic {
store: "cache_store",
source: Box::new(e),
})?;

match value {
CacheValue::File(path, _) => {
debug!("Cache value for ({path:?}, {chunk}) fetched from the file");
let data = self.file_manager.read_file(path).await.unwrap();
Ok(data)
if (chunk_data.is_some() || chunk == end_chunk) && !chunk_batch.is_empty() {
// We either got a value, or are at the last chunk, so first we need to resolve any
// outstanding coalesced chunk requests thus far.
let first = chunk_batch.first().unwrap();
let last = chunk_batch.last().unwrap();

let batch_range = first.range.start..last.range.end;
debug!("{location}-{batch_range:?} fetching");
let mut batch_data =
self.inner.get_range(location, batch_range.clone()).await?;
debug!("{location}-{batch_range:?} fetched");
result.extend_from_slice(&batch_data);

for key in &chunk_batch {
// Split the next chunk from the batch
let data = if batch_data.len() < self.min_fetch_size as usize {
batch_data.clone()
} else {
batch_data.split_to(self.min_fetch_size as usize)
};

self.cache_chunk_data(key.clone(), data).await;
}

chunk_batch = vec![];
}
CacheValue::Memory(data) => {
debug!("Cache value for ({path:?}, {chunk}) fetched from memory");
Ok(data.as_ref().clone())

// Finally append the current chunk data (if not included in the batch above).
if let Some(data) = chunk_data {
result.extend_from_slice(&data);
}
}

Ok(result.into())
}

async fn cache_chunk_data(&self, key: CacheKey, data: Bytes) {
// Cache the memory value
let entry = self
.cache
.entry_by_ref(&key)
.or_insert(CacheValue::Memory(data.clone()))
.await;

// Finally trigger persisting to disk
if entry.is_fresh() {
let cache = self.cache.clone();
let file_manager = self.file_manager.clone();
tokio::spawn(async move {
// Run pending tasks to avert eviction races.
cache.run_pending_tasks().await;
let size = data.len();
match file_manager.write_file(&key, data).await {
Ok(path) => {
// Write task completed successfully, replace the in-memory cache entry
// with the file-pointer one.
debug!("Upserting file pointer for {key:?} into the cache");
let value = CacheValue::File(path, size);
cache.insert(key, value).await;
}
Err(err) => {
// Write task failed, remove the cache entry; we could also defer that to
// TTL/LRU eviction, but then we risk ballooning the memory usage.
warn!("Invalidating cache entry for {key:?}; failed writing to a file: {err}");
cache.invalidate(&key).await;
}
}
});
}
}
}

Expand Down Expand Up @@ -362,50 +417,24 @@ impl ObjectStore for CachingObjectStore {
location: &object_store::path::Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
debug!("{location}-{range:?} get_range");
// Expand the range to the next max_fetch_size (+ alignment)
let start_chunk = range.start / self.min_fetch_size as usize;
// The final chunk to fetch (inclusively). E.g. with min_fetch_size = 16:
// - range.end == 64 (get bytes 0..63 inclusive) -> final chunk is 63 / 16 = 3 (48..64 exclusive)
// - range.end == 65 (get bytes 0..64 exclusive) -> final chunk is 64 / 16 = 4 (64..72 exclusive)
let end_chunk = (range.end - 1) / self.min_fetch_size as usize;

let mut result = Vec::with_capacity(range.end - range.start);

for chunk_num in start_chunk..(end_chunk + 1) {
let mut data = self.get_chunk(location, chunk_num as u32).await?;
let data_len = data.len();

let buf_start = if chunk_num == start_chunk {
let buf_start = range.start % self.min_fetch_size as usize;
data.advance(buf_start);
buf_start
} else {
0usize
};
let mut data = self
.get_chunk_range(location, start_chunk, end_chunk)
.await?;

let buf_end = if chunk_num == end_chunk {
let buf_end = range.end % self.min_fetch_size as usize;

// if min_fetch_size = 16 and buf_end = 64, we want to load everything
// from the final buffer, instead of 0.
if buf_end != 0 {
buf_end
} else {
self.min_fetch_size as usize
}
} else {
data_len
};

debug!(
"Read {} bytes from the buffer for chunk {}, slicing out {}..{}",
data_len, chunk_num, buf_start, buf_end
);

result.put(data.take(buf_end - buf_start));
}

Ok(result.into())
// Finally trim away the expanded range from the chunks that are outside the requested range
let offset = range.start - start_chunk * self.min_fetch_size as usize;
data.advance(offset);
data.truncate(range.end - range.start);
debug!("{location}-{range:?} return");
Ok(data)
}

async fn head(
Expand Down Expand Up @@ -614,8 +643,8 @@ mod tests {
assert_eq!(bytes, body[25..64]);
store.cache.run_pending_tasks().await;

// Mock has had 3 requests
assert_eq!(server.received_requests().await.unwrap().len(), 3);
// Mock has had 1 request that coalesced 3 chunks
assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert_eq!(store.cache.entry_count(), 3);

let on_disk_keys = wait_all_ranges_on_disk(HashSet::new(), &store).await;
Expand All @@ -630,7 +659,7 @@ mod tests {
store.cache.run_pending_tasks().await;

// No extra requests
assert_eq!(server.received_requests().await.unwrap().len(), 3);
assert_eq!(server.received_requests().await.unwrap().len(), 1);
assert_eq!(store.cache.entry_count(), 3);
assert_ranges_in_cache(&store.base_path, &url, vec![1, 2, 3]);

Expand All @@ -643,7 +672,7 @@ mod tests {
store.cache.run_pending_tasks().await;

// One extra request to fetch chunk 4
assert_eq!(server.received_requests().await.unwrap().len(), 4);
assert_eq!(server.received_requests().await.unwrap().len(), 2);
assert_eq!(store.cache.entry_count(), 4);

let mut on_disk_keys = wait_all_ranges_on_disk(on_disk_keys, &store).await;
Expand All @@ -657,7 +686,7 @@ mod tests {
assert_eq!(bytes, body[80..85]);
store.cache.run_pending_tasks().await;

assert_eq!(server.received_requests().await.unwrap().len(), 5);
assert_eq!(server.received_requests().await.unwrap().len(), 3);
assert_eq!(store.cache.entry_count(), 4);

on_disk_keys.retain(|k| k.range.start >= 32); // The first chunk got LRU-evicted
Expand Down

0 comments on commit 64f7356

Please sign in to comment.