Skip to content

Commit

Permalink
discovery: add Avahi reconnection logic
Browse files Browse the repository at this point in the history
This deals gracefully with the case where the Avahi daemon is restarted
or not running initially.
  • Loading branch information
wisp3rwind committed Oct 6, 2024
1 parent e3f815a commit 33964da
Showing 1 changed file with 115 additions and 25 deletions.
140 changes: 115 additions & 25 deletions discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,38 +179,128 @@ async fn avahi_task(
port: u16,
entry_group: &mut Option<avahi::EntryGroupProxy<'_>>,
) -> Result<(), DiscoveryError> {
use self::avahi::ServerProxy;
use self::avahi::{EntryGroupState, ServerProxy};
use futures_util::StreamExt;

let conn = zbus::Connection::system().await?;

// Connect to Avahi and publish the service
let avahi_server = ServerProxy::new(&conn).await?;
log::trace!("Connected to Avahi");

*entry_group = Some(avahi_server.entry_group_new().await?);

entry_group
.as_mut()
.unwrap()
.add_service(
-1, // AVAHI_IF_UNSPEC
-1, // IPv4 and IPv6
0, // flags
&name,
DNS_SD_SERVICE_NAME, // type
"", // domain: let the server choose
"", // host: let the server choose
port,
&TXT_RECORD.map(|s| s.as_bytes()),
)
// Wait for the daemon to show up.
// On error: Failed to listen for NameOwnerChanged signal => Fatal DBus issue
let bus = zbus::fdo::DBusProxy::new(&conn).await?;
let mut stream = bus
.receive_name_owner_changed_with_args(&[(0, "org.freedesktop.Avahi")])
.await?;

entry_group.as_mut().unwrap().commit().await?;
log::debug!("Commited zeroconf service with name {}", &name);
loop {
// Wait for Avahi daemon to be started
'wait_avahi: {
while let Poll::Ready(Some(_)) = futures_util::poll!(stream.next()) {
// Drain queued name owner changes, since we're going to connect in a second
}

// Ping after we connected to the signal since it might have shown up in the meantime
if let Ok(avahi_peer) =
zbus::fdo::PeerProxy::new(&conn, "org.freedesktop.Avahi", "/").await
{
if avahi_peer.ping().await.is_ok() {
log::debug!("Pinged Avahi: Available");
break 'wait_avahi;
}
}
log::warn!("Failed to connect to Avahi, zeroconf discovery will not work until avahi-daemon is started. Check that it is installed and running");

// If it didn't, wait for the signal
match stream.next().await {
Some(_signal) => {
log::debug!("Avahi appeared");
break 'wait_avahi;
}
// The stream ended, but this should never happen
None => {
return Err(zbus::Error::Failure("DBus disappeared".to_owned()).into());
}
}
}

// Connect to Avahi and publish the service
let avahi_server = ServerProxy::new(&conn).await?;
log::trace!("Connected to Avahi");

*entry_group = Some(avahi_server.entry_group_new().await?);

let _: () = std::future::pending().await;
let mut entry_group_state_stream = entry_group
.as_mut()
.unwrap()
.receive_state_changed()
.await?;

Ok(())
entry_group
.as_mut()
.unwrap()
.add_service(
-1, // AVAHI_IF_UNSPEC
-1, // IPv4 and IPv6
0, // flags
&name,
DNS_SD_SERVICE_NAME, // type
"", // domain: let the server choose
"", // host: let the server choose
port,
&TXT_RECORD.map(|s| s.as_bytes()),
)
.await?;

entry_group.as_mut().unwrap().commit().await?;
log::debug!("Commited zeroconf service with name {}", &name);

'monitor_service: loop {
tokio::select! {
Some(state_changed) = entry_group_state_stream.next() => {
let (state, error) = match state_changed.args() {
Ok(sc) => (sc.state, sc.error),
Err(e) => {
log::warn!("Error on receiving EntryGroup state from Avahi: {}", e);
continue 'monitor_service;
}
};
match state {
EntryGroupState::Uncommited | EntryGroupState::Registering => {
// Not yet registered, ignore.
}
EntryGroupState::Established => {
log::info!("Published zeroconf service");
}
EntryGroupState::Collision => {
// This most likely means that librespot has unintentionally been started twice.
// Thus, don't retry with a new name, but abort.
//
// Note that the error would usually already be returned by
// entry_group.add_service above, so this state_changed handler
// won't be hit.
//
// EntryGroup has been withdrawn at this point already!
log::error!("zeroconf collision for name '{}'", &name);
return Err(zbus::Error::Failure(format!("zeroconf collision for name: {}", name)).into());
}
EntryGroupState::Failure => {
// TODO: Back off/treat as fatal?
// EntryGroup has been withdrawn at this point already!
// There seems to be no code in Avahi that actually sets this state.
log::error!("zeroconf failure: {}", error);
return Err(zbus::Error::Failure(format!("zeroconf failure: {}", error)).into());
}
}
}
_name_owner_change = stream.next() => {
break 'monitor_service;
}
}
}

// Avahi disappeared (or the service was immediately taken over by a
// new daemon) => drop all handles, and reconnect
log::info!("Avahi disappeared, trying to reconnect");
}
}

#[cfg(feature = "with-avahi")]
Expand Down

0 comments on commit 33964da

Please sign in to comment.