From 6432a6b73f8c6870f1a174949206abacc07c5fa5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 23 Jan 2025 13:43:51 -0800 Subject: [PATCH] fix: don't compare metadata in merge insert to detect if partial schema (#3412) --- python/python/tests/test_dataset.py | 16 ++++++++++++++-- rust/lance/src/dataset/write/merge_insert.rs | 17 ++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index de5f8513d3..30ab84b929 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1306,12 +1306,23 @@ def check_merge_stats(merge_dict, expected): def test_merge_insert(tmp_path: Path): nrows = 1000 + # Create a schema with some metadata to regress an issue where the metadata + # caused schema comparison problems in merge_insert. + schema = pa.schema( + [ + pa.field("a", pa.int64()), + pa.field("b", pa.int64()), + pa.field("c", pa.int64()), + ], + metadata={"foo": "bar"}, + ) table = pa.Table.from_pydict( { "a": range(nrows), "b": [1 for _ in range(nrows)], "c": [x % 2 for x in range(nrows)], - } + }, + schema=schema, ) dataset = lance.write_dataset( table, tmp_path / "dataset", mode="create", max_rows_per_file=100 @@ -1323,7 +1334,8 @@ def test_merge_insert(tmp_path: Path): "a": range(300, 300 + nrows), "b": [2 for _ in range(nrows)], "c": [0 for _ in range(nrows)], - } + }, + schema=schema, ) is_new = pc.field("b") == 2 diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 0f2394bbcb..df800c11b7 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -1018,13 +1018,20 @@ impl MergeInsertJob { self, source: SendableRecordBatchStream, ) -> Result<(Transaction, MergeStats)> { - let schema = source.schema(); - - let full_schema = Schema::from(self.dataset.local_schema()); - let is_full_schema = &full_schema == schema.as_ref(); + // Erase metadata on source / dataset schemas to avoid comparing metadata + let schema = lance_core::datatypes::Schema::try_from(source.schema().as_ref())?; + let full_schema = self.dataset.local_schema(); + let is_full_schema = full_schema.compare_with_options( + &schema, + &SchemaCompareOptions { + compare_metadata: false, + ..Default::default() + }, + ); + let source_schema = source.schema(); let joined = self.create_joined_stream(source).await?; - let merger = Merger::try_new(self.params.clone(), schema.clone(), !is_full_schema)?; + let merger = Merger::try_new(self.params.clone(), source_schema, !is_full_schema)?; let merge_statistics = merger.merge_stats.clone(); let deleted_rows = merger.deleted_rows.clone(); let merger_schema = merger.output_schema().clone();