Skip to content

Commit

Permalink
fix: ensure that 'block_size' parameter is properly propagated in the…
Browse files Browse the repository at this point in the history
… ObjectStore (#3403)

Previously, setting a block_size parameter did not change the range
coalescing being done in FileScheduler's [submit_request
](https://github.com/lancedb/lance/blob/main/rust/lance-io/src/scheduler.rs#L717)
function. This was because the FileScheduler was using the block_size
parameter in the underlying ObjectStore, but that parameter was not
being properly propagated in the configure_store method.

This change keeps the same defaults but always uses the parameter value
if it is set.

Verified via unit tests and profiling to determine increasing block size
now does actually decrease the number of get_range queries.
  • Loading branch information
vjc578db authored Jan 23, 2025
1 parent aae351b commit 3f26e60
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ parquet.workspace = true
tempfile.workspace = true
test-log.workspace = true
mockall.workspace = true
rstest.workspace = true

[target.'cfg(target_os = "linux")'.dev-dependencies]
pprof.workspace = true
Expand Down
81 changes: 75 additions & 6 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,8 @@ async fn configure_store(
// Block size: On local file systems, we use 4KB block size. On cloud
// object stores, we use 64KB block size. This is generally the largest
// block size where we don't see a latency penalty.
let file_block_size = options.block_size.unwrap_or(4 * 1024);
let cloud_block_size = options.block_size.unwrap_or(64 * 1024);
match url.scheme() {
"s3" | "s3+ddb" => {
storage_options.with_env_s3();
Expand Down Expand Up @@ -916,7 +918,7 @@ async fn configure_store(
Ok(ObjectStore {
inner: Arc::new(store).traced(),
scheme: String::from(url.scheme()),
block_size: 64 * 1024,
block_size: cloud_block_size,
use_constant_size_upload_parts,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
Expand All @@ -935,7 +937,7 @@ async fn configure_store(
Ok(ObjectStore {
inner: store,
scheme: String::from("gs"),
block_size: 64 * 1024,
block_size: cloud_block_size,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
Expand All @@ -950,7 +952,7 @@ async fn configure_store(
Ok(ObjectStore {
inner: store,
scheme: String::from("az"),
block_size: 64 * 1024,
block_size: cloud_block_size,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
Expand All @@ -961,14 +963,21 @@ async fn configure_store(
// however this makes testing harder as we can't use the same code path
// "file-object-store" forces local file system dataset to use the same
// code path as cloud object stores
"file" => Ok(ObjectStore::from_path(url.path())?.0),
"file" => {
let mut object_store = ObjectStore::from_path(url.path())?.0;
object_store.set_block_size(file_block_size);
Ok(object_store)
}
"file-object-store" => {
Ok(ObjectStore::from_path_with_scheme(url.path(), "file-object-store")?.0)
let mut object_store =
ObjectStore::from_path_with_scheme(url.path(), "file-object-store")?.0;
object_store.set_block_size(file_block_size);
Ok(object_store)
}
"memory" => Ok(ObjectStore {
inner: Arc::new(InMemory::new()).traced(),
scheme: String::from("memory"),
block_size: 64 * 1024,
block_size: cloud_block_size,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: true,
io_parallelism: get_num_compute_intensive_cpus(),
Expand Down Expand Up @@ -1112,6 +1121,7 @@ lazy_static::lazy_static! {
mod tests {
use super::*;
use parquet::data_type::AsBytes;
use rstest::rstest;
use std::env::set_current_dir;
use std::fs::{create_dir_all, write};
use std::path::Path as StdPath;
Expand Down Expand Up @@ -1177,6 +1187,65 @@ mod tests {
assert_eq!(path.to_string(), "foo.lance");
}

async fn test_block_size_used_test_helper(
uri: &str,
storage_options: Option<HashMap<String, String>>,
default_expected_block_size: usize,
) {
// Test the default
let registry = Arc::new(ObjectStoreRegistry::default());
let params = ObjectStoreParams {
storage_options: storage_options.clone(),
..ObjectStoreParams::default()
};
let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
.await
.unwrap();
assert_eq!(store.block_size, default_expected_block_size);

// Ensure param is used
let registry = Arc::new(ObjectStoreRegistry::default());
let params = ObjectStoreParams {
block_size: Some(1024),
storage_options: storage_options.clone(),
..ObjectStoreParams::default()
};
let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
.await
.unwrap();
assert_eq!(store.block_size, 1024);
}

#[rstest]
#[case("s3://bucket/foo.lance", None)]
#[case("gs://bucket/foo.lance", None)]
#[case("memory:///bucket/foo.lance", None)]
#[case("az://account/bucket/foo.lance",
Some(HashMap::from([
(String::from("account_name"), String::from("account")),
(String::from("container_name"), String::from("container"))
])))]
#[tokio::test]
async fn test_block_size_used_cloud(
#[case] uri: &str,
#[case] storage_options: Option<HashMap<String, String>>,
) {
test_block_size_used_test_helper(&uri, storage_options, 64 * 1024).await;
}

#[rstest]
#[case("file")]
#[case("file-object-store")]
#[tokio::test]
async fn test_block_size_used_file(#[case] prefix: &str) {
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
let path = format!("{tmp_path}/bar/foo.lance/test_file");
write_to_file(&path, "URL").unwrap();
let uri = format!("{prefix}:///{path}");
test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
}

#[tokio::test]
async fn test_relative_paths() {
let tmp_dir = tempfile::tempdir().unwrap();
Expand Down

0 comments on commit 3f26e60

Please sign in to comment.