Skip to content

Commit

Permalink
Add proper delivery handling on receiver side
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Mar 8, 2024
1 parent 7d0cb48 commit 9e681ca
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 86 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.1.0] - 2024-03-08

* Add proper delivery handling on receiver side

## [2.0.0] - 2024-03-06

* Add proper delivery handling
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "2.0.0"
version = "2.1.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
30 changes: 24 additions & 6 deletions src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bitflags::bitflags! {
}
}

#[derive(Debug)]
pub struct Delivery {
id: DeliveryNumber,
session: Session,
Expand All @@ -34,12 +35,28 @@ pub(crate) struct DeliveryInner {
}

impl Delivery {
pub(crate) fn new_rcv(id: DeliveryNumber, settled: bool, session: Session) -> Delivery {
Delivery {
id,
session,
flags: StdCell::new(if settled {
Flags::LOCAL_SETTLED
} else {
Flags::empty()
}),
}
}

pub fn id(&self) -> DeliveryNumber {
self.id

Check warning on line 51 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L50-L51

Added lines #L50 - L51 were not covered by tests
}

pub fn remote_state(&self) -> Option<DeliveryState> {
if let Some(inner) = self
.session
.inner
.get_mut()
.unsettled_deliveries
.unsettled_deliveries(self.is_set(Flags::SENDER))

Check warning on line 59 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L59

Added line #L59 was not covered by tests
.get_mut(&self.id)
{
inner.state.clone()
Expand Down Expand Up @@ -113,7 +130,7 @@ impl Delivery {
.session
.inner
.get_mut()
.unsettled_deliveries
.unsettled_deliveries(self.is_set(Flags::SENDER))
.get_mut(&self.id)
{
if let Some(st) = self.check_inner(inner) {
Expand All @@ -134,7 +151,7 @@ impl Delivery {
.session
.inner
.get_mut()
.unsettled_deliveries
.unsettled_deliveries(self.is_set(Flags::SENDER))
.get_mut(&self.id)
{
if inner.settled {
Expand Down Expand Up @@ -170,8 +187,11 @@ impl Delivery {
impl Drop for Delivery {
fn drop(&mut self) {
let inner = self.session.inner.get_mut();
let deliveries = inner.unsettled_deliveries(self.is_set(Flags::SENDER));

if deliveries.contains_key(&self.id) {
deliveries.remove(&self.id);

if inner.unsettled_deliveries.contains_key(&self.id) {
if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) {
let err = Error::build()
.condition(ErrorCondition::Custom(Symbol(Str::Static(
Expand All @@ -193,8 +213,6 @@ impl Drop for Delivery {
}));
inner.post_frame(disp.into());
}

inner.unsettled_deliveries.remove(&self.id);
}
}
}
Expand Down
88 changes: 60 additions & 28 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use ntex_amqp_codec::protocol::{
self as codec, Attach, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role,
SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer, TransferBody,
};
use ntex_amqp_codec::types::{Symbol, Variant};
use ntex_amqp_codec::Encode;
use ntex_amqp_codec::{types::Symbol, types::Variant, Encode};

use crate::session::{Session, SessionInner};
use crate::{cell::Cell, error::AmqpProtocolError, types::Action};
use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Delivery};

#[derive(Clone, Debug)]
pub struct ReceiverLink {
Expand All @@ -28,7 +27,7 @@ pub(crate) struct ReceiverLinkInner {
session: Session,
closed: bool,
reader_task: LocalWaker,
queue: VecDeque<Transfer>,
queue: VecDeque<(Delivery, Transfer)>,
credit: u32,
delivery_count: u32,
error: Option<Error>,
Expand Down Expand Up @@ -112,13 +111,13 @@ impl ReceiverLink {
self.inner.get_mut().set_max_partial_transfer(size);
}

/// Check transfer frame
pub fn has_transfers(&self) -> bool {
/// Check deliveries
pub fn has_deliveries(&self) -> bool {

Check warning on line 115 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L115

Added line #L115 was not covered by tests
!self.inner.get_mut().queue.is_empty()
}

/// Get transfer frame
pub fn get_transfer(&self) -> Option<Transfer> {
/// Get delivery
pub fn get_delivery(&self) -> Option<(Delivery, Transfer)> {
self.inner.get_mut().queue.pop_front()
}

Expand Down Expand Up @@ -162,7 +161,7 @@ impl ReceiverLink {
/// Attempt to pull out the next value of this receiver, registering
/// the current task for wakeup if the value is not yet available,
/// and returning None if the stream is exhausted.
pub async fn recv(&self) -> Option<Result<Transfer, AmqpProtocolError>> {
pub async fn recv(&self) -> Option<Result<(Delivery, Transfer), AmqpProtocolError>> {

Check warning on line 164 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L164

Added line #L164 was not covered by tests
poll_fn(|cx| self.poll_recv(cx)).await
}

Expand All @@ -172,7 +171,7 @@ impl ReceiverLink {
pub fn poll_recv(
&self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Transfer, AmqpProtocolError>>> {
) -> Poll<Option<Result<(Delivery, Transfer), AmqpProtocolError>>> {
let inner = self.inner.get_mut();

if inner.partial_body.is_some() && inner.queue.len() == 1 {
Expand Down Expand Up @@ -202,7 +201,7 @@ impl ReceiverLink {
}

impl Stream for ReceiverLink {
type Item = Result<Transfer, AmqpProtocolError>;
type Item = Result<(Delivery, Transfer), AmqpProtocolError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_recv(cx)
Expand Down Expand Up @@ -305,15 +304,18 @@ impl ReceiverLinkInner {
let _ = self.close(Some(err));
Ok(Action::None)
} else {
self.credit -= 1;
if !transfer.0.more {
self.credit -= 1;
}

// handle batched transfer
if let Some(ref mut body) = self.partial_body {
if transfer.0.delivery_id.is_some() {
// if delivery_id is set, then it should be equal to first transfer
if self
.queue
.back()
.map_or(true, |back| back.0.delivery_id != transfer.0.delivery_id)
.map_or(true, |back| Some(back.0.id()) != transfer.0.delivery_id)

Check warning on line 318 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L318

Added line #L318 was not covered by tests
{
let err = Error(Box::new(codec::ErrorInner {
condition: LinkError::DetachForced.into(),
Expand All @@ -340,14 +342,15 @@ impl ReceiverLinkInner {
transfer_body.encode(body);
}

// received last partial transfer
if transfer.0.more {
if !transfer.more() {

Check warning on line 345 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L345

Added line #L345 was not covered by tests
// dont need to update queue, we use first transfer frame as primary
Ok(Action::None)
} else {
// received last partial transfer
self.delivery_count += 1;
let partial_body = self.partial_body.take();
if partial_body.is_some() && !self.queue.is_empty() {
self.queue.back_mut().unwrap().0.body =
self.queue.back_mut().unwrap().1 .0.body =

Check warning on line 353 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L353

Added line #L353 was not covered by tests
Some(TransferBody::Data(partial_body.unwrap().freeze()));
if self.queue.len() == 1 {
self.wake();
Expand All @@ -367,15 +370,8 @@ impl ReceiverLinkInner {
}
}
} else if transfer.more() {
if transfer.delivery_id().is_none() {
let err = Error(Box::new(codec::ErrorInner {
condition: LinkError::DetachForced.into(),
description: Some(ByteString::from_static("delivery_id is required")),
info: None,
}));
let _ = self.close(Some(err));
Ok(Action::None)
} else {
// handle first transfer in batch
if let Some(id) = transfer.delivery_id() {

Check warning on line 374 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L374

Added line #L374 was not covered by tests
let body = if let Some(body) = transfer.0.body.take() {
match body {
TransferBody::Data(data) => BytesMut::copy_from_slice(&data),
Expand All @@ -389,18 +385,45 @@ impl ReceiverLinkInner {
self.pool.buf_with_capacity(16)
};
self.partial_body = Some(body);
self.queue.push_back(transfer);

let delivery = Delivery::new_rcv(
id,
transfer.settled().unwrap_or_default(),
self.session.clone(),

Check warning on line 392 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L391-L392

Added lines #L391 - L392 were not covered by tests
);
self.queue.push_back((delivery, transfer));
Ok(Action::None)

Check warning on line 395 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L394-L395

Added lines #L394 - L395 were not covered by tests
} else {
let err = Error(Box::new(codec::ErrorInner {
condition: LinkError::DetachForced.into(),
description: Some(ByteString::from_static("delivery_id is required")),
info: None,

Check warning on line 400 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L397-L400

Added lines #L397 - L400 were not covered by tests
}));
let _ = self.close(Some(err));

Check warning on line 402 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L402

Added line #L402 was not covered by tests
Ok(Action::None)
}
} else {
} else if let Some(id) = transfer.delivery_id() {
self.delivery_count += 1;
self.queue.push_back(transfer);
let delivery = Delivery::new_rcv(
id,
transfer.settled().unwrap_or_default(),
self.session.clone(),
);
self.queue.push_back((delivery, transfer));
if self.queue.len() == 1 {
self.wake();
}
Ok(Action::Transfer(ReceiverLink {
inner: inner.clone(),
}))
} else {
let err = Error(Box::new(codec::ErrorInner {
condition: LinkError::DetachForced.into(),
description: Some(ByteString::from_static("delivery_id is required")),
info: None,

Check warning on line 423 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L420-L423

Added lines #L420 - L423 were not covered by tests
}));
let _ = self.close(Some(err));
Ok(Action::None)

Check warning on line 426 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L425-L426

Added lines #L425 - L426 were not covered by tests
}
}
}
Expand Down Expand Up @@ -491,6 +514,15 @@ impl ReceiverLinkBuilder {
self
}

/// Modify attach frame
pub fn with_frame<F>(mut self, f: F) -> Self
where
F: FnOnce(&mut Attach),
{
f(&mut self.frame);
self

Check warning on line 523 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L522-L523

Added lines #L522 - L523 were not covered by tests
}

/// Attach receiver link
pub async fn attach(self) -> Result<ReceiverLink, AmqpProtocolError> {
let cell = self.session.clone();
Expand Down
57 changes: 14 additions & 43 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use ntex::service::{
};
use ntex::util::{join_all, HashMap, Ready};

use crate::codec::protocol::{
self, DeliveryNumber, DeliveryState, Disposition, Error, Rejected, Role, Transfer,
};
use crate::codec::protocol::{DeliveryState, Error, Rejected, Transfer};
use crate::error::LinkError;
use crate::types::{Link, Message, Outcome};
use crate::{cell::Cell, rcvlink::ReceiverLink, State};
use crate::{cell::Cell, rcvlink::ReceiverLink, Delivery, State};

type Handle<S> = boxed::BoxServiceFactory<Link<S>, Transfer, Outcome, Error, Error>;
type HandleService = boxed::BoxService<Transfer, Outcome, Error>;
Expand Down Expand Up @@ -105,8 +103,8 @@ impl<S: 'static> Service<Message> for RouterService<S> {
.get_mut()
.handlers
.insert(rcv_link.clone(), Some(Pipeline::new(srv)));
if let Some(tr) = rcv_link.get_transfer() {
service_call(rcv_link, tr, &self.0).await
if let Some((delivery, tr)) = rcv_link.get_delivery() {
service_call(rcv_link, delivery, tr, &self.0).await

Check warning on line 107 in src/router.rs

View check run for this annotation

Codecov / codecov/patch

src/router.rs#L107

Added line #L107 was not covered by tests
} else {
Ok(())
}
Expand Down Expand Up @@ -182,8 +180,8 @@ impl<S: 'static> Service<Message> for RouterService<S> {
}
Message::Transfer(link) => {
if let Some(Some(_)) = self.0.get_ref().handlers.get(&link) {
if let Some(tr) = link.get_transfer() {
service_call(link, tr, &self.0).await?;
if let Some((delivery, tr)) = link.get_delivery() {
service_call(link, delivery, tr, &self.0).await?;
}
}
Ok(())
Expand All @@ -193,7 +191,8 @@ impl<S: 'static> Service<Message> for RouterService<S> {
}

async fn service_call<S>(
mut link: ReceiverLink,
link: ReceiverLink,
mut delivery: Delivery,
tr: Transfer,
inner: &Cell<RouterServiceInner<S>>,
) -> Result<(), Error> {
Expand All @@ -206,53 +205,25 @@ async fn service_call<S>(
return Ok(());
}

let delivery_id = match tr.delivery_id() {
None => {
// #2.7.5 delivery_id MUST be set. batching is handled on lower level
let _ = link.close_with_error(
LinkError::force_detach().description("delivery_id MUST be set"),
);
return Ok(());
}
Some(delivery_id) => {
if link.credit() == 0 {
// self.has_credit = self.link.credit() != 0;
link.set_link_credit(50);
}
delivery_id
}
};
if link.credit() == 0 {
// self.has_credit = self.link.credit() != 0;
link.set_link_credit(50);

Check warning on line 210 in src/router.rs

View check run for this annotation

Codecov / codecov/patch

src/router.rs#L210

Added line #L210 was not covered by tests
}

match srv.call(tr).await {
Ok(outcome) => {
log::trace!("Outcome is ready {:?} for {}", outcome, link.name());
settle(&mut link, delivery_id, outcome.into_delivery_state());
delivery.settle(outcome.into_delivery_state());
}
Err(e) => {
log::trace!("Service response error: {:?}", e);
settle(
&mut link,
delivery_id,
DeliveryState::Rejected(Rejected { error: Some(e) }),
);
delivery.settle(DeliveryState::Rejected(Rejected { error: Some(e) }));

Check warning on line 220 in src/router.rs

View check run for this annotation

Codecov / codecov/patch

src/router.rs#L220

Added line #L220 was not covered by tests
}
}
}
Ok(())
}

fn settle(link: &mut ReceiverLink, id: DeliveryNumber, state: DeliveryState) {
let disposition = Disposition(Box::new(protocol::DispositionInner {
state: Some(state),
role: Role::Receiver,
first: id,
last: None,
settled: true,
batchable: false,
}));
link.send_disposition(disposition);
}

struct ResourceServiceFactory<S, T> {
factory: T,
_t: marker::PhantomData<S>,
Expand Down
Loading

0 comments on commit 9e681ca

Please sign in to comment.