-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Object_store: put_file and get_file methods #5277
Comments
I think adding some free functions that perform this logic for an arbitrary I would suggest the method take a
FWIW you should be able to make use of https://docs.rs/object_store/latest/object_store/struct.GetResult.html#method.into_stream to get a stream of bytes, and avoid buffering the entire file in memory |
Sounds good. I think we can have four methods. get_file_opts(&self, location: &Path, file: &std::fs::File, options: GetOptions) -> Result<()> // Implement it with get
get_file(&self, location: &Path, file: &std::fs::File) -> Result<()> // (which will call get_file_opts with default option)
put_file_opts(&self, location: &Path, file: &std::fs::File, options: PutOptions) -> Result<()> // Implement it with put and put_multipart depending on the file size
put_file(&self, location: &Path, file: &std::fs::File) -> Result<()> // (which will call put_file_opts with default option) How do you think? |
I would expect something along the lines of the following free functions added to
I would like to avoid adding these methods to the |
Oh, I got what you meant. Sounds good to me. Thank you. |
I can try to implement them! |
take |
Hello, I'm reading the need, and how it could be implemented. pub async fn download(
store: &dyn ObjectStore,
location: &Path,
opts: GetOptions,
file: &mut std::fs::File,
transfer_opts: Option<&DownloadTransferConfig>,
) -> Result<()> The idea of transfer_opts (I can change the name for anything else) it's implement something similar to this: pub struct DownloadTransferConfig {
/// The maximum number of concurrent chunks to download
pub max_concurrent_chunks: usize,
/// The maximum number of bytes to buffer in memory
pub chunk_queue_size: usize,
}
impl Default for DownloadTransferConfig {
fn default() -> Self {
Self {
max_concurrent_chunks: 1,
chunk_queue_size: 2,
}
}
} So in the download function, would be possible to use something similar to this pub async fn download(
store: &dyn ObjectStore,
location: &Path,
opts: GetOptions,
file: &mut std::fs::File,
transfer_opts: Option<&DownloadTransferConfig>,
) -> Result<()> {
let result = store.get_opts(&location, opts).await?;
let transfer_opts = *transfer_opts.unwrap_or(&DownloadTransferConfig::default());
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<Bytes>(transfer_opts.chunk_queue_size);
match result.payload {
GetResultPayload::Stream(stream) => {
let sender_task = tokio::spawn(async move {
let mut buffered_stream = stream
.map(|chunk| async move {
let chunk = chunk.map_err(crate::Error::from)?;
Ok::<Bytes, crate::Error>(chunk)
})
.buffered(transfer_opts.max_concurrent_chunks);
while let Some(chunk) = buffered_stream.next().await {
let chunk = chunk?;
if let Err(e) = sender.send(chunk).await {
eprintln!("Error sending the chunk: {:?}", e);
break;
}
}
drop(sender);
Ok::<(), crate::Error>(())
});
while let Some(chunk) = receiver.recv().await {
file.write_all(&chunk).context(UnableToWriteFileSnafu)?;
}
sender_task.await.context(UnableToJoinTaskSnafu)??;
Ok(())
}
GetResultPayload::File(mut source_file, _path) => {
std::io::copy(&mut source_file, file).context(UnableToWriteFileSnafu)?;
Ok(())
}
}
} I'm still making tests, to find the best way to do it. But I was wondering if adding the possibility to configure the buffered, or the size of the channel would fit with the expectatives of thiss issue. Regards |
Making the behaviour configurable and automatically handle concurrency sounds valuable to me. You may be able to take inspiration from the S3 TransferManager. That being said I'm not sure your implementation will actually make concurrent requests, rather distribute chunks from the same streaming request, this is unlikely to be any faster. You'd need to make the initial request and then inspect the returned object size in order to determine if spawning parallel range requests is warranted |
Thanks for the feedback, I will check the S3 TransferManager, and look in the direction you said related with the concurrency. Thank you. |
I've created another issue to implement the method upload for easy review. #6832 Tomorrow I will create a draft to discuss about the download method, Regards |
I've created a pull request draft: #6837 I have to add tests anyway, but if someone is interested about checking it and give feedback, it will be welcome. |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As a user, I would like to utilize the object_store to put and retrieve files. Currently, I can employ the put and get methods for this purpose. However, I am responsible for handling any issues that arise. For instance, when uploading a file to the store, I must verify its size to determine whether multipart upload is necessary. Similarly, when retrieving a large file, I cannot download it in its entirety before writing it to the local file system, as this would cause excessive memory usage. Instead, I may need to use get_range and write specific portions of the file locally. Moreover, downloading different ranges could potentially be executed in parallel.
Describe the solution you'd like
Expose get_file(local_path, remote_path) and put_file(local_path, remote_path) to be more user-friendly.
Describe alternatives you've considered
Stay on what we have now, which means users should implement the functionalities by themself.
The text was updated successfully, but these errors were encountered: