diff --git a/Cargo.lock b/Cargo.lock index eccbd91907..07bd69be98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3813,6 +3813,7 @@ dependencies = [ "pprof", "prost 0.13.4", "rand", + "rstest", "shellexpand", "snafu 0.7.5", "tempfile", diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index cd4c184eb7..d748989d86 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -52,6 +52,7 @@ parquet.workspace = true tempfile.workspace = true test-log.workspace = true mockall.workspace = true +rstest.workspace = true [target.'cfg(target_os = "linux")'.dev-dependencies] pprof.workspace = true diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index b166873d4c..88e2b8ac1d 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -858,6 +858,8 @@ async fn configure_store( // Block size: On local file systems, we use 4KB block size. On cloud // object stores, we use 64KB block size. This is generally the largest // block size where we don't see a latency penalty. + let file_block_size = options.block_size.unwrap_or(4 * 1024); + let cloud_block_size = options.block_size.unwrap_or(64 * 1024); match url.scheme() { "s3" | "s3+ddb" => { storage_options.with_env_s3(); @@ -916,7 +918,7 @@ async fn configure_store( Ok(ObjectStore { inner: Arc::new(store).traced(), scheme: String::from(url.scheme()), - block_size: 64 * 1024, + block_size: cloud_block_size, use_constant_size_upload_parts, list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, @@ -935,7 +937,7 @@ async fn configure_store( Ok(ObjectStore { inner: store, scheme: String::from("gs"), - block_size: 64 * 1024, + block_size: cloud_block_size, use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, @@ -950,7 +952,7 @@ async fn configure_store( Ok(ObjectStore { inner: store, scheme: String::from("az"), - block_size: 64 * 1024, + block_size: cloud_block_size, use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, @@ -961,14 +963,21 @@ async fn configure_store( // however this makes testing harder as we can't use the same code path // "file-object-store" forces local file system dataset to use the same // code path as cloud object stores - "file" => Ok(ObjectStore::from_path(url.path())?.0), + "file" => { + let mut object_store = ObjectStore::from_path(url.path())?.0; + object_store.set_block_size(file_block_size); + Ok(object_store) + } "file-object-store" => { - Ok(ObjectStore::from_path_with_scheme(url.path(), "file-object-store")?.0) + let mut object_store = + ObjectStore::from_path_with_scheme(url.path(), "file-object-store")?.0; + object_store.set_block_size(file_block_size); + Ok(object_store) } "memory" => Ok(ObjectStore { inner: Arc::new(InMemory::new()).traced(), scheme: String::from("memory"), - block_size: 64 * 1024, + block_size: cloud_block_size, use_constant_size_upload_parts: false, list_is_lexically_ordered: true, io_parallelism: get_num_compute_intensive_cpus(), @@ -1112,6 +1121,7 @@ lazy_static::lazy_static! { mod tests { use super::*; use parquet::data_type::AsBytes; + use rstest::rstest; use std::env::set_current_dir; use std::fs::{create_dir_all, write}; use std::path::Path as StdPath; @@ -1177,6 +1187,65 @@ mod tests { assert_eq!(path.to_string(), "foo.lance"); } + async fn test_block_size_used_test_helper( + uri: &str, + storage_options: Option>, + default_expected_block_size: usize, + ) { + // Test the default + let registry = Arc::new(ObjectStoreRegistry::default()); + let params = ObjectStoreParams { + storage_options: storage_options.clone(), + ..ObjectStoreParams::default() + }; + let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms) + .await + .unwrap(); + assert_eq!(store.block_size, default_expected_block_size); + + // Ensure param is used + let registry = Arc::new(ObjectStoreRegistry::default()); + let params = ObjectStoreParams { + block_size: Some(1024), + storage_options: storage_options.clone(), + ..ObjectStoreParams::default() + }; + let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms) + .await + .unwrap(); + assert_eq!(store.block_size, 1024); + } + + #[rstest] + #[case("s3://bucket/foo.lance", None)] + #[case("gs://bucket/foo.lance", None)] + #[case("memory:///bucket/foo.lance", None)] + #[case("az://account/bucket/foo.lance", + Some(HashMap::from([ + (String::from("account_name"), String::from("account")), + (String::from("container_name"), String::from("container")) + ])))] + #[tokio::test] + async fn test_block_size_used_cloud( + #[case] uri: &str, + #[case] storage_options: Option>, + ) { + test_block_size_used_test_helper(&uri, storage_options, 64 * 1024).await; + } + + #[rstest] + #[case("file")] + #[case("file-object-store")] + #[tokio::test] + async fn test_block_size_used_file(#[case] prefix: &str) { + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path = tmp_dir.path().to_str().unwrap().to_owned(); + let path = format!("{tmp_path}/bar/foo.lance/test_file"); + write_to_file(&path, "URL").unwrap(); + let uri = format!("{prefix}:///{path}"); + test_block_size_used_test_helper(&uri, None, 4 * 1024).await; + } + #[tokio::test] async fn test_relative_paths() { let tmp_dir = tempfile::tempdir().unwrap();