Skip to content

Commit

Permalink
feat(rumqttc): batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix Obenhuber committed Mar 26, 2024
1 parent d4fe99b commit 32bc49e
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 133 deletions.
259 changes: 145 additions & 114 deletions rumqttc/src/v5/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ use {
#[cfg(feature = "proxy")]
use crate::proxy::ProxyError;

/// Number of packets or requests processed before flusing the network
const BATCH_SIZE: usize = 10;

/// Critical errors during eventloop polling
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
Expand All @@ -56,6 +53,8 @@ pub enum ConnectionError {
Io(#[from] io::Error),
#[error("Connection refused, return code: `{0:?}`")]
ConnectionRefused(ConnectReturnCode),
#[error("Connection closed")]
ConnectionClosed,
#[error("Expected ConnAck packet, received: {0:?}")]
NotConnAck(Box<Packet>),
#[error("Requests done")]
Expand All @@ -77,10 +76,12 @@ pub struct EventLoop {
pub options: MqttOptions,
/// Current state of the connection
pub state: MqttState,
/// Batch size
batch_size: usize,
/// Request stream
request_rx: Receiver<Request>,
requests: Receiver<Request>,
/// Pending requests from the last session
pending: PendingRequests,
pending: IntervalQueue<Request>,
/// Network connection to the broker
network: Option<Network>,
/// Keep alive time
Expand All @@ -96,18 +97,18 @@ pub enum Event {

impl EventLoop {
/// New MQTT `EventLoop`
///
/// When connection encounters critical errors (like auth failure), user has a choice to
/// access and update `options`, `state` and `requests`.
pub fn new(options: MqttOptions, request_rx: Receiver<Request>) -> EventLoop {
pub(crate) fn new(options: MqttOptions, requests: Receiver<Request>) -> EventLoop {
let inflight_limit = options.outgoing_inflight_upper_limit.unwrap_or(u16::MAX);
let manual_acks = options.manual_acks;
let pending = PendingRequests::new(options.pending_throttle);
let pending = IntervalQueue::new(options.pending_throttle);
let batch_size = options.max_batch_size;
let state = MqttState::new(inflight_limit, manual_acks);

EventLoop {
options,
state: MqttState::new(inflight_limit, manual_acks),
request_rx,
state,
batch_size,
requests,
pending,
network: None,
keepalive_timeout: None,
Expand All @@ -127,8 +128,7 @@ impl EventLoop {
self.pending.extend(self.state.clean());

// drain requests from channel which weren't yet received
let requests_in_channel = self.request_rx.drain();
self.pending.extend(requests_in_channel);
self.pending.extend(self.requests.drain());
}

/// Yields Next notification or outgoing request and periodically pings
Expand All @@ -148,142 +148,176 @@ impl EventLoop {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.options.keep_alive)));
}

// A connack never produces a response packet. Safe to ignore the return value
// of `handle_incoming_packet`
self.state.handle_incoming_packet(connack)?;
self.pending.reset();
}

match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
self.clean();
Err(e)
// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Ok(event)
} else {
match self.poll_process().await {
Ok(v) => Ok(v),
Err(e) => {
self.clean();
Err(e)
}
}
}
}

/// Select on network and requests and generate keepalive pings when necessary
async fn select(&mut self) -> Result<Event, ConnectionError> {
async fn poll_process(&mut self) -> Result<Event, ConnectionError> {
let network = self.network.as_mut().unwrap();
let inflight_full = self.state.is_inflight_full();
let collision = self.state.has_collision();

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
return Ok(event);
}

// this loop is necessary since self.incoming.pop_front() might return None. In that case,
// instead of returning a None event, we try again.
select! {
// Handles pending and new requests.
// If available, prioritises pending requests from previous session.
// Else, pulls next request from user requests channel.
// If conditions in the below branch are for flow control.
// The branch is disabled if there's no pending messages and new user requests
// cannot be serviced due flow control.
// We read next user user request only when inflight messages are < configured inflight
// and there are no collisions while handling previous outgoing requests.
//
// Flow control is based on ack count. If inflight packet count in the buffer is
// less than max_inflight setting, next outgoing request will progress. For this
// to work correctly, broker should ack in sequence (a lot of brokers won't)
//
// E.g If max inflight = 5, user requests will be blocked when inflight queue
// looks like this -> [1, 2, 3, 4, 5].
// If broker acking 2 instead of 1 -> [1, x, 3, 4, 5].
// This pulls next user request. But because max packet id = max_inflight, next
// user request's packet id will roll to 1. This replaces existing packet id 1.
// Resulting in a collision
//
// Eventloop can stop receiving outgoing user requests when previous outgoing
// request collided. I.e collision state. Collision state will be cleared only
// when correct ack is received
// Full inflight queue will look like -> [1a, 2, 3, 4, 5].
// If 3 is acked instead of 1 first -> [1a, 2, x, 4, 5].
// After collision with pkid 1 -> [1b ,2, x, 4, 5].
// 1a is saved to state and event loop is set to collision mode stopping new
// outgoing requests (along with 1b).
Some(request) = self.pending.next(), if !inflight_full && !collision => {
self.state.handle_outgoing_packet(request)?;
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
},
request = self.request_rx.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => {
// Process first request
let request = request.map_err(|_| ConnectionError::RequestsDone)?;
self.state.handle_outgoing_packet(request)?;
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
},
// Pull a bunch of packets from network, reply in bunch and yield the first item
packet = network.read() => {
let packet = packet?;
if let Some(packet) = self.state.handle_incoming_packet(packet)? {
network.write(packet).await?;
for _ in 0..self.batch_size {
let inflight_full = self.state.is_inflight_full();
let collision = self.state.has_collision();

select! {
// Handles pending and new requests.
// If available, prioritises pending requests from previous session.
// Else, pulls next request from user requests channel.
// If conditions in the below branch are for flow control.
// The branch is disabled if there's no pending messages and new user requests
// cannot be serviced due flow control.
// We read next user user request only when inflight messages are < configured inflight
// and there are no collisions while handling previous outgoing requests.
//
// Flow control is based on ack count. If inflight packet count in the buffer is
// less than max_inflight setting, next outgoing request will progress. For this
// to work correctly, broker should ack in sequence (a lot of brokers won't)
//
// E.g If max inflight = 5, user requests will be blocked when inflight queue
// looks like this -> [1, 2, 3, 4, 5].
// If broker acking 2 instead of 1 -> [1, x, 3, 4, 5].
// This pulls next user request. But because max packet id = max_inflight, next
// user request's packet id will roll to 1. This replaces existing packet id 1.
// Resulting in a collision
//
// Eventloop can stop receiving outgoing user requests when previous outgoing
// request collided. I.e collision state. Collision state will be cleared only
// when correct ack is received
// Full inflight queue will look like -> [1a, 2, 3, 4, 5].
// If 3 is acked instead of 1 first -> [1a, 2, x, 4, 5].
// After collision with pkid 1 -> [1b ,2, x, 4, 5].
// 1a is saved to state and event loop is set to collision mode stopping new
// outgoing requests (along with 1b).
Some(request) = self.pending.next(), if !inflight_full && !collision => {
if let Some(packet) = self.state.handle_outgoing_packet(request)? {
network.write(packet).await?;
}
},
request = self.requests.recv_async(), if self.pending.is_empty() && !inflight_full && !collision => {
let request = request.map_err(|_| ConnectionError::RequestsDone)?;
if let Some(packet) = self.state.handle_outgoing_packet(request)? {
network.write(packet).await?;
}
},
// Process next packet received from io
packet = network.read() => {
match packet? {
Some(packet) => if let Some(packet) = self.state.handle_incoming_packet(packet)? {
let flush = matches!(packet, Packet::PingResp(_));
network.write(packet).await?;
if flush {
break;
}
}
None => return Err(ConnectionError::ConnectionClosed),
}
},
// We generate pings irrespective of network activity. This keeps the ping logic
// simple. We can change this behavior in future if necessary (to prevent extra pings)
_ = self.keepalive_timeout.as_mut().unwrap() => {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + self.options.keep_alive);
if let Some(packet) = self.state.handle_outgoing_packet(Request::PingReq)? {
network.write(packet).await?;
}
}
// flush all the acks and return first incoming packet
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
},
// We generate pings irrespective of network activity. This keeps the ping logic
// simple. We can change this behavior in future if necessary (to prevent extra pings)
_ = self.keepalive_timeout.as_mut().unwrap() => {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + self.options.keep_alive);

self.state.handle_outgoing_packet(Request::PingReq)?;
network.flush().await?;
Ok(self.state.events.pop_front().unwrap())
else => unreachable!("Eventloop select is exhaustive"),
}

// Break early if there is no request pending and no more incoming bytes polled into the read buffer
// This implementation is suboptimal: The loop is *not* broken if a incomplete packets resides in the
// rx buffer of `Network`. Until that frame is complete the outgoing queue is *not* flushed.
// Since the incomplete packet is already started to appear in the buffer it should be fine to await
// more data on the stream before flushing.
if self.pending.is_empty()
&& self.requests.is_empty()
&& network.read_buffer_remaining() == 0
{
break;
}
else => unreachable!("Eventloop select is exhaustive"),
}

network.flush().await?;

self.state
.events
.pop_front()
.ok_or_else(|| unreachable!("empty event queue"))
}
}

/// Pending requets yielded with a configured rate. If the queue is empty the stream will yield pending.
struct PendingRequests {
/// Pending items yielded with a configured rate. If the queue is empty the stream will yield pending.
struct IntervalQueue<T> {
/// Interval
interval: Option<time::Interval>,
/// Pending requests
requests: VecDeque<Request>,
queue: VecDeque<T>,
}

impl PendingRequests {
impl<T> IntervalQueue<T> {
/// Construct a new Pending instance
pub fn new(interval: Duration) -> Self {
let interval = (!interval.is_zero()).then(|| time::interval(interval));
PendingRequests {
IntervalQueue {
interval,
requests: VecDeque::new(),
queue: VecDeque::new(),
}
}

/// Returns true this queue is not empty
pub fn is_empty(&self) -> bool {
self.requests.is_empty()
self.queue.is_empty()
}

pub fn extend(&mut self, requests: impl IntoIterator<Item = Request>) {
self.requests.extend(requests);
/// Extend the request queue
pub fn extend(&mut self, requests: impl IntoIterator<Item = T>) {
self.queue.extend(requests);
}

/// Reset the pending interval tick
pub fn reset(&mut self) {
if let Some(interval) = self.interval.as_mut() {
interval.reset();
}
}
}

impl Stream for PendingRequests {
type Item = Request;
impl<T: Unpin> Stream for IntervalQueue<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Request>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
if self.is_empty() {
Poll::Pending
} else {
match self.interval.as_mut() {
Some(interval) => match interval.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(self.requests.pop_front()),
Poll::Ready(_) => Poll::Ready(self.queue.pop_front()),
Poll::Pending => Poll::Pending,
},
None => Poll::Ready(self.requests.pop_front()),
None => Poll::Ready(self.queue.pop_front()),
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.requests.len(), Some(self.requests.len()))
(self.queue.len(), Some(self.queue.len()))
}
}

Expand All @@ -298,12 +332,6 @@ async fn connect(options: &mut MqttOptions) -> Result<(Network, Incoming), Conne

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(options, &mut network).await?;

// Last session might contain packets which aren't acked. MQTT says these packets should be
// republished in the next session
// move pending messages from state to eventloop
// let pending = self.state.clean();
// self.pending = pending.into_iter();
Ok((network, packet))
}

Expand Down Expand Up @@ -434,17 +462,20 @@ async fn mqtt_connect(

// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
// Override local keep_alive value if set by server.
Some(Incoming::ConnAck(connack)) if connack.code == ConnectReturnCode::Success => {
if let Some(props) = &connack.properties {
// Override local keep_alive value if set by server.
if let Some(keep_alive) = props.server_keep_alive {
options.keep_alive = Duration::from_secs(keep_alive as u64);
}

// Override max packet size
network.set_max_outgoing_size(props.max_packet_size);
}
Ok(Packet::ConnAck(connack))
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(Box::new(packet))),
Some(Incoming::ConnAck(connack)) => Err(ConnectionError::ConnectionRefused(connack.code)),
Some(packet) => Err(ConnectionError::NotConnAck(Box::new(packet))),
None => Err(ConnectionError::ConnectionClosed),
}
}
Loading

0 comments on commit 32bc49e

Please sign in to comment.