From d54b6ac28275246ff7d68046d8749fc20a49f01b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 4 Mar 2024 22:49:46 +0500 Subject: [PATCH] Add proper delivery handling --- CHANGES.md | 4 + Cargo.toml | 2 +- src/delivery.rs | 294 +++++++++++++++++++++++++++++++ src/lib.rs | 4 +- src/rcvlink.rs | 13 +- src/session.rs | 311 ++++++++++++++++----------------- src/sndlink.rs | 406 +++++++------------------------------------ tests/test_server.rs | 32 ++-- 8 files changed, 528 insertions(+), 538 deletions(-) create mode 100644 src/delivery.rs diff --git a/CHANGES.md b/CHANGES.md index 544ca79..4ffbd5a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.1.0] - 2024-03-04 + +* Add proper delivery handling + ## [codec-0.9.2] - 2024-02-01 * Add more buffer length checks diff --git a/Cargo.toml b/Cargo.toml index c7a8a08..f8629c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-amqp" -version = "1.0.2" +version = "1.1.0" authors = ["ntex contributors "] description = "AMQP 1.0 Client/Server framework" documentation = "https://docs.rs/ntex-amqp" diff --git a/src/delivery.rs b/src/delivery.rs new file mode 100644 index 0000000..c08433c --- /dev/null +++ b/src/delivery.rs @@ -0,0 +1,294 @@ +use std::cell::Cell as StdCell; + +use ntex::{channel::pool, util::Bytes}; +use ntex_amqp_codec::protocol::{ + DeliveryNumber, DeliveryState, Disposition, DispositionInner, Error, ErrorCondition, Rejected, + Role, TransferBody, +}; +use ntex_amqp_codec::types::{Str, Symbol}; + +use crate::session::Session; +use crate::{cell::Cell, error::AmqpProtocolError, sndlink::SenderLinkInner}; + +bitflags::bitflags! { + #[derive(Copy, Clone, Debug)] + struct Flags: u8 { + const SENDER = 0b0000_0001; + const LOCAL_SETTLED = 0b0000_0100; + const REMOTE_SETTLED = 0b0000_1000; + } +} + +pub struct Delivery { + id: DeliveryNumber, + session: Session, + flags: StdCell, +} + +#[derive(Default, Debug)] +pub(crate) struct DeliveryInner { + settled: bool, + state: Option, + error: Option, + tx: Option>, +} + +impl Delivery { + pub fn remote_state(&self) -> Option { + if let Some(inner) = self + .session + .inner + .get_mut() + .unsettled_deliveries + .get_mut(&self.id) + { + inner.state.clone() + } else { + None + } + } + + pub fn is_remote_settled(&self) -> bool { + self.is_set(Flags::REMOTE_SETTLED) + } + + pub fn settle(&mut self, state: DeliveryState) { + // remote side is settled, not need to send disposition + if self.is_set(Flags::REMOTE_SETTLED) { + return; + } + + if !self.is_set(Flags::LOCAL_SETTLED) { + self.set_flag(Flags::LOCAL_SETTLED); + + let disp = Disposition(Box::new(DispositionInner { + role: if self.is_set(Flags::SENDER) { + Role::Sender + } else { + Role::Receiver + }, + first: self.id, + last: None, + settled: true, + state: Some(state), + batchable: false, + })); + self.session.inner.get_mut().post_frame(disp.into()); + } + } + + pub fn update_state(&mut self, state: DeliveryState) { + // remote side is settled, not need to send disposition + if self.is_set(Flags::REMOTE_SETTLED) || self.is_set(Flags::LOCAL_SETTLED) { + return; + } + + let disp = Disposition(Box::new(DispositionInner { + role: if self.is_set(Flags::SENDER) { + Role::Sender + } else { + Role::Receiver + }, + first: self.id, + last: None, + settled: false, + state: Some(state), + batchable: false, + })); + self.session.inner.get_mut().post_frame(disp.into()); + } + + fn is_set(&self, flag: Flags) -> bool { + self.flags.get().contains(flag) + } + + fn set_flag(&self, flag: Flags) { + let mut flags = self.flags.get(); + flags.insert(flag); + self.flags.set(flags); + } + + pub async fn wait(&self) -> Result, AmqpProtocolError> { + let rx = if let Some(inner) = self + .session + .inner + .get_mut() + .unsettled_deliveries + .get_mut(&self.id) + { + if let Some(st) = self.check_inner(inner) { + return st; + } + + let (tx, rx) = self.session.inner.get_ref().pool_notify.channel(); + inner.tx = Some(tx); + rx + } else { + return Ok(None); + }; + if rx.await.is_err() { + return Err(AmqpProtocolError::ConnectionDropped); + } + + if let Some(inner) = self + .session + .inner + .get_mut() + .unsettled_deliveries + .get_mut(&self.id) + { + if inner.settled { + self.set_flag(Flags::REMOTE_SETTLED); + } + if let Some(st) = self.check_inner(inner) { + return st; + } + } + Ok(None) + } + + fn check_inner( + &self, + inner: &mut DeliveryInner, + ) -> Option, AmqpProtocolError>> { + if let Some(ref st) = inner.state { + if matches!(st, DeliveryState::Modified(..)) { + // non terminal state + Some(Ok(Some(inner.state.take().unwrap()))) + } else { + // return clone of terminal state + Some(Ok(Some(st.clone()))) + } + } else if let Some(ref err) = inner.error { + Some(Err(err.clone())) + } else { + None + } + } +} + +impl Drop for Delivery { + fn drop(&mut self) { + let inner = self.session.inner.get_mut(); + + if inner.unsettled_deliveries.contains_key(&self.id) { + if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) { + let err = Error::build() + .condition(ErrorCondition::Custom(Symbol(Str::Static( + "Internal error", + )))) + .finish(); + + let disp = Disposition(Box::new(DispositionInner { + role: if self.is_set(Flags::SENDER) { + Role::Sender + } else { + Role::Receiver + }, + first: self.id, + last: None, + settled: true, + state: Some(DeliveryState::Rejected(Rejected { error: Some(err) })), + batchable: false, + })); + inner.post_frame(disp.into()); + } + + inner.unsettled_deliveries.remove(&self.id); + } + } +} + +impl DeliveryInner { + pub(crate) fn new() -> Self { + Self { + tx: None, + state: None, + error: None, + settled: false, + } + } + + pub(crate) fn set_error(&mut self, error: AmqpProtocolError) { + self.error = Some(error); + if let Some(tx) = self.tx.take() { + let _ = tx.send(()); + } + } + + pub(crate) fn handle_disposition(&mut self, disp: Disposition) { + if disp.settled() { + self.settled = true; + } + if let Some(state) = disp.state() { + self.state = Some(state.clone()); + } + if let Some(tx) = self.tx.take() { + let _ = tx.send(()); + } + } +} + +impl Drop for DeliveryInner { + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(()); + } + } +} + +pub struct DeliveryBuilder { + tag: Option, + settled: bool, + data: TransferBody, + sender: Cell, +} + +impl DeliveryBuilder { + pub(crate) fn new(data: TransferBody, sender: Cell) -> Self { + Self { + tag: None, + settled: false, + data, + sender, + } + } + + pub fn tag(mut self, tag: Bytes) -> Self { + self.tag = Some(tag); + self + } + + pub fn settled(mut self) -> Self { + self.settled = true; + self + } + + pub async fn send(self) -> Result { + let inner = self.sender.get_ref(); + + if let Some(ref err) = inner.error { + Err(err.clone()) + } else { + if inner + .max_message_size + .map(|l| self.data.len() > l as usize) + .unwrap_or_default() + { + Err(AmqpProtocolError::BodyTooLarge) + } else { + let id = self.sender.get_mut().send(self.data, self.tag).await?; + + Ok(Delivery { + id, + session: self.sender.get_ref().session.clone(), + flags: StdCell::new(if self.settled { + Flags::SENDER | Flags::LOCAL_SETTLED + } else { + Flags::SENDER + }), + }) + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 08f479b..8b5b324 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![deny(rust_2018_idioms, warnings, unreachable_pub)] +//#![deny(rust_2018_idioms, warnings, unreachable_pub)] #![allow(clippy::type_complexity, clippy::let_underscore_future)] #[macro_use] @@ -13,6 +13,7 @@ pub mod client; mod connection; mod control; mod default; +mod delivery; mod dispatcher; pub mod error; pub mod error_code; @@ -26,6 +27,7 @@ pub mod types; pub use self::connection::{Connection, ConnectionRef}; pub use self::control::{ControlFrame, ControlFrameKind}; +pub use self::delivery::{Delivery, DeliveryBuilder}; pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder}; pub use self::session::Session; pub use self::sndlink::{SenderLink, SenderLinkBuilder}; diff --git a/src/rcvlink.rs b/src/rcvlink.rs index acfa8ff..4b12eec 100644 --- a/src/rcvlink.rs +++ b/src/rcvlink.rs @@ -6,9 +6,8 @@ use std::{ use ntex::util::{ByteString, BytesMut, PoolRef, Stream}; use ntex::{channel::oneshot, task::LocalWaker}; use ntex_amqp_codec::protocol::{ - self as codec, Attach, DeliveryNumber, Disposition, Error, Handle, LinkError, - ReceiverSettleMode, Role, SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, - Transfer, TransferBody, + self as codec, Attach, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role, + SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer, TransferBody, }; use ntex_amqp_codec::types::{Symbol, Variant}; use ntex_amqp_codec::Encode; @@ -133,14 +132,6 @@ impl ReceiverLink { .post_frame(disp.into()); } - /// Wait for disposition with specified number - pub fn wait_disposition( - &self, - id: DeliveryNumber, - ) -> impl Future> { - self.inner.get_mut().session.wait_disposition(id) - } - pub fn close(&self) -> impl Future> { self.inner.get_mut().close(None) } diff --git a/src/session.rs b/src/session.rs index 0b14ed9..7a7269a 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, fmt, future::Future}; +use std::{collections::VecDeque, convert::TryFrom, fmt, future::Future}; use ntex::channel::{condition, oneshot, pool}; use ntex::util::{ByteString, Bytes, Either, HashMap, PoolRef, Ready}; @@ -6,18 +6,17 @@ use slab::Slab; use ntex_amqp_codec::protocol::{ self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End, - Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source, - Transfer, TransferBody, TransferNumber, + Error, Flow, Frame, Handle, ReceiverSettleMode, Role, SenderSettleMode, Source, Transfer, + TransferBody, TransferNumber, }; -use ntex_amqp_codec::AmqpFrame; +use ntex_amqp_codec::{AmqpFrame, Encode}; +use crate::delivery::DeliveryInner; use crate::error::AmqpProtocolError; use crate::rcvlink::{ EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner, }; -use crate::sndlink::{ - DeliveryPromise, EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner, -}; +use crate::sndlink::{EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner}; use crate::{cell::Cell, types::Action, ConnectionRef, ControlFrame}; const INITIAL_OUTGOING_ID: TransferNumber = 0; @@ -43,14 +42,13 @@ pub(crate) struct SessionInner { links_by_name: HashMap, remote_handles: HashMap, error: Option, + closed: condition::Condition, pending_transfers: VecDeque, - unsettled_deliveries: HashMap, - disposition_subscribers: HashMap>, + pub(crate) unsettled_deliveries: HashMap, - pub(crate) pool: pool::Pool>, - pool_disp: pool::Pool, - closed: condition::Condition, + pub(crate) pool_notify: pool::Pool<()>, + pub(crate) pool_credit: pool::Pool>, } impl fmt::Debug for Session { @@ -227,14 +225,6 @@ impl Session { } } } - - #[inline] - pub fn wait_disposition( - &self, - id: DeliveryNumber, - ) -> impl Future> { - self.inner.get_mut().wait_disposition(id) - } } #[derive(Debug)] @@ -281,25 +271,8 @@ bitflags::bitflags! { #[derive(Debug)] struct PendingTransfer { + tx: pool::Sender>, link_handle: Handle, - body: Option, - state: TransferState, - settled: Option, - message_format: Option, -} - -#[derive(Debug)] -pub(crate) enum TransferState { - First(DeliveryPromise, Bytes), - Continue, - Last, - Only(DeliveryPromise, Bytes), -} - -impl TransferState { - pub(super) fn more(&self) -> bool { - !matches!(self, TransferState::Only(_, _) | TransferState::Last) - } } impl SessionInner { @@ -326,10 +299,9 @@ impl SessionInner { links_by_name: HashMap::default(), remote_handles: HashMap::default(), pending_transfers: VecDeque::new(), - disposition_subscribers: HashMap::default(), error: None, - pool: pool::new(), - pool_disp: pool::new(), + pool_notify: pool::new(), + pool_credit: pool::new(), closed: condition::Condition::new(), } } @@ -357,18 +329,14 @@ impl SessionInner { // drop pending transfers for tr in self.pending_transfers.drain(..) { - if let TransferState::First(tx, _) | TransferState::Only(tx, _) = tr.state { - tx.ready(Err(err.clone())); - } + let _ = tr.tx.send(Err(err.clone())); } // drop unsettled deliveries - for (_, promise) in self.unsettled_deliveries.drain() { - promise.ready(Err(err.clone())); + for (_, mut promise) in self.unsettled_deliveries.drain() { + promise.set_error(err.clone()); } - self.disposition_subscribers.clear(); - // drop links self.links_by_name.clear(); for (_, st) in self.links.iter_mut() { @@ -420,15 +388,6 @@ impl SessionInner { .collect() } - fn wait_disposition( - &mut self, - id: DeliveryNumber, - ) -> impl Future> { - let (tx, rx) = self.pool_disp.channel(); - self.disposition_subscribers.insert(id, tx); - async move { rx.await.map_err(|_| AmqpProtocolError::Disconnected) } - } - pub(crate) fn max_frame_size(&self) -> u32 { self.sink.0.max_frame_size } @@ -825,11 +784,7 @@ impl SessionInner { Ok(Action::None) } Frame::Disposition(disp) => { - if let Some(sender) = self.disposition_subscribers.remove(&disp.first()) { - let _ = sender.send(disp); - } else { - self.settle_deliveries(disp); - } + self.settle_deliveries(disp); Ok(Action::None) } Frame::Transfer(transfer) => { @@ -921,7 +876,9 @@ impl SessionInner { attach.handle(), delivery_count, cell, - attach.max_message_size().map(|v| v as usize), + attach + .max_message_size() + .map(|v| u32::try_from(v).unwrap_or(u32::MAX)), )); let local_sender = std::mem::replace( item, @@ -1012,11 +969,7 @@ impl SessionInner { while idx < self.pending_transfers.len() { if self.pending_transfers[idx].link_handle == handle { let tr = self.pending_transfers.remove(idx).unwrap(); - if let TransferState::First(tx, _) | TransferState::Only(tx, _) = - tr.state - { - tx.ready(Err(err.clone())); - } + let _ = tr.tx.send(Err(err.clone())); } else { idx += 1; } @@ -1104,42 +1057,39 @@ impl SessionInner { action } - fn settle_deliveries(&mut self, disposition: Disposition) { - let from = disposition.first(); - let to = disposition.last().unwrap_or(from); + fn settle_deliveries(&mut self, disp: Disposition) { + let from = disp.first(); + let to = disp.last(); if cfg!(feature = "frame-trace") { - log::trace!("{}: Settle delivery: {:#?}", self.tag(), disposition); + log::trace!("{}: Settle delivery: {:#?}", self.tag(), disp); } else { log::trace!( - "{}: Settle delivery from {} - {}, state {:?} settled: {:?}", + "{}: Settle delivery from {} - {:?}, state {:?} settled: {:?}", self.tag(), from, to, - disposition.state(), - disposition.settled() + disp.state(), + disp.settled() ); } - if !disposition.settled() { - let mut disp = disposition.clone(); - disp.0.role = Role::Sender; - disp.0.settled = true; - disp.0.state = Some(DeliveryState::Accepted(Accepted {})); - self.post_frame(Frame::Disposition(disp)); - } - - for no in from..=to { - if let Some(val) = self.unsettled_deliveries.remove(&no) { - val.ready(Ok(disposition.clone())); - } else { - log::info!( - "{}: Could not find handler for {:?}, no: {:?}, unsettled: {:?}", - self.tag(), - disposition, - no, - self.unsettled_deliveries.len(), - ); + if let Some(to) = to { + for no in from..=to { + if let Some(delivery) = self.unsettled_deliveries.get_mut(&no) { + delivery.handle_disposition(disp.clone()); + } else { + log::trace!( + "{}: Unknown deliveryid: {:?} disp: {:?}", + self.tag(), + no, + disp + ); + } + } + } else { + if let Some(delivery) = self.unsettled_deliveries.get_mut(&from) { + delivery.handle_disposition(disp); } } } @@ -1163,12 +1113,8 @@ impl SessionInner { self.pending_transfers.len(), ); - while self.remote_incoming_window != 0 { - if let Some(t) = self.pending_transfers.pop_front() { - self.send_transfer(t.link_handle, t.body, t.state, t.settled, t.message_format); - continue; - } - break; + while let Some(tr) = self.pending_transfers.pop_front() { + let _ = tr.tx.send(Ok(())); } if flow.echo() { @@ -1219,75 +1165,80 @@ impl SessionInner { self.post_frame(flow.into()); } - pub(crate) fn send_transfer( + pub(crate) async fn send_transfer( &mut self, link_handle: Handle, - body: Option, - state: TransferState, - settled: Option, - message_format: Option, - ) { - if self.remote_incoming_window == 0 { - log::trace!( - "{}: Remote window is 0, push to pending queue, hnd:{:?}", - self.sink.tag(), - link_handle - ); - self.pending_transfers.push_back(PendingTransfer { - link_handle, - body, - state, - settled, - message_format, - }); - } else { - let more = state.more(); - if !more { - self.remote_incoming_window -= 1; + tag: Bytes, + body: TransferBody, + settled: bool, + max_frame_size: Option, + ) -> Result { + loop { + if self.remote_incoming_window == 0 { + log::trace!( + "{}: Remote window is 0, push to pending queue, hnd:{:?}", + self.sink.tag(), + link_handle + ); + let (tx, rx) = self.pool_credit.channel(); + self.pending_transfers + .push_back(PendingTransfer { tx, link_handle }); + + rx.await + .map_err(|_| AmqpProtocolError::ConnectionDropped) + .and_then(|v| v)?; + continue; } + break; + } - let settled2 = settled.unwrap_or(false); - let tr_settled = if settled2 { - Some(DeliveryState::Accepted(Accepted {})) - } else { - None - }; + self.remote_incoming_window -= 1; - let mut transfer = Transfer(Box::new(codec::TransferInner { - body, - settled, - message_format, - more: false, - handle: link_handle, - state: tr_settled, - delivery_id: None, - delivery_tag: None, - rcv_settle_mode: None, - resume: false, - aborted: false, - batchable: false, - })); + let delivery_id = self.next_outgoing_id; + self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1); - let more = state.more(); - match state { - TransferState::First(promise, delivery_tag) - | TransferState::Only(promise, delivery_tag) => { - let delivery_id = self.next_outgoing_id; - self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1); - - transfer.0.more = more; - transfer.0.batchable = more; - transfer.0.delivery_id = Some(delivery_id); - transfer.0.delivery_tag = Some(delivery_tag); - self.unsettled_deliveries.insert(delivery_id, promise); - } - TransferState::Continue => { - transfer.0.more = true; - transfer.0.batchable = true; - } - TransferState::Last => { - transfer.0.more = false; + let tr_settled = if settled { + Some(DeliveryState::Accepted(Accepted {})) + } else { + None + }; + let message_format = body.message_format(); + + let max_frame_size = max_frame_size.unwrap_or_else(|| self.max_frame_size()); + let max_frame_size = if max_frame_size > 2048 { + max_frame_size - 2048 + } else if max_frame_size == 0 { + u32::MAX + } else { + max_frame_size + } as usize; + + // body is larger than allowed frame size, send body as a set of transfers + if body.len() > max_frame_size { + let mut body = match body { + TransferBody::Data(data) => data, + TransferBody::Message(msg) => { + let mut buf = self.memory_pool().buf_with_capacity(msg.encoded_size()); + msg.encode(&mut buf); + buf.freeze() } + }; + + let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); + + let mut transfer = Transfer(Default::default()); + transfer.0.body = Some(TransferBody::Data(chunk)); + transfer.0.more = true; + transfer.0.settled = Some(settled); + transfer.0.state = tr_settled; + transfer.0.batchable = true; + transfer.0.delivery_id = Some(delivery_id); + transfer.0.delivery_tag = Some(tag.clone()); + transfer.0.message_format = message_format; + + if !settled { + self.unsettled_deliveries + .insert(delivery_id, DeliveryInner::new()); } log::trace!( @@ -1301,8 +1252,44 @@ impl SessionInner { transfer.settled(), ); + loop { + let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); + + // last chunk + if body.is_empty() { + log::trace!("{}: Sending last tranfer for {:?}", self.tag(), tag); + + let mut transfer = Transfer(Default::default()); + transfer.0.more = false; + self.post_frame(Frame::Transfer(transfer)); + break; + } + + log::trace!("{}: Sending chunk tranfer for {:?}", self.tag(), tag); + + let mut transfer = Transfer(Default::default()); + transfer.0.body = Some(TransferBody::Data(chunk)); + transfer.0.more = true; + transfer.0.batchable = true; + self.post_frame(Frame::Transfer(transfer)); + } + } else { + let mut transfer = Transfer(Default::default()); + transfer.0.body = Some(body); + transfer.0.settled = Some(settled); + transfer.0.state = tr_settled; + transfer.0.delivery_id = Some(delivery_id); + transfer.0.delivery_tag = Some(tag); + transfer.0.message_format = message_format; + + if !settled { + self.unsettled_deliveries + .insert(delivery_id, DeliveryInner::new()); + } self.post_frame(Frame::Transfer(transfer)); } + + Ok(delivery_id) } pub(crate) fn post_frame(&mut self, frame: Frame) { diff --git a/src/sndlink.rs b/src/sndlink.rs index a0bf3a1..baaf2a2 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -1,16 +1,15 @@ -use std::collections::VecDeque; -use std::{convert::TryFrom, future::Future, mem, pin::Pin, task::Context, task::Poll}; +use std::{collections::VecDeque, convert::TryFrom, future::Future}; use ntex::channel::{condition, oneshot, pool}; -use ntex::util::{ready, BufMut, ByteString, Bytes, Either, PoolRef, Ready}; +use ntex::util::{BufMut, ByteString, Bytes, Either, PoolRef, Ready}; use ntex_amqp_codec::protocol::{ - self as codec, Attach, DeliveryNumber, DeliveryState, Disposition, Error, Flow, MessageFormat, - ReceiverSettleMode, Role, SenderSettleMode, SequenceNo, Target, TerminusDurability, - TerminusExpiryPolicy, TransferBody, + self as codec, Attach, DeliveryNumber, Error, Flow, ReceiverSettleMode, Role, SenderSettleMode, + SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody, }; -use crate::session::{Session, SessionInner, TransferState}; -use crate::{cell::Cell, codec::Encode, error::AmqpProtocolError, Handle}; +use crate::delivery::DeliveryBuilder; +use crate::session::{Session, SessionInner}; +use crate::{cell::Cell, error::AmqpProtocolError, Handle}; #[derive(Clone)] pub struct SenderLink { @@ -20,28 +19,20 @@ pub struct SenderLink { pub(crate) struct SenderLinkInner { pub(crate) id: usize, name: ByteString, - session: Session, + pub(crate) session: Session, remote_handle: Handle, delivery_count: SequenceNo, delivery_tag: u32, link_credit: u32, - pending_transfers: VecDeque, - error: Option, - closed: bool, + pending_transfers: VecDeque>>, + pub(crate) error: Option, + pub(crate) closed: bool, + pub(crate) max_message_size: Option, on_close: condition::Condition, on_credit: condition::Condition, - on_disposition: Box)>, - max_message_size: Option, pool: PoolRef, } -struct PendingTransfer { - body: Option, - state: TransferState, - settle: Option, - message_format: Option, -} - impl std::fmt::Debug for SenderLink { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fmt.debug_tuple("SenderLink") @@ -129,46 +120,12 @@ impl SenderLink { self.inner.get_ref().error.as_ref() } - /// Send body - pub fn send(&self, body: T) -> impl Future> - where - T: Into, - { - self.inner.get_mut().send(body, None) - } - - /// Send body with specific delivery tag - pub fn send_with_tag( - &self, - body: T, - tag: Bytes, - ) -> impl Future> - where - T: Into, - { - self.inner.get_mut().send(body, Some(tag)) - } - - pub fn send_no_block(&self, body: T) -> Result + /// Start delivery process + pub fn delivery(&self, body: T) -> DeliveryBuilder where T: Into, { - self.inner - .get_mut() - .send_no_block(body, None, self.inner.clone()) - } - - pub fn send_no_block_with_tag(&self, body: T, tag: Bytes) -> Result - where - T: Into, - { - self.inner - .get_mut() - .send_no_block(body, Some(tag), self.inner.clone()) - } - - pub fn settle_message(&self, id: DeliveryNumber, state: DeliveryState) { - self.inner.get_mut().settle_message(id, state); + DeliveryBuilder::new(body.into(), self.inner.clone()) } /// Close sender link @@ -199,18 +156,11 @@ impl SenderLink { self.inner.get_ref().on_credit.wait() } - pub fn on_disposition(&self, f: F) - where - F: Fn(Bytes, Result) + 'static, - { - self.inner.get_mut().on_disposition = Box::new(f); + pub fn max_message_size(&self) -> Option { + self.inner.get_ref().max_message_size } - pub fn max_message_size(&self) -> Option { - self.inner.get_mut().max_message_size - } - - pub fn set_max_message_size(&self, value: usize) { + pub fn set_max_message_size(&self, value: u32) { self.inner.get_mut().max_message_size = Some(value) } } @@ -222,7 +172,7 @@ impl SenderLinkInner { handle: Handle, delivery_count: SequenceNo, session: Cell, - max_message_size: Option, + max_message_size: Option, ) -> SenderLinkInner { let pool = session.get_ref().memory_pool(); SenderLinkInner { @@ -240,7 +190,6 @@ impl SenderLinkInner { delivery_tag: 0, on_close: condition::Condition::new(), on_credit: condition::Condition::new(), - on_disposition: Box::new(|_, _| ()), } } @@ -251,30 +200,22 @@ impl SenderLinkInner { name = Some(addr.clone()); } } - let pool = session.get_ref().memory_pool(); - let delivery_count = frame.initial_delivery_count().unwrap_or(0); let mut name = name.unwrap_or_default(); name.trimdown(); - SenderLinkInner { + let delivery_count = frame.initial_delivery_count().unwrap_or(0); + let max_message_size = frame + .max_message_size() + .map(|size| u32::try_from(size).unwrap_or(u32::MAX)); + + SenderLinkInner::new( id, - pool, name, + frame.handle(), delivery_count, - session: Session::new(session), - remote_handle: frame.handle(), - link_credit: 0, - delivery_tag: 0, - pending_transfers: VecDeque::new(), - error: None, - closed: false, - on_close: condition::Condition::new(), - on_credit: condition::Condition::new(), - on_disposition: Box::new(|_, _| ()), - max_message_size: frame - .max_message_size() - .map(|size| usize::try_from(size).unwrap_or(usize::MAX)), - } + session, + max_message_size, + ) } pub(crate) fn id(&self) -> u32 { @@ -289,7 +230,7 @@ impl SenderLinkInner { &self.name } - pub(crate) fn max_message_size(&self) -> Option { + pub(crate) fn max_message_size(&self) -> Option { self.max_message_size } @@ -302,10 +243,8 @@ impl SenderLinkInner { ); // drop pending transfers - for tr in self.pending_transfers.drain(..) { - if let TransferState::First(tx, _) | TransferState::Only(tx, _) = tr.state { - tx.ready(Err(err.clone())); - } + for tx in self.pending_transfers.drain(..) { + let _ = tx.send(Err(err.clone())); } self.closed = true; @@ -364,25 +303,10 @@ impl SenderLinkInner { ); self.link_credit += delta; - let session = self.session.inner.get_mut(); // credit became available => drain pending_transfers - while self.link_credit > 0 { - if let Some(transfer) = self.pending_transfers.pop_front() { - if !transfer.state.more() { - self.link_credit -= 1; - } - self.delivery_count = self.delivery_count.wrapping_add(1); - session.send_transfer( - self.id as u32, - transfer.body, - transfer.state, - transfer.settle, - transfer.message_format, - ); - } else { - break; - } + while let Some(tx) = self.pending_transfers.pop_front() { + let _ = tx.send(Ok(())); } // notify available credit waiters @@ -392,205 +316,48 @@ impl SenderLinkInner { } } - fn send>(&mut self, body: T, tag: Option) -> Delivery { - if let Some(ref err) = self.error { - Delivery::Resolved(Err(err.clone())) - } else { - let body = body.into(); - let message_format = body.message_format(); - - if let Some(limit) = self.max_message_size { - if body.len() > limit { - return Delivery::Resolved(Err(AmqpProtocolError::BodyTooLarge)); - } - } - - let (delivery_tx, delivery_rx) = self.session.inner.get_ref().pool.channel(); - - let max_frame_size = self.session.inner.get_ref().max_frame_size(); - let max_frame_size = if max_frame_size > 2048 { - max_frame_size - 2048 - } else if max_frame_size == 0 { - u32::MAX - } else { - max_frame_size - } as usize; - - // body is larger than allowed frame size, send body as a set of transfers - if body.len() > max_frame_size { - let mut body = match body { - TransferBody::Data(data) => data, - TransferBody::Message(msg) => { - let mut buf = self.pool.buf_with_capacity(msg.encoded_size()); - msg.encode(&mut buf); - buf.freeze() - } - }; - - let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); - let tag = self.get_tag(tag); - - self.send_inner( - chunk.into(), - message_format, - TransferState::First(DeliveryPromise::new(delivery_tx), tag), - ); - - loop { - let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); - - // last chunk - if body.is_empty() { - self.send_inner(chunk.into(), message_format, TransferState::Last); - break; - } - - self.send_inner(chunk.into(), message_format, TransferState::Continue); - } - } else { - let st = TransferState::Only(DeliveryPromise::new(delivery_tx), self.get_tag(tag)); - self.send_inner(body, message_format, st); - } - - Delivery::Pending(delivery_rx) - } - } - - fn send_no_block>( + pub(crate) async fn send>( &mut self, body: T, tag: Option, - link: Cell, - ) -> Result { + ) -> Result { if let Some(ref err) = self.error { Err(err.clone()) } else { let body = body.into(); - let message_format = body.message_format(); - - if let Some(limit) = self.max_message_size { - if body.len() > limit { - return Err(AmqpProtocolError::BodyTooLarge); - } - } - - let max_frame_size = self.session.inner.get_ref().max_frame_size(); - let max_frame_size = if max_frame_size > 2048 { - max_frame_size - 2048 - } else if max_frame_size == 0 { - u32::MAX - } else { - max_frame_size - } as usize; - - // body is larger than allowed frame size, send body as a set of transfers - if body.len() > max_frame_size { - let mut body = match body { - TransferBody::Data(data) => data, - TransferBody::Message(msg) => { - let mut buf = self.pool.buf_with_capacity(msg.encoded_size()); - msg.encode(&mut buf); - buf.freeze() - } - }; - - let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); - let tag = self.get_tag(tag); - log::trace!( - "{}: Body size if larger than max size, sending multiple tranfers for {:?}", - self.session.tag(), - tag - ); - - self.send_inner( - chunk.into(), - message_format, - TransferState::First(DeliveryPromise::new_link(link, tag.clone()), tag.clone()), - ); - - loop { - let chunk = body.split_to(std::cmp::min(max_frame_size, body.len())); - - // last chunk - if body.is_empty() { - log::trace!("{}: Sending last tranfer for {:?}", self.session.tag(), tag); - self.send_inner(chunk.into(), message_format, TransferState::Last); - break; - } + let tag = self.get_tag(tag); + loop { + if self.link_credit == 0 || !self.pending_transfers.is_empty() { log::trace!( - "{}: Sending chunk tranfer for {:?}", - self.session.tag(), - tag + "{}: Sender link credit is 0({:?}), push to pending queue hnd:{}({} -> {}), queue size: {}", self.session.tag(), + self.link_credit, + self.name, + self.id, + self.remote_handle, + self.pending_transfers.len() ); - self.send_inner(chunk.into(), message_format, TransferState::Continue); + let (tx, rx) = self.session.inner.get_ref().pool_credit.channel(); + self.pending_transfers.push_back(tx); + rx.await + .map_err(|_| AmqpProtocolError::ConnectionDropped) + .and_then(|v| v)?; + continue; } - Ok(tag) - } else { - let tag = self.get_tag(tag); - log::trace!( - "{}: Sending non-blocking tranfer for {:?}", - self.session.tag(), - tag - ); - let st = - TransferState::Only(DeliveryPromise::new_link(link, tag.clone()), tag.clone()); - self.send_inner(body, message_format, st); - Ok(tag) + break; } - } - } - fn send_inner( - &mut self, - body: TransferBody, - message_format: Option, - state: TransferState, - ) { - if self.link_credit == 0 || !self.pending_transfers.is_empty() { - log::trace!( - "{}: Sender link credit is 0({:?}), push to pending queue hnd:{}({} -> {}) {:?}, queue size: {}", self.session.tag(), - self.link_credit, - self.name, - self.id, - self.remote_handle, - state, - self.pending_transfers.len() - ); - self.pending_transfers.push_back(PendingTransfer { - state, - message_format, - settle: Some(false), - body: Some(body), - }); - } else { - // reduce link credit only if transfer is last - if !state.more() { - self.link_credit -= 1; - } + // reduce link credit + self.link_credit -= 1; self.delivery_count = self.delivery_count.wrapping_add(1); - self.session.inner.get_mut().send_transfer( - self.id as u32, - Some(body), - state, - None, - message_format, - ); + self.session + .inner + .get_mut() + .send_transfer(self.id as u32, tag, body, false, self.max_message_size) + .await } } - pub(crate) fn settle_message(&mut self, id: DeliveryNumber, state: DeliveryState) { - let disp = Disposition(Box::new(codec::DispositionInner { - role: Role::Sender, - first: id, - last: None, - settled: true, - state: Some(state), - batchable: false, - })); - self.session.inner.get_mut().post_frame(disp.into()); - } - fn get_tag(&mut self, tag: Option) -> Bytes { tag.unwrap_or_else(|| { let delivery_tag = self.delivery_tag; @@ -697,58 +464,3 @@ impl SenderLinkBuilder { } } } - -enum Delivery { - Resolved(Result), - Pending(pool::Receiver>), - Gone, -} - -impl Future for Delivery { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Delivery::Pending(ref mut receiver) = *self { - return match ready!(Pin::new(receiver).poll(cx)) { - Ok(r) => Poll::Ready(r), - Err(e) => { - log::trace!("Delivery oneshot is gone: {:?}", e); - Poll::Ready(Err(AmqpProtocolError::Disconnected)) - } - }; - } - - let old_v = mem::replace(&mut *self, Delivery::Gone); - if let Delivery::Resolved(r) = old_v { - return match r { - Ok(state) => Poll::Ready(Ok(state)), - Err(e) => Poll::Ready(Err(e)), - }; - } - panic!("Polling Delivery after it was polled as ready is an error."); - } -} - -#[derive(Debug)] -pub(crate) struct DeliveryPromise( - Either>, (Cell, Bytes)>, -); - -impl DeliveryPromise { - fn new(tx: pool::Sender>) -> Self { - DeliveryPromise(Either::Left(tx)) - } - - fn new_link(link: Cell, tag: Bytes) -> Self { - DeliveryPromise(Either::Right((link, tag))) - } - - pub(crate) fn ready(self, result: Result) { - match self.0 { - Either::Left(tx) => { - let _r = tx.send(result); - } - Either::Right((inner, tag)) => (*inner.get_ref().on_disposition)(tag, result), - } - } -} diff --git a/tests/test_server.rs b/tests/test_server.rs index 32f5660..bb4ba1f 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1,10 +1,12 @@ -use std::{cell::Cell, convert::TryFrom, rc::Rc, sync::Arc, sync::Mutex}; +use std::{convert::TryFrom, sync::Arc, sync::Mutex}; use ntex::server::test_server; use ntex::service::{boxed, boxed::BoxService, fn_factory_with_config, fn_service}; use ntex::util::{Bytes, Either, Ready}; use ntex::{http::Uri, rt, time::sleep, time::Millis}; -use ntex_amqp::{client, error::LinkError, server, types, ControlFrame, ControlFrameKind}; +use ntex_amqp::{ + client, codec::protocol, error::LinkError, server, types, ControlFrame, ControlFrameKind, +}; async fn server( link: types::Link<()>, @@ -50,19 +52,13 @@ async fn test_simple() -> std::io::Result<()> { .attach() .await .unwrap(); - link.send(Bytes::from(b"test".as_ref())).await.unwrap(); - - let res = Rc::new(Cell::new(false)); - let res2 = res.clone(); - - link.on_disposition(move |_tag, result| { - if result.is_ok() { - res2.set(true); - } - }); - link.send_no_block(Bytes::from(b"test".as_ref())).unwrap(); - sleep(Millis(500)).await; - assert!(res.get()); + let delivery = link + .delivery(Bytes::from(b"test".as_ref())) + .send() + .await + .unwrap(); + let st = delivery.wait().await.unwrap().unwrap(); + assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {})); Ok(()) } @@ -182,7 +178,11 @@ async fn test_session_end() -> std::io::Result<()> { .attach() .await .unwrap(); - link.send(Bytes::from(b"test".as_ref())).await.unwrap(); + let _delivery = link + .delivery(Bytes::from(b"test".as_ref())) + .send() + .await + .unwrap(); session.end().await.unwrap(); sleep(Millis(150)).await;