diff --git a/crates/applesauce/src/lib.rs b/crates/applesauce/src/lib.rs index a826c11..1ee5eee 100644 --- a/crates/applesauce/src/lib.rs +++ b/crates/applesauce/src/lib.rs @@ -268,36 +268,6 @@ fn try_read_all(mut r: R, buf: &mut [u8]) -> io::Result { Ok(read_len) } -struct InstrumentedIter { - inner: I, - span: tracing::Span, -} - -impl Iterator for InstrumentedIter -where - I: Iterator, -{ - type Item = I::Item; - - fn next(&mut self) -> Option { - let _enter = self.span.enter(); - self.inner.next() - } -} - -pub(crate) fn instrumented_iter( - inner: IntoIt, - span: tracing::Span, -) -> InstrumentedIter -where - IntoIt: IntoIterator, -{ - InstrumentedIter { - inner: inner.into_iter(), - span, - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/applesauce/src/seq_queue.rs b/crates/applesauce/src/seq_queue.rs index 4da9060..a66111f 100644 --- a/crates/applesauce/src/seq_queue.rs +++ b/crates/applesauce/src/seq_queue.rs @@ -1,94 +1,182 @@ -use std::fmt; -use std::fmt::Formatter; +use std::sync::{Arc, Mutex}; +use std::{fmt, io}; -#[derive(Debug)] -pub struct Sender(crossbeam_channel::Sender>); - -#[derive(Debug)] -pub struct Receiver(crossbeam_channel::Receiver>); - -pub struct Slot(oneshot::Sender); +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct UnknownError; -pub fn bounded(cap: usize) -> (Sender, Receiver) { - let (tx, rx) = crossbeam_channel::bounded(cap); - (Sender(tx), Receiver(rx)) +impl fmt::Display for UnknownError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("unspecified error in sender for sequential queue") + } } -impl Sender { - pub fn prepare_send(&self) -> Option> { - let (tx, rx) = oneshot::channel(); - self.0.send(rx).ok()?; - Some(Slot(tx)) +impl std::error::Error for UnknownError {} + +impl From for io::Error { + fn from(value: UnknownError) -> Self { + io::Error::new(io::ErrorKind::Other, value) } } -impl Clone for Sender { +type FinalSuccessData = Option>>; + +#[derive(Debug)] +struct FinalSuccess(Arc>>); + +// Clone doesn't depend on E being clone +impl Clone for FinalSuccess { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } -impl Slot { - pub fn finish(self, item: T) -> Result<(), oneshot::SendError> { - self.0.send(item) +impl FinalSuccess { + fn new() -> Self { + Self(Arc::new(Mutex::new(None))) } -} -impl Receiver { - pub fn recv(&self) -> Result { - let inner_chan = self.0.recv().map_err(|_| RecvError::Finished)?; - inner_chan.recv().map_err(|_| RecvError::ItemRecvError) + fn make_success(self) { + let mutex = &*self.0; + let mut lock = mutex.lock().unwrap(); + match *lock { + Some(_) => {} + None => { + *lock = Some(Ok(())); + } + } } -} -pub struct Iter<'a, T> { - receiver: &'a Receiver, -} + fn make_unknown_error(self) { + let mutex = &*self.0; + let mut lock = mutex.lock().unwrap(); + match *lock { + Some(Err(Some(_))) => {} + _ => { + *lock = Some(Err(None)); + } + } + } -pub struct IntoIter { - receiver: Receiver, + fn make_error(self, error: E) { + let mutex = &*self.0; + let mut lock = mutex.lock().unwrap(); + match *lock { + Some(Err(Some(_))) => {} + _ => { + *lock = Some(Err(Some(error))); + } + } + } + + fn get_result(self) -> Result<(), Option> { + let mutex = &*self.0; + let mut lock = mutex.lock().unwrap(); + lock.take().unwrap_or_else(|| Err(None)) + } } -impl<'a, T> IntoIterator for &'a Receiver { - type Item = T; - type IntoIter = Iter<'a, T>; +struct FinalErrorOnDrop(Option>); - fn into_iter(self) -> Self::IntoIter { - Iter { receiver: self } +impl FinalErrorOnDrop { + fn disarm(mut self) { + self.0 = None; } } -impl<'a, T> IntoIterator for &'a mut Receiver { - type Item = T; - type IntoIter = Iter<'a, T>; - - fn into_iter(self) -> Self::IntoIter { - Iter { receiver: self } +impl Drop for FinalErrorOnDrop { + fn drop(&mut self) { + if let Some(final_success) = self.0.take() { + final_success.make_unknown_error(); + } } } -impl IntoIterator for Receiver { - type Item = T; - type IntoIter = IntoIter; +#[derive(Debug)] +pub struct Sender( + crossbeam_channel::Sender>, + FinalSuccess, +); + +#[derive(Debug)] +pub struct Receiver( + crossbeam_channel::Receiver>, + FinalSuccess, +); + +pub struct Slot(oneshot::Sender, FinalErrorOnDrop); + +pub fn bounded(cap: usize) -> (Sender, Receiver) { + let final_success = FinalSuccess::new(); + let (tx, rx) = crossbeam_channel::bounded(cap); + ( + Sender(tx, final_success.clone()), + Receiver(rx, final_success), + ) +} + +impl Sender { + pub fn prepare_send(&self) -> Option> { + let (tx, rx) = oneshot::channel(); + self.0.send(rx).ok()?; + Some(Slot(tx, FinalErrorOnDrop(Some(self.1.clone())))) + } - fn into_iter(self) -> Self::IntoIter { - IntoIter { receiver: self } + pub fn finish(self, result: Result<(), E>) { + match result { + Ok(()) => self.1.make_success(), + Err(e) => self.1.make_error(e), + } } } -impl Iterator for Iter<'_, T> { - type Item = T; +impl Slot { + pub fn finish(self, item: T) -> Result<(), oneshot::SendError> { + self.1.disarm(); + self.0.send(item) + } - fn next(&mut self) -> Option { - self.receiver.recv().ok() + pub fn error(self, error: E) { + let Self(_sender, mut error_on_drop) = self; + if let Some(final_success) = error_on_drop.0.take() { + final_success.make_error(error) + } } } -impl Iterator for IntoIter { - type Item = T; +impl Receiver { + pub fn try_for_each(self, mut f: impl FnMut(T) -> Result<(), E>) -> Result<(), E> + where + UnknownError: Into, + { + loop { + match self.recv() { + Ok(result) => { + f(result)?; + } + Err(_) => { + return self + .finish() + .map_err(|maybe_e| maybe_e.unwrap_or_else(|| UnknownError.into())) + } + } + } + } + + pub fn recv(&self) -> Result { + let inner_chan = self.0.recv().map_err(|_| RecvError::Finished)?; + inner_chan.recv().map_err(|_| RecvError::ItemRecvError) + } - fn next(&mut self) -> Option { - self.receiver.recv().ok() + pub fn finish(self) -> Result<(), Option> { + let Self(receiver, final_success) = self; + if receiver.recv().is_ok() { + tracing::error!("finish on seq queue received an item"); + return Err(None); + } + // Make sure to drop the receiver, to make sure the sender won't block trying to send + // anything + drop(receiver); + final_success.get_result() } } @@ -111,7 +199,7 @@ impl RecvError { } impl fmt::Display for RecvError { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(self.message()) } } @@ -125,14 +213,13 @@ mod tests { #[test] fn order_after_sending() { - let (tx, rx) = bounded(2); + let (tx, rx) = bounded::(2); let first = tx.prepare_send().unwrap(); assert_eq!(rx.0.len(), 1); let second = tx.prepare_send().unwrap(); assert_eq!(rx.0.len(), 2); - - drop(tx); + tx.finish(Ok(())); second.finish(2).unwrap(); first.finish(1).unwrap(); @@ -141,12 +228,51 @@ mod tests { assert_eq!(rx.recv().unwrap(), 1); assert_eq!(rx.recv().unwrap(), 2); - assert!(rx.recv().is_err()); + assert_eq!(rx.recv().unwrap_err(), RecvError::Finished); + assert_eq!(rx.finish(), Ok(())); + } + + #[test] + fn no_success_becomes_err() { + let (tx, rx) = bounded::(2); + + let first = tx.prepare_send().unwrap(); + first.finish(1).unwrap(); + drop(tx); + + assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv().unwrap_err(), RecvError::Finished); + assert_eq!(rx.finish(), Err(None)); + } + + #[test] + fn unfinished_send_becomes_err() { + let (tx, rx) = bounded::(2); + + let first = tx.prepare_send().unwrap(); + drop(first); + tx.finish(Ok(())); + + assert_eq!(rx.recv().unwrap_err(), RecvError::ItemRecvError); + assert_eq!(rx.finish(), Err(None)); + } + + #[test] + fn explicit_send_err() { + let (tx, rx) = bounded::(2); + + let first = tx.prepare_send().unwrap(); + first.finish(1).unwrap(); + tx.finish(Err("error")); + + assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv().unwrap_err(), RecvError::Finished); + assert_eq!(rx.finish(), Err(Some("error"))); } #[test] fn across_threads() { - let (tx, rx) = bounded(2); + let (tx, rx) = bounded::(2); let sender_handle = std::thread::spawn(move || { let tx = tx; @@ -160,12 +286,14 @@ mod tests { slot.finish(i).unwrap(); }); } + tx.finish(Ok(())); }); for i in 0..1000 { assert_eq!(rx.recv().unwrap(), i); } assert_eq!(rx.recv().unwrap_err(), RecvError::Finished); + assert_eq!(rx.finish(), Ok(())); sender_handle.join().unwrap(); } diff --git a/crates/applesauce/src/threads/compressing.rs b/crates/applesauce/src/threads/compressing.rs index 99f76b8..62864f3 100644 --- a/crates/applesauce/src/threads/compressing.rs +++ b/crates/applesauce/src/threads/compressing.rs @@ -11,7 +11,7 @@ pub(super) struct WorkItem { pub context: Arc, pub data: Vec, pub kind: compressor::Kind, - pub slot: seq_queue::Slot>, + pub slot: seq_queue::Slot, } pub(super) struct Work; @@ -59,10 +59,7 @@ impl WorkHandler for Handler { let size = match size { Ok(size) => size, Err(e) => { - if item.slot.finish(Err(e)).is_err() { - // This should only be because of a failure already reported by the writer - tracing::debug!("unable to finish slot"); - } + item.slot.error(e); return; } }; @@ -72,7 +69,7 @@ impl WorkHandler for Handler { block: self.buf[..size].to_vec(), orig_size: item.data.len().try_into().unwrap(), }; - if item.slot.finish(Ok(chunk)).is_err() { + if item.slot.finish(chunk).is_err() { // This should only be because of a failure already reported by the writer tracing::debug!("unable to finish slot"); } diff --git a/crates/applesauce/src/threads/reader.rs b/crates/applesauce/src/threads/reader.rs index 1f05dbc..f69e017 100644 --- a/crates/applesauce/src/threads/reader.rs +++ b/crates/applesauce/src/threads/reader.rs @@ -1,5 +1,4 @@ use crate::seq_queue::Slot; -use crate::threads::writer::Chunk; use crate::threads::{compressing, writer, BgWork, Context, Mode, WorkHandler}; use crate::{rfork_storage, seq_queue, try_read_all}; use applesauce_core::BLOCK_SIZE; @@ -42,54 +41,12 @@ impl Handler { Self { compressor, writer } } - fn try_handle(&mut self, item: WorkItem) -> io::Result<()> { - let WorkItem { context } = item; - let _guard = tracing::info_span!("reading file", path=%context.path.display()).entered(); - let file = Arc::new(File::open(&context.path)?); - - let file_size = context.orig_metadata.len(); - let (tx, rx) = seq_queue::bounded( - thread::available_parallelism() - .map(NonZeroUsize::get) - .unwrap_or(4), - ); - - { - let _enter = tracing::debug_span!("waiting for space in writer").entered(); - self.writer - .send(writer::WorkItem { - context: Arc::clone(&context), - file: Arc::clone(&file), - blocks: rx, - }) - .unwrap(); - } - - if let Err(e) = self.read_file_into(&context, &file, file_size, &tx) { - context - .progress - .error(&format!("Error reading {}: {}", context.path.display(), e)); - // If we can't get a slot, the writer already had an error, so we can just return. - if let Some(slot) = tx.prepare_send() { - // Likewise, if we get a slot, but the channel closes, the writer must have seen an error - let _ = slot.finish(Err(io::Error::new(io::ErrorKind::Other, "Error in reader"))); - } - return Err(e); - } - // Ensure the file Arc is dropped before finishing sending on the channel, - // so the writer can have the only remaining reference, and take ownership of the file. - drop(file); - drop(tx); - - Ok(()) - } - fn read_file_into( &mut self, context: &Arc, file: &File, expected_len: u64, - tx: &seq_queue::Sender>, + tx: &seq_queue::Sender, ) -> io::Result<()> { match context.operation.mode { Mode::Compress { kind, .. } => { @@ -132,10 +89,10 @@ impl Handler { Mode::DecompressByReading => { self.with_file_chunks(file, expected_len, tx, |slot, data| { let orig_size = data.len() as u64; - let res = slot.finish(Ok(Chunk { + let res = slot.finish(writer::Chunk { block: data, orig_size, - })); + }); if let Err(e) = res { // This should only happen if the writer had an error tracing::debug!("error finishing chunk: {e}"); @@ -153,8 +110,8 @@ impl Handler { &mut self, file: &File, expected_len: u64, - tx: &seq_queue::Sender>, - mut f: impl FnMut(Slot>, Vec) -> io::Result<()>, + tx: &seq_queue::Sender, + mut f: impl FnMut(Slot, Vec) -> io::Result<()>, ) -> io::Result { let mut total_read = 0; let block_span = tracing::debug_span!("reading blocks"); @@ -218,8 +175,45 @@ impl Handler { impl WorkHandler for Handler { fn handle_item(&mut self, item: WorkItem) { - if let Err(e) = self.try_handle(item) { - tracing::error!("unable to compress file: {}", e); + let WorkItem { context } = item; + let _guard = tracing::info_span!("reading file", path=%context.path.display()).entered(); + let file = match File::open(&context.path) { + Ok(file) => file, + Err(e) => { + context + .progress + .error(&format!("Error opening {}: {}", context.path.display(), e)); + return; + } + }; + let file = Arc::new(file); + + let file_size = context.orig_metadata.len(); + let (tx, rx) = seq_queue::bounded( + thread::available_parallelism() + .map(NonZeroUsize::get) + .unwrap_or(4), + ); + + { + let _enter = tracing::debug_span!("waiting for space in writer").entered(); + self.writer + .send(writer::WorkItem { + context: Arc::clone(&context), + file: Arc::clone(&file), + blocks: rx, + }) + .unwrap(); + } + + let result = self.read_file_into(&context, &file, file_size, &tx); + // ensure the file is dropped before tx is finished + drop(file); + if let Err(e) = &result { + context + .progress + .error(&format!("Error reading {}: {}", context.path.display(), e)); } + tx.finish(result); } } diff --git a/crates/applesauce/src/threads/writer.rs b/crates/applesauce/src/threads/writer.rs index 0a80963..5d9096b 100644 --- a/crates/applesauce/src/threads/writer.rs +++ b/crates/applesauce/src/threads/writer.rs @@ -21,7 +21,7 @@ pub(super) struct Chunk { pub(super) struct WorkItem { pub context: Arc, pub file: Arc, - pub blocks: seq_queue::Receiver>, + pub blocks: seq_queue::Receiver, } pub(super) struct Work; @@ -56,7 +56,7 @@ impl Handler { &mut self, context: &Context, writer: &mut applesauce_core::writer::Writer, - chunks: seq_queue::Receiver>, + chunks: seq_queue::Receiver, ) -> io::Result<()> { let block_span = tracing::debug_span!("write block"); @@ -71,10 +71,7 @@ impl Handler { let max_compressed_size = (context.orig_metadata.len() as f64 * minimum_compression_ratio) as u64; - let chunks = crate::instrumented_iter(chunks, tracing::debug_span!("waiting for chunk")); - for chunk in chunks { - let chunk = chunk?; - + chunks.try_for_each(|chunk| { total_compressed_size += u64::try_from(chunk.block.len()).unwrap(); if total_compressed_size > max_compressed_size { context.progress.not_compressible_enough(&context.path); @@ -92,7 +89,8 @@ impl Handler { writer.add_block(&block)?; context.progress.increment(orig_size); - } + Ok(()) + })?; Ok(()) } @@ -170,15 +168,13 @@ impl Handler { let mut tmp_file = tmp_file_for(&item)?; copy_xattrs(&item.file, tmp_file.as_file())?; - let chunks = - crate::instrumented_iter(item.blocks, tracing::debug_span!("waiting for chunk")); - for chunk in chunks { - let chunk = chunk?; + item.blocks.try_for_each(|chunk| { tmp_file.write_all(&chunk.block)?; // Increment progress by the uncompressed size of the block, // not the "original" (compressed) size item.context.progress.increment(chunk.block.len() as u64); - } + Ok(()) + })?; copy_metadata(&item.file, tmp_file.as_file())?; set_flags(