Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: file writing is canceled for ANY errors #102

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: file writing is canceled for ANY errors
This ensures that unless we signal full success in the reader (and for each
slot), the writer will not promote the file.

For example, if the file changed sizes while reading, it will now properly be
ignored.
Dr-Emann committed Jan 2, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit cb8302e36c74c13b5a37efd9c2196f909e192c48
30 changes: 0 additions & 30 deletions crates/applesauce/src/lib.rs
Original file line number Diff line number Diff line change
@@ -268,36 +268,6 @@ fn try_read_all<R: Read>(mut r: R, buf: &mut [u8]) -> io::Result<usize> {
Ok(read_len)
}

struct InstrumentedIter<I> {
inner: I,
span: tracing::Span,
}

impl<I> Iterator for InstrumentedIter<I>
where
I: Iterator,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
let _enter = self.span.enter();
self.inner.next()
}
}

pub(crate) fn instrumented_iter<IntoIt>(
inner: IntoIt,
span: tracing::Span,
) -> InstrumentedIter<IntoIt::IntoIter>
where
IntoIt: IntoIterator,
{
InstrumentedIter {
inner: inner.into_iter(),
span,
}
}

#[cfg(test)]
mod tests {
use super::*;
254 changes: 191 additions & 63 deletions crates/applesauce/src/seq_queue.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,182 @@
use std::fmt;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
use std::{fmt, io};

#[derive(Debug)]
pub struct Sender<T>(crossbeam_channel::Sender<oneshot::Receiver<T>>);

#[derive(Debug)]
pub struct Receiver<T>(crossbeam_channel::Receiver<oneshot::Receiver<T>>);

pub struct Slot<T>(oneshot::Sender<T>);
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct UnknownError;

pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::bounded(cap);
(Sender(tx), Receiver(rx))
impl fmt::Display for UnknownError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("unspecified error in sender for sequential queue")
}
}

impl<T> Sender<T> {
pub fn prepare_send(&self) -> Option<Slot<T>> {
let (tx, rx) = oneshot::channel();
self.0.send(rx).ok()?;
Some(Slot(tx))
impl std::error::Error for UnknownError {}

impl From<UnknownError> for io::Error {
fn from(value: UnknownError) -> Self {
io::Error::new(io::ErrorKind::Other, value)
}
}

impl<T> Clone for Sender<T> {
type FinalSuccessData<E> = Option<Result<(), Option<E>>>;

#[derive(Debug)]
struct FinalSuccess<E>(Arc<Mutex<FinalSuccessData<E>>>);

// Clone doesn't depend on E being clone
impl<E> Clone for FinalSuccess<E> {
fn clone(&self) -> Self {
Self(self.0.clone())
Self(Arc::clone(&self.0))
}
}

impl<T> Slot<T> {
pub fn finish(self, item: T) -> Result<(), oneshot::SendError<T>> {
self.0.send(item)
impl<E> FinalSuccess<E> {
fn new() -> Self {
Self(Arc::new(Mutex::new(None)))
}
}

impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
let inner_chan = self.0.recv().map_err(|_| RecvError::Finished)?;
inner_chan.recv().map_err(|_| RecvError::ItemRecvError)
fn make_success(self) {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
match *lock {
Some(_) => {}
None => {
*lock = Some(Ok(()));
}
}
}
}

pub struct Iter<'a, T> {
receiver: &'a Receiver<T>,
}
fn make_unknown_error(self) {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
match *lock {
Some(Err(Some(_))) => {}
_ => {
*lock = Some(Err(None));
}
}
}

pub struct IntoIter<T> {
receiver: Receiver<T>,
fn make_error(self, error: E) {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
match *lock {
Some(Err(Some(_))) => {}
_ => {
*lock = Some(Err(Some(error)));
}
}
}

fn get_result(self) -> Result<(), Option<E>> {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
lock.take().unwrap_or_else(|| Err(None))
}
}

impl<'a, T> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
struct FinalErrorOnDrop<E>(Option<FinalSuccess<E>>);

fn into_iter(self) -> Self::IntoIter {
Iter { receiver: self }
impl<E> FinalErrorOnDrop<E> {
fn disarm(mut self) {
self.0 = None;
}
}

impl<'a, T> IntoIterator for &'a mut Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
Iter { receiver: self }
impl<E> Drop for FinalErrorOnDrop<E> {
fn drop(&mut self) {
if let Some(final_success) = self.0.take() {
final_success.make_unknown_error();
}
}
}

impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
#[derive(Debug)]
pub struct Sender<T, E>(
crossbeam_channel::Sender<oneshot::Receiver<T>>,
FinalSuccess<E>,
);

#[derive(Debug)]
pub struct Receiver<T, E>(
crossbeam_channel::Receiver<oneshot::Receiver<T>>,
FinalSuccess<E>,
);

pub struct Slot<T, E>(oneshot::Sender<T>, FinalErrorOnDrop<E>);

pub fn bounded<T, E>(cap: usize) -> (Sender<T, E>, Receiver<T, E>) {
let final_success = FinalSuccess::new();
let (tx, rx) = crossbeam_channel::bounded(cap);
(
Sender(tx, final_success.clone()),
Receiver(rx, final_success),
)
}

impl<T, E> Sender<T, E> {
pub fn prepare_send(&self) -> Option<Slot<T, E>> {
let (tx, rx) = oneshot::channel();
self.0.send(rx).ok()?;
Some(Slot(tx, FinalErrorOnDrop(Some(self.1.clone()))))
}

fn into_iter(self) -> Self::IntoIter {
IntoIter { receiver: self }
pub fn finish(self, result: Result<(), E>) {
match result {
Ok(()) => self.1.make_success(),
Err(e) => self.1.make_error(e),
}
}
}

impl<T> Iterator for Iter<'_, T> {
type Item = T;
impl<T, E> Slot<T, E> {
pub fn finish(self, item: T) -> Result<(), oneshot::SendError<T>> {
self.1.disarm();
self.0.send(item)
}

fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
pub fn error(self, error: E) {
let Self(_sender, mut error_on_drop) = self;
if let Some(final_success) = error_on_drop.0.take() {
final_success.make_error(error)
}
}
}

impl<T> Iterator for IntoIter<T> {
type Item = T;
impl<T, E> Receiver<T, E> {
pub fn try_for_each(self, mut f: impl FnMut(T) -> Result<(), E>) -> Result<(), E>
where
UnknownError: Into<E>,
{
loop {
match self.recv() {
Ok(result) => {
f(result)?;
}
Err(_) => {
return self
.finish()
.map_err(|maybe_e| maybe_e.unwrap_or_else(|| UnknownError.into()))
}
}
}
}

pub fn recv(&self) -> Result<T, RecvError> {
let inner_chan = self.0.recv().map_err(|_| RecvError::Finished)?;
inner_chan.recv().map_err(|_| RecvError::ItemRecvError)
}

fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
pub fn finish(self) -> Result<(), Option<E>> {
let Self(receiver, final_success) = self;
if receiver.recv().is_ok() {
tracing::error!("finish on seq queue received an item");
return Err(None);
}
// Make sure to drop the receiver, to make sure the sender won't block trying to send
// anything
drop(receiver);
final_success.get_result()
}
}

@@ -111,7 +199,7 @@ impl RecvError {
}

impl fmt::Display for RecvError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.message())
}
}
@@ -125,14 +213,13 @@ mod tests {

#[test]
fn order_after_sending() {
let (tx, rx) = bounded(2);
let (tx, rx) = bounded::<u8, ()>(2);

let first = tx.prepare_send().unwrap();
assert_eq!(rx.0.len(), 1);
let second = tx.prepare_send().unwrap();
assert_eq!(rx.0.len(), 2);

drop(tx);
tx.finish(Ok(()));

second.finish(2).unwrap();
first.finish(1).unwrap();
@@ -141,12 +228,51 @@ mod tests {

assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert!(rx.recv().is_err());
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Ok(()));
}

#[test]
fn no_success_becomes_err() {
let (tx, rx) = bounded::<u8, ()>(2);

let first = tx.prepare_send().unwrap();
first.finish(1).unwrap();
drop(tx);

assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Err(None));
}

#[test]
fn unfinished_send_becomes_err() {
let (tx, rx) = bounded::<u8, &str>(2);

let first = tx.prepare_send().unwrap();
drop(first);
tx.finish(Ok(()));

assert_eq!(rx.recv().unwrap_err(), RecvError::ItemRecvError);
assert_eq!(rx.finish(), Err(None));
}

#[test]
fn explicit_send_err() {
let (tx, rx) = bounded::<u8, &str>(2);

let first = tx.prepare_send().unwrap();
first.finish(1).unwrap();
tx.finish(Err("error"));

assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Err(Some("error")));
}

#[test]
fn across_threads() {
let (tx, rx) = bounded(2);
let (tx, rx) = bounded::<u32, ()>(2);

let sender_handle = std::thread::spawn(move || {
let tx = tx;
@@ -160,12 +286,14 @@ mod tests {
slot.finish(i).unwrap();
});
}
tx.finish(Ok(()));
});

for i in 0..1000 {
assert_eq!(rx.recv().unwrap(), i);
}
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Ok(()));

sender_handle.join().unwrap();
}
9 changes: 3 additions & 6 deletions crates/applesauce/src/threads/compressing.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ pub(super) struct WorkItem {
pub context: Arc<Context>,
pub data: Vec<u8>,
pub kind: compressor::Kind,
pub slot: seq_queue::Slot<io::Result<writer::Chunk>>,
pub slot: seq_queue::Slot<writer::Chunk, io::Error>,
}

pub(super) struct Work;
@@ -59,10 +59,7 @@ impl WorkHandler<WorkItem> for Handler {
let size = match size {
Ok(size) => size,
Err(e) => {
if item.slot.finish(Err(e)).is_err() {
// This should only be because of a failure already reported by the writer
tracing::debug!("unable to finish slot");
}
item.slot.error(e);
return;
}
};
@@ -72,7 +69,7 @@ impl WorkHandler<WorkItem> for Handler {
block: self.buf[..size].to_vec(),
orig_size: item.data.len().try_into().unwrap(),
};
if item.slot.finish(Ok(chunk)).is_err() {
if item.slot.finish(chunk).is_err() {
// This should only be because of a failure already reported by the writer
tracing::debug!("unable to finish slot");
}
94 changes: 44 additions & 50 deletions crates/applesauce/src/threads/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::seq_queue::Slot;
use crate::threads::writer::Chunk;
use crate::threads::{compressing, writer, BgWork, Context, Mode, WorkHandler};
use crate::{rfork_storage, seq_queue, try_read_all};
use applesauce_core::BLOCK_SIZE;
@@ -42,54 +41,12 @@ impl Handler {
Self { compressor, writer }
}

fn try_handle(&mut self, item: WorkItem) -> io::Result<()> {
let WorkItem { context } = item;
let _guard = tracing::info_span!("reading file", path=%context.path.display()).entered();
let file = Arc::new(File::open(&context.path)?);

let file_size = context.orig_metadata.len();
let (tx, rx) = seq_queue::bounded(
thread::available_parallelism()
.map(NonZeroUsize::get)
.unwrap_or(4),
);

{
let _enter = tracing::debug_span!("waiting for space in writer").entered();
self.writer
.send(writer::WorkItem {
context: Arc::clone(&context),
file: Arc::clone(&file),
blocks: rx,
})
.unwrap();
}

if let Err(e) = self.read_file_into(&context, &file, file_size, &tx) {
context
.progress
.error(&format!("Error reading {}: {}", context.path.display(), e));
// If we can't get a slot, the writer already had an error, so we can just return.
if let Some(slot) = tx.prepare_send() {
// Likewise, if we get a slot, but the channel closes, the writer must have seen an error
let _ = slot.finish(Err(io::Error::new(io::ErrorKind::Other, "Error in reader")));
}
return Err(e);
}
// Ensure the file Arc is dropped before finishing sending on the channel,
// so the writer can have the only remaining reference, and take ownership of the file.
drop(file);
drop(tx);

Ok(())
}

fn read_file_into(
&mut self,
context: &Arc<Context>,
file: &File,
expected_len: u64,
tx: &seq_queue::Sender<io::Result<writer::Chunk>>,
tx: &seq_queue::Sender<writer::Chunk, io::Error>,
) -> io::Result<()> {
match context.operation.mode {
Mode::Compress { kind, .. } => {
@@ -132,10 +89,10 @@ impl Handler {
Mode::DecompressByReading => {
self.with_file_chunks(file, expected_len, tx, |slot, data| {
let orig_size = data.len() as u64;
let res = slot.finish(Ok(Chunk {
let res = slot.finish(writer::Chunk {
block: data,
orig_size,
}));
});
if let Err(e) = res {
// This should only happen if the writer had an error
tracing::debug!("error finishing chunk: {e}");
@@ -153,8 +110,8 @@ impl Handler {
&mut self,
file: &File,
expected_len: u64,
tx: &seq_queue::Sender<io::Result<writer::Chunk>>,
mut f: impl FnMut(Slot<io::Result<writer::Chunk>>, Vec<u8>) -> io::Result<()>,
tx: &seq_queue::Sender<writer::Chunk, io::Error>,
mut f: impl FnMut(Slot<writer::Chunk, io::Error>, Vec<u8>) -> io::Result<()>,
) -> io::Result<bool> {
let mut total_read = 0;
let block_span = tracing::debug_span!("reading blocks");
@@ -218,8 +175,45 @@ impl Handler {

impl WorkHandler<WorkItem> for Handler {
fn handle_item(&mut self, item: WorkItem) {
if let Err(e) = self.try_handle(item) {
tracing::error!("unable to compress file: {}", e);
let WorkItem { context } = item;
let _guard = tracing::info_span!("reading file", path=%context.path.display()).entered();
let file = match File::open(&context.path) {
Ok(file) => file,
Err(e) => {
context
.progress
.error(&format!("Error opening {}: {}", context.path.display(), e));
return;
}
};
let file = Arc::new(file);

let file_size = context.orig_metadata.len();
let (tx, rx) = seq_queue::bounded(
thread::available_parallelism()
.map(NonZeroUsize::get)
.unwrap_or(4),
);

{
let _enter = tracing::debug_span!("waiting for space in writer").entered();
self.writer
.send(writer::WorkItem {
context: Arc::clone(&context),
file: Arc::clone(&file),
blocks: rx,
})
.unwrap();
}

let result = self.read_file_into(&context, &file, file_size, &tx);
// ensure the file is dropped before tx is finished
drop(file);
if let Err(e) = &result {
context
.progress
.error(&format!("Error reading {}: {}", context.path.display(), e));
}
tx.finish(result);
}
}
20 changes: 8 additions & 12 deletions crates/applesauce/src/threads/writer.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ pub(super) struct Chunk {
pub(super) struct WorkItem {
pub context: Arc<Context>,
pub file: Arc<File>,
pub blocks: seq_queue::Receiver<io::Result<Chunk>>,
pub blocks: seq_queue::Receiver<Chunk, io::Error>,
}

pub(super) struct Work;
@@ -56,7 +56,7 @@ impl Handler {
&mut self,
context: &Context,
writer: &mut applesauce_core::writer::Writer<impl applesauce_core::writer::Open>,
chunks: seq_queue::Receiver<io::Result<Chunk>>,
chunks: seq_queue::Receiver<Chunk, io::Error>,
) -> io::Result<()> {
let block_span = tracing::debug_span!("write block");

@@ -71,10 +71,7 @@ impl Handler {
let max_compressed_size =
(context.orig_metadata.len() as f64 * minimum_compression_ratio) as u64;

let chunks = crate::instrumented_iter(chunks, tracing::debug_span!("waiting for chunk"));
for chunk in chunks {
let chunk = chunk?;

chunks.try_for_each(|chunk| {
total_compressed_size += u64::try_from(chunk.block.len()).unwrap();
if total_compressed_size > max_compressed_size {
context.progress.not_compressible_enough(&context.path);
@@ -92,7 +89,8 @@ impl Handler {

writer.add_block(&block)?;
context.progress.increment(orig_size);
}
Ok(())
})?;
Ok(())
}

@@ -170,15 +168,13 @@ impl Handler {
let mut tmp_file = tmp_file_for(&item)?;
copy_xattrs(&item.file, tmp_file.as_file())?;

let chunks =
crate::instrumented_iter(item.blocks, tracing::debug_span!("waiting for chunk"));
for chunk in chunks {
let chunk = chunk?;
item.blocks.try_for_each(|chunk| {
tmp_file.write_all(&chunk.block)?;
// Increment progress by the uncompressed size of the block,
// not the "original" (compressed) size
item.context.progress.increment(chunk.block.len() as u64);
}
Ok(())
})?;

copy_metadata(&item.file, tmp_file.as_file())?;
set_flags(