Skip to content

Commit

Permalink
fix: move IO tasks off of CPU runtime in merge_insert (#3420)
Browse files Browse the repository at this point in the history
Fixes #3419

We were using the `CPU_RUNTIME`, but there's a lot of IO happening here,
and it wasn't configured to do that. Instead, I moved the operations
onto the main runtime so it can perform the IO.

✅  I've testing this manually against S3 with the repro.
  • Loading branch information
wjones127 authored Jan 28, 2025
1 parent 58c5e27 commit 6d77d14
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ use futures::{
use lance_core::{
datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions},
error::{box_error, InvalidInputSnafu},
utils::{
futures::Capacity,
tokio::{get_num_compute_intensive_cpus, CPU_RUNTIME},
},
utils::{futures::Capacity, tokio::get_num_compute_intensive_cpus},
Error, Result, ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD,
};
use lance_datafusion::{
Expand Down Expand Up @@ -679,10 +676,10 @@ impl MergeInsertJob {
let updated_fragments = Arc::new(Mutex::new(Vec::new()));
let new_fragments = Arc::new(Mutex::new(Vec::new()));
let mut tasks = JoinSet::new();
let task_limit = get_num_compute_intensive_cpus();
let task_limit = dataset.object_store().io_parallelism();
let mut reservation =
MemoryConsumer::new("MergeInsert").register(session_ctx.task_ctx().memory_pool());
let handle = CPU_RUNTIME.handle();

while let Some((frag_id, batches)) = group_stream.next().await.transpose()? {
async fn handle_fragment(
dataset: Arc<Dataset>,
Expand Down Expand Up @@ -938,7 +935,7 @@ impl MergeInsertJob {
updated_fragments.clone(),
memory_size,
);
tasks.spawn_on(fut, handle);
tasks.spawn(fut);
}
Some(ScalarValue::Null | ScalarValue::UInt64(None)) => {
let fut = handle_new_fragments(
Expand All @@ -947,7 +944,7 @@ impl MergeInsertJob {
new_fragments.clone(),
memory_size,
);
tasks.spawn_on(fut, handle);
tasks.spawn(fut);
}
_ => {
return Err(Error::Internal {
Expand Down

0 comments on commit 6d77d14

Please sign in to comment.