Skip to content

Commit

Permalink
add support on Windows (#19)
Browse files Browse the repository at this point in the history
* use Read trait instead of recv_from in socket2 0.4 version
* service info should be resolve even if properties is empty
  • Loading branch information
keepsimple1 authored Jun 15, 2022
1 parent 38a1dea commit 7d94326
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 97 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ default = ["async"]
[dependencies]
flume = { version = "0.10", default-features = false } # channel between threads
log = "0.4.14" # logging
nix = "0.24.1" # socket APIs
polling = "2.1.0" # select/poll sockets
socket2 = { version = "0.4", features = ["all"] } # socket APIs
176 changes: 81 additions & 95 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,19 @@
// corresponds to a set of DNS Resource Records.
use flume::{bounded, Sender, TrySendError};
use log::{debug, error};
use nix::{
errno, fcntl,
sys::{
select::{select, FdSet},
socket::{
bind, recvfrom, sendto, setsockopt, socket, sockopt, AddressFamily,
IpMembershipRequest, MsgFlags, SockFlag, SockType, SockaddrIn,
},
time::{TimeVal, TimeValLike},
},
};
use polling::Poller;
use socket2::{SockAddr, Socket};
use std::{
any::Any,
cmp,
collections::{HashMap, HashSet},
convert::TryInto,
fmt,
net::Ipv4Addr,
os::unix::io::RawFd,
io::Read,
net::{Ipv4Addr, SocketAddrV4},
str::{self, FromStr},
thread,
time::SystemTime,
time::{Duration, SystemTime},
vec,
};

Expand Down Expand Up @@ -393,30 +384,38 @@ impl ServiceDaemon {
/// 4. announce its registered services.
/// 5. process retransmissions if any.
fn run(mut zc: Zeroconf, receiver: Receiver<Command>) {
let poller_key = 17; // An arbitrary number to identify the events we are interested in.
if let Err(e) = zc
.poller
.add(&zc.listen_socket, polling::Event::readable(poller_key))
{
error!("failed to add socket to poller: {}", e);
return;
}
let mut events = Vec::new();
let timeout = Duration::from_millis(10);

loop {
let mut read_fds = FdSet::new();
read_fds.insert(zc.listen_socket);

// read incoming packets with a small timeout
let mut timeout = TimeVal::milliseconds(10);

// From POSIX select():
// If the readfds argument is not a null pointer,
// it points to an object of type fd_set that on input
// specifies the file descriptors to be checked for
// being ready to read, and on output indicates which
// file descriptors are ready to read.
match select(None, Some(&mut read_fds), None, None, Some(&mut timeout)) {
events.clear();
match zc.poller.wait(&mut events, Some(timeout)) {
Ok(_) => {
for fd in read_fds.fds(None) {
// Read from `fd` until no more packets available.
let count = events.iter().filter(|ev| ev.key == poller_key).count();
if count > 0 {
// Read until no more packets available.
loop {
let rc = zc.handle_read(fd);
let rc = zc.handle_read();
if !rc {
break;
}
}
}
if let Err(e) = zc
.poller
.modify(&zc.listen_socket, polling::Event::readable(poller_key))
{
error!("failed to poll listen_socket again: {}", e);
break;
}
}
Err(e) => error!("failed to select from sockets: {}", e),
}
Expand Down Expand Up @@ -550,11 +549,13 @@ impl ServiceDaemon {
None => error!("StopBrowse: cannot find querier for {}", &ty_domain),
Some((ty, sender)) => {
// Remove pending browse commands in the reruns.
debug!("StopBrowse: removed queryer for {}", &ty);
let mut i = 0;
while i < zc.retransmissions.len() {
if let Command::Browse(t, _, _) = &zc.retransmissions[i].command {
if t == &ty {
zc.retransmissions.remove(i);
debug!("StopBrowse: removed retransmission for {}", &ty);
continue;
}
}
Expand Down Expand Up @@ -583,29 +584,27 @@ impl ServiceDaemon {

/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
/// `non_block` indicates whether to set O_NONBLOCK for the socket.
fn new_socket(port: u16, non_block: bool) -> Result<RawFd> {
let fd = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None,
)
.map_err(|e| e_fmt!("nix::sys::socket failed: {}", e))?;

setsockopt(fd, sockopt::ReuseAddr, &true)
.map_err(|e| e_fmt!("nix::sys::setsockopt ReuseAddr failed: {}", e))?;
setsockopt(fd, sockopt::ReusePort, &true)
.map_err(|e| e_fmt!("nix::sys::setsockopt ReusePort failed: {}", e))?;
fn new_socket(port: u16, non_block: bool) -> Result<Socket> {
let fd = Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)
.map_err(|e| e_fmt!("create socket failed: {}", e))?;

fd.set_reuse_address(true)
.map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
#[cfg(unix)] // this is currently restricted to Unix's in socket2
fd.set_reuse_port(true)
.map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;

if non_block {
fcntl::fcntl(fd, fcntl::FcntlArg::F_SETFL(fcntl::OFlag::O_NONBLOCK))
.map_err(|e| e_fmt!("nix::fcntl O_NONBLOCK: {}", e))?;
fd.set_nonblocking(true)
.map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
}

let ipv4_any = SockaddrIn::new(0, 0, 0, 0, port);
bind(fd, &ipv4_any).map_err(|e| e_fmt!("nix::sys::socket::bind failed: {}", e))?;
let ipv4_any = Ipv4Addr::new(0, 0, 0, 0);
let inet_addr = SocketAddrV4::new(ipv4_any, port);
fd.bind(&inet_addr.into())
.map_err(|e| e_fmt!("socket bind failed: {}", e))?;

debug!("new socket {} bind to {}", &fd, &ipv4_any);
debug!("new socket bind to {}", &inet_addr);
Ok(fd)
}

Expand All @@ -627,19 +626,19 @@ struct ReRun {
struct Zeroconf {
/// One socket to receive all mDNS packets incoming, regardless interface.
/// This socket will not be able to read unicast packets.
listen_socket: RawFd,
listen_socket: Socket,

/// Sockets for outgoing packets.
/// NOTE: For now we only support multicast and this Vec has only one socket.
/// If we support unicast, we will have one respond socket for each
/// valid interface, and read unicast packets from these sockets.
respond_sockets: Vec<RawFd>,
respond_sockets: Vec<Socket>,

/// Local registered services
my_services: HashMap<String, ServiceInfo>,

/// Well-known mDNS IPv4 address and port
broadcast_addr: SockaddrIn,
broadcast_addr: SockAddr,

cache: DnsCache,

Expand All @@ -653,25 +652,28 @@ struct Zeroconf {
retransmissions: Vec<ReRun>,

counters: Metrics,

poller: Poller,
}

impl Zeroconf {
fn new(udp_port: u16) -> Result<Self> {
let poller = Poller::new().map_err(|e| e_fmt!("create Poller: {}", e))?;
let listen_socket = new_socket(udp_port, true)?;
debug!("created listening socket: {}", &listen_socket);
debug!("created listening socket: {:?}", &listen_socket);

let group_addr = Ipv4Addr::new(224, 0, 0, 251);
let request = IpMembershipRequest::new(group_addr, None);
setsockopt(listen_socket, sockopt::IpAddMembership, &request)
.map_err(|e| e_fmt!("nix::sys::setsockopt failed: {}", e))?;
listen_socket
.join_multicast_v4(&group_addr, &Ipv4Addr::new(0, 0, 0, 0))
.map_err(|e| e_fmt!("join multicast group: {}", e))?;

// We are not setting specific outgoing interface for this socket.
// It will use the default outgoing interface set by the OS.
let mut respond_sockets = Vec::new();
let respond_socket = new_socket(udp_port, false)?;
respond_sockets.push(respond_socket);

let broadcast_addr = SockaddrIn::new(224, 0, 0, 251, MDNS_PORT);
let broadcast_addr = SocketAddrV4::new(group_addr, MDNS_PORT).into();

Ok(Self {
listen_socket,
Expand All @@ -683,6 +685,7 @@ impl Zeroconf {
instances_to_resolve: HashMap::new(),
retransmissions: Vec::new(),
counters: HashMap::new(),
poller,
})
}

Expand Down Expand Up @@ -831,10 +834,10 @@ impl Zeroconf {
}

/// Sends an outgoing packet, and returns the packet bytes.
fn send(&self, out: &DnsOutgoing, addr: &SockaddrIn) -> Vec<u8> {
fn send(&self, out: &DnsOutgoing, addr: &SockAddr) -> Vec<u8> {
let qtype = if out.is_query() { "query" } else { "response" };
debug!(
"Sending {} to {}: {} questions {} answers {} authorities {} additional",
"Sending {} to {:?}: {} questions {} answers {} authorities {} additional",
qtype,
addr,
out.questions.len(),
Expand All @@ -852,10 +855,10 @@ impl Zeroconf {
packet
}

fn send_packet(&self, packet: &[u8], addr: &SockaddrIn) {
fn send_packet(&self, packet: &[u8], addr: &SockAddr) {
for s in self.respond_sockets.iter() {
match sendto(*s, packet, addr, MsgFlags::empty()) {
Ok(sz) => debug!("sent out {} bytes on socket {}", sz, s),
match s.send_to(packet, addr) {
Ok(sz) => debug!("sent out {} bytes on socket {:?}", sz, s),
Err(e) => error!("send failed: {}", e),
}
}
Expand All @@ -870,37 +873,26 @@ impl Zeroconf {

/// Returns false if failed to receive a packet,
/// otherwise returns true.
/// `sockfd` is expected to be connectionless (i.e. UDP socket).
fn handle_read(&mut self, sockfd: RawFd) -> bool {
let mut buf = vec![0; MAX_MSG_ABSOLUTE];
let (sz, src_addr) = match recvfrom(sockfd, &mut buf) {
Ok((sz, Some(addr))) => (sz, addr),
Ok((_, None)) => {
error!("recvfrom could not find source address");
return false;
}
Err(errno::Errno::EAGAIN) => {
// Simply means the fd has no more packets to read.
// No need to log an error.
return false;
}
fn handle_read(&mut self) -> bool {
let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];
let sz = match self.listen_socket.read(&mut buf) {
Ok(sz) => sz,
Err(e) => {
error!("recvfrom failed: {}", e);
if e.kind() != std::io::ErrorKind::WouldBlock {
error!("listening socket read failed: {}", e);
}
return false;
}
};

debug!(
"socket fd {} received {} bytes from {}",
&sockfd, sz, src_addr
);
debug!("received {} bytes", sz);

match DnsIncoming::new(buf) {
Ok(msg) => {
if msg.is_query() {
self.handle_query(msg, &src_addr);
self.handle_query(msg);
} else if msg.is_response() {
self.handle_response(msg, &src_addr);
self.handle_response(msg);
} else {
error!("Invalid message: not query and not response");
}
Expand Down Expand Up @@ -1087,10 +1079,10 @@ impl Zeroconf {

/// Deal with incoming response packets. All answers
/// are held in the cache, and listeners are notified.
fn handle_response(&mut self, mut msg: DnsIncoming, src: &SockaddrIn) {
fn handle_response(&mut self, mut msg: DnsIncoming) {
debug!(
"handle_response from {}: {} answers {} authorities {} additionals",
src, &msg.num_answers, &msg.num_authorities, &msg.num_additionals
"handle_response: {} answers {} authorities {} additionals",
&msg.num_answers, &msg.num_authorities, &msg.num_additionals
);
let now = current_time_millis();

Expand All @@ -1116,8 +1108,7 @@ impl Zeroconf {
}
}

fn handle_query(&mut self, msg: DnsIncoming, addr: &SockaddrIn) {
debug!("handle_query from {}", &addr);
fn handle_query(&mut self, msg: DnsIncoming) {
let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);

// Special meta-query "_services._dns-sd._udp.<Domain>".
Expand Down Expand Up @@ -1408,10 +1399,7 @@ impl AsIpv4Addrs for &str {
fn as_ipv4_addrs(&self) -> Result<HashSet<Ipv4Addr>> {
let mut addrs = HashSet::new();

let iter = self
.split(',')
.map(str::trim)
.map(std::net::Ipv4Addr::from_str);
let iter = self.split(',').map(str::trim).map(Ipv4Addr::from_str);

for addr in iter {
let addr = addr.map_err(|err| Error::ParseIpAddr(err.to_string()))?;
Expand Down Expand Up @@ -1500,9 +1488,7 @@ impl ServiceInfo {
let fullname = format!("{}.{}", my_name, ty_domain);
let ty_domain = ty_domain.to_string();
let server = host_name.to_string();

let addresses = host_ipv4.as_ipv4_addrs()?;

let properties = properties.unwrap_or_default();

let this = Self {
Expand Down Expand Up @@ -1558,13 +1544,13 @@ impl ServiceInfo {
self.other_ttl
}

/// Returns whether the service info is ready to be resolved.
fn is_ready(&self) -> bool {
let some_missing = self.ty_domain.is_empty()
|| self.fullname.is_empty()
|| self.server.is_empty()
|| self.port == 0
|| self.addresses.is_empty()
|| self.properties.is_empty();
|| self.addresses.is_empty();
!some_missing
}

Expand Down
Loading

0 comments on commit 7d94326

Please sign in to comment.