Skip to content

Commit

Permalink
Document modules
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Sep 19, 2024
1 parent ae49890 commit d6f6ab8
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 230 deletions.
7 changes: 7 additions & 0 deletions metalmq/src/client/channel.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! Represents the channel opened in a connection by a client.
//!
//! It maintains the channel state, the consumings the passive consume processes by basic get
//! commands.
//!
//! Each channel is a spawned tokio process, so a longer handling of a message doesn't block the
//! whole connection or another channel.
pub mod basic;
pub mod content;
pub mod exchange;
Expand Down
10 changes: 10 additions & 0 deletions metalmq/src/client/conn.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
//! Handles the TCP socket level connection.
//!
//! When a client connects a new tokio process is spawned which handles the incoming frames. The
//! frames are already decoded by [`AMQPCodec`]. To avoid blocking the outgoing frames are passed
//! by a different `mpsc::channel` usually named as `outgoing`. In that way we avoided the circular
//! blocking of channel senders and receivers.
//!
//! Incoming and outgoing loop handles the heartbeats which we expect and send in a timely manner.
//!
//! The [`handle_in_stream_data`] function forwards the frames to the connection.
use crate::{
client::connection::types::Connection,
error::{to_runtime_error, Result},
Expand Down
4 changes: 4 additions & 0 deletions metalmq/src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Represent the connection when a client connects to the server.
//!
//! It maintains the connection state and also servers as a forwarder of the frames to the open
//! channels. It handles the outgoing messages, too.
pub mod open_close;
pub mod router;
pub mod types;
Expand Down
4 changes: 4 additions & 0 deletions metalmq/src/client/connection/open_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl Connection {
Ok(())
}

// TODO really we need to release the resources here, and then we need to close the connection.
// Now we close the connection right after connection.close which is not good since clients
// want to send the connection.close-ok and they fail that connection has been closed
// unexpectedly.
pub async fn handle_channel_close_ok(&mut self, channel: u16) -> Result<()> {
// TODO not sure if we need to send out basic cancel here
self.channel_handlers.remove(&channel);
Expand Down
13 changes: 13 additions & 0 deletions metalmq/src/client/connection/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct ExclusiveQueue {
pub queue_name: String,
}

/// Represent a client connection to a server.
pub struct Connection {
/// Unique ID of the connection.
pub id: String,
Expand All @@ -26,15 +27,27 @@ pub struct Connection {
pub auto_delete_exchanges: Vec<String>,
/// Exclusive queues created by the connection.
pub exclusive_queues: Vec<ExclusiveQueue>,
/// Sender part of the queue command channel to the queue manager
pub qm: queue::manager::QueueManagerSink,
/// Sender part of the exchange command channel to the exchange manager
pub em: exchange::manager::ExchangeManagerSink,
/// [`JoinHandle`] of the channels.
///
/// To be sure that a channel is stopped one need to `await` on that handler.
pub channel_handlers: HashMap<u16, JoinHandle<Result<()>>>,
/// The incoming frame channel of a [`crate::client::channel::types::Channel`].
///
/// Through this channel the AMQP channel gets the incoming frames, so it can handle them.
pub channel_receivers: HashMap<u16, mpsc::Sender<Command>>,
/// Sink for AMQP frames toward the client
///
/// This is cloned by each channel in order that they can send back response frames.
pub outgoing: mpsc::Sender<Frame>,
}

impl Connection {
/// Creates a new connection with the queue and exchange managers and the outgoing frame
/// channel.
pub fn new(context: Context, outgoing: mpsc::Sender<Frame>) -> Self {
let conn_id = Uuid::new_v4().as_hyphenated().to_string();

Expand Down
2 changes: 2 additions & 0 deletions metalmq/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Handling client connections, receiving frames, forwarding to the connection representation of
//! the client connection, forwarding frames to channels which are owned by the connection.
pub mod channel;
pub mod conn;
pub mod connection;
10 changes: 8 additions & 2 deletions metalmq/src/tests/connect.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use metalmq_codec::frame::{self, AMQPFrame};

use crate::error::{ErrorScope, Result, RuntimeError};
use crate::tests::{recv, TestCase};
use crate::tests::{recv, test_case::TestCase};

/// Frame level test of a connection start, tune and open.
#[tokio::test]
async fn connect_with_username_password() -> Result<()> {
let test_case = TestCase::new().await;
Expand Down Expand Up @@ -66,6 +67,7 @@ async fn connect_with_username_password() -> Result<()> {
Ok(())
}

/// Test a connection open, channel open, channel close and connection close positive flow.
#[tokio::test]
async fn connect_and_open_channel() -> Result<()> {
let test_case = TestCase::new().await;
Expand Down Expand Up @@ -93,9 +95,12 @@ async fn connect_and_open_channel() -> Result<()> {
AMQPFrame::Method(_, frame::CONNECTION_CLOSE_OK, frame::MethodFrameArgs::ConnectionCloseOk)
));

assert!(client.connection.await.is_ok());

Ok(())
}

/// Connect with bad password should end up in connection error.
#[tokio::test]
async fn connect_with_bad_password() -> Result<()> {
let test_case = TestCase::new().await;
Expand All @@ -117,6 +122,7 @@ async fn connect_with_bad_password() -> Result<()> {
Ok(())
}

/// Opening two channels with the same number should end up in connection error.
#[tokio::test]
async fn channel_reopen_with_same_number() -> Result<()> {
let test_case = TestCase::new().await;
Expand All @@ -135,7 +141,7 @@ async fn channel_reopen_with_same_number() -> Result<()> {
},
));

dbg!(client.connection.await);
assert!(dbg!(client.connection.await).is_ok());

Ok(())
}
3 changes: 1 addition & 2 deletions metalmq/src/tests/consume.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use metalmq_codec::frame::{self, BasicCancelArgs, BasicConsumeArgs};
use test_client::basic_deliver_args;

use crate::tests::*;
use crate::tests::{test_case::TestCase, test_client::basic_deliver_args};

#[tokio::test]
async fn one_consumer() {
Expand Down
225 changes: 1 addition & 224 deletions metalmq/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,228 +3,5 @@ mod consume;
mod publish;
mod queue;
pub mod recv;
mod test_case;
mod test_client;

use std::collections::HashMap;

use tokio::sync::mpsc;

use crate::{
client::{conn, connection::types::Connection},
exchange::{
manager::{self as em, ExchangeManagerSink},
Exchange, ExchangeType,
},
queue::{
manager::{self as qm, QueueManagerSink},
Queue,
},
Context, Result,
};
use metalmq_codec::{codec::Frame, frame};
use test_client::TestClient;

/// TestCase for System Under Test which spawns an exchange manager and a queue manager and can
/// test more integrated features like forwarding messages from exchanges to queues.
struct TestCase {
em: ExchangeManagerSink,
qm: QueueManagerSink,
setup_tx: mpsc::Sender<Frame>,
setup_rx: mpsc::Receiver<Frame>,
}

impl TestCase {
/// Create the internal engine of MetalMQ, the exchange and the queue manager, it connects them
/// and also declare all types of exchanges for tests.
async fn new() -> Self {
let em = crate::exchange::manager::start();
let qm = crate::queue::manager::start(em.clone());
let (setup_tx, setup_rx) = mpsc::channel(128);

Self {
em,
qm,
setup_tx,
setup_rx,
}
.setup()
.await
}

/// Create a new client and return the outgoing channel part as well.
async fn new_client(&self) -> TestClient {
let ctx = Context {
exchange_manager: self.em.clone(),
queue_manager: self.qm.clone(),
};
let (incoming_tx, incoming_rx) = mpsc::channel(16);
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let connection = Connection::new(ctx, outgoing_tx);

let jh = tokio::spawn(async move { dbg!(TestCase::client_loop(connection, incoming_rx).await) });

TestClient {
connection: jh,
conn_tx: incoming_tx,
conn_rx: outgoing_rx,
}
}

async fn client_loop(mut connection: Connection, mut incoming_rx: mpsc::Receiver<Frame>) -> Result<()> {
while let Some(f) = incoming_rx.recv().await {
if !conn::handle_in_stream_data(&mut connection, f).await? {
break;
}
}

Ok(())
}

async fn new_client_with_channel(&self, channel: u16) -> TestClient {
let mut client = self.new_client().await;

client.connect().await;
client.open_channel(channel).await;

client
}

async fn new_client_with_channels(&self, channels: &[u16]) -> TestClient {
let mut client = self.new_client().await;

client.connect().await;

for channel in channels {
client.open_channel(*channel).await;
}

client
}

async fn setup(mut self) -> Self {
self.exchange_declare("x-direct", ExchangeType::Direct).await;
self.exchange_declare("x-fanout", ExchangeType::Fanout).await;
self.exchange_declare("x-topic", ExchangeType::Topic).await;
self.exchange_declare("x-headers", ExchangeType::Headers).await;

self.queue_declare("q-direct").await;
self.queue_declare("q-fanout").await;
self.queue_declare("q-topic").await;
self.queue_declare("q-headers").await;

self.queue_bind("q-direct", "x-direct", "magic-key", None).await;
self.queue_bind("q-fanout", "x-fanout", "", None).await;
self.queue_bind("q-topic", "x-topic", "topic.#", None).await;

let mut args: HashMap<String, frame::AMQPFieldValue> = HashMap::new();
args.insert("x-match".into(), frame::AMQPFieldValue::LongString("any".into()));
args.insert(
"message.type".into(),
frame::AMQPFieldValue::LongString("string".into()),
);

self.queue_bind("q-headers", "x-headers", "", Some(args)).await;

while self.setup_rx.try_recv().is_ok() {}

self
}

async fn teardown(self) {
self.queue_delete("q-direct").await;
self.exchange_delete("x-direct").await;
self.queue_delete("q-fanout").await;
self.exchange_delete("x-fanout").await;
self.queue_delete("q-topic").await;
self.exchange_delete("x-topic").await;
}

async fn exchange_declare(&self, name: &str, exchange_type: ExchangeType) {
em::declare_exchange(
&self.em,
em::DeclareExchangeCommand {
channel: 1,
exchange: Exchange::default().name(name).exchange_type(exchange_type),
passive: false,
outgoing: self.setup_tx.clone(),
},
)
.await
.unwrap();
}

async fn exchange_delete(&self, name: &str) {
em::delete_exchange(
&self.em,
em::DeleteExchangeCommand {
channel: 1,
if_unused: false,
exchange_name: name.to_string(),
},
)
.await
.unwrap();
}

async fn queue_declare(&self, name: &str) {
qm::declare_queue(
&self.qm,
qm::QueueDeclareCommand {
queue: Queue::default().name(name),
conn_id: "does-not-matter".to_string(),
channel: 1,
passive: false,
},
)
.await
.unwrap();
}

async fn queue_bind(
&self,
queue_name: &str,
exchange_name: &str,
routing_key: &str,
args: Option<frame::FieldTable>,
) {
let sink = qm::get_command_sink(
&self.qm,
qm::GetQueueSinkQuery {
channel: 1,
queue_name: queue_name.to_string(),
},
)
.await
.unwrap();

em::bind_queue(
&self.em,
em::BindQueueCommand {
conn_id: "does-not-matter".to_string(),
channel: 1,
exchange_name: exchange_name.to_string(),
queue_name: queue_name.to_string(),
routing_key: routing_key.to_string(),
args,
queue_sink: sink,
},
)
.await
.unwrap();
}

async fn queue_delete(&self, queue_name: &str) {
qm::delete_queue(
&self.qm,
qm::QueueDeleteCommand {
conn_id: "does-not-matter".to_string(),
channel: 1,
queue_name: queue_name.to_string(),
if_unused: false,
if_empty: false,
},
)
.await
.unwrap();
}
}
2 changes: 1 addition & 1 deletion metalmq/src/tests/publish.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use metalmq_codec::frame::{self, BasicPublishArgs};

use crate::tests::{
test_case::TestCase,
test_client::{basic_deliver_args, sleep},
TestCase,
};

/// Publish a mandatory message to an exchange which doesn't have binding on the given routing key.
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/tests/queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::tests::{test_client::sleep, TestCase};
use crate::tests::{test_case::TestCase, test_client::sleep};
use metalmq_codec::frame::{self, ExchangeDeclareArgs};

#[tokio::test]
Expand Down
Loading

0 comments on commit d6f6ab8

Please sign in to comment.