diff --git a/metalmq/src/client/channel/basic.rs b/metalmq/src/client/channel/basic.rs index 8868ba5..faf6324 100644 --- a/metalmq/src/client/channel/basic.rs +++ b/metalmq/src/client/channel/basic.rs @@ -405,7 +405,7 @@ mod tests { use tokio::sync::mpsc; use crate::exchange; - use crate::tests::recv_timeout; + use crate::tests::recv::recv_with_timeout; use super::*; @@ -456,7 +456,7 @@ mod tests { channel.handle_content_body(body).await?; - let cmd = recv_timeout(&mut ex_rx).await.expect("No frame received"); + let cmd = recv_with_timeout(&mut ex_rx).await.expect("No frame received"); assert!(matches!( cmd, diff --git a/metalmq/src/client/channel/router.rs b/metalmq/src/client/channel/router.rs index 63a5133..2bc5cdc 100644 --- a/metalmq/src/client/channel/router.rs +++ b/metalmq/src/client/channel/router.rs @@ -11,7 +11,7 @@ impl Channel { while let Some(m) = rx.recv().await { match m { - MethodFrame(ch, cm, ma) => { + MethodFrame(_ch, _cm, ma) => { let result = match ma { metalmq_codec::frame::MethodFrameArgs::ChannelClose(args) => { self.handle_channel_close( diff --git a/metalmq/src/client/connection.rs b/metalmq/src/client/connection.rs index 9a5a414..eaa8501 100644 --- a/metalmq/src/client/connection.rs +++ b/metalmq/src/client/connection.rs @@ -1,3 +1,6 @@ pub mod open_close; pub mod router; pub mod types; + +#[cfg(test)] +mod tests; diff --git a/metalmq/src/client/connection/tests.rs b/metalmq/src/client/connection/tests.rs new file mode 100644 index 0000000..a4c8a0e --- /dev/null +++ b/metalmq/src/client/connection/tests.rs @@ -0,0 +1,102 @@ +// Test cases +// - here we need to test if specific events made the necessary state changes in Connection +// struct +// +// connection tune with good and bad numbers +// +// connection open with bad virtual host +// - probably I have more tests when there will be multiple virtual hosts from config +// +// connection close closes all the channels + +use metalmq_codec::{codec::Frame, frame}; +use tokio::sync::mpsc; + +use crate::{exchange, queue, tests::recv, Context}; + +use super::types::Connection; + +fn new_context() -> Context { + let em = exchange::manager::start(); + let qm = queue::manager::start(em.clone()); + + Context { + exchange_manager: em, + queue_manager: qm, + } +} + +#[tokio::test] +async fn connection_close_clean_up_channels() { + use metalmq_codec::frame::{AMQPFrame, MethodFrameArgs}; + + let ctx = new_context(); + let (tx, mut rx) = mpsc::channel(1); + let mut connection = Connection::new(ctx, tx); + + connection.handle_client_frame(frame::AMQPFrame::Header).await.unwrap(); + + let connection_start = recv::recv_single_frame(&mut rx).await; + + assert!(matches!( + connection_start, + AMQPFrame::Method(0, _, MethodFrameArgs::ConnectionStart(_)) + )); + + connection + .handle_connection_start_ok(frame::ConnectionStartOkArgs::new("guest", "guest")) + .await + .unwrap(); + + let connection_tune = recv::recv_single_frame(&mut rx).await; + + connection + .handle_connection_tune_ok(frame::ConnectionTuneOkArgs::default()) + .await + .unwrap(); + + connection + .handle_connection_open(frame::ConnectionOpenArgs::default().virtual_host("/")) + .await + .unwrap(); + + let connection_open_ok = recv::recv_single_frame(&mut rx).await; + + // start a channel + // start a consumer + // + // test should cancel consumer, close channel, stop channel spawned task, and close connection + + connection.handle_channel_open(11).await.unwrap(); + + let channel_open_ok = recv::recv_single_frame(&mut rx).await; + + assert!(connection.channel_receivers.contains_key(&11)); + + connection + .handle_connection_close(frame::ConnectionCloseArgs::default()) + .await + .unwrap(); + + //assert!(!connection.channel_handlers.contains_key(&11)); + + // TODO this is a deadlock situation, here we need to have a blocking call - probably we don't + // need - which emits channel close and other messages, but it will get channel close ok + // message from client, then it can go and close the other channel or other resources, and so + // on. + // The problem however is that we block the incoming loop of the connection and if a channel + // close ok message arrives, we cannot route to the appropriate channel. So maybe connection + // close should be an implicit event, we need to + // - if we are asked to close the connection, we need to check if there are channels open + // - if not we can send back the connection close ok message and we are done + // - if not we need to start closing all the channels when all the channel close ok + // messages coming back we can close the connection. But what if the client doesn't send + // back all the close messages? We will leak connections... okay we have heartbeat, so it + // will close connection, but... + + let channel_close = recv::recv_single_frame(&mut rx).await; + + connection.handle_channel_close_ok(11).await.unwrap(); + + //let connection_close_ok = recv::recv_single_frame(&mut rx).await; +} diff --git a/metalmq/src/exchange/binding.rs b/metalmq/src/exchange/binding.rs index 176d66f..e54a511 100644 --- a/metalmq/src/exchange/binding.rs +++ b/metalmq/src/exchange/binding.rs @@ -1,3 +1,7 @@ +//! Handling the exchange-queue bindings. +//! +//! To see the list of exchange bindings and their semantic, see [AMQP +//! exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts#exchanges). use crate::{ logerr, message::Message, @@ -11,6 +15,8 @@ use tokio::sync::oneshot; use super::ExchangeType; +/// A direct exchange routes the message if the routing key of the message is exactly the same +/// as the routing key of the binding. It is legal to have two bindings with the same routing key. #[derive(Debug)] pub struct DirectBinding { pub routing_key: String, @@ -18,12 +24,21 @@ pub struct DirectBinding { pub queue: QueueCommandSink, } +/// A fanout exchange routes the message to all bound queues, disregarding the routing key of the +/// message. #[derive(Debug)] pub struct FanoutBinding { pub queue_name: String, pub queue: QueueCommandSink, } +/// Topic exchange routes the message to the queues whose routing key matches to the routing key of +/// the message. +/// +/// A binding routing key can be 'price.nyse.*' or 'price.nikkei.*' so the message with routing key +/// 'price.nyse.goog' will match to the first pattern. The dot has special meaning, it separates +/// the topic path, and the '*' matches a arbitrary topic path segment, and the '#' matches one or +/// more arbitrary path segments. So 'price.#' matches all price messages. #[derive(Debug)] pub struct TopicBinding { pub routing_key: String, @@ -31,6 +46,16 @@ pub struct TopicBinding { pub queue: QueueCommandSink, } +/// Headers exchange routes messages by message header matching. +/// +/// Headers exchange ignores the routing key of the message, and it matches the headers of the +/// message with the headers of the binding. +/// +/// If there are more header values specified one can decide if all or any of them need to be +/// matched. This described by the 'x-match' header and the value can be 'any' or 'all'. In these +/// cases the headers starting with 'x-' are not taken into account. But if the 'x-match' is +/// 'any-with-x' or 'all-with-x', all the headers including the ones starting with 'x-' are +/// considered. #[derive(Debug)] pub struct HeadersBinding { pub headers: HashMap, @@ -51,6 +76,7 @@ pub enum Bindings { } impl Bindings { + /// Create a new exchange binding. pub fn new(exchange_type: ExchangeType) -> Self { match exchange_type { ExchangeType::Direct => Bindings::Direct(vec![]), @@ -403,9 +429,31 @@ pub fn match_header( #[cfg(test)] mod tests { + use crate::queue::handler::QueueCommandSource; + use super::*; + use tokio::sync::mpsc; + fn direct_bind_queue(bindings: &mut Bindings, routing_key: &str, queue_name: &str) -> QueueCommandSource { + let (tx, rx) = mpsc::channel(1); + + let result = bindings.add_direct_binding(routing_key.to_string(), queue_name.to_string(), tx); + + assert!(result); + + rx + } + + fn new_message(exchange: &str, routing_key: &str) -> Message { + let mut message = Message::default(); + + message.exchange = exchange.to_string(); + message.routing_key = routing_key.to_string(); + + message + } + #[test] fn test_match_routing_key() { assert!(match_routing_key("stocks.nwse.goog", "stocks.nwse.goog")); @@ -416,15 +464,12 @@ mod tests { } #[tokio::test] - async fn test_direct_binding() { - let (tx, mut rx) = mpsc::channel(1); - + async fn direct_binding_one_routing_key() { let mut bindings = Bindings::new(ExchangeType::Direct); - bindings.add_direct_binding("extension.png".to_string(), "png-images".to_string(), tx); - let mut message = Message::default(); - message.exchange = "images".to_string(); - message.routing_key = "extension.png".to_string(); + let mut rx = direct_bind_queue(&mut bindings, "extension.png", "png-images"); + + let message = new_message("images", "extension.png"); let result = bindings.route_message(message).await.unwrap(); @@ -434,4 +479,21 @@ mod tests { let delivered = rx.recv().await.unwrap(); assert!(matches!(delivered, QueueCommand::PublishMessage(_))); } + + #[tokio::test] + async fn direct_binding_multiple_queues_same_routing_key() { + let mut bindings = Bindings::new(ExchangeType::Direct); + + let mut jpg = direct_bind_queue(&mut bindings, "jpg-images", "extension.jpg"); + let mut jpeg = direct_bind_queue(&mut bindings, "jpg-images", "extension.jpeg"); + + let message = new_message("images", "jpg-images"); + + let result = bindings.route_message(message).await.unwrap(); + + assert!(result.is_none()); + + assert!(matches!(jpg.recv().await.unwrap(), QueueCommand::PublishMessage(_))); + assert!(matches!(jpeg.recv().await.unwrap(), QueueCommand::PublishMessage(_))); + } } diff --git a/metalmq/src/queue/handler/mod.rs b/metalmq/src/queue/handler/mod.rs index 5e4e75c..3272bf6 100644 --- a/metalmq/src/queue/handler/mod.rs +++ b/metalmq/src/queue/handler/mod.rs @@ -29,6 +29,7 @@ use tokio::sync::{mpsc, oneshot}; use self::outbox::{Outbox, OutgoingMessage}; pub type QueueCommandSink = mpsc::Sender; +pub type QueueCommandSource = mpsc::Receiver; /// Delivery tag of a message #[derive(Clone, Debug)] diff --git a/metalmq/src/queue/handler/tests.rs b/metalmq/src/queue/handler/tests.rs index 8ecad59..e82843c 100644 --- a/metalmq/src/queue/handler/tests.rs +++ b/metalmq/src/queue/handler/tests.rs @@ -2,7 +2,7 @@ use super::*; use crate::{ error::{to_runtime_error, ErrorScope}, message::{Message, MessageContent}, - tests::recv_timeout, + tests::recv::recv_with_timeout, }; use metalmq_codec::{codec::Frame, frame::AMQPFrame}; use std::sync::Arc; @@ -344,7 +344,7 @@ async fn publish_to_queue_with_one_consumer() { let result = tester.state.handle_command(cmd).await; assert!(result.is_ok()); - let frame = recv_timeout(&mut msg_rx).await.unwrap(); + let frame = recv_with_timeout(&mut msg_rx).await.unwrap(); let message = parse_message(frame).unwrap(); assert_eq!(message.message.exchange, tester.exchange_name); @@ -389,14 +389,14 @@ async fn unacked_messages_should_be_put_back_in_the_queue() { .handle_command(tester.command_publish("1st")) .await .unwrap(); - let _msg_res = recv_timeout(&mut frx).await.unwrap(); + let _msg_res = recv_with_timeout(&mut frx).await.unwrap(); tester .state .handle_command(tester.command_publish("2nd")) .await .unwrap(); - let _msg_res = recv_timeout(&mut frx).await.unwrap(); + let _msg_res = recv_with_timeout(&mut frx).await.unwrap(); let (rtx, rrx) = oneshot::channel(); tester @@ -430,7 +430,7 @@ async fn consume_unacked_removes_messages_from_the_queue_after_send() { .await .unwrap(); - recv_timeout(&mut frx).await; + recv_with_timeout(&mut frx).await; assert!(tester.state.messages.is_empty()); } @@ -447,7 +447,7 @@ async fn basic_get_then_basic_ack_deletes_the_message_from_the_queue() { let mut frx = tester.passive_consume().await; - let fr = recv_timeout(&mut frx).await.unwrap(); + let fr = recv_with_timeout(&mut frx).await.unwrap(); let msg = parse_message(fr).unwrap(); assert_eq!(msg.consumer_tag, ""); @@ -481,7 +481,7 @@ async fn basic_get_and_consume_without_ack_and_get_should_redeliver() { let mut frx = tester.passive_consume().await; - let _frame = recv_timeout(&mut frx).await.unwrap(); + let _frame = recv_with_timeout(&mut frx).await.unwrap(); tester .state @@ -490,7 +490,7 @@ async fn basic_get_and_consume_without_ack_and_get_should_redeliver() { .unwrap(); let mut frx = tester.passive_consume().await; - let fr = recv_timeout(&mut frx).await.unwrap(); + let fr = recv_with_timeout(&mut frx).await.unwrap(); println!("{:?}", fr); diff --git a/metalmq/src/tests/consume.rs b/metalmq/src/tests/consume.rs index 0659c31..3e47ed9 100644 --- a/metalmq/src/tests/consume.rs +++ b/metalmq/src/tests/consume.rs @@ -12,10 +12,10 @@ async fn one_consumer() { .basic_consume(1, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag")) .await; - let consume_ok = test_client.recv_frames().await; + let consume_ok = test_client.recv_single_frame().await; assert!(matches!( - dbg!(consume_ok.get(0)).unwrap(), + dbg!(consume_ok), frame::AMQPFrame::Method( 1, _, @@ -63,7 +63,7 @@ async fn one_consumer_redeliver() { test_client .basic_consume(1, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag")) .await; - test_client.recv_frames().await; + test_client.recv_single_frame().await; // Publish a message test_client @@ -82,7 +82,7 @@ async fn one_consumer_redeliver() { test_client .basic_consume(3, BasicConsumeArgs::default().queue("q-direct").consumer_tag("ctag2")) .await; - test_client.recv_frames().await; + test_client.recv_single_frame().await; // Receive the message again let mut deliver = test_client.recv_frames().await; diff --git a/metalmq/src/tests/mod.rs b/metalmq/src/tests/mod.rs index 96cde3d..2fe4b45 100644 --- a/metalmq/src/tests/mod.rs +++ b/metalmq/src/tests/mod.rs @@ -2,6 +2,7 @@ mod connect; mod consume; mod publish; mod queue; +pub mod recv; mod test_client; use std::collections::HashMap; @@ -213,18 +214,3 @@ impl TestCase { .unwrap(); } } - -/// Receiving with timeout -pub async fn recv_timeout(rx: &mut mpsc::Receiver) -> Option { - let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(1)); - tokio::pin!(sleep); - - tokio::select! { - frame = rx.recv() => { - frame - } - _ = &mut sleep => { - None - } - } -} diff --git a/metalmq/src/tests/publish.rs b/metalmq/src/tests/publish.rs index 635ab71..b26af5f 100644 --- a/metalmq/src/tests/publish.rs +++ b/metalmq/src/tests/publish.rs @@ -1,10 +1,14 @@ use metalmq_codec::frame::{self, BasicPublishArgs}; use crate::tests::{ - test_client::{basic_deliver_args, sleep, unpack_frames}, + test_client::{basic_deliver_args, sleep}, TestCase, }; +/// Publish a mandatory message to an exchange which doesn't have binding on the given routing key. +/// The message should be returned since it is mandatory and the routing cannot be done. +/// Then publish another mandatory message with the correct routing key and check if there is no +/// Basic.Return is got by the publisher. #[tokio::test] async fn basic_publish_mandatory_message() { let test_case = TestCase::new().await; @@ -47,6 +51,8 @@ async fn basic_publish_mandatory_message() { test_client.close().await; } +/// Perform a basic get on an empty queue and check if the consumer get the empty get message. Then +/// publish a message and check if the consumer gets the message with Basic.Get. #[tokio::test] async fn basic_get_empty_and_ok() { let test_case = TestCase::new().await; @@ -56,10 +62,10 @@ async fn basic_get_empty_and_ok() { .basic_get(2, frame::BasicGetArgs::new("q-fanout").no_ack(false)) .await; - let get_empty_frames = test_client.recv_frames().await; + let get_empty_frames = test_client.recv_single_frame().await; assert!(matches!( - dbg!(get_empty_frames).get(0).unwrap(), + dbg!(get_empty_frames), frame::AMQPFrame::Method(2, _, frame::MethodFrameArgs::BasicGetEmpty) )); @@ -86,6 +92,7 @@ async fn basic_get_empty_and_ok() { test_case.teardown().await; } +/// Consumer sends back a multi ack message after getting a couple of messages. #[tokio::test] async fn basic_ack_multiple() { let test_case = TestCase::new().await; @@ -180,13 +187,13 @@ async fn basic_ack_multiple() { )); } +/// Publish a topic exchange with the correct routing key, and consume in another channel to see if +/// the message is correctly routed to the queue. #[tokio::test] async fn publish_to_topic_exchange() { let test_case = TestCase::new().await; let mut test_client = test_case.new_client_with_channels(&[1, 2]).await; - test_client.open_channel(2).await; - test_client .publish_content(1, "x-topic", "topic.key", b"Topic test") .await; @@ -198,13 +205,16 @@ async fn publish_to_topic_exchange() { ) .await; - let _consume_ok = unpack_frames(test_client.recv_timeout().await.unwrap()); + let _consume_ok = test_client.recv_single_frame().await; - let delivery = unpack_frames(test_client.recv_timeout().await.unwrap()); + let delivery = test_client.recv_frames().await; let mut frames = delivery.into_iter(); let basic_deliver = basic_deliver_args(frames.next().unwrap()); assert_eq!("x-topic", basic_deliver.exchange_name); + assert_eq!("ctag", basic_deliver.consumer_tag); + assert_eq!(false, basic_deliver.redelivered); + // ... if let frame::AMQPFrame::ContentHeader(header) = frames.next().unwrap() { @@ -217,3 +227,35 @@ async fn publish_to_topic_exchange() { test_case.teardown().await; } + +/// Switch the channel to confirm mode and publish a message. And Basic.Ack should come back from +/// the server. +#[tokio::test] +async fn channel_in_confirm_mode_acks_publish() { + let test_case = TestCase::new().await; + let mut client = test_case.new_client_with_channel(1).await; + + client.confirm_select(1).await; + + let _confirm_select_ok = client.recv_single_frame().await; + + client + .publish_content(1, "x-topic", "topic.key", b"Confirmed message") + .await; + + let ack = client.recv_single_frame().await; + + assert!(matches!( + ack, + frame::AMQPFrame::Method( + 1, + _, + frame::MethodFrameArgs::BasicAck(frame::BasicAckArgs { + multiple: false, + delivery_tag: 1u64 + }) + ) + )); + + test_case.teardown().await; +} diff --git a/metalmq/src/tests/queue.rs b/metalmq/src/tests/queue.rs index de5c505..c0d560d 100644 --- a/metalmq/src/tests/queue.rs +++ b/metalmq/src/tests/queue.rs @@ -1,4 +1,4 @@ -use crate::tests::{test_client::unpack_frames, TestCase}; +use crate::tests::{test_client::sleep, TestCase}; use metalmq_codec::frame::{self, ExchangeDeclareArgs}; #[tokio::test] @@ -46,7 +46,7 @@ async fn bind_queue_with_validation() { //assert_eq!(channel_error.code, ConnectionError::CommandInvalid as u16); } -//#[tokio::test] +#[tokio::test] async fn queue_purge_clean_the_queue() { let test_case = TestCase::new().await; let mut client = test_case.new_client_with_channels(&[1, 2]).await; @@ -57,15 +57,17 @@ async fn queue_purge_clean_the_queue() { .await; } + sleep(100).await; + client .connection .handle_client_frame(frame::QueuePurgeArgs::default().queue_name("q-fanout").frame(2)) .await .unwrap(); - let purge_ok = unpack_frames(client.recv_timeout().await.unwrap()); + let purge_ok = client.recv_single_frame().await; assert!(matches!( - dbg!(purge_ok).get(0).unwrap(), + dbg!(purge_ok), frame::AMQPFrame::Method( _, _, @@ -83,7 +85,7 @@ async fn queue_delete_unbind_and_cancel_consume() { client .queue_declare(1, frame::QueueDeclareArgs::default().name("queue-delete-test")) .await; - client.recv_frames().await; + client.recv_single_frame().await; client .exchange_declare( @@ -93,7 +95,7 @@ async fn queue_delete_unbind_and_cancel_consume() { .exchange_type("direct"), ) .await; - client.recv_frames().await; + client.recv_single_frame().await; client .connection @@ -104,7 +106,7 @@ async fn queue_delete_unbind_and_cancel_consume() { ) .await .unwrap(); - client.recv_frames().await; + client.recv_single_frame().await; //let mut consumer = tc.new_client(); diff --git a/metalmq/src/tests/recv.rs b/metalmq/src/tests/recv.rs new file mode 100644 index 0000000..51bee14 --- /dev/null +++ b/metalmq/src/tests/recv.rs @@ -0,0 +1,35 @@ +use metalmq_codec::{codec::Frame, frame::AMQPFrame}; +use tokio::sync::mpsc; + +/// Receiving with timeout +pub async fn recv_with_timeout(rx: &mut mpsc::Receiver) -> Option { + let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(1)); + tokio::pin!(sleep); + + tokio::select! { + frame = rx.recv() => { + frame + } + _ = &mut sleep => { + None + } + } +} + +pub async fn recv_single_frame(rx: &mut mpsc::Receiver) -> AMQPFrame { + let f = recv_with_timeout(rx).await.expect("No response is received"); + + match f { + Frame::Frame(sf) => sf, + Frame::Frames(_) => panic!("Multiple frames are received"), + } +} + +pub async fn recv_multiple_frames(rx: &mut mpsc::Receiver) -> Vec { + let f = recv_with_timeout(rx).await.expect("No response is received"); + + match f { + Frame::Frame(_) => panic!("A single frame is received"), + Frame::Frames(mf) => mf, + } +} diff --git a/metalmq/src/tests/test_client.rs b/metalmq/src/tests/test_client.rs index d4e5967..ef1b9ed 100644 --- a/metalmq/src/tests/test_client.rs +++ b/metalmq/src/tests/test_client.rs @@ -6,6 +6,8 @@ use tokio::sync::mpsc; use crate::client::connection::types::Connection; +use super::recv; + /// The test client used by test cases. It makes available the sender part of the input channel, so /// one can control the connection by sending frames in the `conn_tx`. pub struct TestClient { @@ -93,6 +95,13 @@ impl TestClient { self.connection.handle_client_frame(f).await.unwrap(); } + pub async fn confirm_select(&mut self, channel: u16) { + self.connection + .handle_client_frame(frame::confirm_select(channel)) + .await + .unwrap(); + } + pub async fn publish_content(&mut self, channel: u16, exchange: &str, routing_key: &str, message: &[u8]) { let f = frame::BasicPublishArgs::new(exchange) .routing_key(routing_key) @@ -129,25 +138,15 @@ impl TestClient { /// Receiving with timeout pub async fn recv_timeout(&mut self) -> Option { - let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(1)); - tokio::pin!(sleep); - - tokio::select! { - frame = self.conn_rx.recv() => { - frame - } - _ = &mut sleep => { - None - } - } + recv::recv_with_timeout(&mut self.conn_rx).await } pub async fn recv_frames(&mut self) -> Vec { - unpack_frames(self.recv_timeout().await.expect("At least one frame is expected")) + recv::recv_multiple_frames(&mut self.conn_rx).await } pub async fn recv_single_frame(&mut self) -> frame::AMQPFrame { - unpack_single_frame(self.recv_timeout().await.expect("A frame is expected")) + recv::recv_single_frame(&mut self.conn_rx).await } pub async fn close(&mut self) { @@ -163,21 +162,6 @@ pub async fn sleep(ms: u32) { tokio::time::sleep(std::time::Duration::from_millis(ms.into())).await; } -pub fn unpack_single_frame(f: Frame) -> frame::AMQPFrame { - if let Frame::Frame(single_frame) = f { - single_frame - } else { - panic!("Frame {f:?} is not a single frame"); - } -} - -pub fn unpack_frames(f: Frame) -> Vec { - match f { - Frame::Frame(sf) => vec![sf], - Frame::Frames(mf) => mf, - } -} - // It should be an assert pub fn basic_deliver_args(f: frame::AMQPFrame) -> frame::BasicDeliverArgs { if let frame::AMQPFrame::Method(_, _, frame::MethodFrameArgs::BasicDeliver(args)) = f {