Skip to content

Commit

Permalink
Merge pull request #58 from jarikp/master
Browse files Browse the repository at this point in the history
incorrect implementation of RabbitMqQueueOptionsBuilder.SetAutoDelete() and ClientConnectionName(string connectionName) to RabbitMqOptionsBuilder to propogate the connection_name property
  • Loading branch information
mookid8000 authored Aug 27, 2019
2 parents 2e18d0d + be5772c commit 4865b4d
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 54 deletions.
125 changes: 87 additions & 38 deletions Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,125 @@
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Rebus.RabbitMq.Tests
{
[TestFixture]
public class RabbitMqCreateQueueTest : FixtureBase
{
readonly string _testQueue1 = TestConfig.GetName("test_queue_1");

protected override void SetUp()
[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart_()
{
RabbitMqTransportFactory.DeleteQueue(_testQueue1);
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetAutoDelete(false))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});

using (var bus = configurer.Start())
{
Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}

Thread.Sleep(5000);
Assert.IsTrue(RabbitMqTransportFactory.QueueExists(testScope.QeueuName));
}
}

[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart()
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_THEN_BusCanStart()
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1)
.InputQueueOptions(o => o.SetAutoDelete(false))
.AddClientProperties(new Dictionary<string, string> {
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetAutoDelete(true))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});
var bus = configurer.Start();
});
});

Assert.IsTrue(bus.Advanced.Workers.Count > 0);
using (var bus = configurer.Start())
{
Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}
}
}


[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_AND_TTL_Positive_THEN_BusCanStart()
public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_0_THEN_ArgumentException()
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1)
.InputQueueOptions(o => o.SetAutoDelete(true, 1))
.AddClientProperties(new Dictionary<string, string> {
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());

Assert.Throws<ArgumentException>(() =>
{
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetQueueTTL(0).SetDurable(false))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});
});

var bus = configurer.Start();

Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}
, "Time must be in milliseconds and greater than 0");
}
}

[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_AND_TTL_0_THEN_ArgumentExceptionThrown()
public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_5000_THEN_QueueIsDeleted_WHEN_5000msAfterConnectionClosed()
{
var activator = Using(new BuiltinHandlerActivator());

Assert.Throws<ArgumentException>(() =>
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());

var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1)
.InputQueueOptions(o => o.SetAutoDelete(true, 0))
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetQueueTTL(100))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});

using (var bus = configurer.Start())
{
Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}

Thread.Sleep(5000);
Assert.IsFalse(RabbitMqTransportFactory.QueueExists(testScope.QeueuName));
}
, "Time must be in milliseconds and greater than 0");
}

class QeueuNameTestScope : IDisposable
{
public string QeueuName { get; }

public QeueuNameTestScope()
{
QeueuName = Guid.NewGuid().ToString();
}

public void Dispose()
{
RabbitMqTransportFactory.DeleteQueue(QeueuName);
}
}

}
}
22 changes: 20 additions & 2 deletions Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ public static void DeleteQueue(string queueName)
}
}

public static bool QueueExists(string queueName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
try
{
model.QueueDeclarePassive(queueName);
return true;
}
catch (RabbitMQ.Client.Exceptions.OperationInterruptedException)
{
return false;
}
}
}

public static void DeleteExchange(string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
Expand Down Expand Up @@ -109,9 +127,9 @@ public static bool ExchangeExists(string exchangeName)
}
}
}

protected virtual RabbitMqTransport CreateRabbitMqTransport(string inputQueueAddress)
{
{
return new RabbitMqTransport(ConnectionString, inputQueueAddress, new ConsoleLoggerFactory(false));
}
}
Expand Down
142 changes: 136 additions & 6 deletions Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Rebus.RabbitMq;

Expand Down Expand Up @@ -147,7 +148,7 @@ public RabbitMqOptionsBuilder InputQueueOptions(Action<RabbitMqQueueOptionsBuild

return this;
}

/// <summary>
/// Configure input exchanges manually.
/// </summary>
Expand Down Expand Up @@ -177,7 +178,7 @@ public RabbitMqOptionsBuilder Ssl(SslSettings sslSettings)
SslSettings = sslSettings;
return this;
}

/// <summary>
/// Enable the publisher confirms protocol.
/// This method is intended to use when publishers cannot afford message loss.
Expand All @@ -187,7 +188,17 @@ public RabbitMqOptionsBuilder EnablePublisherConfirms(bool value = true)
PublisherConfirms = value;
return this;
}


/// <summary>
/// Set the connection_name property (user-friendly non-unique client connection name) of RabbitMQ connection, which is
/// shown in the connections overview list and in the client properites of a connection.
/// </summary>
/// <exception cref="InvalidOperationException">expcetion is thrown if another connection factory customizer is in use</exception>
public RabbitMqOptionsBuilder ClientConnectionName(string connectionName)
{
return CustomizeConnectionFactory(factory => new ConnectionFactoryClientNameDecorator(factory, connectionName));
}

internal bool? DeclareExchanges { get; private set; }
internal bool? DeclareInputQueue { get; private set; }
internal bool? BindInputQueue { get; private set; }
Expand All @@ -197,7 +208,7 @@ public RabbitMqOptionsBuilder EnablePublisherConfirms(bool value = true)
internal string TopicExchangeName { get; private set; }

internal int? MaxNumberOfMessagesToPrefetch { get; private set; }

internal SslSettings SslSettings { get; private set; }

internal RabbitMqCallbackOptionsBuilder CallbackOptionsBuilder { get; } = new RabbitMqCallbackOptionsBuilder();
Expand Down Expand Up @@ -251,14 +262,133 @@ internal void Configure(RabbitMqTransport transport)
{
transport.SetCallbackOptions(CallbackOptionsBuilder);
}

if (PublisherConfirms.HasValue)
{
transport.EnablePublisherConfirms(PublisherConfirms.Value);
}

transport.SetInputQueueOptions(QueueOptions);
transport.SetExchangeOptions(ExchangeOptions);
}

/// This is temporary decorator-fix, until Rebus is upgraded to a version 6+ of RabbitMQ.Client wich has new signature:
/// IConnection CreateConnection(IList AmqpTcpEndpoint endpoints, string clientProvidedName)
/// so it is more correct to provide the name of client connection in ConnectionManager.GetConnection() method, when connections are created.
class ConnectionFactoryClientNameDecorator : IConnectionFactory
{
private readonly IConnectionFactory _decoratedFactory;
private readonly string _clientProvidedName;

public IDictionary<string, object> ClientProperties
{
get { return _decoratedFactory.ClientProperties; }
set { _decoratedFactory.ClientProperties = value; }
}

public TimeSpan ContinuationTimeout
{
get { return _decoratedFactory.ContinuationTimeout; }
set { _decoratedFactory.ContinuationTimeout = value; }
}

public TimeSpan HandshakeContinuationTimeout
{
get { return _decoratedFactory.HandshakeContinuationTimeout; }
set { _decoratedFactory.HandshakeContinuationTimeout = value; }
}

public string Password
{
get { return _decoratedFactory.Password; }
set { _decoratedFactory.Password = value; }
}

public ushort RequestedChannelMax
{
get { return _decoratedFactory.RequestedChannelMax; }
set { _decoratedFactory.RequestedChannelMax = value; }
}

public uint RequestedFrameMax
{
get { return _decoratedFactory.RequestedFrameMax; }
set { _decoratedFactory.RequestedFrameMax = value; }
}

public ushort RequestedHeartbeat
{
get { return _decoratedFactory.RequestedHeartbeat; }
set { _decoratedFactory.RequestedHeartbeat = value; }
}

public TaskScheduler TaskScheduler
{
#pragma warning disable CS0618 // Type or member is obsolete
get { return _decoratedFactory.TaskScheduler; }
set { _decoratedFactory.TaskScheduler = value; }
#pragma warning restore CS0618 // Type or member is obsolete
}

public Uri Uri
{
get { return _decoratedFactory.Uri; }
set { _decoratedFactory.Uri = value; }
}

public bool UseBackgroundThreadsForIO
{
get { return _decoratedFactory.UseBackgroundThreadsForIO; }
set { _decoratedFactory.UseBackgroundThreadsForIO = value; }
}

public string UserName
{
get { return _decoratedFactory.UserName; }
set { _decoratedFactory.UserName = value; }
}

public string VirtualHost
{
get { return _decoratedFactory.VirtualHost; }
set { _decoratedFactory.VirtualHost = value; }
}

public ConnectionFactoryClientNameDecorator(IConnectionFactory originalFacotry, string clientProvidedName)
{
_decoratedFactory = originalFacotry;
_clientProvidedName = clientProvidedName;
}

public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
{
return _decoratedFactory.AuthMechanismFactory(mechanismNames);
}

public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
{
return (_decoratedFactory as RabbitMQ.Client.ConnectionFactory).CreateConnection(new DefaultEndpointResolver(endpoints), _clientProvidedName);
}

public IConnection CreateConnection()
{
return _decoratedFactory.CreateConnection(_clientProvidedName);
}

public IConnection CreateConnection(string clientProvidedName)
{
return _decoratedFactory.CreateConnection(clientProvidedName);
}

public IConnection CreateConnection(IList<string> hostnames)
{
return _decoratedFactory.CreateConnection(hostnames, _clientProvidedName);
}

public IConnection CreateConnection(IList<string> hostnames, string clientProvidedName)
{
return _decoratedFactory.CreateConnection(hostnames, clientProvidedName);
}
}
}
}
Loading

0 comments on commit 4865b4d

Please sign in to comment.