Skip to content

Commit

Permalink
Implement RTP over TCP on output (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
wkozyra95 authored Feb 23, 2024
1 parent 97ecbc1 commit 2633054
Show file tree
Hide file tree
Showing 16 changed files with 495 additions and 322 deletions.
6 changes: 6 additions & 0 deletions compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub enum OutputInitError {

#[error(transparent)]
SocketError(#[from] std::io::Error),

#[error("Failed to register output. Port: {0} is already used or not available.")]
PortAlreadyInUse(u16),

#[error("Failed to register output. All ports in range {lower_bound} to {upper_bound} are already used or not available.")]
AllPortsAlreadyInUse { lower_bound: u16, upper_bound: u16 },
}

#[derive(Debug, thiserror::Error)]
Expand Down
9 changes: 2 additions & 7 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,17 @@ pub mod input;
pub mod output;
mod pipeline_input;
mod pipeline_output;
pub mod rtp;
mod structs;

use self::pipeline_input::new_pipeline_input;
use self::pipeline_output::new_pipeline_output;
pub use self::structs::AudioCodec;
pub use self::structs::VideoCodec;

#[derive(Debug)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Port(pub u16);

#[derive(Debug, Clone, Copy)]
pub enum RequestedPort {
Exact(u16),
Range((u16, u16)),
}

pub struct RegisterInputOptions {
pub input_options: InputOptions,
pub queue_options: queue::InputOptions,
Expand Down
77 changes: 20 additions & 57 deletions compositor_pipeline/src/pipeline/input/rtp.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{
net,
sync::{atomic::AtomicBool, Arc},
thread,
};

use crate::pipeline::{
decoder::{self, DecoderOptions},
encoder,
rtp::{bind_to_requested_port, BindToPortError, RequestedPort, TransportProtocol},
structs::EncodedChunkKind,
Port, RequestedPort,
Port,
};
use compositor_render::InputId;
use crossbeam_channel::{unbounded, Receiver};
Expand Down Expand Up @@ -52,12 +52,6 @@ pub struct RtpReceiverOptions {
pub stream: RtpStream,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransportProtocol {
Udp,
TcpServer,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputVideoStream {
pub options: decoder::VideoDecoderOptions,
Expand Down Expand Up @@ -137,7 +131,7 @@ impl RtpReceiver {
)
.map_err(RtpReceiverError::SocketOptions)?;

let port = Self::bind_to_port(opts.port, &socket)?;
let port = bind_to_requested_port(opts.port, &socket)?;

socket.listen(1).map_err(RtpReceiverError::SocketBind)?;

Expand Down Expand Up @@ -174,7 +168,7 @@ impl RtpReceiver {
}
}

let port = Self::bind_to_port(opts.port, &socket)?;
let port = bind_to_requested_port(opts.port, &socket)?;

socket
.set_read_timeout(Some(std::time::Duration::from_millis(50)))
Expand All @@ -189,53 +183,6 @@ impl RtpReceiver {

Ok((port, receiver_thread, packets_rx))
}

fn bind_to_port(
requested_port: RequestedPort,
socket: &socket2::Socket,
) -> Result<Port, RtpReceiverError> {
let port = match requested_port {
RequestedPort::Exact(port) => {
socket
.bind(
&net::SocketAddr::V4(net::SocketAddrV4::new(
net::Ipv4Addr::UNSPECIFIED,
port,
))
.into(),
)
.map_err(|err| match err.kind() {
std::io::ErrorKind::AddrInUse => RtpReceiverError::PortAlreadyInUse(port),
_ => RtpReceiverError::SocketBind(err),
})?;
port
}
RequestedPort::Range((lower_bound, upper_bound)) => {
let port = (lower_bound..upper_bound).find(|port| {
let bind_res = socket.bind(
&net::SocketAddr::V4(net::SocketAddrV4::new(
net::Ipv4Addr::UNSPECIFIED,
*port,
))
.into(),
);

bind_res.is_ok()
});

match port {
Some(port) => port,
None => {
return Err(RtpReceiverError::AllPortsAlreadyInUse {
lower_bound,
upper_bound,
})
}
}
}
};
Ok(Port(port))
}
}

impl Drop for RtpReceiver {
Expand Down Expand Up @@ -357,3 +304,19 @@ pub enum DepayloadingError {
#[error(transparent)]
Rtp(#[from] rtp::Error),
}

impl From<BindToPortError> for RtpReceiverError {
fn from(value: BindToPortError) -> Self {
match value {
BindToPortError::SocketBind(err) => RtpReceiverError::SocketBind(err),
BindToPortError::PortAlreadyInUse(port) => RtpReceiverError::PortAlreadyInUse(port),
BindToPortError::AllPortsAlreadyInUse {
lower_bound,
upper_bound,
} => RtpReceiverError::AllPortsAlreadyInUse {
lower_bound,
upper_bound,
},
}
}
}
116 changes: 61 additions & 55 deletions compositor_pipeline/src/pipeline/input/rtp/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::VecDeque,
io::{self, Read},
net::TcpStream,
sync::{atomic::AtomicBool, Arc},
Expand All @@ -9,14 +10,13 @@ use std::{
use bytes::BytesMut;
use compositor_render::error::ErrorStack;
use crossbeam_channel::Sender;
use log::info;
use log::error;

pub(super) fn run_tcp_server_receiver(
socket: std::net::TcpListener,
packets_tx: Sender<bytes::Bytes>,
should_close: Arc<AtomicBool>,
) {
let mut buffer = BytesMut::zeroed(65536);
// make accept non blocking so we have a chance to handle should_close value
socket
.set_nonblocking(true)
Expand All @@ -28,73 +28,79 @@ pub(super) fn run_tcp_server_receiver(
}

// accept only one connection at the time
let accept_result = socket.accept();
let Ok((socket, _)) = accept_result else {
let Ok((socket, _)) = socket.accept() else {
thread::sleep(Duration::from_millis(50));
continue;
};
info!("Connection accepted");

socket
.set_read_timeout(Some(Duration::from_millis(50)))
.expect("Cannot set read timeout");

let mut socket = TcpReadPacketStream::new(socket, should_close.clone());
loop {
let mut len_bytes = [0u8; 2];
if let Err(err) = (&socket).read_exact_with_should_close(&mut len_bytes, &should_close)
{
maybe_log_err(err);
break;
};
let len = u16::from_be_bytes(len_bytes) as usize;

if let Err(err) =
(&socket).read_exact_with_should_close(&mut buffer[..len], &should_close)
{
maybe_log_err(err);
break;
};
packets_tx
.send(bytes::Bytes::copy_from_slice(&buffer[..len]))
.unwrap();
match socket.read_packet() {
Ok(packet) => {
packets_tx.send(packet).unwrap();
}
Err(err) => {
error!(
"Error while reading from TCP socket: {}",
ErrorStack::new(&err).into_string()
);
break;
}
}
}
}
}

fn maybe_log_err(err: io::Error) {
if err.kind() != io::ErrorKind::WouldBlock {
log::error!(
"Unknown error when reading from TCP socket. {}",
ErrorStack::new(&err).into_string()
);
}
struct TcpReadPacketStream {
socket: TcpStream,
buf: VecDeque<u8>,
read_buf: Vec<u8>,
should_close: Arc<AtomicBool>,
}

trait TcpStreamExt {
fn read_exact_with_should_close(
&mut self,
buf: &mut [u8],
should_close: &Arc<AtomicBool>,
) -> io::Result<()>;
}
impl TcpReadPacketStream {
fn new(socket: TcpStream, should_close: Arc<AtomicBool>) -> Self {
socket
.set_read_timeout(Some(Duration::from_millis(50)))
.expect("Cannot set read timeout");
Self {
socket,
buf: VecDeque::new(),
read_buf: vec![0; 65536],
should_close,
}
}
fn read_packet(&mut self) -> io::Result<bytes::Bytes> {
self.read_until_buffer_size(2)?;

let mut len_bytes = [0u8; 2];
self.buf.read_exact(&mut len_bytes)?;
let len = u16::from_be_bytes(len_bytes) as usize;

self.read_until_buffer_size(len)?;
let mut packet = BytesMut::zeroed(len);
self.buf.read_exact(&mut packet[..])?;
Ok(packet.freeze())
}

impl TcpStreamExt for &TcpStream {
fn read_exact_with_should_close(
&mut self,
buf: &mut [u8],
should_close: &Arc<AtomicBool>,
) -> io::Result<()> {
fn read_until_buffer_size(&mut self, buf_size: usize) -> io::Result<()> {
loop {
match self.read_exact(buf) {
Ok(val) => return Ok(val),
Err(err) => match err.kind() {
std::io::ErrorKind::WouldBlock
if should_close.load(std::sync::atomic::Ordering::Relaxed) =>
{
continue;
if self.buf.len() >= buf_size {
return Ok(());
}
match self.socket.read(&mut self.read_buf) {
Ok(read_bytes) => {
self.buf.extend(self.read_buf[0..read_bytes].iter());
}
Err(err) => {
let should_close = self.should_close.load(std::sync::atomic::Ordering::Relaxed);
match err.kind() {
std::io::ErrorKind::WouldBlock if !should_close => {
continue;
}
_ => return io::Result::Err(err),
}
_ => return io::Result::Err(err),
},
}
};
}
}
Expand Down
4 changes: 2 additions & 2 deletions compositor_pipeline/src/pipeline/input/rtp/udp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::{atomic::AtomicBool, Arc};

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use crossbeam_channel::Sender;

pub(super) fn run_udp_receiver(
Expand Down Expand Up @@ -28,7 +28,7 @@ pub(super) fn run_udp_receiver(
};

packets_tx
.send(bytes::Bytes::copy_from_slice(&buffer[..received_bytes]))
.send(Bytes::copy_from_slice(&buffer[..received_bytes]))
.unwrap();
}
}
Loading

0 comments on commit 2633054

Please sign in to comment.