Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split put and put_multipart #49

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api/put.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

::: obstore.put
::: obstore.put_async
::: obstore.put_multipart
::: obstore.put_multipart_async
::: obstore.PutResult
2 changes: 2 additions & 0 deletions obstore/python/obstore/_obstore.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ from ._list import list_with_delimiter_async as list_with_delimiter_async
from ._put import PutResult as PutResult
from ._put import put as put
from ._put import put_async as put_async
from ._put import put_multipart as put_multipart
from ._put import put_multipart_async as put_multipart_async
from ._rename import rename as rename
from ._rename import rename_async as rename_async
from ._sign import HTTP_METHOD as HTTP_METHOD
Expand Down
36 changes: 31 additions & 5 deletions obstore/python/obstore/_put.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,35 @@ def put(
store: ObjectStore,
path: str,
file: IO[bytes] | Path | bytes,
) -> PutResult:
"""Save the provided bytes to the specified location

The operation is guaranteed to be atomic, it will either successfully write the
entirety of `file` to `location`, or fail. No clients should be able to observe a
partially written object.

Args:
store: The ObjectStore instance to use.
path: The path within ObjectStore for where to save the file.
file: The object to upload. Can either be file-like, a `Path` to a local file,
or a `bytes` object.
"""

async def put_async(
store: ObjectStore,
path: str,
file: IO[bytes] | Path | bytes,
) -> PutResult:
"""Call `put` asynchronously.

Refer to the documentation for [put][obstore.put].
"""

def put_multipart(
store: ObjectStore,
path: str,
file: IO[bytes] | Path | bytes,
*,
use_multipart: bool | None = None,
chunk_size: int = 5 * 1024 * 1024,
max_concurrency: int = 12,
) -> PutResult:
Expand All @@ -40,12 +67,11 @@ def put(
or a `bytes` object.

Keyword args:
use_multipart: Whether to use a multipart upload under the hood. Defaults using a multipart upload if the length of the file is greater than `chunk_size`.
chunk_size: The size of chunks to use within each part of the multipart upload. Defaults to 5 MB.
max_concurrency: The maximum number of chunks to upload concurrently. Defaults to 12.
"""

async def put_async(
async def put_multipart_async(
store: ObjectStore,
path: str,
file: IO[bytes] | Path | bytes,
Expand All @@ -54,7 +80,7 @@ async def put_async(
chunk_size: int = 5 * 1024 * 1024,
max_concurrency: int = 12,
) -> PutResult:
"""Call `put` asynchronously.
"""Call `put_multipart` asynchronously.

Refer to the documentation for [put][obstore.put].
Refer to the documentation for [put_multipart][obstore.put_multipart].
"""
2 changes: 2 additions & 0 deletions obstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ fn _obstore(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(list::list_with_delimiter))?;
m.add_wrapped(wrap_pyfunction!(list::list))?;
m.add_wrapped(wrap_pyfunction!(put::put_async))?;
m.add_wrapped(wrap_pyfunction!(put::put_multipart_async))?;
m.add_wrapped(wrap_pyfunction!(put::put_multipart))?;
m.add_wrapped(wrap_pyfunction!(put::put))?;
m.add_wrapped(wrap_pyfunction!(rename::rename_async))?;
m.add_wrapped(wrap_pyfunction!(rename::rename))?;
Expand Down
96 changes: 47 additions & 49 deletions obstore/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ impl MultipartPutInput {
self.seek(SeekFrom::Start(origin_pos))?;
Ok(size.try_into().unwrap())
}

/// Whether to use multipart uploads.
fn use_multipart(&mut self, chunk_size: usize) -> PyObjectStoreResult<bool> {
Ok(self.nbytes()? > chunk_size)
}
}

impl<'py> FromPyObject<'py> for MultipartPutInput {
Expand Down Expand Up @@ -88,65 +83,25 @@ impl IntoPy<PyObject> for PyPutResult {
}

#[pyfunction]
#[pyo3(signature = (store, path, file, *, use_multipart = None, chunk_size = 5242880, max_concurrency = 12))]
pub(crate) fn put(
py: Python,
store: PyObjectStore,
path: String,
mut file: MultipartPutInput,
use_multipart: Option<bool>,
chunk_size: usize,
max_concurrency: usize,
file: MultipartPutInput,
) -> PyObjectStoreResult<PyPutResult> {
let use_multipart = if let Some(use_multipart) = use_multipart {
use_multipart
} else {
file.use_multipart(chunk_size)?
};
let runtime = get_runtime(py)?;
if use_multipart {
runtime.block_on(put_multipart_inner(
store.into_inner(),
&path.into(),
file,
chunk_size,
max_concurrency,
))
} else {
runtime.block_on(put_inner(store.into_inner(), &path.into(), file))
}
runtime.block_on(put_inner(store.into_inner(), &path.into(), file))
}

#[pyfunction]
#[pyo3(signature = (store, path, file, *, use_multipart = None, chunk_size = 5242880, max_concurrency = 12))]
pub(crate) fn put_async(
py: Python,
store: PyObjectStore,
path: String,
mut file: MultipartPutInput,
use_multipart: Option<bool>,
chunk_size: usize,
max_concurrency: usize,
file: MultipartPutInput,
) -> PyResult<Bound<PyAny>> {
let use_multipart = if let Some(use_multipart) = use_multipart {
use_multipart
} else {
file.use_multipart(chunk_size)?
};
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let result = if use_multipart {
put_multipart_inner(
store.into_inner(),
&path.into(),
file,
chunk_size,
max_concurrency,
)
.await?
} else {
put_inner(store.into_inner(), &path.into(), file).await?
};
Ok(result)
Ok(put_inner(store.into_inner(), &path.into(), file).await?)
})
}

Expand All @@ -162,6 +117,49 @@ async fn put_inner(
Ok(PyPutResult(store.put(path, payload).await?))
}

#[pyfunction]
#[pyo3(signature = (store, path, file, *, chunk_size = 5242880, max_concurrency = 12))]
pub(crate) fn put_multipart(
py: Python,
store: PyObjectStore,
path: String,
file: MultipartPutInput,
chunk_size: usize,
max_concurrency: usize,
) -> PyObjectStoreResult<PyPutResult> {
let runtime = get_runtime(py)?;
runtime.block_on(put_multipart_inner(
store.into_inner(),
&path.into(),
file,
chunk_size,
max_concurrency,
))
}

#[pyfunction]
#[pyo3(signature = (store, path, file, *, chunk_size = 5242880, max_concurrency = 12))]
pub(crate) fn put_multipart_async(
py: Python,
store: PyObjectStore,
path: String,
file: MultipartPutInput,
chunk_size: usize,
max_concurrency: usize,
) -> PyResult<Bound<PyAny>> {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let result = put_multipart_inner(
store.into_inner(),
&path.into(),
file,
chunk_size,
max_concurrency,
)
.await?;
Ok(result)
})
}

async fn put_multipart_inner<R: Read>(
store: Arc<dyn ObjectStore>,
path: &Path,
Expand Down
Loading