Skip to content

Commit

Permalink
Rename DeliveryBuilder to TransferBuilder (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jan 29, 2025
1 parent 009407d commit 051c2d8
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.5.0] - 2025-01-29

* Rename DeliveryBuilder to TransferBuilder

## [3.4.0] - 2025-01-08

* Allow to set Transfer format
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "3.4.0"
version = "3.5.0"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
20 changes: 9 additions & 11 deletions src/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,10 @@ where
{
/// Connect to amqp server
pub async fn connect(&self, address: A) -> Result<Client, ConnectError> {
let fut = timeout_checked(self.config.handshake_timeout, self._connect(address));
match fut.await {
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
timeout_checked(self.config.handshake_timeout, self._connect(address))
.await
.map_err(|_| ConnectError::HandshakeTimeout)
.and_then(|res| res)
}

/// Negotiate amqp protocol over opened socket
Expand All @@ -168,14 +167,13 @@ where

/// Connect to amqp server
pub async fn connect_sasl(&self, addr: A, auth: SaslAuth) -> Result<Client, ConnectError> {
let fut = timeout_checked(
timeout_checked(
self.config.handshake_timeout,
self._connect_sasl(addr, auth),
);
match fut.await {
Ok(res) => res.map_err(From::from),
Err(_) => Err(ConnectError::HandshakeTimeout),
}
)
.await
.map_err(|_| ConnectError::HandshakeTimeout)
.and_then(|res| res)
}

/// Negotiate amqp sasl protocol over opened socket
Expand Down
4 changes: 2 additions & 2 deletions src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,15 @@ impl Drop for DeliveryInner {
}
}

pub struct DeliveryBuilder {
pub struct TransferBuilder {
tag: Option<Bytes>,
settled: bool,
data: TransferBody,
format: Option<MessageFormat>,
sender: Cell<SenderLinkInner>,
}

impl DeliveryBuilder {
impl TransferBuilder {
pub(crate) fn new(data: TransferBody, sender: Cell<SenderLinkInner>) -> Self {
Self {
tag: None,
Expand Down
20 changes: 9 additions & 11 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,14 @@ where
}

// handle idle timeout
if self.idle_timeout.non_zero() {
if self.idle_sleep.poll_elapsed(cx).is_ready() {
log::trace!(
"{}: Send keep-alive ping, timeout: {:?} secs",
self.sink.tag(),
self.idle_timeout
);
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
self.idle_sleep.reset(self.idle_timeout);
}
if self.idle_timeout.non_zero() && self.idle_sleep.poll_elapsed(cx).is_ready() {
log::trace!(
"{}: Send keep-alive ping, timeout: {:?} secs",
self.sink.tag(),
self.idle_timeout
);
self.sink.post_frame(AmqpFrame::new(0, Frame::Empty));
self.idle_sleep.reset(self.idle_timeout);
}

Ok(())
Expand Down Expand Up @@ -391,7 +389,7 @@ pin_project_lite::pin_project! {
}
}

impl<'f, F, E> Future for ServiceResult<'f, F, E>
impl<F, E> Future for ServiceResult<'_, F, E>
where
F: Future<Output = Result<(), E>>,
E: Into<Error>,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod types;

pub use self::connection::{Connection, ConnectionRef};
pub use self::control::{ControlFrame, ControlFrameKind};
pub use self::delivery::{Delivery, DeliveryBuilder};
pub use self::delivery::{Delivery, TransferBuilder};
pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
pub use self::session::Session;
pub use self::sndlink::{SenderLink, SenderLinkBuilder};
Expand Down
16 changes: 13 additions & 3 deletions src/sndlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ntex_amqp_codec::protocol::{
SenderSettleMode, SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
};

use crate::delivery::DeliveryBuilder;
use crate::delivery::TransferBuilder;
use crate::session::{Session, SessionInner};
use crate::{cell::Cell, error::AmqpProtocolError, Handle};

Expand Down Expand Up @@ -120,12 +120,22 @@ impl SenderLink {
self.inner.get_ref().error.as_ref()
}

#[doc(hidden)]
#[deprecated]
/// Start delivery process
pub fn delivery<T>(&self, body: T) -> DeliveryBuilder
pub fn delivery<T>(&self, body: T) -> TransferBuilder
where
T: Into<TransferBody>,
{
DeliveryBuilder::new(body.into(), self.inner.clone())
self.transfer(body)
}

/// Start delivery process
pub fn transfer<T>(&self, body: T) -> TransferBuilder
where
T: Into<TransferBody>,
{
TransferBuilder::new(body.into(), self.inner.clone())
}

/// Close sender link
Expand Down
11 changes: 6 additions & 5 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ async fn test_simple() -> std::io::Result<()> {
.await
.unwrap();
let delivery = link
.delivery(Bytes::from(b"test".as_ref()))
.transfer(Bytes::from(b"test".as_ref()))
.send()
.await
.unwrap();
let st = delivery.wait().await.unwrap().unwrap();
assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {}));

let delivery = link
.delivery(Bytes::from(b"test".as_ref()))
.transfer(Bytes::from(b"test".as_ref()))
.settled()
.send()
.await
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn test_large_transfer() -> std::io::Result<()> {
.unwrap();

let delivery = link
.delivery(Bytes::from(data.clone()))
.transfer(Bytes::from(data.clone()))
.send()
.await
.unwrap();
Expand Down Expand Up @@ -274,7 +274,7 @@ async fn test_session_end() -> std::io::Result<()> {
.await
.unwrap();
let _delivery = link
.delivery(Bytes::from(b"test".as_ref()))
.transfer(Bytes::from(b"test".as_ref()))
.send()
.await
.unwrap();
Expand Down Expand Up @@ -512,7 +512,8 @@ async fn test_drop_delivery_on_link_detach() -> std::io::Result<()> {
.unwrap();

let delivery = link
.delivery(Bytes::from(b"test".as_ref()))
.transfer(Bytes::from(b"test".as_ref()))
.format(1)
.send()
.await
.unwrap();
Expand Down

0 comments on commit 051c2d8

Please sign in to comment.