Skip to content

Commit

Permalink
temporarily disable string spilling until we get the fix from df
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Feb 21, 2025
1 parent f508cbd commit a4aa77c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
10 changes: 9 additions & 1 deletion rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,13 @@ impl TrainingSource for BTreeUpdater {
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
let data_type = self.new_data.schema().field(0).data_type().clone();
// Datafusion currently has bugs with spilling on string columns
// See https://github.com/apache/datafusion/issues/10073
//
// One we upgrade we can remove this
let use_spilling = !matches!(data_type, DataType::Utf8 | DataType::LargeUtf8);

let new_input = Arc::new(OneShotExec::new(self.new_data));
let old_input = Self::into_old_input(self.index);
debug_assert_eq!(
Expand All @@ -1285,10 +1292,11 @@ impl TrainingSource for BTreeUpdater {
LexOrdering::new(vec![sort_expr]),
all_data,
));

let unchunked = execute_plan(
ordered,
LanceExecutionOptions {
use_spilling: true,
use_spilling,
..Default::default()
},
)?;
Expand Down
20 changes: 19 additions & 1 deletion rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ impl TrainingRequest {
) -> Result<SendableRecordBatchStream> {
let mut scan = self.dataset.scan();

let column_field =
self.dataset
.schema()
.field(&self.column)
.ok_or(Error::InvalidInput {
source: format!("No column with name {}", self.column).into(),
location: location!(),
})?;

// Datafusion currently has bugs with spilling on string columns
// See https://github.com/apache/datafusion/issues/10073
//
// One we upgrade we can remove this
let use_spilling = !matches!(
column_field.data_type(),
DataType::Utf8 | DataType::LargeUtf8
);

let ordering = match sort {
true => Some(vec![ColumnOrdering::asc_nulls_first(self.column.clone())]),
false => None,
Expand All @@ -74,7 +92,7 @@ impl TrainingRequest {

let batches = scan
.try_into_dfstream(LanceExecutionOptions {
use_spilling: true,
use_spilling,
..Default::default()
})
.await?;
Expand Down

0 comments on commit a4aa77c

Please sign in to comment.