Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Sep 9, 2024
1 parent 1c89eed commit 641c74e
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 65 deletions.
49 changes: 40 additions & 9 deletions metalmq/src/client/channel/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@ use tokio::sync::oneshot;
use crate::{
error::{ChannelError, ConnectionError, Result},
exchange::{self, handler::ExchangeCommand, manager::GetExchangeSinkQuery},
message::Message,
message::{self, Message},
queue,
};

use super::types::{ActivelyConsumedQueue, Channel, PassivelyConsumedQueue, PublishedContent};

impl Channel {
pub async fn handle_basic_publish(&mut self, args: frame::BasicPublishArgs) -> Result<()> {
//if let Some(exchange) = self.exchanges.get(&args.exchange_name) {
//let cmd = ExchangeCommand::Message { message: todo!(), returned: todo!() };

//exchange.send(cmd).await.unwrap();

if self.in_flight_content.is_some() {
return ConnectionError::UnexpectedFrame.to_result(frame::BASIC_PUBLISH, "Already publish message arrived");
}
Expand Down Expand Up @@ -234,7 +229,7 @@ impl Channel {
}

pub async fn handle_confirm_select(&mut self, _args: frame::ConfirmSelectArgs) -> Result<()> {
self.next_confirm_delivery_tag = 1u64;
self.next_confirm_delivery_tag = Some(1u64);

self.send_frame(Frame::Frame(frame::confirm_select_ok(self.number)))
.await?;
Expand Down Expand Up @@ -275,12 +270,48 @@ impl Channel {
let msg: Message = pc.into();

if let Some(ex_tx) = self.exchanges.get(&msg.exchange) {
// In confirm mode or if message is mandatory we need to be sure that the message
// was router correctly from the exchange to the queue.
let (rtx, rrx) = match msg.mandatory || self.next_confirm_delivery_tag.is_some() {
false => (None, None),
true => {
let (tx, rx) = oneshot::channel();

(Some(tx), Some(rx))
}
};

let cmd = ExchangeCommand::Message {
message: msg,
returned: None,
returned: rtx,
};

ex_tx.send(cmd).await.unwrap();

if let Some(rx) = rrx {
match rx.await.unwrap() {
Some(returned_message) => {
message::send_basic_return(returned_message, self.frame_size, &self.outgoing)
.await
.unwrap();
}
None => {
// TODO do we need to send ack if the message is mandatory or
// immediate?
if let Some(dt) = &mut self.next_confirm_delivery_tag {
// We can keep this mut shorter, if it matters
self.outgoing
.send(Frame::Frame(
frame::BasicAckArgs::default().delivery_tag(*dt).frame(channel),
))
.await
.unwrap();

*dt += 1;
}
}
}
}
}
}

Expand Down Expand Up @@ -394,7 +425,7 @@ mod tests {
passively_consumed_queue: None,
in_flight_content: None,
confirm_mode: false,
next_confirm_delivery_tag: 1u64,
next_confirm_delivery_tag: None,
frame_size: 65535,
outgoing: tx,
exchanges: HashMap::new(),
Expand Down
2 changes: 2 additions & 0 deletions metalmq/src/client/channel/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl Channel {
metalmq_codec::frame::MethodFrameArgs::ConfirmSelectOk => unreachable!(),
_ => unreachable!(),
};

result?;
}
Close(reason, cm, text) => self.handle_channel_close(reason, cm, text).await?,
ContentHeader(header) => self.handle_content_header(header).await?,
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/client/channel/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct Channel {
pub passively_consumed_queue: Option<PassivelyConsumedQueue>,
pub in_flight_content: Option<PublishedContent>,
pub confirm_mode: bool,
pub next_confirm_delivery_tag: u64,
pub next_confirm_delivery_tag: Option<u64>,
pub frame_size: usize,
pub outgoing: mpsc::Sender<Frame>,
pub exchanges: HashMap<String, exchange::handler::ExchangeCommandSink>,
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/client/connection/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Connection {
passively_consumed_queue: None,
in_flight_content: None,
confirm_mode: false,
next_confirm_delivery_tag: 1u64,
next_confirm_delivery_tag: None,
frame_size: self.frame_max,
outgoing: self.outgoing.clone(),
exchanges: HashMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/exchange/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use metalmq_codec::{codec::Frame, frame};
use std::collections::HashMap;
use tokio::sync::{mpsc, oneshot};

use super::handler::QueueBindCmd;
use super::{handler::QueueBindCmd, ExchangeType};

pub struct ExchangeState {
pub exchange: Exchange,
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/tests/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn connect_with_username_password() -> Result<()> {
#[tokio::test]
async fn connect_and_open_channel() -> Result<()> {
let test_case = TestCase::new().await;
let mut client = test_case.new_connected_client(1).await;
let mut client = test_case.new_client_with_channel(1).await;

client
.connection
Expand Down
21 changes: 11 additions & 10 deletions metalmq/src/tests/consume.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use metalmq_codec::frame::{self, AMQPFrame::Method, BasicCancelArgs, BasicConsumeArgs, MethodFrameArgs};
use metalmq_codec::frame::{self, BasicCancelArgs, BasicConsumeArgs};
use test_client::basic_deliver_args;

use crate::tests::*;

#[tokio::test]
async fn one_consumer() {
let test_case = TestCase::new().await;
let mut test_client = test_case.new_connected_client(1).await;
let mut test_client = test_case.new_client_with_channel(1).await;

test_client
.basic_consume(1, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag"))
Expand Down Expand Up @@ -42,21 +42,22 @@ async fn one_consumer() {

let cancel_ok = test_client.recv_single_frame().await;

let args = if let Method(_, _, MethodFrameArgs::BasicCancelOk(args)) = cancel_ok {
args
} else {
panic!("Not a Basic.CancelOk frame");
};

assert_eq!(args.consumer_tag, "ctag");
assert!(matches!(
cancel_ok,
frame::AMQPFrame::Method(
1,
_,
frame::MethodFrameArgs::BasicCancelOk(frame::BasicCancelOkArgs { .. })
)
));

test_case.teardown().await;
}

#[tokio::test]
async fn one_consumer_redeliver() {
let test_case = TestCase::new().await;
let mut test_client = test_case.new_connected_client(1).await;
let mut test_client = test_case.new_client_with_channels(&[1, 2, 3]).await;

// Consume the queue
test_client
Expand Down
14 changes: 13 additions & 1 deletion metalmq/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl TestCase {
}
}

async fn new_connected_client(&self, channel: u16) -> TestClient {
async fn new_client_with_channel(&self, channel: u16) -> TestClient {
let mut client = self.new_client();

client.connect().await;
Expand All @@ -74,6 +74,18 @@ impl TestCase {
client
}

async fn new_client_with_channels(&self, channels: &[u16]) -> TestClient {
let mut client = self.new_client();

client.connect().await;

for channel in channels {
client.open_channel(*channel).await;
}

client
}

async fn setup(mut self) -> Self {
self.exchange_declare("x-direct", ExchangeType::Direct).await;
self.exchange_declare("x-fanout", ExchangeType::Fanout).await;
Expand Down
46 changes: 25 additions & 21 deletions metalmq/src/tests/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ use crate::tests::{
#[tokio::test]
async fn basic_publish_mandatory_message() {
let test_case = TestCase::new().await;
let mut test_client = test_case.new_client();
let mut test_client = test_case.new_client_with_channel(1).await;

// Publish message to an exchange which doesn't route to queues -> channel error
test_client
.publish_content(1u16, "x-direct", "invalid-key", b"A simple message")
.await;
let mandatory_message = frame::BasicPublishArgs::new("x-direct")
.routing_key("invalid-key")
.mandatory(true)
.frame(1);

test_client.send_frame(mandatory_message).await;

// Since the routing key is not matching and message is mandatory, server sends back the message
// with a Basic.Return frame
let return_frames = unpack_frames(test_client.recv_timeout().await.unwrap());
let return_frames = test_client.recv_frames().await;

assert!(matches!(
dbg!(return_frames).get(0).unwrap(),
dbg!(return_frames).first().unwrap(),
frame::AMQPFrame::Method(1u16, _, frame::MethodFrameArgs::BasicReturn(_))
));

Expand All @@ -46,13 +49,13 @@ async fn basic_publish_mandatory_message() {
#[tokio::test]
async fn basic_get_empty_and_ok() {
let test_case = TestCase::new().await;
let mut test_client = test_case.new_client();
let mut test_client = test_case.new_client_with_channels(&[1, 2]).await;

test_client
.basic_get(2, frame::BasicGetArgs::new("q-fanout").no_ack(false))
.await;

let get_empty_frames = unpack_frames(test_client.recv_timeout().await.unwrap());
let get_empty_frames = test_client.recv_frames().await;

assert!(matches!(
dbg!(get_empty_frames).get(0).unwrap(),
Expand All @@ -68,25 +71,24 @@ async fn basic_get_empty_and_ok() {

test_client.basic_get(2, frame::BasicGetArgs::new("q-fanout")).await;

let get_frames = unpack_frames(test_client.recv_timeout().await.unwrap());
let get_frames = test_client.recv_frames().await;

assert!(matches!(
dbg!(get_frames).get(0).unwrap(),
dbg!(get_frames).first().unwrap(),
frame::AMQPFrame::Method(
2,
_,
frame::MethodFrameArgs::BasicGetOk(frame::BasicGetOkArgs { redelivered: false, .. })
)
));

test_client.close_channel(1).await;
test_client.close().await;
test_case.teardown().await;
}

#[tokio::test]
async fn basic_ack_multiple() {
let test_case = TestCase::new().await;
let mut test_client = test_case.new_client();
let mut test_client = test_case.new_client_with_channel(3).await;

// Send 10 messages
for i in 0..10 {
Expand All @@ -104,7 +106,7 @@ async fn basic_ack_multiple() {
sleep(100).await;

// Consume 10 messages with another client
let mut consumer = test_case.new_client();
let mut consumer = test_case.new_client_with_channel(4).await;

consumer
.basic_consume(
Expand All @@ -115,16 +117,16 @@ async fn basic_ack_multiple() {
)
.await;

let consumer_ok = unpack_frames(test_client.recv_timeout().await.unwrap());
let consumer_ok = consumer.recv_single_frame().await;
assert!(matches!(
dbg!(consumer_ok).get(0).unwrap(),
dbg!(consumer_ok),
frame::AMQPFrame::Method(4, _, frame::MethodFrameArgs::BasicConsumeOk(_))
));

let mut last_delivery_tag = 0u64;

for _ in 0..10 {
let message_frames = unpack_frames(test_client.recv_timeout().await.unwrap());
let message_frames = consumer.recv_frames().await;

dbg!(&message_frames);

Expand All @@ -150,16 +152,16 @@ async fn basic_ack_multiple() {
.await;

// Check if queue is empty by deleting and if it is empty
test_client
consumer
.queue_delete(
4,
frame::QueueDeleteArgs::default().queue_name("q-direct").if_empty(true),
)
.await;

let delete_ok = unpack_frames(test_client.recv_timeout().await.unwrap());
let delete_ok = consumer.recv_single_frame().await;
assert!(matches!(
dbg!(delete_ok).get(0).unwrap(),
dbg!(delete_ok),
frame::AMQPFrame::Method(
4,
_,
Expand All @@ -171,7 +173,9 @@ async fn basic_ack_multiple() {
#[tokio::test]
async fn publish_to_topic_exchange() {
let test_case = TestCase::new().await;
let mut test_client = test_case.new_client();
let mut test_client = test_case.new_client_with_channels(&[1, 2]).await;

test_client.open_channel(2).await;

test_client
.publish_content(1, "x-topic", "topic.key", b"Topic test")
Expand Down
Loading

0 comments on commit 641c74e

Please sign in to comment.