Skip to content

Commit

Permalink
fix: file writing is canceled for ANY errors
Browse files Browse the repository at this point in the history
This ensures that unless we signal full success in the reader (and for each
slot), the writer will not promote the file.

For example, if the file changed sizes while reading, it will now properly be
ignored.
  • Loading branch information
Dr-Emann committed Jan 2, 2025
1 parent b960b70 commit faec4b6
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 161 deletions.
30 changes: 0 additions & 30 deletions crates/applesauce/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,36 +268,6 @@ fn try_read_all<R: Read>(mut r: R, buf: &mut [u8]) -> io::Result<usize> {
Ok(read_len)
}

struct InstrumentedIter<I> {
inner: I,
span: tracing::Span,
}

impl<I> Iterator for InstrumentedIter<I>
where
I: Iterator,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
let _enter = self.span.enter();
self.inner.next()
}
}

pub(crate) fn instrumented_iter<IntoIt>(
inner: IntoIt,
span: tracing::Span,
) -> InstrumentedIter<IntoIt::IntoIter>
where
IntoIt: IntoIterator,
{
InstrumentedIter {
inner: inner.into_iter(),
span,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
254 changes: 191 additions & 63 deletions crates/applesauce/src/seq_queue.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,182 @@
use std::fmt;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
use std::{fmt, io};

#[derive(Debug)]
pub struct Sender<T>(crossbeam_channel::Sender<oneshot::Receiver<T>>);

#[derive(Debug)]
pub struct Receiver<T>(crossbeam_channel::Receiver<oneshot::Receiver<T>>);

pub struct Slot<T>(oneshot::Sender<T>);
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct UnknownError;

pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = crossbeam_channel::bounded(cap);
(Sender(tx), Receiver(rx))
impl fmt::Display for UnknownError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("unspecified error in sender for sequential queue")
}
}

impl<T> Sender<T> {
pub fn prepare_send(&self) -> Option<Slot<T>> {
let (tx, rx) = oneshot::channel();
self.0.send(rx).ok()?;
Some(Slot(tx))
impl std::error::Error for UnknownError {}

impl From<UnknownError> for io::Error {
fn from(value: UnknownError) -> Self {
io::Error::new(io::ErrorKind::Other, value)
}
}

impl<T> Clone for Sender<T> {
type FinalSuccessData<E> = Option<Result<(), Option<E>>>;

#[derive(Debug)]
struct FinalSuccess<E>(Arc<Mutex<FinalSuccessData<E>>>);

// Clone doesn't depend on E being clone
impl<E> Clone for FinalSuccess<E> {
fn clone(&self) -> Self {
Self(self.0.clone())
Self(Arc::clone(&self.0))
}
}

impl<T> Slot<T> {
pub fn finish(self, item: T) -> Result<(), oneshot::SendError<T>> {
self.0.send(item)
impl<E> FinalSuccess<E> {
fn new() -> Self {
Self(Arc::new(Mutex::new(None)))
}
}

impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
let inner_chan = self.0.recv().map_err(|_| RecvError::Finished)?;
inner_chan.recv().map_err(|_| RecvError::ItemRecvError)
fn make_success(self) {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
match *lock {
Some(_) => {}
None => {
*lock = Some(Ok(()));
}
}
}
}

pub struct Iter<'a, T> {
receiver: &'a Receiver<T>,
}
fn make_unknown_error(self) {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
match *lock {
Some(Err(Some(_))) => {}
_ => {
*lock = Some(Err(None));
}
}
}

pub struct IntoIter<T> {
receiver: Receiver<T>,
fn make_error(self, error: E) {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
match *lock {
Some(Err(Some(_))) => {}
_ => {
*lock = Some(Err(Some(error)));
}
}
}

fn get_result(self) -> Result<(), Option<E>> {
let mutex = &*self.0;
let mut lock = mutex.lock().unwrap();
lock.take().unwrap_or_else(|| Err(None))
}
}

impl<'a, T> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
struct FinalErrorOnDrop<E>(Option<FinalSuccess<E>>);

fn into_iter(self) -> Self::IntoIter {
Iter { receiver: self }
impl<E> FinalErrorOnDrop<E> {
fn disarm(mut self) {
self.0 = None;
}
}

impl<'a, T> IntoIterator for &'a mut Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
Iter { receiver: self }
impl<E> Drop for FinalErrorOnDrop<E> {
fn drop(&mut self) {
if let Some(final_success) = self.0.take() {
final_success.make_unknown_error();
}
}
}

impl<T> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
#[derive(Debug)]
pub struct Sender<T, E>(
crossbeam_channel::Sender<oneshot::Receiver<T>>,
FinalSuccess<E>,
);

#[derive(Debug)]
pub struct Receiver<T, E>(
crossbeam_channel::Receiver<oneshot::Receiver<T>>,
FinalSuccess<E>,
);

pub struct Slot<T, E>(oneshot::Sender<T>, FinalErrorOnDrop<E>);

pub fn bounded<T, E>(cap: usize) -> (Sender<T, E>, Receiver<T, E>) {
let final_success = FinalSuccess::new();
let (tx, rx) = crossbeam_channel::bounded(cap);
(
Sender(tx, final_success.clone()),
Receiver(rx, final_success),
)
}

impl<T, E> Sender<T, E> {
pub fn prepare_send(&self) -> Option<Slot<T, E>> {
let (tx, rx) = oneshot::channel();
self.0.send(rx).ok()?;
Some(Slot(tx, FinalErrorOnDrop(Some(self.1.clone()))))
}

fn into_iter(self) -> Self::IntoIter {
IntoIter { receiver: self }
pub fn finish(self, result: Result<(), E>) {
match result {
Ok(()) => self.1.make_success(),
Err(e) => self.1.make_error(e),
}
}
}

impl<T> Iterator for Iter<'_, T> {
type Item = T;
impl<T, E> Slot<T, E> {
pub fn finish(self, item: T) -> Result<(), oneshot::SendError<T>> {
self.1.disarm();
self.0.send(item)
}

fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
pub fn error(self, error: E) {
let Self(_sender, mut error_on_drop) = self;
if let Some(final_success) = error_on_drop.0.take() {
final_success.make_error(error)
}
}
}

impl<T> Iterator for IntoIter<T> {
type Item = T;
impl<T, E> Receiver<T, E> {
pub fn try_for_each(self, mut f: impl FnMut(T) -> Result<(), E>) -> Result<(), E>
where
UnknownError: Into<E>,
{
loop {
match self.recv() {
Ok(result) => {
f(result)?;
}
Err(_) => {
return self
.finish()
.map_err(|maybe_e| maybe_e.unwrap_or_else(|| UnknownError.into()))
}
}
}
}

pub fn recv(&self) -> Result<T, RecvError> {
let inner_chan = self.0.recv().map_err(|_| RecvError::Finished)?;
inner_chan.recv().map_err(|_| RecvError::ItemRecvError)
}

fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
pub fn finish(self) -> Result<(), Option<E>> {
let Self(receiver, final_success) = self;
if receiver.recv().is_ok() {
tracing::error!("finish on seq queue received an item");
return Err(None);
}
// Make sure to drop the receiver, to make sure the sender won't block trying to send
// anything
drop(receiver);
final_success.get_result()
}
}

Expand All @@ -111,7 +199,7 @@ impl RecvError {
}

impl fmt::Display for RecvError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.message())
}
}
Expand All @@ -125,14 +213,13 @@ mod tests {

#[test]
fn order_after_sending() {
let (tx, rx) = bounded(2);
let (tx, rx) = bounded::<u8, ()>(2);

let first = tx.prepare_send().unwrap();
assert_eq!(rx.0.len(), 1);
let second = tx.prepare_send().unwrap();
assert_eq!(rx.0.len(), 2);

drop(tx);
tx.finish(Ok(()));

second.finish(2).unwrap();
first.finish(1).unwrap();
Expand All @@ -141,12 +228,51 @@ mod tests {

assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert!(rx.recv().is_err());
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Ok(()));
}

#[test]
fn no_success_becomes_err() {
let (tx, rx) = bounded::<u8, ()>(2);

let first = tx.prepare_send().unwrap();
first.finish(1).unwrap();
drop(tx);

assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Err(None));
}

#[test]
fn unfinished_send_becomes_err() {
let (tx, rx) = bounded::<u8, &str>(2);

let first = tx.prepare_send().unwrap();
drop(first);
tx.finish(Ok(()));

assert_eq!(rx.recv().unwrap_err(), RecvError::ItemRecvError);
assert_eq!(rx.finish(), Err(None));
}

#[test]
fn explicit_send_err() {
let (tx, rx) = bounded::<u8, &str>(2);

let first = tx.prepare_send().unwrap();
first.finish(1).unwrap();
tx.finish(Err("error"));

assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Err(Some("error")));
}

#[test]
fn across_threads() {
let (tx, rx) = bounded(2);
let (tx, rx) = bounded::<u32, ()>(2);

let sender_handle = std::thread::spawn(move || {
let tx = tx;
Expand All @@ -160,12 +286,14 @@ mod tests {
slot.finish(i).unwrap();
});
}
tx.finish(Ok(()));
});

for i in 0..1000 {
assert_eq!(rx.recv().unwrap(), i);
}
assert_eq!(rx.recv().unwrap_err(), RecvError::Finished);
assert_eq!(rx.finish(), Ok(()));

sender_handle.join().unwrap();
}
Expand Down
Loading

0 comments on commit faec4b6

Please sign in to comment.