Skip to content

Commit

Permalink
Merge pull request #125 from mksergiy/master
Browse files Browse the repository at this point in the history
fix potential race conditions in ModelObjectPool
  • Loading branch information
mookid8000 authored Dec 18, 2024
2 parents 2b07403 + 12c15c3 commit 6d44441
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public async Task TryParallelism(int maxParallelism)
.Transport(t =>
{
t.UseRabbitMq(RabbitMqTransportFactory.ConnectionString, queueName)
.Prefetch(maxNumberOfMessagesToPrefetch: 3 * maxParallelism);
.Prefetch(maxNumberOfMessagesToPrefetch: 3 * maxParallelism)
.SetPublisherConfirms(false);
})
.Options(o =>
{
Expand Down
16 changes: 14 additions & 2 deletions Rebus.RabbitMq/Internals/ModelObjectPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using RabbitMQ.Client;

namespace Rebus.Internals;
Expand All @@ -14,6 +15,7 @@ class ModelObjectPool : IDisposable
readonly WriterModelPoolPolicy _policy;

int _maxEntries;
int _currentCount = 0;

public ModelObjectPool(WriterModelPoolPolicy policy, int maxEntries)
{
Expand All @@ -26,12 +28,22 @@ public void SetMaxEntries(int maxEntries)
_maxEntries = maxEntries;
}

public IModel Get() => _availableObjects.TryTake(out var model) ? model : _policy.Create();
public IModel Get()
{
if (_availableObjects.TryTake(out var model))
{
Interlocked.Decrement(ref _currentCount);
return model;
}

return _policy.Create();
}

public void Return(IModel model)
{
if (_availableObjects.Count >= _maxEntries)
if (Interlocked.Increment(ref _currentCount) > _maxEntries)
{
Interlocked.Decrement(ref _currentCount);
model.SafeDrop();
}
else
Expand Down

0 comments on commit 6d44441

Please sign in to comment.