Skip to content

Commit

Permalink
vsock: add buffer space management
Browse files Browse the repository at this point in the history
`buf_alloc` and `fwd_cnt` in packet header are used for buffer space
management of stream sockets. If peer has insufficient buffer space, the
sender waits until new packets are returned and checks `buf_alloc` and
`fwd_cnt` again.

`VIRTIO_VSOCK_OP_CREDIT_REQUEST` packet is used to query how much buffer
space is available. `VIRTIO_VSOCK_OP_CREDIT_UPDATE` replies the query
and it can also be sent without previous `VIRTIO_VSOCK_OP_CREDIT_REQUEST`.

This patch tracks the `fwd_cnt` and `buf_alloc` to calculate the free
space of peer. The credite request and update packets are also handled.

Signed-off-by: Jiaqi Gao <[email protected]>
  • Loading branch information
gaojiaqi7 authored and jyao1 committed Oct 31, 2024
1 parent 91ad64e commit 97b95be
Showing 1 changed file with 96 additions and 38 deletions.
134 changes: 96 additions & 38 deletions src/devices/vsock/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ pub struct VsockStream {
addr: VsockAddrPair,
data_queue: VecDeque<Vec<u8>>,
rx_cnt: u32,
tx_cnt: u32,
peer_fwd_cnt: u32,
peer_buf_alloc: u32,
}

impl Read for VsockStream {
Expand Down Expand Up @@ -124,6 +127,9 @@ impl VsockStream {
},
data_queue: VecDeque::new(),
rx_cnt: 0,
tx_cnt: 0,
peer_fwd_cnt: 0,
peer_buf_alloc: 0,
})
}

Expand Down Expand Up @@ -201,6 +207,9 @@ impl VsockStream {
},
data_queue: VecDeque::new(),
rx_cnt: 0,
tx_cnt: 0,
peer_fwd_cnt: packet.fwd_cnt(),
peer_buf_alloc: packet.buf_alloc(),
};

add_stream_to_connection_map(&new_stream);
Expand Down Expand Up @@ -254,6 +263,8 @@ impl VsockStream {
&& packet.src_cid() == self.addr.remote.cid() as u64
{
self.state = State::Establised;
self.peer_buf_alloc = packet.buf_alloc();
self.peer_fwd_cnt = packet.fwd_cnt();
Ok(())
} else {
Err(VsockError::REFUSED)
Expand Down Expand Up @@ -297,6 +308,11 @@ impl VsockStream {
if state != State::Establised {
return Err(VsockError::Illegal);
}

while self.has_free_space() == 0 {
self.recv_packet_connected()?;
}

let mut header_buf = [0u8; HEADER_LEN];
let mut packet = Packet::new_unchecked(&mut header_buf[..]);
packet.set_src_cid(self.addr.local.cid() as u64);
Expand All @@ -315,6 +331,7 @@ impl VsockStream {
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.enqueue(self, packet.as_ref(), buf, DEFAULT_TIMEOUT)?;
self.tx_cnt += buf.len() as u32;

Ok(buf.len())
}
Expand All @@ -325,44 +342,8 @@ impl VsockStream {
return Err(VsockError::Illegal);
}

if self.data_queue.is_empty() {
let recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;
let packet = Packet::new_checked(recv.as_slice())?;

if packet.op() == field::OP_SHUTDOWN {
self.shutdown()?;
return Ok(0);
}
if packet.op() == field::OP_RST {
self.reset()?;
return Err(VsockError::Illegal);
}
if packet.op() != field::OP_RW {
return Err(VsockError::Illegal);
}

if packet.data_len() > 0 {
let mut recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;

self.rx_cnt += packet.data_len();
if packet.data_len() as usize <= recv.len() {
recv.truncate(packet.data_len() as usize);
} else {
return Err(VsockError::Illegal);
}

self.data_queue.push_back(recv);
}
while self.data_queue.is_empty() {
self.recv_packet_connected()?;
}

let mut recvd = 0;
Expand Down Expand Up @@ -424,6 +405,83 @@ impl VsockStream {
}
}

fn recv_packet_connected(&mut self) -> Result<()> {
let recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;
let packet = Packet::new_checked(recv.as_slice())?;

self.peer_buf_alloc = packet.buf_alloc();
self.peer_fwd_cnt = packet.fwd_cnt();
match packet.op() {
field::OP_SHUTDOWN => {
self.shutdown()?;
}
field::OP_RST => {
self.reset()?;
return Err(VsockError::Illegal);
}
field::OP_RW => {
if packet.data_len() > 0 {
let mut recv = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.dequeue(self, DEFAULT_TIMEOUT)?;

self.rx_cnt += packet.data_len();
if packet.data_len() as usize <= recv.len() {
recv.truncate(packet.data_len() as usize);
} else {
return Err(VsockError::Illegal);
}

self.data_queue.push_back(recv);
}
}
field::OP_CREDIT_UPDATE => {
self.peer_fwd_cnt = packet.fwd_cnt();
self.peer_buf_alloc = packet.buf_alloc();
}
field::OP_CREDIT_REQUEST => {
self.send_credit_update()?;
}
_ => return Err(VsockError::Illegal),
}
Ok(())
}

fn send_credit_update(&self) -> Result<()> {
let mut header_buf = [0u8; HEADER_LEN];
let mut packet = Packet::new_unchecked(&mut header_buf[..]);
packet.set_src_cid(self.addr.local.cid() as u64);
packet.set_dst_cid(self.addr.remote.cid() as u64);
packet.set_src_port(self.addr.local.port());
packet.set_dst_port(self.addr.remote.port());
packet.set_type(field::TYPE_STREAM);
packet.set_op(field::OP_CREDIT_UPDATE);
packet.set_data_len(0);
packet.set_flags(0);
packet.set_fwd_cnt(self.rx_cnt);
packet.set_buf_alloc(VSOCK_BUF_ALLOC);
let _ = VSOCK_DEVICE
.lock()
.get_mut()
.ok_or(VsockError::DeviceNotAvailable)?
.transport
.enqueue(self, packet.as_ref(), &[], DEFAULT_TIMEOUT)?;
Ok(())
}

fn has_free_space(&self) -> u32 {
self.peer_buf_alloc
.saturating_sub(self.tx_cnt.saturating_sub(self.peer_fwd_cnt))
}

pub(crate) fn addr(&self) -> VsockAddrPair {
self.addr
}
Expand Down

0 comments on commit 97b95be

Please sign in to comment.