Skip to content

Commit

Permalink
fix: propagate storage_options to LanceFragment.create and LanceDatas…
Browse files Browse the repository at this point in the history
…et.commit (#2547)

## How is this tested?

Install the package with local change:

`maturin develop`

Verified the following code did not work before and worked after the
changes.

```
import pyarrow as pa
import lance
from lance.fragment import LanceFragment, FragmentMetadata

uri = 's3://xxxxx'
storage_options = {
    'aws_access_key_id': 'xxx',
    'aws_secret_access_key': 'xxx'
}

data = pa.table({"a": [1, 2], "b": ["a", "b"]})
fragment = LanceFragment.create(uri, data, storage_options=storage_options)
fragment = FragmentMetadata.from_json(fragment._metadata.json())

operation = lance.LanceOperation.Overwrite(tab1.schema, fragments=[fragment])
dataset = lance.LanceDataset.commit(uri, operation, storage_options=storage_options)
```
  • Loading branch information
jiachengdb authored Jul 1, 2024
1 parent f2ca181 commit c132ec9
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
14 changes: 12 additions & 2 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,7 @@ def commit(
operation: LanceOperation.BaseOperation,
read_version: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> LanceDataset:
"""Create a new version of dataset
Expand Down Expand Up @@ -1623,6 +1624,9 @@ def commit(
commit_lock : CommitLock, optional
A custom commit lock. Only needed if your object store does not support
atomic commits. See the user guide for more details.
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
Returns
-------
Expand Down Expand Up @@ -1660,8 +1664,14 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

_Dataset.commit(base_uri, operation._to_inner(), read_version, commit_lock)
return LanceDataset(base_uri)
_Dataset.commit(
base_uri,
operation._to_inner(),
read_version,
commit_lock,
storage_options=storage_options,
)
return LanceDataset(base_uri, storage_options=storage_options)

def validate(self):
"""
Expand Down
5 changes: 5 additions & 0 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def create(
mode: str = "append",
*,
use_legacy_format=True,
storage_options: Optional[Dict[str, str]] = None,
) -> FragmentMetadata:
"""Create a :class:`FragmentMetadata` from the given data.
Expand Down Expand Up @@ -180,6 +181,9 @@ def create(
use_legacy_format: bool, default True
Use the legacy format to write Lance files. The default is True
while the v2 format is still in beta.
storage_options : optional, dict
Extra options that make sense for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
See Also
--------
Expand Down Expand Up @@ -219,6 +223,7 @@ def create(
progress=progress,
mode=mode,
use_legacy_format=use_legacy_format,
storage_options=storage_options,
)
return FragmentMetadata(inner_meta.json())

Expand Down
16 changes: 14 additions & 2 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,13 @@ impl Dataset {
operation: Operation,
read_version: Option<u64>,
commit_lock: Option<&PyAny>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
let object_store_params = storage_options.map(|storage_options| ObjectStoreParams {
storage_options: Some(storage_options),
..Default::default()
});

let commit_handler = commit_lock.map(|commit_lock| {
Arc::new(PyCommitLock::new(commit_lock.to_object(commit_lock.py())))
as Arc<dyn CommitHandler>
Expand All @@ -1008,8 +1014,14 @@ impl Dataset {
};
let manifest = dataset.as_ref().map(|ds| ds.manifest());
validate_operation(manifest, &operation.0)?;
LanceDataset::commit(dataset_uri, operation.0, read_version, None, commit_handler)
.await
LanceDataset::commit(
dataset_uri,
operation.0,
read_version,
object_store_params,
commit_handler,
)
.await
})?
.map_err(|e| PyIOError::new_err(e.to_string()))?;
Ok(Self {
Expand Down
6 changes: 5 additions & 1 deletion rust/lance/src/dataset/fragment/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ impl<'a> FragmentCreateBuilder<'a> {

Self::validate_schema(&schema, stream.schema().as_ref())?;

let (object_store, base_path) = ObjectStore::from_uri(self.dataset_uri).await?;
let (object_store, base_path) = ObjectStore::from_uri_and_params(
self.dataset_uri,
&params.store_params.clone().unwrap_or_default(),
)
.await?;
let filename = format!("{}.lance", Uuid::new_v4());
let mut fragment = Fragment::with_file_legacy(id, &filename, &schema, None);
let full_path = base_path.child(DATA_DIR).child(filename.clone());
Expand Down

0 comments on commit c132ec9

Please sign in to comment.