Skip to content

Commit

Permalink
Block shutdown until all pending messages are sent (Fixes librespot-o…
Browse files Browse the repository at this point in the history
…rg#27).

Ideally the task within our "mdns-responder" thread would block
until all the futures actually return Poll::Ready but I can't get
that to work, hence the explicit extra blocking I have added.
  • Loading branch information
kingosticks committed Mar 25, 2021
1 parent 4829c5c commit d36ebb7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
27 changes: 21 additions & 6 deletions src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};

use tokio::{net::UdpSocket, sync::mpsc};

Expand All @@ -31,6 +32,7 @@ pub enum Command {
}

pub struct FSM<AF: AddressFamily> {
shutdown_done: Receiver<bool>,
socket: UdpSocket,
services: Services,
commands: mpsc::UnboundedReceiver<Command>,
Expand All @@ -40,21 +42,23 @@ pub struct FSM<AF: AddressFamily> {

impl<AF: AddressFamily> FSM<AF> {
// Will panic if called from outside the context of a runtime
pub fn new(services: &Services) -> io::Result<(FSM<AF>, mpsc::UnboundedSender<Command>)> {
pub fn new(services: &Services) -> io::Result<(FSM<AF>, mpsc::UnboundedSender<Command>, SyncSender<bool>)> {
let std_socket = AF::bind()?;
let socket = UdpSocket::from_std(std_socket)?;

let (tx, rx) = mpsc::unbounded_channel();
let (tx_fin, rx_fin) = sync_channel::<bool>(0);

let fsm = FSM {
shutdown_done: rx_fin,
socket: socket,
services: services.clone(),
commands: rx,
outgoing: VecDeque::new(),
_af: PhantomData,
};

Ok((fsm, tx))
Ok((fsm, tx, tx_fin))
}

fn recv_packets(&mut self, cx: &mut Context) -> io::Result<()> {
Expand Down Expand Up @@ -221,9 +225,13 @@ impl<AF: Unpin + AddressFamily> Future for FSM<AF> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
let pinned = Pin::get_mut(self);
let mut shutdown = false;
while let Poll::Ready(cmd) = Pin::new(&mut pinned.commands).poll_recv(cx) {
match cmd {
Some(Command::Shutdown) => return Poll::Ready(()),
Some(Command::Shutdown) => {
shutdown = true;
break;
}
Some(Command::SendUnsolicited {
svc,
ttl,
Expand All @@ -238,9 +246,11 @@ impl<AF: Unpin + AddressFamily> Future for FSM<AF> {
}
}

match pinned.recv_packets(cx) {
Ok(_) => (),
Err(e) => error!("ResponderRecvPacket Error: {:?}", e),
if !shutdown {
match pinned.recv_packets(cx) {
Ok(_) => (),
Err(e) => error!("ResponderRecvPacket Error: {:?}", e),
}
}

while let Some((ref response, addr)) = pinned.outgoing.pop_front() {
Expand All @@ -255,6 +265,11 @@ impl<AF: Unpin + AddressFamily> Future for FSM<AF> {
}
}

if shutdown {
pinned.shutdown_done.recv().unwrap();
return Poll::Ready(());
}

Poll::Pending
}
}
20 changes: 12 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::future::Future;
use std::io;
use std::marker::Unpin;
use std::sync::{Arc, RwLock};
use std::sync::mpsc::SyncSender;

use std::thread;
use tokio::{runtime::Handle, sync::mpsc};
Expand Down Expand Up @@ -105,15 +106,15 @@ impl Responder {
let v4 = FSM::<Inet>::new(&services);
let v6 = FSM::<Inet6>::new(&services);

let (task, commands): (ResponderTask, _) = match (v4, v6) {
(Ok((v4_task, v4_command)), Ok((v6_task, v6_command))) => {
let (task, commands, shutdown): (ResponderTask, _, _) = match (v4, v6) {
(Ok((v4_task, v4_command, v4_shutdown)), Ok((v6_task, v6_command, v6_shutdown))) => {
let tasks = future::join(v4_task, v6_task).map(|((), ())| ());
(Box::new(tasks), vec![v4_command, v6_command])
(Box::new(tasks), vec![v4_command, v6_command], vec![v4_shutdown, v6_shutdown])
}

(Ok((v4_task, v4_command)), Err(err)) => {
(Ok((v4_task, v4_command, v4_shutdown)), Err(err)) => {
warn!("Failed to register IPv6 receiver: {:?}", err);
(Box::new(v4_task), vec![v4_command])
(Box::new(v4_task), vec![v4_command], vec![v4_shutdown])
}

(Err(err), _) => return Err(err),
Expand All @@ -123,7 +124,7 @@ impl Responder {
let responder = Responder {
services: services,
commands: RefCell::new(commands.clone()),
shutdown: Arc::new(Shutdown(commands)),
shutdown: Arc::new(Shutdown(commands, shutdown)),
};

Ok((responder, task))
Expand Down Expand Up @@ -197,12 +198,15 @@ impl Drop for Service {
}
}

struct Shutdown(CommandSender);
struct Shutdown(CommandSender, Vec<SyncSender<bool>>);

impl Drop for Shutdown {
fn drop(&mut self) {
self.0.send_shutdown();
// TODO wait for tasks to shutdown
for x in self.1.iter() {
// Blocks until service has acked the shutdown.
let _ = x.send(false).unwrap();
}
}
}

Expand Down

0 comments on commit d36ebb7

Please sign in to comment.