diff --git a/Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs b/Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs index 1bc969c..868e201 100644 --- a/Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs +++ b/Rebus.RabbitMq.Tests/RabbitMqCreateQueueTest.cs @@ -7,76 +7,125 @@ using System.Linq; using System.Text; using System.Threading; -using System.Threading.Tasks; namespace Rebus.RabbitMq.Tests { [TestFixture] public class RabbitMqCreateQueueTest : FixtureBase { - readonly string _testQueue1 = TestConfig.GetName("test_queue_1"); - - protected override void SetUp() + [Test] + public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart_() { - RabbitMqTransportFactory.DeleteQueue(_testQueue1); + using (var testScope = new QeueuNameTestScope()) + { + var activator = Using(new BuiltinHandlerActivator()); + var configurer = Configure.With(activator) + .Transport(t => + { + t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName) + .InputQueueOptions(o => o.SetAutoDelete(false)) + .AddClientProperties(new Dictionary { + { "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" } + }); + }); + + using (var bus = configurer.Start()) + { + Assert.IsTrue(bus.Advanced.Workers.Count > 0); + } + + Thread.Sleep(5000); + Assert.IsTrue(RabbitMqTransportFactory.QueueExists(testScope.QeueuName)); + } } [Test] - public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_False_AND_TTL0_THEN_BusCanStart() + public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_THEN_BusCanStart() { - var activator = Using(new BuiltinHandlerActivator()); - var configurer = Configure.With(activator) - .Transport(t => - { - t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1) - .InputQueueOptions(o => o.SetAutoDelete(false)) - .AddClientProperties(new Dictionary { + using (var testScope = new QeueuNameTestScope()) + { + var activator = Using(new BuiltinHandlerActivator()); + var configurer = Configure.With(activator) + .Transport(t => + { + t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName) + .InputQueueOptions(o => o.SetAutoDelete(true)) + .AddClientProperties(new Dictionary { { "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" } - }); - }); - var bus = configurer.Start(); + }); + }); - Assert.IsTrue(bus.Advanced.Workers.Count > 0); + using (var bus = configurer.Start()) + { + Assert.IsTrue(bus.Advanced.Workers.Count > 0); + } + } } - [Test] - public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_AND_TTL_Positive_THEN_BusCanStart() + public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_0_THEN_ArgumentException() { - var activator = Using(new BuiltinHandlerActivator()); - var configurer = Configure.With(activator) - .Transport(t => - { - t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1) - .InputQueueOptions(o => o.SetAutoDelete(true, 1)) - .AddClientProperties(new Dictionary { + using (var testScope = new QeueuNameTestScope()) + { + var activator = Using(new BuiltinHandlerActivator()); + + Assert.Throws(() => + { + var configurer = Configure.With(activator) + .Transport(t => + { + t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName) + .InputQueueOptions(o => o.SetQueueTTL(0).SetDurable(false)) + .AddClientProperties(new Dictionary { { "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" } + }); }); - }); - - var bus = configurer.Start(); - - Assert.IsTrue(bus.Advanced.Workers.Count > 0); + } + , "Time must be in milliseconds and greater than 0"); + } } [Test] - public void Test_CreateQueue_WHEN_InputQueueOptions_AutoDelete_True_AND_TTL_0_THEN_ArgumentExceptionThrown() + public void Test_CreateQueue_WHEN_InputQueueOptions_SetQueueTTL_5000_THEN_QueueIsDeleted_WHEN_5000msAfterConnectionClosed() { - var activator = Using(new BuiltinHandlerActivator()); - - Assert.Throws(() => + using (var testScope = new QeueuNameTestScope()) { + var activator = Using(new BuiltinHandlerActivator()); + var configurer = Configure.With(activator) .Transport(t => { - t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, _testQueue1) - .InputQueueOptions(o => o.SetAutoDelete(true, 0)) + t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, testScope.QeueuName) + .InputQueueOptions(o => o.SetQueueTTL(100)) .AddClientProperties(new Dictionary { { "description", "CreateQueue_With_AutoDelete test in RabbitMqCreateQueueTest.cs" } }); }); + + using (var bus = configurer.Start()) + { + Assert.IsTrue(bus.Advanced.Workers.Count > 0); + } + + Thread.Sleep(5000); + Assert.IsFalse(RabbitMqTransportFactory.QueueExists(testScope.QeueuName)); } - , "Time must be in milliseconds and greater than 0"); } + + class QeueuNameTestScope : IDisposable + { + public string QeueuName { get; } + + public QeueuNameTestScope() + { + QeueuName = Guid.NewGuid().ToString(); + } + + public void Dispose() + { + RabbitMqTransportFactory.DeleteQueue(QeueuName); + } + } + } } \ No newline at end of file diff --git a/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs b/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs index 0752928..f4e145c 100644 --- a/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs +++ b/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs @@ -61,6 +61,24 @@ public static void DeleteQueue(string queueName) } } + public static bool QueueExists(string queueName) + { + var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) }; + using (var connection = connectionFactory.CreateConnection()) + using (var model = connection.CreateModel()) + { + try + { + model.QueueDeclarePassive(queueName); + return true; + } + catch (RabbitMQ.Client.Exceptions.OperationInterruptedException) + { + return false; + } + } + } + public static void DeleteExchange(string exchangeName) { var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) }; @@ -109,9 +127,9 @@ public static bool ExchangeExists(string exchangeName) } } } - + protected virtual RabbitMqTransport CreateRabbitMqTransport(string inputQueueAddress) - { + { return new RabbitMqTransport(ConnectionString, inputQueueAddress, new ConsoleLoggerFactory(false)); } } diff --git a/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs b/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs index 1174ce5..47febff 100644 --- a/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs +++ b/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using RabbitMQ.Client; using Rebus.RabbitMq; @@ -147,7 +148,7 @@ public RabbitMqOptionsBuilder InputQueueOptions(Action /// Configure input exchanges manually. /// @@ -177,7 +178,7 @@ public RabbitMqOptionsBuilder Ssl(SslSettings sslSettings) SslSettings = sslSettings; return this; } - + /// /// Enable the publisher confirms protocol. /// This method is intended to use when publishers cannot afford message loss. @@ -187,7 +188,17 @@ public RabbitMqOptionsBuilder EnablePublisherConfirms(bool value = true) PublisherConfirms = value; return this; } - + + /// + /// Set the connection_name property (user-friendly non-unique client connection name) of RabbitMQ connection, which is + /// shown in the connections overview list and in the client properites of a connection. + /// + /// expcetion is thrown if another connection factory customizer is in use + public RabbitMqOptionsBuilder ClientConnectionName(string connectionName) + { + return CustomizeConnectionFactory(factory => new ConnectionFactoryClientNameDecorator(factory, connectionName)); + } + internal bool? DeclareExchanges { get; private set; } internal bool? DeclareInputQueue { get; private set; } internal bool? BindInputQueue { get; private set; } @@ -197,7 +208,7 @@ public RabbitMqOptionsBuilder EnablePublisherConfirms(bool value = true) internal string TopicExchangeName { get; private set; } internal int? MaxNumberOfMessagesToPrefetch { get; private set; } - + internal SslSettings SslSettings { get; private set; } internal RabbitMqCallbackOptionsBuilder CallbackOptionsBuilder { get; } = new RabbitMqCallbackOptionsBuilder(); @@ -251,14 +262,133 @@ internal void Configure(RabbitMqTransport transport) { transport.SetCallbackOptions(CallbackOptionsBuilder); } - + if (PublisherConfirms.HasValue) { transport.EnablePublisherConfirms(PublisherConfirms.Value); } - + transport.SetInputQueueOptions(QueueOptions); transport.SetExchangeOptions(ExchangeOptions); } + + /// This is temporary decorator-fix, until Rebus is upgraded to a version 6+ of RabbitMQ.Client wich has new signature: + /// IConnection CreateConnection(IList AmqpTcpEndpoint endpoints, string clientProvidedName) + /// so it is more correct to provide the name of client connection in ConnectionManager.GetConnection() method, when connections are created. + class ConnectionFactoryClientNameDecorator : IConnectionFactory + { + private readonly IConnectionFactory _decoratedFactory; + private readonly string _clientProvidedName; + + public IDictionary ClientProperties + { + get { return _decoratedFactory.ClientProperties; } + set { _decoratedFactory.ClientProperties = value; } + } + + public TimeSpan ContinuationTimeout + { + get { return _decoratedFactory.ContinuationTimeout; } + set { _decoratedFactory.ContinuationTimeout = value; } + } + + public TimeSpan HandshakeContinuationTimeout + { + get { return _decoratedFactory.HandshakeContinuationTimeout; } + set { _decoratedFactory.HandshakeContinuationTimeout = value; } + } + + public string Password + { + get { return _decoratedFactory.Password; } + set { _decoratedFactory.Password = value; } + } + + public ushort RequestedChannelMax + { + get { return _decoratedFactory.RequestedChannelMax; } + set { _decoratedFactory.RequestedChannelMax = value; } + } + + public uint RequestedFrameMax + { + get { return _decoratedFactory.RequestedFrameMax; } + set { _decoratedFactory.RequestedFrameMax = value; } + } + + public ushort RequestedHeartbeat + { + get { return _decoratedFactory.RequestedHeartbeat; } + set { _decoratedFactory.RequestedHeartbeat = value; } + } + + public TaskScheduler TaskScheduler + { +#pragma warning disable CS0618 // Type or member is obsolete + get { return _decoratedFactory.TaskScheduler; } + set { _decoratedFactory.TaskScheduler = value; } +#pragma warning restore CS0618 // Type or member is obsolete + } + + public Uri Uri + { + get { return _decoratedFactory.Uri; } + set { _decoratedFactory.Uri = value; } + } + + public bool UseBackgroundThreadsForIO + { + get { return _decoratedFactory.UseBackgroundThreadsForIO; } + set { _decoratedFactory.UseBackgroundThreadsForIO = value; } + } + + public string UserName + { + get { return _decoratedFactory.UserName; } + set { _decoratedFactory.UserName = value; } + } + + public string VirtualHost + { + get { return _decoratedFactory.VirtualHost; } + set { _decoratedFactory.VirtualHost = value; } + } + + public ConnectionFactoryClientNameDecorator(IConnectionFactory originalFacotry, string clientProvidedName) + { + _decoratedFactory = originalFacotry; + _clientProvidedName = clientProvidedName; + } + + public AuthMechanismFactory AuthMechanismFactory(IList mechanismNames) + { + return _decoratedFactory.AuthMechanismFactory(mechanismNames); + } + + public IConnection CreateConnection(IList endpoints) + { + return (_decoratedFactory as RabbitMQ.Client.ConnectionFactory).CreateConnection(new DefaultEndpointResolver(endpoints), _clientProvidedName); + } + + public IConnection CreateConnection() + { + return _decoratedFactory.CreateConnection(_clientProvidedName); + } + + public IConnection CreateConnection(string clientProvidedName) + { + return _decoratedFactory.CreateConnection(clientProvidedName); + } + + public IConnection CreateConnection(IList hostnames) + { + return _decoratedFactory.CreateConnection(hostnames, _clientProvidedName); + } + + public IConnection CreateConnection(IList hostnames, string clientProvidedName) + { + return _decoratedFactory.CreateConnection(hostnames, clientProvidedName); + } + } } } \ No newline at end of file diff --git a/Rebus.RabbitMq/Config/RabbitMqQueueOptionsBuilder.cs b/Rebus.RabbitMq/Config/RabbitMqQueueOptionsBuilder.cs index ac0cf80..e6b4c1f 100644 --- a/Rebus.RabbitMq/Config/RabbitMqQueueOptionsBuilder.cs +++ b/Rebus.RabbitMq/Config/RabbitMqQueueOptionsBuilder.cs @@ -27,19 +27,43 @@ public RabbitMqQueueOptionsBuilder SetExclusive(bool exclusive) } /// - /// Set auto delete, when last consumer disconnects - /// Whether queue should be deleted - /// Time to live (in milliseconds) after last subscriber disconnects + /// Set auto-delete propery when declaring the queue + /// Whether queue should be deleted when the last consumer unsubscribes /// - public RabbitMqQueueOptionsBuilder SetAutoDelete(bool autoDelete, long ttlInMs = 0) + public RabbitMqQueueOptionsBuilder SetAutoDelete(bool autoDelete) { AutoDelete = autoDelete; + return this; + } - if (AutoDelete && ttlInMs <= 0) + /// + /// Configure for how long a queue can be unused before it is automatically deleted by setting x-expires argument + /// + /// expiration period in milliseconds, + /// if the argumnet value is 0 or less + public RabbitMqQueueOptionsBuilder SetQueueTTL(long ttlInMs) + { + if (ttlInMs <= 0) throw new ArgumentException("Time must be in milliseconds and greater than 0", nameof(ttlInMs)); - if (AutoDelete) - Arguments.Add("x-expires", ttlInMs); + Arguments.Add("x-expires", ttlInMs); + + return this; + } + + + /// + /// Set auto delete, when last consumer disconnects and/or how long queue can stay unused until it is deleted as expired. + /// Zero or negative values of ttlInMs are ignored (no queue expiration). + /// Whether queue should be deleted + /// Time to live (in milliseconds) after last subscriber disconnects + /// + public RabbitMqQueueOptionsBuilder SetAutoDelete(bool autoDelete, long ttlInMs = 0) + { + SetAutoDelete(autoDelete); + + if (ttlInMs > 0) + SetQueueTTL(ttlInMs); return this; } @@ -73,4 +97,4 @@ public RabbitMqQueueOptionsBuilder AddArgument(string key, object val) {"x-ha-policy", "all"} }; } -} +} \ No newline at end of file