Skip to content

Commit

Permalink
Send Ack with Basic.Return in confirm mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasrichard committed Oct 4, 2024
1 parent 67268ed commit e120eb6
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 13 deletions.
29 changes: 27 additions & 2 deletions amqp-compliance/helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
__name__ = "helper"

from contextlib import contextmanager
from typing import Optional

import pika

def connect(username: str ="guest", password="guest", host="::1", port=5672, vhost="/"):
def connect(
username: str = "guest",
password: str = "guest",
host: str = "::1",
port: int = 5672,
vhost: str = "/"
) -> pika.BlockingConnection:
return pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
Expand All @@ -14,7 +22,24 @@ def connect(username: str ="guest", password="guest", host="::1", port=5672, vho
)

@contextmanager
def channel(number=None):
def connection() -> pika.BlockingConnection:
"""
Open a connection with the default parameters.
"""
conn = connect()

try:
yield connect()
finally:
if conn.is_open:
conn.close()

@contextmanager
def channel(number: Optional[int] = None) -> pika.adapters.blocking_connection.BlockingChannel:
"""
Open a connection with the default parameters and open a channel with the specified channel
number.
"""
conn = connect()
ch = conn.channel(number)

Expand Down
12 changes: 6 additions & 6 deletions amqp-compliance/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ def test_reopen_the_same_channel():
"""
Open the same channel results in a channel error.
"""
conn = helper.connect()
conn.channel(17)

with pytest.raises(pika.exceptions.ConnectionClosedByBroker) as exp:
with helper.connection() as conn:
conn.channel(17)

assert 504 == exp.value.reply_code
assert str(exp.value.reply_text).startswith("CHANNEL_ERROR")
with pytest.raises(pika.exceptions.ConnectionClosedByBroker) as exp:
conn.channel(17)

assert 504 == exp.value.reply_code
assert str(exp.value.reply_text).startswith("CHANNEL_ERROR")
12 changes: 12 additions & 0 deletions metalmq/src/client/channel/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ impl Channel {
message::send_basic_return(returned_message, self.frame_size, &self.outgoing)
.await
.unwrap();

if let Some(dt) = &mut self.next_confirm_delivery_tag {
// We can keep this mut shorter, if it matters
self.outgoing
.send(Frame::Frame(
frame::BasicAckArgs::default().delivery_tag(*dt).frame(channel),
))
.await
.unwrap();

*dt += 1;
}
}
None => {
// TODO do we need to send ack if the message is mandatory or
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/client/channel/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl Channel {
let passive = args.flags.contains(frame::ExchangeDeclareFlags::PASSIVE);
let exchange_name = args.exchange_name.clone();

exchange::validate_exchange_type(&args.exchange_type)?;
exchange::validate_exchange_type(self.number, &args.exchange_type)?;
exchange::validate_exchange_name(self.number, &args.exchange_name)?;

let cmd = DeclareExchangeCommand {
Expand Down
2 changes: 1 addition & 1 deletion metalmq/src/client/connection/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Connection {
}
ChannelOpen => {
if ch_tx.is_some() {
ConnectionError::ChannelError.into_result(class_method, "Channel already exist")
ConnectionError::ChannelError.into_result(class_method, "CHANNEL_ERROR - Channel already exist")
} else {
self.handle_channel_open(channel).await?;
//self.start_channel(channel).await?;
Expand Down
9 changes: 6 additions & 3 deletions metalmq/src/exchange/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ pub fn validate_exchange_name(channel: Channel, exchange_name: &str) -> Result<(
Ok(())
}

pub fn validate_exchange_type(exchange_type: &str) -> Result<()> {
pub fn validate_exchange_type(channel: Channel, exchange_type: &str) -> Result<()> {
match ExchangeType::from_str(exchange_type) {
Ok(_) => Ok(()),
Err(_) => ConnectionError::CommandInvalid
.into_result(frame::EXCHANGE_DECLARE, "COMMAND_INVALID - Exchange type is invalid"),
Err(_) => ChannelError::PreconditionFailed.into_result(
channel,
frame::EXCHANGE_DECLARE,
"PRECONDITION_FAILED - Exchange type is invalid",
),
}
}

Expand Down

0 comments on commit e120eb6

Please sign in to comment.