Skip to content

Commit

Permalink
Merge pull request #708 from splitgraph/prefix-store-fix
Browse files Browse the repository at this point in the history
Make sure that local FS url points only to the root storage location
  • Loading branch information
gruuya authored Oct 17, 2024
2 parents 9e2b449 + 922ada1 commit a8a8831
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 35 deletions.
25 changes: 15 additions & 10 deletions src/object_store/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use deltalake::{
storage::{FactoryRegistry, ObjectStoreRef, StorageOptions},
DeltaResult, DeltaTableError, Path,
};
use object_store::prefix::PrefixStore;
use object_store::ObjectStore;
use object_store_factory;
use url::Url;
Expand Down Expand Up @@ -103,16 +104,6 @@ impl ObjectStoreFactory {
options: HashMap<String, String>,
table_path: String,
) -> Result<Arc<dyn LogStore>, object_store::Error> {
// This is the least surprising way to extend the path, and make the url point to the table
// root: https://github.com/servo/rust-url/issues/333
url.path_segments_mut()
.map_err(|_| object_store::Error::Generic {
store: "object_store_factory",
source: "The provided URL is a cannot-be-a-base URL".into(),
})?
.pop_if_empty()
.extend(table_path.split("/"));

let store = {
let used_options = options.clone();
let key = StoreCacheKey {
Expand Down Expand Up @@ -144,6 +135,20 @@ impl ObjectStoreFactory {
}
};

// The table path provided has not been included in the object store root url, so it
// needs to become a part of an additional prefix
let store = Arc::new(PrefixStore::new(store, table_path.clone()));

// This is the least surprising way to extend the path, and make the url point to the table
// root: https://github.com/servo/rust-url/issues/333
url.path_segments_mut()
.map_err(|_| object_store::Error::Generic {
store: "object_store_factory",
source: "The provided URL is a cannot-be-a-base URL".into(),
})?
.pop_if_empty()
.extend(table_path.split("/"));

Ok(default_logstore(store, &url, &Default::default()))
}

Expand Down
52 changes: 27 additions & 25 deletions tests/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,6 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
})
}

let minio_options = HashMap::from([
(
AmazonS3ConfigKey::Endpoint.as_ref().to_string(),
"http://127.0.0.1:9000".to_string(),
),
(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
"minioadmin".to_string(),
),
(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
"minioadmin".to_string(),
),
(
// This has been removed from the config enum, but it can
// still be picked up via `AmazonS3ConfigKey::from_str`
AmazonS3ConfigKey::Client(ClientConfigKey::AllowHttp)
.as_ref()
.to_string(),
"true".to_string(),
),
]);

ListSchemaResponse {
schemas: vec![
SchemaObject {
Expand Down Expand Up @@ -94,12 +71,12 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
StorageLocation {
name: "minio".to_string(),
location: "s3://seafowl-test-bucket".to_string(),
options: minio_options.clone(),
options: minio_options(),
},
StorageLocation {
name: "minio-prefix".to_string(),
location: "s3://seafowl-test-bucket/test-data".to_string(),
options: minio_options,
options: minio_options(),
},
StorageLocation {
name: "fake-gcs".to_string(),
Expand All @@ -120,3 +97,28 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse {
],
}
}

pub fn minio_options() -> HashMap<String, String> {
HashMap::from([
(
AmazonS3ConfigKey::Endpoint.as_ref().to_string(),
"http://127.0.0.1:9000".to_string(),
),
(
AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(),
"minioadmin".to_string(),
),
(
AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(),
"minioadmin".to_string(),
),
(
// This has been removed from the config enum, but it can
// still be picked up via `AmazonS3ConfigKey::from_str`
AmazonS3ConfigKey::Client(ClientConfigKey::AllowHttp)
.as_ref()
.to_string(),
"true".to_string(),
),
])
}
1 change: 1 addition & 0 deletions tests/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ bind_host = "127.0.0.1"
bind_port = {}
[misc.sync_conf]
max_in_memory_bytes = 1000000
max_replication_lag_s = 1
flush_task_interval_s = 1"#,
addr.port()
Expand Down
118 changes: 118 additions & 0 deletions tests/flight/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use crate::fixtures::minio_options;
use crate::flight::*;
use clade::schema::StorageLocation;
use clade::sync::{ColumnDescriptor, ColumnRole};
use deltalake::DeltaTable;
use std::collections::HashMap;
use tempfile::TempDir;
use url::Url;

pub(crate) fn sync_cmd_to_flight_data(
cmd: DataSyncCommand,
Expand Down Expand Up @@ -489,3 +495,115 @@ async fn test_sync_happy_path() -> std::result::Result<(), Box<dyn std::error::E

Ok(())
}

#[rstest]
#[tokio::test]
async fn test_sync_custom_store(
#[values("local", "minio")] target_type: &str,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let (ctx, mut client) = flight_server(TestServerType::Memory).await;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Utf8, true),
]));
let column_descriptors = vec![
ColumnDescriptor {
role: ColumnRole::OldPk as _,
name: "c1".to_string(),
},
ColumnDescriptor {
role: ColumnRole::NewPk as _,
name: "c1".to_string(),
},
ColumnDescriptor {
role: ColumnRole::Value as _,
name: "c2".to_string(),
},
];

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::new_null(100_000)),
Arc::new(Int32Array::from((0..100_000).collect::<Vec<i32>>())),
Arc::new(StringArray::from(vec!["a"; 100_000])),
],
)?;

let table_name = "sync_table";

let temp_dir = TempDir::new().unwrap();
let (location, options) = if target_type == "local" {
(
format!("file://{}", temp_dir.path().to_string_lossy()),
HashMap::new(),
)
} else {
(
"s3://seafowl-test-bucket/some/path".to_string(),
minio_options(),
)
};

let log_store = ctx
.metastore
.object_stores
.get_log_store_for_table(
Url::parse(&location)?,
options.clone(),
table_name.to_string(),
)
.await?;

let store = StorageLocation {
name: "custom-store".to_string(),
location,
options,
};

let cmd = DataSyncCommand {
path: table_name.to_string(),
store: Some(store),
column_descriptors,
origin: "42".to_string(),
sequence_number: Some(1000),
};

let sync_result = do_put_sync(cmd.clone(), batch.clone(), &mut client).await?;
assert_eq!(
sync_result,
DataSyncResponse {
accepted: true,
memory_sequence_number: Some(1000),
durable_sequence_number: Some(1000),
first: true,
}
);

let mut table = DeltaTable::new(log_store, Default::default());
table.load().await?;

ctx.inner.register_table(table_name, Arc::new(table))?;
let results = ctx
.inner
.sql(&format!(
"SELECT count(*), count(distinct c1), min(c1), max(c1) FROM {table_name}"
))
.await?
.collect()
.await?;

let expected = [
"+----------+-------------------------------+--------------------+--------------------+",
"| count(*) | count(DISTINCT sync_table.c1) | min(sync_table.c1) | max(sync_table.c1) |",
"+----------+-------------------------------+--------------------+--------------------+",
"| 100000 | 100000 | 0 | 99999 |",
"+----------+-------------------------------+--------------------+--------------------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

0 comments on commit a8a8831

Please sign in to comment.