Skip to content

Commit

Permalink
Merge pull request #71 from mathiasnohall/rabbitMqClient6.0
Browse files Browse the repository at this point in the history
RabbitMQ client 6.0 support
  • Loading branch information
mookid8000 authored Oct 31, 2020
2 parents f34cf6b + aa5a9b0 commit 57623fe
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 24 deletions.
4 changes: 2 additions & 2 deletions Rebus.RabbitMq.Tests/Rebus.RabbitMq.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Rebus.RabbitMq.Tests</RootNamespace>
<AssemblyName>Rebus.RabbitMq.Tests</AssemblyName>
<TargetFrameworks>net452;netcoreapp2.1</TargetFrameworks>
<TargetFrameworks>net461;netcoreapp2.1</TargetFrameworks>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
Expand Down Expand Up @@ -36,7 +36,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="rabbitmq.client" Version="5.1.0" />
<PackageReference Include="rabbitmq.client" Version="6.2.1" />
<PackageReference Include="rebus" Version="6.0.0" />
<PackageReference Include="rebus.tests.contracts" Version="6.0.0" />
</ItemGroup>
Expand Down
23 changes: 13 additions & 10 deletions Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,12 @@ public uint RequestedFrameMax
set { _decoratedFactory.RequestedFrameMax = value; }
}

public ushort RequestedHeartbeat
public TimeSpan RequestedHeartbeat
{
get { return _decoratedFactory.RequestedHeartbeat; }
set { _decoratedFactory.RequestedHeartbeat = value; }
}

public TaskScheduler TaskScheduler
{
#pragma warning disable CS0618 // Type or member is obsolete
get { return _decoratedFactory.TaskScheduler; }
set { _decoratedFactory.TaskScheduler = value; }
#pragma warning restore CS0618 // Type or member is obsolete
}

public Uri Uri
{
get { return _decoratedFactory.Uri; }
Expand All @@ -371,13 +363,19 @@ public string VirtualHost
set { _decoratedFactory.VirtualHost = value; }
}

public string ClientProvidedName
{
get { return _decoratedFactory.ClientProvidedName; }
set { _decoratedFactory.ClientProvidedName = value; }
}

public ConnectionFactoryClientNameDecorator(IConnectionFactory originalFacotry, string clientProvidedName)
{
_decoratedFactory = originalFacotry;
_clientProvidedName = clientProvidedName;
}

public AuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
public IAuthMechanismFactory AuthMechanismFactory(IList<string> mechanismNames)
{
return _decoratedFactory.AuthMechanismFactory(mechanismNames);
}
Expand Down Expand Up @@ -406,6 +404,11 @@ public IConnection CreateConnection(IList<string> hostnames, string clientProvid
{
return _decoratedFactory.CreateConnection(hostnames, clientProvidedName);
}

public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints, string clientProvidedName)
{
return (_decoratedFactory as RabbitMQ.Client.ConnectionFactory).CreateConnection(new DefaultEndpointResolver(endpoints), clientProvidedName);
}
}
}
}
15 changes: 8 additions & 7 deletions Rebus.RabbitMq/Internals/CustomQueueingConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace Rebus.Internals
{
class CustomQueueingConsumer : DefaultBasicConsumer, IQueueingBasicConsumer
class CustomQueueingConsumer : DefaultBasicConsumer
{
public SharedQueue<BasicDeliverEventArgs> Queue { get; } = new SharedQueue<BasicDeliverEventArgs>();

public CustomQueueingConsumer(IModel model) : base(model)
{
}

public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
Queue.Enqueue(new BasicDeliverEventArgs
{
Expand All @@ -22,13 +23,13 @@ public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, b
Exchange = exchange,
RoutingKey = routingKey,
BasicProperties = properties,
Body = body
Body = body.ToArray()
});
}

public override void OnCancel()
public override void OnCancel(params string[] consumerTags)
{
base.OnCancel();
base.OnCancel(consumerTags);
Queue.Close();
}

Expand Down
Loading

0 comments on commit 57623fe

Please sign in to comment.