Skip to content

Commit

Permalink
Add proper delivery handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Mar 4, 2024
1 parent e15dbc5 commit d54b6ac
Show file tree
Hide file tree
Showing 8 changed files with 528 additions and 538 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.1.0] - 2024-03-04

* Add proper delivery handling

## [codec-0.9.2] - 2024-02-01

* Add more buffer length checks
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "1.0.2"
version = "1.1.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
294 changes: 294 additions & 0 deletions src/delivery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
use std::cell::Cell as StdCell;

use ntex::{channel::pool, util::Bytes};
use ntex_amqp_codec::protocol::{
DeliveryNumber, DeliveryState, Disposition, DispositionInner, Error, ErrorCondition, Rejected,
Role, TransferBody,
};
use ntex_amqp_codec::types::{Str, Symbol};

use crate::session::Session;
use crate::{cell::Cell, error::AmqpProtocolError, sndlink::SenderLinkInner};

bitflags::bitflags! {
#[derive(Copy, Clone, Debug)]

Check warning on line 14 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L14

Added line #L14 was not covered by tests
struct Flags: u8 {
const SENDER = 0b0000_0001;
const LOCAL_SETTLED = 0b0000_0100;
const REMOTE_SETTLED = 0b0000_1000;
}
}

pub struct Delivery {
id: DeliveryNumber,
session: Session,
flags: StdCell<Flags>,
}

#[derive(Default, Debug)]
pub(crate) struct DeliveryInner {
settled: bool,
state: Option<DeliveryState>,
error: Option<AmqpProtocolError>,
tx: Option<pool::Sender<()>>,
}

impl Delivery {
pub fn remote_state(&self) -> Option<DeliveryState> {
if let Some(inner) = self

Check warning on line 38 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L37-L38

Added lines #L37 - L38 were not covered by tests
.session
.inner
.get_mut()
.unsettled_deliveries
.get_mut(&self.id)

Check warning on line 43 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L43

Added line #L43 was not covered by tests
{
inner.state.clone()

Check warning on line 45 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L45

Added line #L45 was not covered by tests
} else {
None

Check warning on line 47 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L47

Added line #L47 was not covered by tests
}
}

pub fn is_remote_settled(&self) -> bool {
self.is_set(Flags::REMOTE_SETTLED)

Check warning on line 52 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
}

pub fn settle(&mut self, state: DeliveryState) {

Check warning on line 55 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L55

Added line #L55 was not covered by tests
// remote side is settled, not need to send disposition
if self.is_set(Flags::REMOTE_SETTLED) {

Check warning on line 57 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L57

Added line #L57 was not covered by tests
return;
}

if !self.is_set(Flags::LOCAL_SETTLED) {
self.set_flag(Flags::LOCAL_SETTLED);

Check warning on line 62 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L61-L62

Added lines #L61 - L62 were not covered by tests

let disp = Disposition(Box::new(DispositionInner {
role: if self.is_set(Flags::SENDER) {
Role::Sender

Check warning on line 66 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L64-L66

Added lines #L64 - L66 were not covered by tests
} else {
Role::Receiver

Check warning on line 68 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L68

Added line #L68 was not covered by tests
},
first: self.id,
last: None,

Check warning on line 71 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L70-L71

Added lines #L70 - L71 were not covered by tests
settled: true,
state: Some(state),

Check warning on line 73 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L73

Added line #L73 was not covered by tests
batchable: false,
}));
self.session.inner.get_mut().post_frame(disp.into());

Check warning on line 76 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L76

Added line #L76 was not covered by tests
}
}

pub fn update_state(&mut self, state: DeliveryState) {

Check warning on line 80 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L80

Added line #L80 was not covered by tests
// remote side is settled, not need to send disposition
if self.is_set(Flags::REMOTE_SETTLED) || self.is_set(Flags::LOCAL_SETTLED) {

Check warning on line 82 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L82

Added line #L82 was not covered by tests
return;
}

let disp = Disposition(Box::new(DispositionInner {
role: if self.is_set(Flags::SENDER) {
Role::Sender

Check warning on line 88 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L86-L88

Added lines #L86 - L88 were not covered by tests
} else {
Role::Receiver

Check warning on line 90 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L90

Added line #L90 was not covered by tests
},
first: self.id,
last: None,

Check warning on line 93 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L92-L93

Added lines #L92 - L93 were not covered by tests
settled: false,
state: Some(state),

Check warning on line 95 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L95

Added line #L95 was not covered by tests
batchable: false,
}));
self.session.inner.get_mut().post_frame(disp.into());

Check warning on line 98 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L98

Added line #L98 was not covered by tests
}

fn is_set(&self, flag: Flags) -> bool {
self.flags.get().contains(flag)
}

fn set_flag(&self, flag: Flags) {
let mut flags = self.flags.get();
flags.insert(flag);
self.flags.set(flags);
}

pub async fn wait(&self) -> Result<Option<DeliveryState>, AmqpProtocolError> {
let rx = if let Some(inner) = self
.session
.inner
.get_mut()
.unsettled_deliveries
.get_mut(&self.id)
{
if let Some(st) = self.check_inner(inner) {
return st;

Check warning on line 120 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L120

Added line #L120 was not covered by tests
}

let (tx, rx) = self.session.inner.get_ref().pool_notify.channel();
inner.tx = Some(tx);
rx
} else {
return Ok(None);

Check warning on line 127 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L127

Added line #L127 was not covered by tests
};
if rx.await.is_err() {
return Err(AmqpProtocolError::ConnectionDropped);

Check warning on line 130 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L130

Added line #L130 was not covered by tests
}

if let Some(inner) = self
.session
.inner
.get_mut()
.unsettled_deliveries
.get_mut(&self.id)
{
if inner.settled {
self.set_flag(Flags::REMOTE_SETTLED);
}
if let Some(st) = self.check_inner(inner) {
return st;
}
}
Ok(None)

Check warning on line 147 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L147

Added line #L147 was not covered by tests
}

fn check_inner(
&self,
inner: &mut DeliveryInner,
) -> Option<Result<Option<DeliveryState>, AmqpProtocolError>> {
if let Some(ref st) = inner.state {
if matches!(st, DeliveryState::Modified(..)) {
// non terminal state
Some(Ok(Some(inner.state.take().unwrap())))

Check warning on line 157 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L157

Added line #L157 was not covered by tests
} else {
// return clone of terminal state
Some(Ok(Some(st.clone())))
}
} else if let Some(ref err) = inner.error {
Some(Err(err.clone()))

Check warning on line 163 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L163

Added line #L163 was not covered by tests
} else {
None
}

Check warning on line 166 in src/delivery.rs

View workflow job for this annotation

GitHub Actions / clippy

manual implementation of `Option::map`

warning: manual implementation of `Option::map` --> src/delivery.rs:162:16 | 162 | } else if let Some(ref err) = inner.error { | ________________^ 163 | | Some(Err(err.clone())) 164 | | } else { 165 | | None 166 | | } | |_________^ help: try: `{ inner.error.as_ref().map(|err| Err(err.clone())) }` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#manual_map = note: `#[warn(clippy::manual_map)]` on by default
}
}

impl Drop for Delivery {
fn drop(&mut self) {
let inner = self.session.inner.get_mut();

if inner.unsettled_deliveries.contains_key(&self.id) {
if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) {
let err = Error::build()
.condition(ErrorCondition::Custom(Symbol(Str::Static(

Check warning on line 177 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L176-L177

Added lines #L176 - L177 were not covered by tests
"Internal error",
))))
.finish();

let disp = Disposition(Box::new(DispositionInner {
role: if self.is_set(Flags::SENDER) {
Role::Sender

Check warning on line 184 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L182-L184

Added lines #L182 - L184 were not covered by tests
} else {
Role::Receiver

Check warning on line 186 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L186

Added line #L186 was not covered by tests
},
first: self.id,
last: None,

Check warning on line 189 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L188-L189

Added lines #L188 - L189 were not covered by tests
settled: true,
state: Some(DeliveryState::Rejected(Rejected { error: Some(err) })),

Check warning on line 191 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L191

Added line #L191 was not covered by tests
batchable: false,
}));
inner.post_frame(disp.into());

Check warning on line 194 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L194

Added line #L194 was not covered by tests
}

inner.unsettled_deliveries.remove(&self.id);
}
}
}

impl DeliveryInner {
pub(crate) fn new() -> Self {
Self {
tx: None,
state: None,
error: None,
settled: false,
}
}

pub(crate) fn set_error(&mut self, error: AmqpProtocolError) {
self.error = Some(error);
if let Some(tx) = self.tx.take() {
let _ = tx.send(());

Check warning on line 215 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L215

Added line #L215 was not covered by tests
}
}

pub(crate) fn handle_disposition(&mut self, disp: Disposition) {
if disp.settled() {
self.settled = true;
}
if let Some(state) = disp.state() {
self.state = Some(state.clone());
}
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}

impl Drop for DeliveryInner {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());

Check warning on line 235 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L235

Added line #L235 was not covered by tests
}
}
}

pub struct DeliveryBuilder {
tag: Option<Bytes>,
settled: bool,
data: TransferBody,
sender: Cell<SenderLinkInner>,
}

impl DeliveryBuilder {
pub(crate) fn new(data: TransferBody, sender: Cell<SenderLinkInner>) -> Self {
Self {
tag: None,
settled: false,
data,
sender,
}
}

pub fn tag(mut self, tag: Bytes) -> Self {
self.tag = Some(tag);
self

Check warning on line 259 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L257-L259

Added lines #L257 - L259 were not covered by tests
}

pub fn settled(mut self) -> Self {
self.settled = true;
self

Check warning on line 264 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L262-L264

Added lines #L262 - L264 were not covered by tests
}

pub async fn send(self) -> Result<Delivery, AmqpProtocolError> {
let inner = self.sender.get_ref();

if let Some(ref err) = inner.error {
Err(err.clone())

Check warning on line 271 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L271

Added line #L271 was not covered by tests
} else {
if inner
.max_message_size
.map(|l| self.data.len() > l as usize)
.unwrap_or_default()
{
Err(AmqpProtocolError::BodyTooLarge)

Check warning on line 278 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L278

Added line #L278 was not covered by tests
} else {
let id = self.sender.get_mut().send(self.data, self.tag).await?;

Ok(Delivery {
id,
session: self.sender.get_ref().session.clone(),
flags: StdCell::new(if self.settled {
Flags::SENDER | Flags::LOCAL_SETTLED

Check warning on line 286 in src/delivery.rs

View check run for this annotation

Codecov / codecov/patch

src/delivery.rs#L286

Added line #L286 was not covered by tests
} else {
Flags::SENDER
}),
})
}
}

Check warning on line 292 in src/delivery.rs

View workflow job for this annotation

GitHub Actions / clippy

this `else { if .. }` block can be collapsed

warning: this `else { if .. }` block can be collapsed --> src/delivery.rs:272:16 | 272 | } else { | ________________^ 273 | | if inner 274 | | .max_message_size 275 | | .map(|l| self.data.len() > l as usize) ... | 291 | | } 292 | | } | |_________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_else_if = note: `#[warn(clippy::collapsible_else_if)]` on by default help: collapse nested if block | 272 ~ } else if inner 273 + .max_message_size 274 + .map(|l| self.data.len() > l as usize) 275 + .unwrap_or_default() 276 + { 277 + Err(AmqpProtocolError::BodyTooLarge) 278 + } else { 279 + let id = self.sender.get_mut().send(self.data, self.tag).await?; 280 + 281 + Ok(Delivery { 282 + id, 283 + session: self.sender.get_ref().session.clone(), 284 + flags: StdCell::new(if self.settled { 285 + Flags::SENDER | Flags::LOCAL_SETTLED 286 + } else { 287 + Flags::SENDER 288 + }), 289 + }) 290 + } |
}
}
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![deny(rust_2018_idioms, warnings, unreachable_pub)]
//#![deny(rust_2018_idioms, warnings, unreachable_pub)]
#![allow(clippy::type_complexity, clippy::let_underscore_future)]

#[macro_use]
Expand All @@ -13,6 +13,7 @@ pub mod client;
mod connection;
mod control;
mod default;
mod delivery;
mod dispatcher;
pub mod error;
pub mod error_code;
Expand All @@ -26,6 +27,7 @@ pub mod types;

pub use self::connection::{Connection, ConnectionRef};
pub use self::control::{ControlFrame, ControlFrameKind};
pub use self::delivery::{Delivery, DeliveryBuilder};
pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
pub use self::session::Session;
pub use self::sndlink::{SenderLink, SenderLinkBuilder};
Expand Down
13 changes: 2 additions & 11 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use std::{
use ntex::util::{ByteString, BytesMut, PoolRef, Stream};
use ntex::{channel::oneshot, task::LocalWaker};
use ntex_amqp_codec::protocol::{
self as codec, Attach, DeliveryNumber, Disposition, Error, Handle, LinkError,
ReceiverSettleMode, Role, SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy,
Transfer, TransferBody,
self as codec, Attach, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role,
SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer, TransferBody,
};
use ntex_amqp_codec::types::{Symbol, Variant};
use ntex_amqp_codec::Encode;
Expand Down Expand Up @@ -133,14 +132,6 @@ impl ReceiverLink {
.post_frame(disp.into());
}

/// Wait for disposition with specified number
pub fn wait_disposition(
&self,
id: DeliveryNumber,
) -> impl Future<Output = Result<Disposition, AmqpProtocolError>> {
self.inner.get_mut().session.wait_disposition(id)
}

pub fn close(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
self.inner.get_mut().close(None)
}
Expand Down
Loading

0 comments on commit d54b6ac

Please sign in to comment.