Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

incorrect implementation of RabbitMqQueueOptionsBuilder.SetAutoDelete() and ClientConnectionName(string connectionName) to RabbitMqOptionsBuilder to propogate the connection_name property #58

Merged
merged 5 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 87 additions & 38 deletions Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,125 @@
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Rebus.RabbitMq.Tests
{
[TestFixture]
public class RabbitMqCreateQueueTest : FixtureBase
{
readonly string _testQueue1 = TestConfig.GetName("test_queue_1");

protected override void SetUp()
[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart_()
{
RabbitMqTransportFactory.DeleteQueue(_testQueue1);
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetAutoDelete(false))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});

using (var bus = configurer.Start())
{
Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}

Thread.Sleep(5000);
Assert.IsTrue(RabbitMqTransportFactory.QueueExists(testScope.QeueuName));
}
}

[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart()
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_THEN_BusCanStart()
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1)
.InputQueueOptions(o => o.SetAutoDelete(false))
.AddClientProperties(new Dictionary<string, string> {
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetAutoDelete(true))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});
var bus = configurer.Start();
});
});

Assert.IsTrue(bus.Advanced.Workers.Count > 0);
using (var bus = configurer.Start())
{
Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}
}
}


[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_AND_TTL_Positive_THEN_BusCanStart()
public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_0_THEN_ArgumentException()
{
var activator = Using(new BuiltinHandlerActivator());
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1)
.InputQueueOptions(o => o.SetAutoDelete(true, 1))
.AddClientProperties(new Dictionary<string, string> {
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());

Assert.Throws<ArgumentException>(() =>
{
var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetQueueTTL(0).SetDurable(false))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});
});

var bus = configurer.Start();

Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}
, "Time must be in milliseconds and greater than 0");
}
}

[Test]
public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_AND_TTL_0_THEN_ArgumentExceptionThrown()
public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_5000_THEN_QueueIsDeleted_WHEN_5000msAfterConnectionClosed()
{
var activator = Using(new BuiltinHandlerActivator());

Assert.Throws<ArgumentException>(() =>
using (var testScope = new QeueuNameTestScope())
{
var activator = Using(new BuiltinHandlerActivator());

var configurer = Configure.With(activator)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1)
.InputQueueOptions(o => o.SetAutoDelete(true, 0))
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName)
.InputQueueOptions(o => o.SetQueueTTL(100))
.AddClientProperties(new Dictionary<string, string> {
{ "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" }
});
});

using (var bus = configurer.Start())
{
Assert.IsTrue(bus.Advanced.Workers.Count > 0);
}

Thread.Sleep(5000);
Assert.IsFalse(RabbitMqTransportFactory.QueueExists(testScope.QeueuName));
}
, "Time must be in milliseconds and greater than 0");
}

class QeueuNameTestScope : IDisposable
{
public string QeueuName { get; }

public QeueuNameTestScope()
{
QeueuName = Guid.NewGuid().ToString();
}

public void Dispose()
{
RabbitMqTransportFactory.DeleteQueue(QeueuName);
}
}

}
}
22 changes: 20 additions & 2 deletions Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ public static void DeleteQueue(string queueName)
}
}

public static bool QueueExists(string queueName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
try
{
model.QueueDeclarePassive(queueName);
return true;
}
catch (RabbitMQ.Client.Exceptions.OperationInterruptedException)
{
return false;
}
}
}

public static void DeleteExchange(string exchangeName)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };
Expand Down Expand Up @@ -109,9 +127,9 @@ public static bool ExchangeExists(string exchangeName)
}
}
}

protected virtual RabbitMqTransport CreateRabbitMqTransport(string inputQueueAddress)
{
{
return new RabbitMqTransport(ConnectionString, inputQueueAddress, new ConsoleLoggerFactory(false));
}
}
Expand Down
142 changes: 136 additions & 6 deletions Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client;
using Rebus.RabbitMq;

Expand Down Expand Up @@ -147,7 +148,7 @@ public RabbitMqOptionsBuilder InputQueueOptions(Action<RabbitMqQueueOptionsBuild

return this;
}

/// <summary>
/// Configure input exchanges manually.
/// </summary>
Expand Down Expand Up @@ -177,7 +178,7 @@ public RabbitMqOptionsBuilder Ssl(SslSettings sslSettings)
SslSettings = sslSettings;
return this;
}

/// <summary>
/// Enable the publisher confirms protocol.
/// This method is intended to use when publishers cannot afford message loss.
Expand All @@ -187,7 +188,17 @@ public RabbitMqOptionsBuilder EnablePublisherConfirms(bool value = true)
PublisherConfirms = value;
return this;
}


/// <summary>
/// Set the connection_name property (user-friendly non-unique client connection name) of RabbitMQ connection, which is
/// shown in the connections overview list and in the client properites of a connection.
/// </summary>
/// <exception cref="InvalidOperationException">expcetion is thrown if another connection factory customizer is in use</exception>
public RabbitMqOptionsBuilder ClientConnectionName(string connectionName)
{
return CustomizeConnectionFactory(factory => new ConnectionFactoryClientNameDecorator(factory, connectionName));
}

internal bool? DeclareExchanges { get; private set; }
internal bool? DeclareInputQueue { get; private set; }
internal bool? BindInputQueue { get; private set; }
Expand All @@ -197,7 +208,7 @@ public RabbitMqOptionsBuilder EnablePublisherConfirms(bool value = true)
internal string TopicExchangeName { get; private set; }

internal int? MaxNumberOfMessagesToPrefetch { get; private set; }

internal SslSettings SslSettings { get; private set; }

internal RabbitMqCallbackOptionsBuilder CallbackOptionsBuilder { get; } = new RabbitMqCallbackOptionsBuilder();
Expand Down Expand Up @@ -251,14 +262,133 @@ internal void Configure(RabbitMqTransport transport)
{
transport.SetCallbackOptions(CallbackOptionsBuilder);
}

if (PublisherConfirms.HasValue)
{
transport.EnablePublisherConfirms(PublisherConfirms.Value);
}

transport.SetInputQueueOptions(QueueOptions);
transport.SetExchangeOptions(ExchangeOptions);
}

/// This is temporary decorator-fix, until Rebus is upgraded to a version 6+ of RabbitMQ.Client wich has new signature:
/// IConnection CreateConnection(IList AmqpTcpEndpoint endpoints, string clientProvidedName)
/// so it is more correct to provide the name of client connection in ConnectionManager.GetConnection() method, when connections are created.
class ConnectionFactoryClientNameDecorator : IConnectionFactory
{
private readonly IConnectionFactory _decoratedFactory;
private readonly string _clientProvidedName;

public IDictionary<string, object> ClientProperties
{
get { return _decoratedFactory.ClientProperties; }
set { _decoratedFactory.ClientProperties = value; }
}

public TimeSpan ContinuationTimeout
{
get { return _decoratedFactory.ContinuationTimeout; }
set { _decoratedFactory.ContinuationTimeout = value; }
}

public TimeSpan HandshakeContinuationTimeout
{
get { return _decoratedFactory.HandshakeContinuationTimeout; }
set { _decoratedFactory.HandshakeContinuationTimeout = value; }
}

public string Password
{
get { return _decoratedFactory.Password; }
set { _decoratedFactory.Password = value; }
}

public ushort RequestedChannelMax
{
get { return _decoratedFactory.RequestedChannelMax; }
set { _decoratedFactory.RequestedChannelMax = value; }
}

public uint RequestedFrameMax
{
get { return _decoratedFactory.RequestedFrameMax; }
set { _decoratedFactory.RequestedFrameMax = value; }
}

public ushort RequestedHeartbeat
{
get { return _decoratedFactory.RequestedHeartbeat; }
set { _decoratedFactory.RequestedHeartbeat = value; }
}

public TaskScheduler TaskScheduler
{
#pragma warning disable CS0618 // Type or member is obsolete
get { return _decoratedFactory.TaskScheduler; }
set { _decoratedFactory.TaskScheduler = value; }
#pragma warning restore CS0618 // Type or member is obsolete
}

public Uri Uri
{
get { return _decoratedFactory.Uri; }
set { _decoratedFactory.Uri = value; }
}

public bool UseBackgroundThreadsForIO
{
get { return _decoratedFactory.UseBackgroundThreadsForIO; }
set { _decoratedFactory.UseBackgroundThreadsForIO = value; }
}

public string UserName
{
get { return _decoratedFactory.UserName; }
set { _decoratedFactory.UserName = value; }
}

public string VirtualHost
{
get { return _decoratedFactory.VirtualHost; }
set { _decoratedFactory.VirtualHost = value; }
}

public ConnectionFactoryClientNameDecorator(IConnectionFactory originalFacotry, string clientProvidedName)
{
_decoratedFactory = originalFacotry;
_clientProvidedName = clientProvidedName;
}

public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
{
return _decoratedFactory.AuthMechanismFactory(mechanismNames);
}

public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints)
{
return (_decoratedFactory as RabbitMQ.Client.ConnectionFactory).CreateConnection(new DefaultEndpointResolver(endpoints), _clientProvidedName);
}

public IConnection CreateConnection()
{
return _decoratedFactory.CreateConnection(_clientProvidedName);
}

public IConnection CreateConnection(string clientProvidedName)
{
return _decoratedFactory.CreateConnection(clientProvidedName);
}

public IConnection CreateConnection(IList<string> hostnames)
{
return _decoratedFactory.CreateConnection(hostnames, _clientProvidedName);
}

public IConnection CreateConnection(IList<string> hostnames, string clientProvidedName)
{
return _decoratedFactory.CreateConnection(hostnames, clientProvidedName);
}
}
}
}
Loading