Skip to content

Commit

Permalink
feat: add replace_schema_metadata and replace_field_metadata (#3263)
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace authored Dec 17, 2024
1 parent 8a16e2e commit b1ab748
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 45 deletions.
6 changes: 6 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ message Transaction {
message UpdateConfig {
map<string, string> upsert_values = 1;
repeated string delete_keys = 2;
map<string, string> schema_metadata = 3;
map<uint32, FieldMetadataUpdate> field_metadata = 4;

message FieldMetadataUpdate {
map<string, string> metadata = 5;
}
}

// The operation of this transaction.
Expand Down
32 changes: 31 additions & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,38 @@ def partition_expression(self):
def replace_schema(self, schema: Schema):
"""
Not implemented (just override pyarrow dataset to prevent segfault)
See :py:method:`replace_schema_metadata` or :py:method:`replace_field_metadata`
"""
raise NotImplementedError(
"Cannot replace the schema of a dataset. This method exists for backwards"
" compatibility with pyarrow. Use replace_schema_metadata or "
"replace_field_metadata to change the metadata"
)

def replace_schema_metadata(self, new_metadata: Dict[str, str]):
"""
Replace the schema metadata of the dataset
Parameters
----------
new_metadata: dict
The new metadata to set
"""
self._ds.replace_schema_metadata(new_metadata)

def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
raise NotImplementedError("not changing schemas yet")
self._ds.replace_field_metadata(field_name, new_metadata)

def get_fragments(self, filter: Optional[Expression] = None) -> List[LanceFragment]:
"""Get all fragments from the dataset.
Expand Down
33 changes: 33 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,39 @@ def test_dataset_from_record_batch_iterable(tmp_path: Path):
assert list(dataset.to_batches())[0].to_pylist() == test_pylist


def test_schema_metadata(tmp_path: Path):
schema = pa.schema(
[
pa.field("a", pa.int64(), metadata={b"thisis": "a"}),
pa.field("b", pa.int64(), metadata={b"thisis": "b"}),
],
metadata={b"foo": b"bar", b"baz": b"qux"},
)
table = pa.Table.from_pydict({"a": range(100), "b": range(100)}, schema=schema)
ds = lance.write_dataset(table, tmp_path)
# Original schema
assert ds.schema.metadata == {b"foo": b"bar", b"baz": b"qux"}
assert ds.schema.field("a").metadata == {b"thisis": b"a"}
assert ds.schema.field("b").metadata == {b"thisis": b"b"}

# Replace schema metadata
ds.replace_schema_metadata({"foo": "baz"})
assert ds.schema.metadata == {b"foo": b"baz"}
assert ds.schema.field("a").metadata == {b"thisis": b"a"}
assert ds.schema.field("b").metadata == {b"thisis": b"b"}

# Replace field metadata
ds.replace_field_metadata("a", {"thisis": "c"})
assert ds.schema.field("a").metadata == {b"thisis": b"c"}
assert ds.schema.field("b").metadata == {b"thisis": b"b"}

# Overwrite overwrites metadata
ds = lance.write_dataset(table, tmp_path, mode="overwrite")
assert ds.schema.metadata == {b"foo": b"bar", b"baz": b"qux"}
assert ds.schema.field("a").metadata == {b"thisis": b"a"}
assert ds.schema.field("b").metadata == {b"thisis": b"b"}


def test_versions(tmp_path: Path):
table1 = pa.Table.from_pylist([{"a": 1, "b": 2}, {"a": 10, "b": 20}])
base_dir = tmp_path / "test"
Expand Down
26 changes: 26 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,32 @@ impl Dataset {
LanceSchema(self_.ds.schema().clone())
}

fn replace_schema_metadata(&mut self, metadata: HashMap<String, String>) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
RT.block_on(None, new_self.replace_schema_metadata(metadata))?
.map_err(|err| PyIOError::new_err(err.to_string()))?;
self.ds = Arc::new(new_self);
Ok(())
}

fn replace_field_metadata(
&mut self,
field_name: &str,
metadata: HashMap<String, String>,
) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
let field = new_self
.schema()
.field(field_name)
.ok_or_else(|| PyKeyError::new_err(format!("Field \"{}\" not found", field_name)))?;
let new_field_meta: HashMap<u32, HashMap<String, String>> =
HashMap::from_iter(vec![(field.id as u32, metadata)]);
RT.block_on(None, new_self.replace_field_metadata(new_field_meta))?
.map_err(|err| PyIOError::new_err(err.to_string()))?;
self.ds = Arc::new(new_self);
Ok(())
}

#[getter(data_storage_version)]
fn data_storage_version(&self) -> PyResult<String> {
Ok(self.ds.manifest().data_storage_format.version.clone())
Expand Down
13 changes: 13 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,19 @@ impl Field {
self.children.iter_mut().for_each(Self::reset_id);
}

pub fn field_by_id_mut(&mut self, id: impl Into<i32>) -> Option<&mut Self> {
let id = id.into();
for child in self.children.as_mut_slice() {
if child.id == id {
return Some(child);
}
if let Some(grandchild) = child.field_by_id_mut(id) {
return Some(grandchild);
}
}
None
}

pub fn field_by_id(&self, id: impl Into<i32>) -> Option<&Self> {
let id = id.into();
for child in self.children.as_slice() {
Expand Down
14 changes: 13 additions & 1 deletion rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,19 @@ impl Schema {
}

/// Get field by its id.
// TODO: pub(crate)
pub fn field_by_id_mut(&mut self, id: impl Into<i32>) -> Option<&mut Field> {
let id = id.into();
for field in self.fields.iter_mut() {
if field.id == id {
return Some(field);
}
if let Some(grandchild) = field.field_by_id_mut(id) {
return Some(grandchild);
}
}
None
}

pub fn field_by_id(&self, id: impl Into<i32>) -> Option<&Field> {
let id = id.into();
for field in self.fields.iter() {
Expand Down
14 changes: 14 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ impl Manifest {
.retain(|key, _| !delete_keys.contains(&key.as_str()));
}

/// Replaces the schema metadata with the given key-value pairs.
pub fn update_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
self.schema.metadata = new_metadata;
}

/// Replaces the metadata of the field with the given id with the given key-value pairs.
///
/// If the field does not exist in the schema, this is a no-op.
pub fn update_field_metadata(&mut self, field_id: i32, new_metadata: HashMap<String, String>) {
if let Some(field) = self.schema.field_by_id_mut(field_id) {
field.metadata = new_metadata;
}
}

/// Check the current fragment list and update the high water mark
pub fn update_max_fragment_id(&mut self) {
let max_fragment_id = self
Expand Down
88 changes: 51 additions & 37 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,20 +1495,9 @@ impl Dataset {
self.merge_impl(stream, left_on, right_on).await
}

/// Update key-value pairs in config.
pub async fn update_config(
&mut self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
let transaction = Transaction::new(
self.manifest.version,
Operation::UpdateConfig {
upsert_values: Some(HashMap::from_iter(upsert_values)),
delete_keys: None,
},
/*blobs_op=*/ None,
None,
);
async fn update_op(&mut self, op: Operation) -> Result<()> {
let transaction =
Transaction::new(self.manifest.version, op, /*blobs_op=*/ None, None);

let (manifest, manifest_path) = commit_transaction(
self,
Expand All @@ -1527,33 +1516,58 @@ impl Dataset {
Ok(())
}

/// Update key-value pairs in config.
pub async fn update_config(
&mut self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
self.update_op(Operation::UpdateConfig {
upsert_values: Some(HashMap::from_iter(upsert_values)),
delete_keys: None,
schema_metadata: None,
field_metadata: None,
})
.await
}

/// Delete keys from the config.
pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
let transaction = Transaction::new(
self.manifest.version,
Operation::UpdateConfig {
upsert_values: None,
delete_keys: Some(Vec::from_iter(delete_keys.iter().map(ToString::to_string))),
},
/*blob_op=*/ None,
None,
);

let (manifest, manifest_path) = commit_transaction(
self,
&self.object_store,
self.commit_handler.as_ref(),
&transaction,
&Default::default(),
&Default::default(),
self.manifest_naming_scheme,
)
.await?;
self.update_op(Operation::UpdateConfig {
upsert_values: None,
delete_keys: Some(Vec::from_iter(delete_keys.iter().map(ToString::to_string))),
schema_metadata: None,
field_metadata: None,
})
.await
}

self.manifest = Arc::new(manifest);
self.manifest_file = manifest_path;
/// Update schema metadata
pub async fn replace_schema_metadata(
&mut self,
upsert_values: impl IntoIterator<Item = (String, String)>,
) -> Result<()> {
self.update_op(Operation::UpdateConfig {
upsert_values: None,
delete_keys: None,
schema_metadata: Some(HashMap::from_iter(upsert_values)),
field_metadata: None,
})
.await
}

Ok(())
/// Update field metadata
pub async fn replace_field_metadata(
&mut self,
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
) -> Result<()> {
let new_values = new_values.into_iter().collect::<HashMap<_, _>>();
self.update_op(Operation::UpdateConfig {
upsert_values: None,
delete_keys: None,
schema_metadata: None,
field_metadata: Some(new_values),
})
.await
}
}

Expand Down
Loading

0 comments on commit b1ab748

Please sign in to comment.