diff --git a/CHANGES.md b/CHANGES.md index 7ca8823..e1884e5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.1.0] - 2024-03-08 + +* Add proper delivery handling on receiver side + ## [2.0.0] - 2024-03-06 * Add proper delivery handling diff --git a/Cargo.toml b/Cargo.toml index 7b1a93a..a16d0e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "2.0.0" +version = "2.1.0" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" diff --git a/src/delivery.rs b/src/delivery.rs index c08433c..bef9d56 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -19,6 +19,7 @@ bitflags::bitflags! { } } +#[derive(Debug)] pub struct Delivery { id: DeliveryNumber, session: Session, @@ -34,12 +35,28 @@ pub(crate) struct DeliveryInner { } impl Delivery { + pub(crate) fn new_rcv(id: DeliveryNumber, settled: bool, session: Session) -> Delivery { + Delivery { + id, + session, + flags: StdCell::new(if settled { + Flags::LOCAL_SETTLED + } else { + Flags::empty() + }), + } + } + + pub fn id(&self) -> DeliveryNumber { + self.id + } + pub fn remote_state(&self) -> Option { if let Some(inner) = self .session .inner .get_mut() - .unsettled_deliveries + .unsettled_deliveries(self.is_set(Flags::SENDER)) .get_mut(&self.id) { inner.state.clone() @@ -113,7 +130,7 @@ impl Delivery { .session .inner .get_mut() - .unsettled_deliveries + .unsettled_deliveries(self.is_set(Flags::SENDER)) .get_mut(&self.id) { if let Some(st) = self.check_inner(inner) { @@ -134,7 +151,7 @@ impl Delivery { .session .inner .get_mut() - .unsettled_deliveries + .unsettled_deliveries(self.is_set(Flags::SENDER)) .get_mut(&self.id) { if inner.settled { @@ -170,8 +187,11 @@ impl Delivery { impl Drop for Delivery { fn drop(&mut self) { let inner = self.session.inner.get_mut(); + let deliveries = inner.unsettled_deliveries(self.is_set(Flags::SENDER)); + + if deliveries.contains_key(&self.id) { + deliveries.remove(&self.id); - if inner.unsettled_deliveries.contains_key(&self.id) { if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) { let err = Error::build() .condition(ErrorCondition::Custom(Symbol(Str::Static( @@ -193,8 +213,6 @@ impl Drop for Delivery { })); inner.post_frame(disp.into()); } - - inner.unsettled_deliveries.remove(&self.id); } } } diff --git a/src/rcvlink.rs b/src/rcvlink.rs index 4b12eec..514deec 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -9,11 +9,10 @@ use ntex_amqp_codec::protocol::{ self as codec, Attach, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role, SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer, TransferBody, }; -use ntex_amqp_codec::types::{Symbol, Variant}; -use ntex_amqp_codec::Encode; +use ntex_amqp_codec::{types::Symbol, types::Variant, Encode}; use crate::session::{Session, SessionInner}; -use crate::{cell::Cell, error::AmqpProtocolError, types::Action}; +use crate::{cell::Cell, error::AmqpProtocolError, types::Action, Delivery}; #[derive(Clone, Debug)] pub struct ReceiverLink { @@ -28,7 +27,7 @@ pub(crate) struct ReceiverLinkInner { session: Session, closed: bool, reader_task: LocalWaker, - queue: VecDeque, + queue: VecDeque<(Delivery, Transfer)>, credit: u32, delivery_count: u32, error: Option, @@ -112,13 +111,13 @@ impl ReceiverLink { self.inner.get_mut().set_max_partial_transfer(size); } - /// Check transfer frame - pub fn has_transfers(&self) -> bool { + /// Check deliveries + pub fn has_deliveries(&self) -> bool { !self.inner.get_mut().queue.is_empty() } - /// Get transfer frame - pub fn get_transfer(&self) -> Option { + /// Get delivery + pub fn get_delivery(&self) -> Option<(Delivery, Transfer)> { self.inner.get_mut().queue.pop_front() } @@ -162,7 +161,7 @@ impl ReceiverLink { /// Attempt to pull out the next value of this receiver, registering /// the current task for wakeup if the value is not yet available, /// and returning None if the stream is exhausted. - pub async fn recv(&self) -> Option> { + pub async fn recv(&self) -> Option> { poll_fn(|cx| self.poll_recv(cx)).await } @@ -172,7 +171,7 @@ impl ReceiverLink { pub fn poll_recv( &self, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let inner = self.inner.get_mut(); if inner.partial_body.is_some() && inner.queue.len() == 1 { @@ -202,7 +201,7 @@ impl ReceiverLink { } impl Stream for ReceiverLink { - type Item = Result; + type Item = Result<(Delivery, Transfer), AmqpProtocolError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll_recv(cx) @@ -305,15 +304,18 @@ impl ReceiverLinkInner { let _ = self.close(Some(err)); Ok(Action::None) } else { - self.credit -= 1; + if !transfer.0.more { + self.credit -= 1; + } + // handle batched transfer if let Some(ref mut body) = self.partial_body { if transfer.0.delivery_id.is_some() { // if delivery_id is set, then it should be equal to first transfer if self .queue .back() - .map_or(true, |back| back.0.delivery_id != transfer.0.delivery_id) + .map_or(true, |back| Some(back.0.id()) != transfer.0.delivery_id) { let err = Error(Box::new(codec::ErrorInner { condition: LinkError::DetachForced.into(), @@ -340,14 +342,15 @@ impl ReceiverLinkInner { transfer_body.encode(body); } - // received last partial transfer - if transfer.0.more { + if !transfer.more() { + // dont need to update queue, we use first transfer frame as primary Ok(Action::None) } else { + // received last partial transfer self.delivery_count += 1; let partial_body = self.partial_body.take(); if partial_body.is_some() && !self.queue.is_empty() { - self.queue.back_mut().unwrap().0.body = + self.queue.back_mut().unwrap().1 .0.body = Some(TransferBody::Data(partial_body.unwrap().freeze())); if self.queue.len() == 1 { self.wake(); @@ -367,15 +370,8 @@ impl ReceiverLinkInner { } } } else if transfer.more() { - if transfer.delivery_id().is_none() { - let err = Error(Box::new(codec::ErrorInner { - condition: LinkError::DetachForced.into(), - description: Some(ByteString::from_static("delivery_id is required")), - info: None, - })); - let _ = self.close(Some(err)); - Ok(Action::None) - } else { + // handle first transfer in batch + if let Some(id) = transfer.delivery_id() { let body = if let Some(body) = transfer.0.body.take() { match body { TransferBody::Data(data) => BytesMut::copy_from_slice(&data), @@ -389,18 +385,45 @@ impl ReceiverLinkInner { self.pool.buf_with_capacity(16) }; self.partial_body = Some(body); - self.queue.push_back(transfer); + + let delivery = Delivery::new_rcv( + id, + transfer.settled().unwrap_or_default(), + self.session.clone(), + ); + self.queue.push_back((delivery, transfer)); + Ok(Action::None) + } else { + let err = Error(Box::new(codec::ErrorInner { + condition: LinkError::DetachForced.into(), + description: Some(ByteString::from_static("delivery_id is required")), + info: None, + })); + let _ = self.close(Some(err)); Ok(Action::None) } - } else { + } else if let Some(id) = transfer.delivery_id() { self.delivery_count += 1; - self.queue.push_back(transfer); + let delivery = Delivery::new_rcv( + id, + transfer.settled().unwrap_or_default(), + self.session.clone(), + ); + self.queue.push_back((delivery, transfer)); if self.queue.len() == 1 { self.wake(); } Ok(Action::Transfer(ReceiverLink { inner: inner.clone(), })) + } else { + let err = Error(Box::new(codec::ErrorInner { + condition: LinkError::DetachForced.into(), + description: Some(ByteString::from_static("delivery_id is required")), + info: None, + })); + let _ = self.close(Some(err)); + Ok(Action::None) } } } @@ -491,6 +514,15 @@ impl ReceiverLinkBuilder { self } + /// Modify attach frame + pub fn with_frame(mut self, f: F) -> Self + where + F: FnOnce(&mut Attach), + { + f(&mut self.frame); + self + } + /// Attach receiver link pub async fn attach(self) -> Result { let cell = self.session.clone(); diff --git a/src/router.rs b/src/router.rs index 0ab3549..d1e253d 100644 --- a/src/router.rs +++ b/src/router.rs @@ -7,12 +7,10 @@ use ntex::service::{ }; use ntex::util::{join_all, HashMap, Ready}; -use crate::codec::protocol::{ - self, DeliveryNumber, DeliveryState, Disposition, Error, Rejected, Role, Transfer, -}; +use crate::codec::protocol::{DeliveryState, Error, Rejected, Transfer}; use crate::error::LinkError; use crate::types::{Link, Message, Outcome}; -use crate::{cell::Cell, rcvlink::ReceiverLink, State}; +use crate::{cell::Cell, rcvlink::ReceiverLink, Delivery, State}; type Handle = boxed::BoxServiceFactory, Transfer, Outcome, Error, Error>; type HandleService = boxed::BoxService; @@ -105,8 +103,8 @@ impl Service for RouterService { .get_mut() .handlers .insert(rcv_link.clone(), Some(Pipeline::new(srv))); - if let Some(tr) = rcv_link.get_transfer() { - service_call(rcv_link, tr, &self.0).await + if let Some((delivery, tr)) = rcv_link.get_delivery() { + service_call(rcv_link, delivery, tr, &self.0).await } else { Ok(()) } @@ -182,8 +180,8 @@ impl Service for RouterService { } Message::Transfer(link) => { if let Some(Some(_)) = self.0.get_ref().handlers.get(&link) { - if let Some(tr) = link.get_transfer() { - service_call(link, tr, &self.0).await?; + if let Some((delivery, tr)) = link.get_delivery() { + service_call(link, delivery, tr, &self.0).await?; } } Ok(()) @@ -193,7 +191,8 @@ impl Service for RouterService { } async fn service_call( - mut link: ReceiverLink, + link: ReceiverLink, + mut delivery: Delivery, tr: Transfer, inner: &Cell>, ) -> Result<(), Error> { @@ -206,53 +205,25 @@ async fn service_call( return Ok(()); } - let delivery_id = match tr.delivery_id() { - None => { - // #2.7.5 delivery_id MUST be set. batching is handled on lower level - let _ = link.close_with_error( - LinkError::force_detach().description("delivery_id MUST be set"), - ); - return Ok(()); - } - Some(delivery_id) => { - if link.credit() == 0 { - // self.has_credit = self.link.credit() != 0; - link.set_link_credit(50); - } - delivery_id - } - }; + if link.credit() == 0 { + // self.has_credit = self.link.credit() != 0; + link.set_link_credit(50); + } match srv.call(tr).await { Ok(outcome) => { log::trace!("Outcome is ready {:?} for {}", outcome, link.name()); - settle(&mut link, delivery_id, outcome.into_delivery_state()); + delivery.settle(outcome.into_delivery_state()); } Err(e) => { log::trace!("Service response error: {:?}", e); - settle( - &mut link, - delivery_id, - DeliveryState::Rejected(Rejected { error: Some(e) }), - ); + delivery.settle(DeliveryState::Rejected(Rejected { error: Some(e) })); } } } Ok(()) } -fn settle(link: &mut ReceiverLink, id: DeliveryNumber, state: DeliveryState) { - let disposition = Disposition(Box::new(protocol::DispositionInner { - state: Some(state), - role: Role::Receiver, - first: id, - last: None, - settled: true, - batchable: false, - })); - link.send_disposition(disposition); -} - struct ResourceServiceFactory { factory: T, _t: marker::PhantomData, diff --git a/src/session.rs b/src/session.rs index 7a7269a..2bc8b0e 100644 --- a/src/session.rs +++ b/src/session.rs @@ -45,7 +45,8 @@ pub(crate) struct SessionInner { closed: condition::Condition, pending_transfers: VecDeque, - pub(crate) unsettled_deliveries: HashMap, + pub(crate) unsettled_snd_deliveries: HashMap, + pub(crate) unsettled_rcv_deliveries: HashMap, pub(crate) pool_notify: pool::Pool<()>, pub(crate) pool_credit: pool::Pool>, @@ -294,7 +295,8 @@ impl SessionInner { remote_outgoing_window, flags: if local { Flags::LOCAL } else { Flags::empty() }, next_outgoing_id: INITIAL_OUTGOING_ID, - unsettled_deliveries: HashMap::default(), + unsettled_snd_deliveries: HashMap::default(), + unsettled_rcv_deliveries: HashMap::default(), links: Slab::new(), links_by_name: HashMap::default(), remote_handles: HashMap::default(), @@ -319,6 +321,17 @@ impl SessionInner { self.sink.0.memory_pool() } + pub(crate) fn unsettled_deliveries( + &mut self, + sender: bool, + ) -> &mut HashMap { + if sender { + &mut self.unsettled_snd_deliveries + } else { + &mut self.unsettled_rcv_deliveries + } + } + /// Set error. New operations will return error. pub(crate) fn set_error(&mut self, err: AmqpProtocolError) { log::trace!( @@ -333,7 +346,10 @@ impl SessionInner { } // drop unsettled deliveries - for (_, mut promise) in self.unsettled_deliveries.drain() { + for (_, mut promise) in self.unsettled_snd_deliveries.drain() { + promise.set_error(err.clone()); + } + for (_, mut promise) in self.unsettled_rcv_deliveries.drain() { promise.set_error(err.clone()); } @@ -1074,21 +1090,27 @@ impl SessionInner { ); } + let deliveries = if disp.role() == Role::Receiver { + &mut self.unsettled_snd_deliveries + } else { + &mut self.unsettled_rcv_deliveries + }; + if let Some(to) = to { for no in from..=to { - if let Some(delivery) = self.unsettled_deliveries.get_mut(&no) { + if let Some(delivery) = deliveries.get_mut(&no) { delivery.handle_disposition(disp.clone()); } else { log::trace!( "{}: Unknown deliveryid: {:?} disp: {:?}", - self.tag(), + self.sink.tag(), no, disp ); } } } else { - if let Some(delivery) = self.unsettled_deliveries.get_mut(&from) { + if let Some(delivery) = deliveries.get_mut(&from) { delivery.handle_disposition(disp); } } @@ -1237,7 +1259,7 @@ impl SessionInner { transfer.0.message_format = message_format; if !settled { - self.unsettled_deliveries + self.unsettled_snd_deliveries .insert(delivery_id, DeliveryInner::new()); } @@ -1283,7 +1305,7 @@ impl SessionInner { transfer.0.message_format = message_format; if !settled { - self.unsettled_deliveries + self.unsettled_snd_deliveries .insert(delivery_id, DeliveryInner::new()); } self.post_frame(Frame::Transfer(transfer));