Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore broken "Automatically create input queue if it suddenly disappears" feature #49

Merged
merged 3 commits into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions Rebus.RabbitMq.Tests/RabbitMqReceiveSubscriptionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Extensions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Rebus.RabbitMq.Tests
{
[TestFixture]
public class RabbitMqReceiveSubscriptionTests : FixtureBase
{
readonly string _publisherQueueName = TestConfig.GetName("publisher-RabbitMqReceiveSubscriptionTests");
readonly string _subscriberQueueName = TestConfig.GetName("subscriber-RabbitMqReceiveSubscriptionTests");

protected override void TearDown()
{
base.TearDown();
RabbitMqTransportFactory.DeleteQueue(_publisherQueueName);
RabbitMqTransportFactory.DeleteQueue(_subscriberQueueName);
}

[Test]
public async Task Test_ReceieveOnSubscribe_WHEN_SubscriberQueueDeleted_THEN_ItRecreates_SubscirberQuere_AND_ReceivesPublishedData()
{
string message = "Test-Message-123";

using (var receivedEvent = new ManualResetEvent(false))
{
using (var publisher = StartBus(_publisherQueueName))
{
using (var subscriber = StartBus(_subscriberQueueName, data => Task.Run(() =>
{
if (string.Equals(data, message))
receivedEvent.Set();
})))
{

await subscriber.Subscribe<string>();

RabbitMqTransportFactory.DeleteQueue(_subscriberQueueName);
await Task.Delay(1000);

await publisher.Publish(message);

receivedEvent.WaitOrDie(TimeSpan.FromSeconds(2), "The event has not been receved by the subscriber within the expected time");
}
}
}
}


private IBus StartBus(string queueName, Func<string, Task> handlerMethod = null)
{
var activator = Using(new BuiltinHandlerActivator());
activator?.Handle(handlerMethod);

Configure.With(activator).Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, queueName)
.AddClientProperties(new Dictionary<string, string> { { "description", "Created for RabbitMqReceiveTests" } });
}).Start();

return activator.Bus;
}
}
}
13 changes: 13 additions & 0 deletions Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,19 @@ public async Task<TransportMessage> Receive(ITransactionContext context, Cancell
{
consumer = InitializeConsumer();
}
else
{
try
{
// When a consumer is dequeued from the the "consumers" pool, it might be bound to a queue, which does not exist anymore,
// eg. expired and deleted by RabittMQ server policy). In this case this calling QueueDeclarePassive will result in
// an OperationInterruptedException and "consumer.Model.IsOpen" will be set to false (this is handled later in the code by
// disposing this consumer). There is no need to handle this exception. The logic of InitializeConsumer() will make sure
// that the queue is recreated later based on assumption about how ReBus is handling null-result of ITransport.Receive().
consumer?.Model.QueueDeclarePassive(Address);
}
catch { }
}

if (consumer == null)
{
Expand Down