Skip to content

Commit

Permalink
Merge pull request #508 from splitgraph/clade-store-cache
Browse files Browse the repository at this point in the history
Add caching on top of clade-provided object stores too
  • Loading branch information
gruuya authored Mar 11, 2024
2 parents 71306b2 + 4ecaf2a commit af67f65
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 70 deletions.
31 changes: 27 additions & 4 deletions src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::catalog::{
CatalogError, CatalogResult, CatalogStore, CreateFunctionError, FunctionStore,
SchemaStore, TableStore,
};
use crate::config::schema::ObjectCacheProperties;
use crate::object_store::wrapped::InternalObjectStore;
use crate::provider::{SeafowlDatabase, SeafowlFunction, SeafowlSchema};
use crate::repository::interface::{AllDatabaseFunctionsResult, Repository};
Expand Down Expand Up @@ -38,6 +39,7 @@ pub struct Metastore {
pub functions: Arc<dyn FunctionStore>,
staging_schema: Arc<MemorySchemaProvider>,
default_store: Arc<InternalObjectStore>,
object_store_cache: Option<ObjectCacheProperties>,
}

impl Metastore {
Expand All @@ -55,6 +57,7 @@ impl Metastore {
functions: repository_store,
staging_schema,
default_store,
object_store_cache: None,
}
}

Expand All @@ -70,9 +73,18 @@ impl Metastore {
functions: external_store,
staging_schema,
default_store,
object_store_cache: None,
}
}

pub fn with_object_store_cache(
mut self,
object_store_cache: Option<ObjectCacheProperties>,
) -> Self {
self.object_store_cache = object_store_cache;
self
}

pub async fn build_catalog(
&self,
catalog_name: &str,
Expand Down Expand Up @@ -147,11 +159,22 @@ impl Metastore {
let table_log_store = match table.location {
// Use the provided customized location
Some(location) => {
let store = stores.get(&location).ok_or(CatalogError::Generic {
reason: format!("Object store for location {location} not found"),
})?;
let mut store = stores
.get(&location)
.ok_or(CatalogError::Generic {
reason: format!("Object store for location {location} not found"),
})?
.clone();

if (location.starts_with("file://") || location.starts_with("memory://"))
&& let Some(ref cache) = self.object_store_cache
{
// Wrap the non-local store with the caching layer
store = cache.wrap_store(store);
}

let prefixed_store: PrefixStore<Arc<dyn ObjectStore>> =
PrefixStore::new(store.clone(), &*table.path);
PrefixStore::new(store, &*table.path);

let url = Url::parse(&format!("{location}/{}", table.path))?;

Expand Down
13 changes: 8 additions & 5 deletions src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use object_store::aws::AmazonS3Builder;
use object_store::gcp::GoogleCloudStorageBuilder;
use url::Url;

use super::schema::{self, GCS, MEBIBYTES, MEMORY_FRACTION, S3};
use super::schema::{self, ObjectCacheProperties, GCS, MEBIBYTES, MEMORY_FRACTION, S3};

#[cfg(feature = "metrics")]
pub const HTTP_REQUESTS: &str = "http_requests";
Expand Down Expand Up @@ -78,7 +78,9 @@ async fn build_metastore(
.await
.expect("Error setting up remote store"),
);
return Metastore::new_from_external(external, object_store);

return Metastore::new_from_external(external, object_store)
.with_object_store_cache(config.misc.object_store_cache.clone());
}
};

Expand All @@ -87,6 +89,7 @@ async fn build_metastore(

pub fn build_object_store(
object_store_cfg: &schema::ObjectStore,
cache_properties: &Option<ObjectCacheProperties>,
) -> Result<Arc<dyn ObjectStore>> {
Ok(match &object_store_cfg {
schema::ObjectStore::Local(schema::Local { data_dir }) => {
Expand All @@ -100,7 +103,6 @@ pub fn build_object_store(
secret_access_key,
endpoint,
bucket,
cache_properties,
..
}) => {
let mut builder = AmazonS3Builder::new()
Expand Down Expand Up @@ -134,7 +136,6 @@ pub fn build_object_store(
schema::ObjectStore::GCS(GCS {
bucket,
google_application_credentials,
cache_properties,
..
}) => {
let gcs_builder: GoogleCloudStorageBuilder =
Expand Down Expand Up @@ -212,7 +213,8 @@ pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext>

// We're guaranteed by config validation that this is Some
let object_store_cfg = cfg.object_store.clone().unwrap();
let object_store = build_object_store(&object_store_cfg)?;
let object_store =
build_object_store(&object_store_cfg, &cfg.misc.object_store_cache)?;
let internal_object_store = Arc::new(InternalObjectStore::new(
object_store.clone(),
object_store_cfg,
Expand Down Expand Up @@ -305,6 +307,7 @@ mod tests {
gc_interval: 0,
ssl_cert_file: None,
metrics: None,
object_store_cache: None,
},
};

Expand Down
102 changes: 47 additions & 55 deletions src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ pub struct S3 {
pub endpoint: Option<String>,
pub bucket: String,
pub prefix: Option<String>,
pub cache_properties: Option<ObjectCacheProperties>,
}

impl S3 {
Expand All @@ -146,7 +145,6 @@ impl S3 {
endpoint: map.remove("endpoint"),
bucket,
prefix: None,
cache_properties: Some(ObjectCacheProperties::default()),
})
}
}
Expand All @@ -156,7 +154,6 @@ pub struct GCS {
pub bucket: String,
pub prefix: Option<String>,
pub google_application_credentials: Option<String>,
pub cache_properties: Option<ObjectCacheProperties>,
}

impl GCS {
Expand All @@ -168,44 +165,10 @@ impl GCS {
bucket,
prefix: None,
google_application_credentials: map.remove("google_application_credentials"),
cache_properties: Some(ObjectCacheProperties::default()),
}
}
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(default)]
pub struct ObjectCacheProperties {
pub capacity: u64,
pub min_fetch_size: u64,
pub ttl_s: u64, // We could use humantime_serde crate to parse directly into `std::time::Duration`
}

impl Default for ObjectCacheProperties {
fn default() -> Self {
Self {
capacity: DEFAULT_CACHE_CAPACITY,
min_fetch_size: DEFAULT_MIN_FETCH_SIZE,
ttl_s: DEFAULT_CACHE_ENTRY_TTL.as_secs(),
}
}
}

impl ObjectCacheProperties {
pub fn wrap_store(&self, inner: Arc<DynObjectStore>) -> Arc<DynObjectStore> {
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.into_path();

Arc::new(CachingObjectStore::new(
inner,
&path,
self.min_fetch_size,
self.capacity,
Duration::from_secs(self.ttl_s),
))
}
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Catalog {
Expand Down Expand Up @@ -384,6 +347,7 @@ pub struct Misc {
pub ssl_cert_file: Option<String>,
#[cfg(feature = "metrics")]
pub metrics: Option<Metrics>,
pub object_store_cache: Option<ObjectCacheProperties>,
}

impl Default for Misc {
Expand All @@ -394,6 +358,7 @@ impl Default for Misc {
ssl_cert_file: None,
#[cfg(feature = "metrics")]
metrics: None,
object_store_cache: None,
}
}
}
Expand All @@ -416,6 +381,39 @@ impl Default for Metrics {
}
}

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(default)]
pub struct ObjectCacheProperties {
pub capacity: u64,
pub min_fetch_size: u64,
pub ttl: u64,
}

impl Default for ObjectCacheProperties {
fn default() -> Self {
Self {
capacity: DEFAULT_CACHE_CAPACITY,
min_fetch_size: DEFAULT_MIN_FETCH_SIZE,
ttl: DEFAULT_CACHE_ENTRY_TTL.as_secs(),
}
}
}

impl ObjectCacheProperties {
pub fn wrap_store(&self, inner: Arc<DynObjectStore>) -> Arc<DynObjectStore> {
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.into_path();

Arc::new(CachingObjectStore::new(
inner,
&path,
self.min_fetch_size,
self.capacity,
Duration::from_secs(self.ttl),
))
}
}

#[derive(Default, Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(default)]
pub struct Runtime {
Expand Down Expand Up @@ -453,6 +451,10 @@ pub fn validate_config(mut config: SeafowlConfig) -> Result<SeafowlConfig, Confi
"You are trying to connect to a GCS bucket without providing credentials.
If Seafowl is running on GCP a token should be fetched using the GCP metadata endpoint."
),
Some(ObjectStore::Local(_))
| Some(ObjectStore::InMemory(_)) if config.misc.object_store_cache.is_some() => warn!(
"The provided caching properties take no effect on local and in-memory object stores"
),
None if !matches!(config.catalog, Catalog::Clade(Clade { dsn: _ })) => return Err(ConfigError::Message(
"Cannot omit the object_store section unless Clade catalog is configured"
.to_string(),
Expand Down Expand Up @@ -549,13 +551,13 @@ secret_access_key = "ABC..."
endpoint = "https://s3.amazonaws.com:9000"
bucket = "seafowl"
[object_store.cache_properties]
min_fetch_size = 4096
ttl_s = 10
[catalog]
type = "postgres"
dsn = "postgresql://user:pass@localhost:5432/somedb"
[misc.object_store_cache]
min_fetch_size = 4096
ttl = 10
"#;

const TEST_CONFIG_BASIC: &str = r#"
Expand Down Expand Up @@ -628,26 +630,15 @@ cache_control = "private, max-age=86400"
#[case::basic_s3(TEST_CONFIG_S3, None)]
#[case::basic_s3_with_cache(
TEST_CONFIG_S3_WITH_CACHE,
Some(ObjectCacheProperties{ capacity: DEFAULT_CACHE_CAPACITY, min_fetch_size: 4096, ttl_s: 10 }))
Some(ObjectCacheProperties{ capacity: DEFAULT_CACHE_CAPACITY, min_fetch_size: 4096, ttl: 10 }))
]
fn test_parse_config_with_s3(
#[case] config_str: &str,
#[case] cache_props: Option<ObjectCacheProperties>,
) {
let config = load_config_from_string(config_str, false, None).unwrap();

assert_eq!(
config.object_store,
Some(ObjectStore::S3(S3 {
region: None,
access_key_id: Some("AKI...".to_string()),
secret_access_key: Some("ABC...".to_string()),
endpoint: Some("https://s3.amazonaws.com:9000".to_string()),
bucket: "seafowl".to_string(),
prefix: None,
cache_properties: cache_props,
}))
);
assert_eq!(config.misc.object_store_cache, cache_props);
}

#[test]
Expand All @@ -663,7 +654,6 @@ cache_control = "private, max-age=86400"
endpoint: Some("https://s3.amazonaws.com:9000".to_string()),
bucket: "seafowl".to_string(),
prefix: None,
cache_properties: None,
}))
);
}
Expand Down Expand Up @@ -716,6 +706,7 @@ cache_control = "private, max-age=86400"
gc_interval: 0,
ssl_cert_file: None,
metrics: None,
object_store_cache: None,
},
}
)
Expand Down Expand Up @@ -814,6 +805,7 @@ cache_control = "private, max-age=86400"
gc_interval: 0,
ssl_cert_file: None,
metrics: None,
object_store_cache: None,
},
}
)
Expand Down
5 changes: 4 additions & 1 deletion src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ impl SeafowlContext {
schema::ObjectStore::GCS(gcs)
};

let object_store = build_object_store(&config)?;
let object_store = build_object_store(
&config,
&self.config.misc.object_store_cache,
)?;
self.inner
.runtime_env()
.register_object_store(url, object_store);
Expand Down
3 changes: 1 addition & 2 deletions src/object_store/wrapped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,10 @@ mod tests {
bucket: bucket.to_string(),
prefix: prefix.map(|p| p.to_string()),
endpoint: endpoint.clone(),
cache_properties: None,
});
// In principle for this test we could use any object store since we only exercise the
// prefix/log store uri logic
let inner_store = build_object_store(&config)?;
let inner_store = build_object_store(&config, &None)?;

let store = InternalObjectStore::new(inner_store, config);

Expand Down
7 changes: 4 additions & 3 deletions tests/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ secret_access_key = "minioadmin"
endpoint = "http://127.0.0.1:9000"
bucket = "seafowl-test-bucket"
{}
[object_store.cache_properties]
ttl = 30
"#,
if let Some(path) = path {
format!("prefix = \"{path}\"")
Expand Down Expand Up @@ -122,7 +120,10 @@ google_application_credentials = "{}"
[catalog]
type = "postgres"
dsn = "{dsn}"
schema = "{schema}""#
schema = "{schema}"
[misc.object_store_cache]
ttl = 30"#
);

// Ignore the "in-memory object store / persistent catalog" error in e2e tests (we'll discard
Expand Down

0 comments on commit af67f65

Please sign in to comment.