diff --git a/protos/transaction.proto b/protos/transaction.proto index 9959c5e75a..dd735fd0eb 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -173,6 +173,12 @@ message Transaction { } } + // An operation that replaces the data in a region of the table with new data. + message DataReplacement { + repeated uint64 old_fragment_ids = 1; + repeated DataFile new_datafiles = 2; + } + // The operation of this transaction. oneof operation { Append append = 100; @@ -186,6 +192,7 @@ message Transaction { Update update = 108; Project project = 109; UpdateConfig update_config = 110; + DataReplacement data_replacement = 111; } // An operation to apply to the blob dataset @@ -193,4 +200,4 @@ message Transaction { Append blob_append = 200; Overwrite blob_overwrite = 202; } -} \ No newline at end of file +} diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 01358195af..30d337619b 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -45,7 +45,7 @@ ) from .dependencies import numpy as np from .dependencies import pandas as pd -from .fragment import FragmentMetadata, LanceFragment +from .fragment import DataFile, FragmentMetadata, LanceFragment from .lance import ( CleanupStats, Compaction, @@ -1927,7 +1927,7 @@ def create_index( valid_index_types = ["IVF_FLAT", "IVF_PQ", "IVF_HNSW_PQ", "IVF_HNSW_SQ"] if index_type not in valid_index_types: raise NotImplementedError( - f"Only {valid_index_types} index types supported. " f"Got {index_type}" + f"Only {valid_index_types} index types supported. Got {index_type}" ) if index_type != "IVF_PQ" and one_pass_ivfpq: raise ValueError( @@ -2247,8 +2247,7 @@ def _commit( commit_lock: Optional[CommitLock] = None, ) -> LanceDataset: warnings.warn( - "LanceDataset._commit() is deprecated, use LanceDataset.commit()" - " instead", + "LanceDataset._commit() is deprecated, use LanceDataset.commit() instead", DeprecationWarning, ) return LanceDataset.commit(base_uri, operation, read_version, commit_lock) @@ -2935,6 +2934,15 @@ class CreateIndex(BaseOperation): dataset_version: int fragment_ids: Set[int] + @dataclass + class DataReplacement(BaseOperation): + """ + Operation that replaces existing datafiles in the dataset. + """ + + old_fragment_ids: List[int] + new_datafiles: List[DataFile] + class ScannerBuilder: def __init__(self, ds: LanceDataset): @@ -3203,7 +3211,7 @@ def nearest( if q_dim != dim: raise ValueError( - f"Query vector size {len(q)} does not match index column size" f" {dim}" + f"Query vector size {len(q)} does not match index column size {dim}" ) if k is not None and int(k) <= 0: diff --git a/python/python/lance/file.py b/python/python/lance/file.py index a36d8a4d7d..e81b61d7b5 100644 --- a/python/python/lance/file.py +++ b/python/python/lance/file.py @@ -134,7 +134,7 @@ def take_rows( if indices[i] > indices[i + 1]: raise ValueError( f"Indices must be sorted in ascending order for \ - file API, got {indices[i]} > {indices[i+1]}" + file API, got {indices[i]} > {indices[i + 1]}" ) return ReaderResults( diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index 20765f3e83..cfcde00f46 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -161,7 +161,7 @@ def on_write_complete( if len(write_results) == 0: warnings.warn( - "write results is empty. please check ray version " "or internal error", + "write results is empty. please check ray version or internal error", DeprecationWarning, ) return diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 30ab84b929..dc8957a916 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2913,3 +2913,26 @@ def test_dataset_schema(tmp_path: Path): ds = lance.write_dataset(table, str(tmp_path)) # noqa: F841 ds._default_scan_options = {"with_row_id": True} assert ds.schema == ds.to_table().schema + + +def test_data_replacement(tmp_path: Path): + table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + table = pa.Table.from_pydict({"a": range(100, 200), "b": range(100, 200)}) + fragment = lance.fragment.LanceFragment.create(base_dir, table) + data_file = fragment.files[0] + data_replacement = lance.LanceOperation.DataReplacement([0], [data_file]) + dataset = lance.LanceDataset.commit(dataset, data_replacement, read_version=1) + + tbl = dataset.to_table() + + expected = pa.Table.from_pydict( + { + "a": list(range(100, 200)), + "b": list(range(100, 200)), + } + ) + assert tbl == expected diff --git a/python/src/transaction.rs b/python/src/transaction.rs index ee549503d1..2a3c21c8b4 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -118,6 +118,17 @@ impl FromPyObject<'_> for PyLance { }; Ok(Self(op)) } + "DataReplacement" => { + let old_fragment_ids = ob.getattr("old_fragment_ids")?.extract::>()?; + let new_datafiles = extract_vec(&ob.getattr("new_datafiles")?)?; + + let op = Operation::DataReplacement { + old_fragment_ids, + new_datafiles, + }; + + Ok(Self(op)) + } unsupported => Err(PyValueError::new_err(format!( "Unsupported operation: {unsupported}", ))), @@ -172,6 +183,19 @@ impl ToPyObject for PyLance<&Operation> { .unwrap() .to_object(py) } + Operation::DataReplacement { + old_fragment_ids, + new_datafiles, + } => { + let old_fragment_ids = old_fragment_ids.to_object(py); + let new_datafiles = export_vec(py, new_datafiles.as_slice()); + let cls = namespace + .getattr("DataReplacement") + .expect("Failed to get DataReplacement class"); + cls.call1((old_fragment_ids, new_datafiles)) + .unwrap() + .to_object(py) + } _ => todo!(), } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 5d8d0ddf3b..65d70e635a 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1744,12 +1744,13 @@ mod tests { use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; use lance_core::datatypes::LANCE_STORAGE_CLASS_SCHEMA_META_KEY; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; + use lance_file::v2::writer::FileWriter; use lance_file::version::LanceFileVersion; use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams}; use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; use lance_table::feature_flags; - use lance_table::format::WriterVersion; + use lance_table::format::{DataFile, WriterVersion}; use lance_table::io::commit::RenameCommitHandler; use lance_table::io::deletion::read_deletion_file; use lance_testing::datagen::generate_random_array; @@ -5148,4 +5149,453 @@ mod tests { assert!(result.is_err()); assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); } + + #[tokio::test] + async fn test_datafile_replacement() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let dataset = Arc::new( + Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(), + ); + dataset.validate().await.unwrap(); + + // Test empty replacement should commit a new manifest and do nothing + let mut dataset = Dataset::commit( + WriteDestination::Dataset(dataset.clone()), + Operation::DataReplacement { + old_fragment_ids: vec![], + new_datafiles: vec![], + }, + Some(1), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + assert_eq!(dataset.version().version, 2); + assert_eq!(dataset.get_fragments().len(), 0); + + // try the same thing on a non-empty dataset + let vals: Int32Array = vec![1, 2, 3].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + dataset + .append( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + None, + ) + .await + .unwrap(); + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragment_ids: vec![], + new_datafiles: vec![], + }, + Some(3), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + assert_eq!(dataset.version().version, 4); + assert_eq!(dataset.get_fragments().len(), 1); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[1, 2, 3] + ); + + // write a new datafile + let object_writer = dataset + .object_store + .create(&Path::from("data/test.lance")) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + + let vals: Int32Array = vec![4, 5, 6].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + writer.write_batch(&batch).await.unwrap(); + writer.finish().await.unwrap(); + + // find the datafile we want to replace + let frag = dataset.get_fragment(0).unwrap(); + let data_file = frag.data_file_for_field(0).unwrap(); + let mut new_data_file = data_file.clone(); + new_data_file.path = "test.lance".to_string(); + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragment_ids: vec![0], + new_datafiles: vec![new_data_file], + }, + Some(5), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + assert_eq!(dataset.version().version, 5); + assert_eq!(dataset.get_fragments().len(), 1); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4, 5, 6] + ); + } + + #[tokio::test] + async fn test_datafile_partial_replacement() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let mut dataset = Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + let vals: Int32Array = vec![1, 2, 3].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + dataset + .append( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + None, + ) + .await + .unwrap(); + + let fragment = dataset.get_fragments().pop().unwrap().metadata; + + let extended_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])); + + // add all null column + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::Merge { + fragments: vec![fragment], + schema: extended_schema.as_ref().try_into().unwrap(), + }, + Some(2), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + let partial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Int32, + true, + )])); + + // write a new datafile + let object_writer = dataset + .object_store + .create(&Path::from("data/test.lance")) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + partial_schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + + let vals: Int32Array = vec![4, 5, 6].into(); + let batch = RecordBatch::try_new(partial_schema.clone(), vec![Arc::new(vals)]).unwrap(); + writer.write_batch(&batch).await.unwrap(); + writer.finish().await.unwrap(); + + // find the datafile we want to replace + let new_data_file = DataFile { + path: "test.lance".to_string(), + // the second column in the dataset + fields: vec![1], + // is located in the first column of this datafile + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }; + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragment_ids: vec![0], + new_datafiles: vec![new_data_file], + }, + Some(3), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + assert_eq!(dataset.version().version, 4); + assert_eq!(dataset.get_fragments().len(), 1); + assert_eq!(dataset.get_fragments()[0].metadata.files.len(), 2); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[1, 2, 3] + ); + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4, 5, 6] + ); + + // do it again but on the first column + // find the datafile we want to replace + let new_data_file = DataFile { + path: "test.lance".to_string(), + // the first column in the dataset + fields: vec![0], + // is located in the first column of this datafile + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }; + + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataReplacement { + old_fragment_ids: vec![0], + new_datafiles: vec![new_data_file], + }, + Some(4), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + assert_eq!(dataset.version().version, 5); + assert_eq!(dataset.get_fragments().len(), 1); + assert_eq!(dataset.get_fragments()[0].metadata.files.len(), 2); + + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4, 5, 6] + ); + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values(), + &[4, 5, 6] + ); + } + + #[tokio::test] + async fn test_datafile_replacement_error() { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])); + let empty_reader = RecordBatchIterator::new(vec![], schema.clone()); + let mut dataset = Dataset::write(empty_reader, "memory://", None) + .await + .unwrap(); + dataset.validate().await.unwrap(); + + let vals: Int32Array = vec![1, 2, 3].into(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(vals)]).unwrap(); + dataset + .append( + RecordBatchIterator::new(vec![Ok(batch)], schema.clone()), + None, + ) + .await + .unwrap(); + + let fragment = dataset.get_fragments().pop().unwrap().metadata; + + let extended_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])); + + // add all null column + let dataset = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::Merge { + fragments: vec![fragment], + schema: extended_schema.as_ref().try_into().unwrap(), + }, + Some(2), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap(); + + let partial_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Int32, + true, + )])); + + // write a new datafile + let object_writer = dataset + .object_store + .create(&Path::from("data/test.lance")) + .await + .unwrap(); + let mut writer = FileWriter::try_new( + object_writer, + partial_schema.as_ref().try_into().unwrap(), + Default::default(), + ) + .unwrap(); + + let vals: Int32Array = vec![4, 5, 6].into(); + let batch = RecordBatch::try_new(partial_schema.clone(), vec![Arc::new(vals)]).unwrap(); + writer.write_batch(&batch).await.unwrap(); + writer.finish().await.unwrap(); + + // find the datafile we want to replace + let new_data_file = DataFile { + path: "test.lance".to_string(), + // the second column in the dataset + fields: vec![1], + // is located in the first column of this datafile + column_indices: vec![0], + file_major_version: 2, + file_minor_version: 0, + }; + + let err: Error = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset.clone())), + Operation::DataReplacement { + old_fragment_ids: vec![0], + new_datafiles: vec![new_data_file.clone(), new_data_file.clone()], + }, + Some(3), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap_err(); + + assert!(err + .to_string() + .contains("Number of old fragments must match number of new data files")); + + let err = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset.clone())), + Operation::DataReplacement { + old_fragment_ids: vec![0], + new_datafiles: vec![], + }, + Some(4), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap_err(); + + assert!(err + .to_string() + .contains("Number of old fragments must match number of new data files")); + + let new_data_file = DataFile { + fields: vec![0, 1], + ..new_data_file + }; + + let err = Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset.clone())), + Operation::DataReplacement { + old_fragment_ids: vec![0], + new_datafiles: vec![new_data_file], + }, + Some(4), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap_err(); + assert!(err + .to_string() + .contains("Expected to modify the fragment but no changes were made")); + } } diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index e390ecddcc..2ce5ad7be6 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -22,22 +22,29 @@ //! a conflict. Some operations have additional conditions that must be met for //! them to be compatible. //! -//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | -//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------| -//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | -//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ | -//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) | -//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | -//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ | -//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | -//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | -//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) | +//! NOTE/TODO(rmeng): DataReplacement conflict resolution is not fully implemented //! -//! (1) Delete, update, and rewrite are compatible with each other and themselves only if +//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | DataReplacement | +//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|--------------|-----------------| +//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅ +//! | Delete / Update | ✅ | 1️⃣ | ❌ | ✅ | 1️⃣ | ❌ | ❌ | ✅ | ✅ +//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | ✅ +//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | 3️⃣ +//! | Rewrite | ✅ | 1️⃣ | ❌ | ❌ | 1️⃣ | ❌ | ❌ | ✅ | 3️⃣ +//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅ +//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅ +//! | UpdateConfig | ✅ | ✅ | 2️⃣ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | ✅ +//! | DataReplacement | ✅ | ✅ | ❌ | 3️⃣ | 3️⃣ | ✅ | ❌* | ✅ | 3️⃣ +//! +//! 1️⃣ Delete, update, and rewrite are compatible with each other and themselves only if //! they affect distinct fragments. Otherwise, they conflict. -//! (2) Operations that mutate the config conflict if one of the operations upserts a key +//! 2️⃣ Operations that mutate the config conflict if one of the operations upserts a key //! that if referenced by another concurrent operation or if both operations modify the schema //! metadata or the same field metadata. +//! 3️⃣ DataReplacement on a column without index is compatible with any operation AS LONG AS +//! the operation does not modify the region of the column being replaced. +//! * This could become allowed in the future +//! use std::{ collections::{HashMap, HashSet}, @@ -51,7 +58,7 @@ use lance_io::object_store::ObjectStore; use lance_table::{ format::{ pb::{self, IndexMetadata}, - DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, + DataFile, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, }, io::{ commit::CommitHandler, @@ -136,6 +143,25 @@ pub enum Operation { /// Indices that have been updated with the new row addresses rewritten_indices: Vec, }, + /// Replace data in a column in the dataset with a new data. This is used for + /// null column population where we replace an entirely null column with a + /// new column that has data. + /// + /// This operation will only allow replacing files that contains the same schema + /// e.g. if the original files contains column A, B, C and the new files contains + /// only column A, B then the operation is not allowed. As we would need to split + /// the original files into two files, one with column A, B and the other with column C. + /// + /// Corollary to the above: the operation will also not allow replacing files layouts + /// that are not uniform across all fragments. + /// e.g. if fragments being replaced contains files with different schema layouts on + /// the column being replaced, the operation is not allowed. + /// say frag_1: [A] [B, C] and frag_2: [A, B] [C] and we are trying to replace column A + /// with a new column A the operation is not allowed. + DataReplacement { + old_fragment_ids: Vec, + new_datafiles: Vec, + }, /// Merge a new column in Merge { fragments: Vec, @@ -229,6 +255,9 @@ impl Operation { .map(|f| f.id) .chain(removed_fragment_ids.iter().copied()), ), + Self::DataReplacement { + old_fragment_ids, .. + } => Box::new(old_fragment_ids.iter().copied()), } } @@ -332,6 +361,7 @@ impl Operation { Self::Update { .. } => "Update", Self::Project { .. } => "Project", Self::UpdateConfig { .. } => "UpdateConfig", + Self::DataReplacement { .. } => "DataReplacement", } } } @@ -370,6 +400,7 @@ impl Transaction { Operation::ReserveFragments { .. } => false, Operation::Project { .. } => false, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, _ => true, }, Operation::Rewrite { .. } => match &other.operation { @@ -385,6 +416,10 @@ impl Transaction { } Operation::Project { .. } => false, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => { + // TODO(rmeng): check that the fragments being replaced are not part of the groups + true + } _ => true, }, // Restore always succeeds @@ -411,6 +446,10 @@ impl Transaction { // if the rewrite changed more than X% of row ids. Operation::Rewrite { .. } => true, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => { + // TODO(rmeng): check that the new indices isn't on the column being replaced + true + } _ => true, }, Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { @@ -467,6 +506,26 @@ impl Transaction { Operation::UpdateConfig { .. } => false, _ => true, }, + Operation::DataReplacement { .. } => match &other.operation { + Operation::Append { .. } + | Operation::Delete { .. } + | Operation::Update { .. } + | Operation::Merge { .. } + | Operation::UpdateConfig { .. } => false, + Operation::CreateIndex { .. } => { + // TODO(rmeng): check that the new indices isn't on the column being replaced + true + } + Operation::Rewrite { .. } => { + // TODO(rmeng): check that the fragments being replaced are not part of the groups + true + } + Operation::DataReplacement { .. } => { + // TODO(rmeng): check cell conflicts + true + } + _ => true, + }, } } @@ -744,6 +803,110 @@ impl Transaction { Operation::Restore { .. } => { unreachable!() } + Operation::DataReplacement { + old_fragment_ids, + new_datafiles, + } => { + // 0. check we have the same number of old fragments as new data files + if old_fragment_ids.len() != new_datafiles.len() { + return Err(Error::invalid_input( + "Number of old fragments must match number of new data files", + location!(), + )); + } + + // 1. make sure the new files all have the same fields / or empty + // NOTE: arguably this requirement could be relaxed in the future + // for the sake of simplicity, we require the new files to have the same fields + if new_datafiles + .iter() + .map(|f| f.fields.clone()) + .collect::>() + .len() + > 1 + { + let field_info = new_datafiles + .iter() + .enumerate() + .map(|(id, f)| (id, f.fields.clone())) + .fold("".to_string(), |acc, (id, fields)| { + format!("{}File {}: {:?}\n", acc, id, fields) + }); + + return Err(Error::invalid_input( + format!( + "All new data files must have the same fields, but found different fields:\n{field_info}" + ), + location!(), + )); + } + + let existing_fragments = maybe_existing_fragments?; + + // 2. check that the fragments being modified have isomorphic layouts along the columns being replaced + // 3. add modified fragments to final_fragments + for (frag_id, new_file) in old_fragment_ids.iter().zip(new_datafiles) { + let frag = existing_fragments + .iter() + .find(|f| f.id == *frag_id) + .ok_or_else(|| { + Error::invalid_input( + "Fragment being replaced not found in existing fragments", + location!(), + ) + })?; + let mut new_frag = frag.clone(); + + // TODO(rmeng): check new file and fragment are the same length + + let mut columns_covered = HashSet::new(); + for file in &mut new_frag.files { + if file.fields == new_file.fields + && file.file_major_version == new_file.file_major_version + && file.file_minor_version == new_file.file_minor_version + { + // assign the new file path to the fragment + file.path = new_file.path.clone(); + } + columns_covered.extend(file.fields.iter()); + } + // SPECIAL CASE: if the column(s) being replaced are not covered by the fragment + // Then it means it's a all-NULL column that is being replaced with real data + // just add it to the final fragments + if columns_covered.is_disjoint(&new_file.fields.iter().collect()) { + new_frag.add_file( + new_file.path.clone(), + new_file.fields.clone(), + new_file.column_indices.clone(), + &LanceFileVersion::try_from_major_minor( + new_file.file_major_version, + new_file.file_minor_version, + ) + .expect("Expected valid file version"), + ); + } + + // Nothing changed in the current fragment, which is not expected -- error out + if &new_frag == frag { + return Err(Error::invalid_input( + "Expected to modify the fragment but no changes were made. This means the new data files does not align with any exiting datafiles. Please check if the schema of the new data files matches the schema of the old data files including the file major and minor versions", + location!(), + )); + } + final_fragments.push(new_frag); + } + + let fragments_changed = old_fragment_ids.iter().collect::>(); + + // 4. push fragments that didn't change back to final_fragments + let unmodified_fragments = existing_fragments + .iter() + .filter(|f| !fragments_changed.contains(&f.id)) + .cloned() + .collect::>(); + + final_fragments.extend(unmodified_fragments); + } }; // If a fragment was reserved then it may not belong at the end of the fragments list. @@ -1164,6 +1327,18 @@ impl TryFrom for Transaction { field_metadata, } } + Some(pb::transaction::Operation::DataReplacement( + pb::transaction::DataReplacement { + old_fragment_ids, + new_datafiles, + }, + )) => Operation::DataReplacement { + old_fragment_ids, + new_datafiles: new_datafiles + .into_iter() + .map(DataFile::try_from) + .collect::>>()?, + }, None => { return Err(Error::Internal { message: "Transaction message did not contain an operation".to_string(), @@ -1380,6 +1555,13 @@ impl From<&Transaction> for pb::Transaction { }) .unwrap_or(Default::default()), }), + Operation::DataReplacement { + old_fragment_ids, + new_datafiles, + } => pb::transaction::Operation::DataReplacement(pb::transaction::DataReplacement { + old_fragment_ids: old_fragment_ids.clone(), + new_datafiles: new_datafiles.iter().map(pb::DataFile::from).collect(), + }), }; let blob_operation = value.blobs_op.as_ref().map(|op| match op {