Skip to content

Commit

Permalink
fix: don't compare metadata in merge insert to detect if partial sche…
Browse files Browse the repository at this point in the history
…ma (#3412)
  • Loading branch information
westonpace authored Jan 23, 2025
1 parent 43cd830 commit 6432a6b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
16 changes: 14 additions & 2 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,12 +1306,23 @@ def check_merge_stats(merge_dict, expected):

def test_merge_insert(tmp_path: Path):
nrows = 1000
# Create a schema with some metadata to regress an issue where the metadata
# caused schema comparison problems in merge_insert.
schema = pa.schema(
[
pa.field("a", pa.int64()),
pa.field("b", pa.int64()),
pa.field("c", pa.int64()),
],
metadata={"foo": "bar"},
)
table = pa.Table.from_pydict(
{
"a": range(nrows),
"b": [1 for _ in range(nrows)],
"c": [x % 2 for x in range(nrows)],
}
},
schema=schema,
)
dataset = lance.write_dataset(
table, tmp_path / "dataset", mode="create", max_rows_per_file=100
Expand All @@ -1323,7 +1334,8 @@ def test_merge_insert(tmp_path: Path):
"a": range(300, 300 + nrows),
"b": [2 for _ in range(nrows)],
"c": [0 for _ in range(nrows)],
}
},
schema=schema,
)

is_new = pc.field("b") == 2
Expand Down
17 changes: 12 additions & 5 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,13 +1018,20 @@ impl MergeInsertJob {
self,
source: SendableRecordBatchStream,
) -> Result<(Transaction, MergeStats)> {
let schema = source.schema();

let full_schema = Schema::from(self.dataset.local_schema());
let is_full_schema = &full_schema == schema.as_ref();
// Erase metadata on source / dataset schemas to avoid comparing metadata
let schema = lance_core::datatypes::Schema::try_from(source.schema().as_ref())?;
let full_schema = self.dataset.local_schema();
let is_full_schema = full_schema.compare_with_options(
&schema,
&SchemaCompareOptions {
compare_metadata: false,
..Default::default()
},
);

let source_schema = source.schema();
let joined = self.create_joined_stream(source).await?;
let merger = Merger::try_new(self.params.clone(), schema.clone(), !is_full_schema)?;
let merger = Merger::try_new(self.params.clone(), source_schema, !is_full_schema)?;
let merge_statistics = merger.merge_stats.clone();
let deleted_rows = merger.deleted_rows.clone();
let merger_schema = merger.output_schema().clone();
Expand Down

0 comments on commit 6432a6b

Please sign in to comment.