From aa9d47963c2a15670c3797c1d477a7bb2943c95e Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 22 Jan 2024 16:16:02 -0300 Subject: [PATCH 1/4] List and ListStream implementation --- src/lib.rs | 303 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 3f2132d..dc608ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +use futures_util::stream::BoxStream; use futures_util::StreamExt; use object_store::RetryConfig; use once_cell::sync::OnceCell; @@ -633,6 +634,308 @@ pub extern "C" fn delete( } } +// Any non-Copy fields of ListEntry must be properly destroyed on destroy_list_entries +#[repr(C)] +pub struct ListEntry { + location: *const c_char, + last_modified: u64, + size: u64, + e_tag: *const c_char, + version: *const c_char +} + +#[repr(C)] +pub struct ListResponse { + result: CResult, + entries: *const ListEntry, + entry_count: u64, + error_message: *mut c_char +} + +unsafe impl Send for ListResponse {} + +impl ListResponse { + fn from_error(&mut self, error: impl std::fmt::Display) { + self.result = CResult::Error; + self.entries = std::ptr::null(); + self.entry_count = 0; + let c_string = CString::new(format!("{}", error)).expect("should not have nulls"); + self.error_message = c_string.into_raw(); + } +} + +#[no_mangle] +pub extern "C" fn destroy_list_entries( + entries: *mut ListEntry, + entry_count: u64 +) -> CResult { + let boxed_slice = unsafe { Box::from_raw(std::slice::from_raw_parts_mut(entries, entry_count as usize)) }; + for entry in &*boxed_slice { + // Safety: must properly drop all allocated fields from ListEntry here + let _ = unsafe { CString::from_raw(entry.location.cast_mut()) }; + if !entry.e_tag.is_null() { + let _ = unsafe { CString::from_raw(entry.e_tag.cast_mut()) }; + } + if !entry.version.is_null() { + let _ = unsafe { CString::from_raw(entry.version.cast_mut()) }; + } + } + CResult::Ok +} + +impl From for ListEntry { + fn from(meta: object_store::ObjectMeta) -> Self { + ListEntry { + location: CString::new(meta.location.to_string()) + .expect("should not have nulls") + .into_raw(), + last_modified: meta.last_modified + .timestamp() + .try_into() + .expect("is positive"), + size: meta.size as u64, + e_tag: match meta.e_tag { + None => std::ptr::null(), + Some(s) => { + CString::new(s) + .expect("should not have nulls") + .into_raw() + } + }, + version: match meta.version { + None => std::ptr::null(), + Some(s) => { + CString::new(s) + .expect("should not have nulls") + .into_raw() + } + } + } + } +} + +#[no_mangle] +pub extern "C" fn list( + prefix: *const c_char, + config: *const Config, + response: *mut ListResponse, + handle: *const c_void +) -> CResult { + let response = unsafe { &mut (*response) }; + response.result = CResult::Uninitialized; + let prefix = unsafe { std::ffi::CStr::from_ptr(prefix) }; + let prefix: Path = prefix.to_str().expect("invalid utf8").try_into().unwrap(); + let config = unsafe { & (*config) }; + let notifier = Notifier { handle }; + + match RT.get() { + Some(runtime) => { + runtime.spawn(async move { + let list_op = async move { + let client = clients() + .try_get_with(config.get_hash(), dyn_connect(config)) + .await + .map_err(|e| anyhow!(e))?; + + let stream = client.list(Some(&prefix)); + + let entries: Vec<_> = stream.collect().await; + let entries = entries.into_iter().collect::, _>>()?; + let entries_slice = entries.into_iter() + .map(Into::into) + .collect::>() + .into_boxed_slice(); + + let entry_count = entries_slice.len() as u64; + let entries_ptr = entries_slice.as_ptr(); + std::mem::forget(entries_slice); + + Ok::<_, anyhow::Error>((entry_count, entries_ptr)) + }; + + match list_op.await { + Ok((entry_count, entries_ptr)) => { + response.result = CResult::Ok; + response.entry_count = entry_count; + response.entries = entries_ptr; + response.error_message = std::ptr::null_mut(); + notifier.notify(); + }, + Err(e) => { + tracing::warn!("{}", e); + response.from_error(e); + notifier.notify(); + } + } + }); + CResult::Ok + } + None => { + return CResult::Error; + } + } +} + +pub struct StreamWrapper { + client: Arc, + stream: Option>>> +} + +#[no_mangle] +pub extern "C" fn destroy_list_stream( + stream: *mut StreamWrapper +) -> CResult { + let mut boxed = unsafe { Box::from_raw(stream) }; + // Safety: Must drop the stream before the client here + drop(boxed.stream.take()); + drop(boxed); + CResult::Ok +} + +#[repr(C)] +pub struct ListStreamResponse { + result: CResult, + stream: *mut StreamWrapper, + error_message: *mut c_char +} + +unsafe impl Send for ListStreamResponse {} + +impl ListStreamResponse { + fn from_error(&mut self, error: impl std::fmt::Display) { + self.result = CResult::Error; + self.stream = std::ptr::null_mut(); + let c_string = CString::new(format!("{}", error)).expect("should not have nulls"); + self.error_message = c_string.into_raw(); + } +} + +#[no_mangle] +pub extern "C" fn list_stream( + prefix: *const c_char, + config: *const Config, + response: *mut ListStreamResponse, + handle: *const c_void +) -> CResult { + let response = unsafe { &mut (*response) }; + response.result = CResult::Uninitialized; + let prefix = unsafe { std::ffi::CStr::from_ptr(prefix) }; + let prefix: Path = prefix.to_str().expect("invalid utf8").try_into().unwrap(); + let config = unsafe { & (*config) }; + let notifier = Notifier { handle }; + + match RT.get() { + Some(runtime) => { + runtime.spawn(async move { + let list_op = async move { + let client = clients() + .try_get_with(config.get_hash(), dyn_connect(config)) + .await + .map_err(|e| anyhow!(e))?; + + let mut wrapper = Box::new(StreamWrapper { + client, + stream: None + }); + + let stream = wrapper.client.list(Some(&prefix)).chunks(1000).boxed(); + + // Safety: This is needed because the compiler cannot infer that the stream + // will outlive the client. We ensure this happens + // by droping the stream before droping the Arc on destroy_list_stream + wrapper.stream = Some(unsafe { std::mem::transmute(stream) }); + + Ok::<_, anyhow::Error>(wrapper) + }; + + match list_op.await { + Ok(wrapper) => { + response.result = CResult::Ok; + response.stream = Box::into_raw(wrapper); + response.error_message = std::ptr::null_mut(); + notifier.notify(); + }, + Err(e) => { + tracing::warn!("{}", e); + response.from_error(e); + notifier.notify(); + } + } + }); + CResult::Ok + } + None => { + return CResult::Error; + } + } +} + +#[no_mangle] +pub extern "C" fn next_list_stream_chunk( + stream: *mut StreamWrapper, + response: *mut ListResponse, + handle: *const c_void +) -> CResult { + let response = unsafe { &mut (*response) }; + response.result = CResult::Uninitialized; + let notifier = Notifier { handle }; + let wrapper = match unsafe { stream.as_mut() } { + Some(w) => w, + None => { + tracing::error!("null stream pointer"); + return CResult::Error; + } + }; + + match RT.get() { + Some(runtime) => { + runtime.spawn(async move { + let list_op = async move { + let stream_ref = wrapper.stream.as_mut().unwrap(); + let result = match stream_ref.next().await { + Some(vec) => { + let entries = vec.into_iter().collect::, _>>()?; + let entries_slice = entries.into_iter() + .map(Into::into) + .collect::>() + .into_boxed_slice(); + + let entry_count = entries_slice.len() as u64; + let entries_ptr = entries_slice.as_ptr(); + std::mem::forget(entries_slice); + + (entry_count, entries_ptr) + } + None => { + (0, std::ptr::null()) + } + }; + Ok::<_, anyhow::Error>(result) + }; + + match list_op.await { + Ok((entry_count, entries_ptr)) => { + response.result = CResult::Ok; + response.entries = entries_ptr; + response.entry_count = entry_count; + response.error_message = std::ptr::null_mut(); + notifier.notify(); + }, + Err(e) => { + tracing::warn!("{}", e); + response.from_error(e); + notifier.notify(); + } + } + }); + CResult::Ok + } + None => { + return CResult::Error; + } + } +} + #[no_mangle] pub extern "C" fn destroy_cstring(string: *mut c_char) -> CResult { let string = unsafe { std::ffi::CString::from_raw(string) }; From 7181414b850c656688e156a1830691c0d4341f3a Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Wed, 31 Jan 2024 17:26:05 -0300 Subject: [PATCH 2/4] Adapt to use response guards and new retry logic --- src/lib.rs | 281 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 173 insertions(+), 108 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index dc608ba..1744acb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ use futures_util::stream::BoxStream; use futures_util::StreamExt; -use object_store::RetryConfig; +use object_store::{RetryConfig, ObjectMeta}; use once_cell::sync::OnceCell; use tokio::io::AsyncWriteExt; use tokio::runtime::Runtime; @@ -654,16 +654,61 @@ pub struct ListResponse { unsafe impl Send for ListResponse {} -impl ListResponse { - fn from_error(&mut self, error: impl std::fmt::Display) { - self.result = CResult::Error; - self.entries = std::ptr::null(); - self.entry_count = 0; +// RAII Guard for a ListResponse that ensures the awaiting Julia task will be notified +// even if this is dropped on a panic. +pub struct ListResponseGuard { + response: &'static mut ListResponse, + handle: *const c_void +} + +impl NotifyGuard for ListResponseGuard { + fn is_uninitialized(&self) -> bool { + self.response.result == CResult::Uninitialized + } + fn condition_handle(&self) -> *const c_void { + self.handle + } + fn set_error(&mut self, error: impl std::fmt::Display) { + self.response.result = CResult::Error; + self.response.entries = std::ptr::null(); + self.response.entry_count = 0; let c_string = CString::new(format!("{}", error)).expect("should not have nulls"); - self.error_message = c_string.into_raw(); + self.response.error_message = c_string.into_raw(); + } +} + +impl ListResponseGuard { + unsafe fn new(response_ptr: *mut ListResponse, handle: *const c_void) -> ListResponseGuard { + let response = unsafe { &mut (*response_ptr) }; + response.result = CResult::Uninitialized; + + ListResponseGuard { response, handle } + } + fn success(self, entries: Vec) { // TODO entries + let entries_slice = entries.into_iter() + .map(Into::into) + .collect::>() + .into_boxed_slice(); + + let entry_count = entries_slice.len() as u64; + let entries_ptr = entries_slice.as_ptr(); + std::mem::forget(entries_slice); + + self.response.result = CResult::Ok; + self.response.entry_count = entry_count; + self.response.entries = entries_ptr; + self.response.error_message = std::ptr::null_mut(); } } +impl Drop for ListResponseGuard { + fn drop(&mut self) { + self.notify_on_drop() + } +} + +unsafe impl Send for ListResponseGuard {} + #[no_mangle] pub extern "C" fn destroy_list_entries( entries: *mut ListEntry, @@ -721,50 +766,46 @@ pub extern "C" fn list( response: *mut ListResponse, handle: *const c_void ) -> CResult { - let response = unsafe { &mut (*response) }; - response.result = CResult::Uninitialized; + let response = unsafe { ListResponseGuard::new(response, handle) }; let prefix = unsafe { std::ffi::CStr::from_ptr(prefix) }; let prefix: Path = prefix.to_str().expect("invalid utf8").try_into().unwrap(); let config = unsafe { & (*config) }; - let notifier = Notifier { handle }; match RT.get() { Some(runtime) => { runtime.spawn(async move { - let list_op = async move { - let client = clients() - .try_get_with(config.get_hash(), dyn_connect(config)) - .await - .map_err(|e| anyhow!(e))?; - - let stream = client.list(Some(&prefix)); - - let entries: Vec<_> = stream.collect().await; - let entries = entries.into_iter().collect::, _>>()?; - let entries_slice = entries.into_iter() - .map(Into::into) - .collect::>() - .into_boxed_slice(); - - let entry_count = entries_slice.len() as u64; - let entries_ptr = entries_slice.as_ptr(); - std::mem::forget(entries_slice); - - Ok::<_, anyhow::Error>((entry_count, entries_ptr)) - }; + let start_instant = Instant::now(); + let mut retries = 0; + 'retry: loop { + let list_op = async { + let (client, _) = clients() + .try_get_with(config.get_hash(), dyn_connect(config)) + .await + .map_err(|e| anyhow!(e))?; + + let stream = client.list(Some(&prefix)); + + let entries: Vec<_> = stream.collect().await; + let entries = entries.into_iter().collect::, _>>()?; + Ok::<_, anyhow::Error>(entries) + }; - match list_op.await { - Ok((entry_count, entries_ptr)) => { - response.result = CResult::Ok; - response.entry_count = entry_count; - response.entries = entries_ptr; - response.error_message = std::ptr::null_mut(); - notifier.notify(); - }, - Err(e) => { - tracing::warn!("{}", e); - response.from_error(e); - notifier.notify(); + match list_op.await { + Ok(entries) => { + response.success(entries); + return; + }, + Err(e) => { + if let Some(duration) = should_retry(retries, &e, start_instant.elapsed(), config).await { + retries += 1; + tracing::info!("retrying error (reason: {:?}) after {:?}: {}", extract_error_info(&e).reason, duration, e); + tokio::time::sleep(duration).await; + continue 'retry; + } + tracing::warn!("{}", e); + response.into_error(e); + return; + } } } }); @@ -801,15 +842,50 @@ pub struct ListStreamResponse { unsafe impl Send for ListStreamResponse {} -impl ListStreamResponse { - fn from_error(&mut self, error: impl std::fmt::Display) { - self.result = CResult::Error; - self.stream = std::ptr::null_mut(); +// RAII Guard for a ListResponse that ensures the awaiting Julia task will be notified +// even if this is dropped on a panic. +pub struct ListStreamResponseGuard { + response: &'static mut ListStreamResponse, + handle: *const c_void +} + +impl NotifyGuard for ListStreamResponseGuard { + fn is_uninitialized(&self) -> bool { + self.response.result == CResult::Uninitialized + } + fn condition_handle(&self) -> *const c_void { + self.handle + } + fn set_error(&mut self, error: impl std::fmt::Display) { + self.response.result = CResult::Error; + self.response.stream = std::ptr::null_mut(); let c_string = CString::new(format!("{}", error)).expect("should not have nulls"); - self.error_message = c_string.into_raw(); + self.response.error_message = c_string.into_raw(); + } +} + +impl ListStreamResponseGuard { + unsafe fn new(response_ptr: *mut ListStreamResponse, handle: *const c_void) -> ListStreamResponseGuard { + let response = unsafe { &mut (*response_ptr) }; + response.result = CResult::Uninitialized; + + ListStreamResponseGuard { response, handle } + } + fn success(self, stream: Box) { + self.response.result = CResult::Ok; + self.response.stream = Box::into_raw(stream); + self.response.error_message = std::ptr::null_mut(); + } +} + +impl Drop for ListStreamResponseGuard { + fn drop(&mut self) { + self.notify_on_drop() } } +unsafe impl Send for ListStreamResponseGuard {} + #[no_mangle] pub extern "C" fn list_stream( prefix: *const c_char, @@ -817,48 +893,54 @@ pub extern "C" fn list_stream( response: *mut ListStreamResponse, handle: *const c_void ) -> CResult { - let response = unsafe { &mut (*response) }; - response.result = CResult::Uninitialized; + let response = unsafe { ListStreamResponseGuard::new(response, handle) }; let prefix = unsafe { std::ffi::CStr::from_ptr(prefix) }; let prefix: Path = prefix.to_str().expect("invalid utf8").try_into().unwrap(); let config = unsafe { & (*config) }; - let notifier = Notifier { handle }; match RT.get() { Some(runtime) => { runtime.spawn(async move { - let list_op = async move { - let client = clients() - .try_get_with(config.get_hash(), dyn_connect(config)) - .await - .map_err(|e| anyhow!(e))?; - - let mut wrapper = Box::new(StreamWrapper { - client, - stream: None - }); - - let stream = wrapper.client.list(Some(&prefix)).chunks(1000).boxed(); - - // Safety: This is needed because the compiler cannot infer that the stream - // will outlive the client. We ensure this happens - // by droping the stream before droping the Arc on destroy_list_stream - wrapper.stream = Some(unsafe { std::mem::transmute(stream) }); - - Ok::<_, anyhow::Error>(wrapper) - }; + let start_instant = Instant::now(); + let mut retries = 0; + 'retry: loop { + let list_op = async { + let (client, _) = clients() + .try_get_with(config.get_hash(), dyn_connect(config)) + .await + .map_err(|e| anyhow!(e))?; + + let mut wrapper = Box::new(StreamWrapper { + client, + stream: None + }); + + let stream = wrapper.client.list(Some(&prefix)).chunks(1000).boxed(); + + // Safety: This is needed because the compiler cannot infer that the stream + // will outlive the client. We ensure this happens + // by droping the stream before droping the Arc on destroy_list_stream + wrapper.stream = Some(unsafe { std::mem::transmute(stream) }); + + Ok::<_, anyhow::Error>(wrapper) + }; - match list_op.await { - Ok(wrapper) => { - response.result = CResult::Ok; - response.stream = Box::into_raw(wrapper); - response.error_message = std::ptr::null_mut(); - notifier.notify(); - }, - Err(e) => { - tracing::warn!("{}", e); - response.from_error(e); - notifier.notify(); + match list_op.await { + Ok(wrapper) => { + response.success(wrapper); + return; + }, + Err(e) => { + if let Some(duration) = should_retry(retries, &e, start_instant.elapsed(), config).await { + retries += 1; + tracing::info!("retrying error (reason: {:?}) after {:?}: {}", extract_error_info(&e).reason, duration, e); + tokio::time::sleep(duration).await; + continue 'retry; + } + tracing::warn!("{}", e); + response.into_error(e); + return; + } } } }); @@ -876,9 +958,7 @@ pub extern "C" fn next_list_stream_chunk( response: *mut ListResponse, handle: *const c_void ) -> CResult { - let response = unsafe { &mut (*response) }; - response.result = CResult::Uninitialized; - let notifier = Notifier { handle }; + let response = unsafe { ListResponseGuard::new(response, handle) }; let wrapper = match unsafe { stream.as_mut() } { Some(w) => w, None => { @@ -890,41 +970,26 @@ pub extern "C" fn next_list_stream_chunk( match RT.get() { Some(runtime) => { runtime.spawn(async move { - let list_op = async move { + let list_op = async { let stream_ref = wrapper.stream.as_mut().unwrap(); - let result = match stream_ref.next().await { + let option = match stream_ref.next().await { Some(vec) => { - let entries = vec.into_iter().collect::, _>>()?; - let entries_slice = entries.into_iter() - .map(Into::into) - .collect::>() - .into_boxed_slice(); - - let entry_count = entries_slice.len() as u64; - let entries_ptr = entries_slice.as_ptr(); - std::mem::forget(entries_slice); - - (entry_count, entries_ptr) + vec.into_iter().collect::, _>>()? } None => { - (0, std::ptr::null()) + vec![] } }; - Ok::<_, anyhow::Error>(result) + Ok::<_, anyhow::Error>(option) }; match list_op.await { - Ok((entry_count, entries_ptr)) => { - response.result = CResult::Ok; - response.entries = entries_ptr; - response.entry_count = entry_count; - response.error_message = std::ptr::null_mut(); - notifier.notify(); + Ok(entries) => { + response.success(entries); }, Err(e) => { tracing::warn!("{}", e); - response.from_error(e); - notifier.notify(); + response.into_error(e); } } }); From 815cbd7e46381e66c08a364c34342932ec164939 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Mon, 5 Feb 2024 13:04:25 -0300 Subject: [PATCH 3/4] Remove TODO --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 1744acb..b4a7716 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -684,7 +684,7 @@ impl ListResponseGuard { ListResponseGuard { response, handle } } - fn success(self, entries: Vec) { // TODO entries + fn success(self, entries: Vec) { let entries_slice = entries.into_iter() .map(Into::into) .collect::>() From f143013cb8f1a0e5523d3c359f1458efebae8a67 Mon Sep 17 00:00:00 2001 From: Andre Guedes Date: Wed, 7 Feb 2024 16:55:02 -0300 Subject: [PATCH 4/4] Makes list ops go through submission queue --- src/lib.rs | 186 ++++++++++++++++++++++++++++------------------------- 1 file changed, 99 insertions(+), 87 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b4a7716..268eb02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,7 +66,9 @@ pub enum CResult { enum Request { Get(Path, &'static mut [u8], &'static Config, ResponseGuard), Put(Path, &'static [u8], &'static Config, ResponseGuard), - Delete(Path, &'static Config, ResponseGuard) + Delete(Path, &'static Config, ResponseGuard), + List(Path, &'static Config, ListResponseGuard), + ListStream(Path, &'static Config, ListStreamResponseGuard) } unsafe impl Send for Request {} @@ -412,6 +414,40 @@ async fn handle_delete(path: &Path, config: &Config) -> anyhow::Result<()> { Ok(()) } +async fn handle_list(prefix: &Path, config: &Config) -> anyhow::Result> { + let (client, _) = clients() + .try_get_with(config.get_hash(), dyn_connect(config)) + .await + .map_err(|e| anyhow!(e))?; + + let stream = client.list(Some(&prefix)); + + let entries: Vec<_> = stream.collect().await; + let entries = entries.into_iter().collect::, _>>()?; + Ok(entries) +} + +async fn handle_list_stream(prefix: &Path, config: &Config) -> anyhow::Result> { + let (client, _) = clients() + .try_get_with(config.get_hash(), dyn_connect(config)) + .await + .map_err(|e| anyhow!(e))?; + + let mut wrapper = Box::new(StreamWrapper { + client, + stream: None + }); + + let stream = wrapper.client.list(Some(&prefix)).chunks(1000).boxed(); + + // Safety: This is needed because the compiler cannot infer that the stream + // will outlive the client. We ensure this happens + // by droping the stream before droping the Arc on destroy_list_stream + wrapper.stream = Some(unsafe { std::mem::transmute(stream) }); + + Ok(wrapper) +} + #[no_mangle] pub extern "C" fn start( config: StaticConfig, @@ -534,6 +570,48 @@ pub extern "C" fn start( } } } + Request::List(prefix, config, response) => { + 'retry: loop { + match handle_list(&prefix, config).await { + Ok(entries) => { + response.success(entries); + return; + } + Err(e) => { + if let Some(duration) = should_retry(retries, &e, start_instant.elapsed(), config).await { + retries += 1; + tracing::info!("retrying error (reason: {:?}) after {:?}: {}", extract_error_info(&e).reason, duration, e); + tokio::time::sleep(duration).await; + continue 'retry; + } + tracing::warn!("{}", e); + response.into_error(e); + return; + } + } + } + } + Request::ListStream(prefix, config, response) => { + 'retry: loop { + match handle_list_stream(&prefix, config).await { + Ok(stream) => { + response.success(stream); + return; + } + Err(e) => { + if let Some(duration) = should_retry(retries, &e, start_instant.elapsed(), config).await { + retries += 1; + tracing::info!("retrying error (reason: {:?}) after {:?}: {}", extract_error_info(&e).reason, duration, e); + tokio::time::sleep(duration).await; + continue 'retry; + } + tracing::warn!("{}", e); + response.into_error(e); + return; + } + } + } + } } } }).buffer_unordered(static_config().concurrency_limit as usize).for_each(|_| async {}).await; @@ -770,46 +848,17 @@ pub extern "C" fn list( let prefix = unsafe { std::ffi::CStr::from_ptr(prefix) }; let prefix: Path = prefix.to_str().expect("invalid utf8").try_into().unwrap(); let config = unsafe { & (*config) }; - - match RT.get() { - Some(runtime) => { - runtime.spawn(async move { - let start_instant = Instant::now(); - let mut retries = 0; - 'retry: loop { - let list_op = async { - let (client, _) = clients() - .try_get_with(config.get_hash(), dyn_connect(config)) - .await - .map_err(|e| anyhow!(e))?; - - let stream = client.list(Some(&prefix)); - - let entries: Vec<_> = stream.collect().await; - let entries = entries.into_iter().collect::, _>>()?; - Ok::<_, anyhow::Error>(entries) - }; - - match list_op.await { - Ok(entries) => { - response.success(entries); - return; - }, - Err(e) => { - if let Some(duration) = should_retry(retries, &e, start_instant.elapsed(), config).await { - retries += 1; - tracing::info!("retrying error (reason: {:?}) after {:?}: {}", extract_error_info(&e).reason, duration, e); - tokio::time::sleep(duration).await; - continue 'retry; - } - tracing::warn!("{}", e); - response.into_error(e); - return; - } - } + match SQ.get() { + Some(sq) => { + match sq.try_send(Request::List(prefix, config, response)) { + Ok(_) => CResult::Ok, + Err(async_channel::TrySendError::Full(_)) => { + CResult::Backoff } - }); - CResult::Ok + Err(async_channel::TrySendError::Closed(_)) => { + CResult::Error + } + } } None => { return CResult::Error; @@ -897,54 +946,17 @@ pub extern "C" fn list_stream( let prefix = unsafe { std::ffi::CStr::from_ptr(prefix) }; let prefix: Path = prefix.to_str().expect("invalid utf8").try_into().unwrap(); let config = unsafe { & (*config) }; - - match RT.get() { - Some(runtime) => { - runtime.spawn(async move { - let start_instant = Instant::now(); - let mut retries = 0; - 'retry: loop { - let list_op = async { - let (client, _) = clients() - .try_get_with(config.get_hash(), dyn_connect(config)) - .await - .map_err(|e| anyhow!(e))?; - - let mut wrapper = Box::new(StreamWrapper { - client, - stream: None - }); - - let stream = wrapper.client.list(Some(&prefix)).chunks(1000).boxed(); - - // Safety: This is needed because the compiler cannot infer that the stream - // will outlive the client. We ensure this happens - // by droping the stream before droping the Arc on destroy_list_stream - wrapper.stream = Some(unsafe { std::mem::transmute(stream) }); - - Ok::<_, anyhow::Error>(wrapper) - }; - - match list_op.await { - Ok(wrapper) => { - response.success(wrapper); - return; - }, - Err(e) => { - if let Some(duration) = should_retry(retries, &e, start_instant.elapsed(), config).await { - retries += 1; - tracing::info!("retrying error (reason: {:?}) after {:?}: {}", extract_error_info(&e).reason, duration, e); - tokio::time::sleep(duration).await; - continue 'retry; - } - tracing::warn!("{}", e); - response.into_error(e); - return; - } - } + match SQ.get() { + Some(sq) => { + match sq.try_send(Request::ListStream(prefix, config, response)) { + Ok(_) => CResult::Ok, + Err(async_channel::TrySendError::Full(_)) => { + CResult::Backoff } - }); - CResult::Ok + Err(async_channel::TrySendError::Closed(_)) => { + CResult::Error + } + } } None => { return CResult::Error;