Skip to content

Commit

Permalink
Merge pull request #506 from splitgraph/optional-object-store
Browse files Browse the repository at this point in the history
Make `object_store` confgi section optional
  • Loading branch information
gruuya authored Mar 7, 2024
2 parents dd6f5f2 + 4702e6a commit 9c191f6
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 47 deletions.
8 changes: 5 additions & 3 deletions src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,12 @@ pub async fn build_context(cfg: schema::SeafowlConfig) -> Result<SeafowlContext>
let state = build_state_with_table_factories(session_config, Arc::new(runtime_env));
let context = SessionContext::new_with_state(state);

let object_store = build_object_store(&cfg.object_store)?;
// We're guaranteed by config validation that this is Some
let object_store_cfg = cfg.object_store.clone().unwrap();
let object_store = build_object_store(&object_store_cfg)?;
let internal_object_store = Arc::new(InternalObjectStore::new(
object_store.clone(),
cfg.object_store.clone(),
object_store_cfg,
));

// Register the HTTP object store for external tables
Expand Down Expand Up @@ -271,7 +273,7 @@ mod tests {
#[tokio::test]
async fn test_config_to_context() {
let config = schema::SeafowlConfig {
object_store: schema::ObjectStore::InMemory(schema::InMemory {}),
object_store: Some(schema::ObjectStore::InMemory(schema::InMemory {})),
catalog: schema::Catalog::Sqlite(schema::Sqlite {
dsn: "sqlite::memory:".to_string(),
journal_mode: SqliteJournalMode::Wal,
Expand Down
83 changes: 51 additions & 32 deletions src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub const MEBIBYTES: u64 = 1024 * 1024;

#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct SeafowlConfig {
pub object_store: ObjectStore,
pub object_store: Option<ObjectStore>,
pub catalog: Catalog,
#[serde(default)]
pub frontend: Frontend,
Expand Down Expand Up @@ -423,10 +423,10 @@ pub struct Runtime {
pub temp_dir: Option<PathBuf>,
}

pub fn validate_config(config: SeafowlConfig) -> Result<SeafowlConfig, ConfigError> {
pub fn validate_config(mut config: SeafowlConfig) -> Result<SeafowlConfig, ConfigError> {
let in_memory_catalog = matches!(config.catalog, Catalog::Sqlite(Sqlite { ref dsn, journal_mode: _, read_only: _ }) if dsn.contains(":memory:"));

let in_memory_object_store = matches!(config.object_store, ObjectStore::InMemory(_));
let in_memory_object_store =
matches!(config.object_store, Some(ObjectStore::InMemory(_)));

if in_memory_catalog ^ in_memory_object_store {
return Err(ConfigError::Message(
Expand All @@ -435,30 +435,32 @@ pub fn validate_config(config: SeafowlConfig) -> Result<SeafowlConfig, ConfigErr
if the process is restarted."
.to_string(),
));
};
}

if let ObjectStore::S3(S3 {
region: None,
endpoint: None,
..
}) = config.object_store
{
return Err(ConfigError::Message(
match config.object_store {
Some(ObjectStore::S3(S3 {
region: None,
endpoint: None,
..
})) => return Err(ConfigError::Message(
"You need to supply either the region or the endpoint of the S3 object store."
.to_string(),
));
}

if let ObjectStore::GCS(GCS {
google_application_credentials: None,
..
}) = config.object_store
{
warn!(
)),
Some(ObjectStore::GCS(GCS {
google_application_credentials: None,
..
})) => warn!(
"You are trying to connect to a GCS bucket without providing credentials.
If Seafowl is running on GCP a token should be fetched using the GCP metadata endpoint."
)
}
),
None if !matches!(config.catalog, Catalog::Clade(Clade { dsn: _ })) => return Err(ConfigError::Message(
"Cannot omit the object_store section unless Clade catalog is configured"
.to_string(),
)),
// When no object_store section present, default to using in-memory one internally
None if matches!(config.catalog, Catalog::Clade(Clade { dsn: _ })) => config.object_store = Some(ObjectStore::InMemory(InMemory {})),
_ => {}
};

if let Some(max_memory) = config.runtime.max_memory {
if max_memory < MIN_MEMORY {
Expand Down Expand Up @@ -506,7 +508,7 @@ pub fn load_config_from_string(
mod tests {
use super::{
build_default_config, load_config_from_string, AccessSettings, Catalog, Frontend,
HttpFrontend, Local, ObjectStore, Postgres, Runtime, SeafowlConfig, S3,
HttpFrontend, InMemory, Local, ObjectStore, Postgres, Runtime, SeafowlConfig, S3,
};
use crate::config::schema::{Misc, ObjectCacheProperties, Sqlite};
use crate::object_store::cache::DEFAULT_CACHE_CAPACITY;
Expand Down Expand Up @@ -615,6 +617,12 @@ cache_control = "private, max-age=86400"
type = "postgres"
dsn = "postgresql://user:pass@localhost:5432/somedb""#;

// Valid config: no object store with a Clade catalog
const TEST_CONFIG_VALID_CLADE: &str = r#"
[catalog]
type = "clade"
dsn = "http://localhost:54321""#;

#[cfg(feature = "object-store-s3")]
#[rstest]
#[case::basic_s3(TEST_CONFIG_S3, None)]
Expand All @@ -630,15 +638,15 @@ cache_control = "private, max-age=86400"

assert_eq!(
config.object_store,
ObjectStore::S3(S3 {
Some(ObjectStore::S3(S3 {
region: None,
access_key_id: Some("AKI...".to_string()),
secret_access_key: Some("ABC...".to_string()),
endpoint: Some("https://s3.amazonaws.com:9000".to_string()),
bucket: "seafowl".to_string(),
prefix: None,
cache_properties: cache_props,
})
}))
);
}

Expand All @@ -648,15 +656,26 @@ cache_control = "private, max-age=86400"

assert_eq!(
config.object_store,
ObjectStore::S3(S3 {
Some(ObjectStore::S3(S3 {
region: None,
access_key_id: None,
secret_access_key: None,
endpoint: Some("https://s3.amazonaws.com:9000".to_string()),
bucket: "seafowl".to_string(),
prefix: None,
cache_properties: None,
})
}))
);
}

#[test]
fn test_parse_no_object_store_clade() {
let config =
load_config_from_string(TEST_CONFIG_VALID_CLADE, false, None).unwrap();

assert_eq!(
config.object_store,
Some(ObjectStore::InMemory(InMemory {}))
);
}

Expand All @@ -667,9 +686,9 @@ cache_control = "private, max-age=86400"
assert_eq!(
config,
SeafowlConfig {
object_store: ObjectStore::Local(Local {
object_store: Some(ObjectStore::Local(Local {
data_dir: "./seafowl-data".to_string(),
}),
})),
catalog: Catalog::Postgres(Postgres {
dsn: "postgresql://user:pass@localhost:5432/somedb".to_string(),
schema: "public".to_string()
Expand Down Expand Up @@ -760,9 +779,9 @@ cache_control = "private, max-age=86400"
assert_eq!(
config,
SeafowlConfig {
object_store: ObjectStore::Local(Local {
object_store: Some(ObjectStore::Local(Local {
data_dir: "some_other_path".to_string(),
}),
})),
catalog: Catalog::Sqlite(Sqlite {
dsn: "sqlite://file.sqlite".to_string(),
journal_mode: SqliteJournalMode::Wal,
Expand Down
2 changes: 1 addition & 1 deletion src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ pub mod test_utils {
/// Build a real (not mocked) in-memory context that uses SQLite
pub async fn in_memory_context() -> SeafowlContext {
let config = SeafowlConfig {
object_store: schema::ObjectStore::InMemory(schema::InMemory {}),
object_store: Some(schema::ObjectStore::InMemory(schema::InMemory {})),
catalog: Catalog::Sqlite(Sqlite {
dsn: "sqlite://:memory:".to_string(),
journal_mode: SqliteJournalMode::Wal,
Expand Down
16 changes: 11 additions & 5 deletions tests/clade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,22 @@ impl SchemaStoreService for TestCladeMetastore {
}
}

async fn start_clade_server() -> Arc<SeafowlContext> {
// let OS choose a a free port
async fn start_clade_server(object_store: bool) -> Arc<SeafowlContext> {
// let OS choose a free port
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

let object_store_section = if object_store {
r#"[object_store]
type = "local"
data_dir = "tests/data""#
} else {
""
};

let config_text = format!(
r#"
[object_store]
type = "local"
data_dir = "tests/data"
{object_store_section}
[catalog]
type = "clade"
Expand Down
13 changes: 7 additions & 6 deletions tests/clade/query.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::clade::*;

#[rstest]
#[should_panic(expected = "table 'default.local.file' not found")]
#[case("local.file", false)]
#[case("local.file", true)]
#[case("s3.minio", true)]
#[case("gcs.fake", true)]
#[tokio::test]
async fn test_basic_select(
#[values("local.file", "s3.minio", "gcs.fake")] table: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let context = start_clade_server().await;
async fn test_basic_select(#[case] table: &str, #[case] object_store: bool) -> () {
let context = start_clade_server(object_store).await;

// Before proceeding with the test swallow up a single initial
// ConnectError("tcp connect error", Os { code: 61, kind: ConnectionRefused, message: "Connection refused" })
Expand All @@ -32,6 +35,4 @@ async fn test_basic_select(
"+-------+------+-------+-----+",
];
assert_batches_eq!(expected, &results);

Ok(())
}

0 comments on commit 9c191f6

Please sign in to comment.