Skip to content

Commit

Permalink
Tutorial 7: Publisher Confirms & Load Test
Browse files Browse the repository at this point in the history
  • Loading branch information
ahcantarim committed Jul 20, 2021
1 parent 3d94c6c commit 0bca6e6
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 6 deletions.
60 changes: 60 additions & 0 deletions src/Tutorial.RabbitMQ.Console.LoadTest.Client/Client.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;

namespace Tutorial.RabbitMQ.Console.LoadTest.Client
{
class Client
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
var queueName = "load_test";

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
connection.ConnectionBlocked += Connection_ConnectionBlocked;
connection.ConnectionUnblocked += Connection_ConnectionUnblocked;

channel.QueueDeclare(queueName, true, false, false, null);

string message = string.Empty;
byte[] body = null;
var rnd = new Random();

for (int i = 0; i <= 1_000_000; i++)
{
var dotsCount = rnd.Next(1, 6);
var dotsSequence = Enumerable.Repeat(".", dotsCount);

message = $"Mensagem '{i}' vai demorar {dotsCount} segundos{string.Join("", dotsSequence)}";
body = Encoding.UTF8.GetBytes(message);

var props = channel.CreateBasicProperties();
props.Persistent = true;

channel.BasicPublish(string.Empty, queueName, props, body);

System.Console.WriteLine($"[x] {DateTime.Now} - Enviando mensagem '{message}'");
}
}

System.Console.WriteLine($"[x] {DateTime.Now} - Pressione [ENTER] para sair.");
System.Console.ReadLine();
}

private static void Connection_ConnectionUnblocked(object sender, EventArgs e)
{
System.Console.WriteLine($"[|] {DateTime.Now} - A CONEXÃO ESTÁ BLOQUEADA!");
System.Console.ReadLine();
}

private static void Connection_ConnectionBlocked(object sender, global::RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
{
System.Console.WriteLine($"[-] {DateTime.Now} - A CONEXÃO ESTÁ desBLOQUEADA!");
System.Console.ReadLine();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
45 changes: 45 additions & 0 deletions src/Tutorial.RabbitMQ.Console.LoadTest.Server/Server.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Tutorial.RabbitMQ.Console.LoadTest.Server
{
class Server
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
var queueName = "load_test";

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);
channel.BasicQos(0, 10000, false);

System.Console.WriteLine($"[x] {DateTime.Now} - Aguardando mensagens...");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);

System.Console.WriteLine($"[.] {DateTime.Now} - Mensagem recebida '{message}'");
int dots = message.Split('.').Length - 1;
//Thread.Sleep(dots * 1000);
System.Console.WriteLine($"[x] {DateTime.Now} - Mensagem processada '{message}'");

channel.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(queueName, false, consumer);

System.Console.WriteLine($"[x] {DateTime.Now} - Pressione [ENTER] para sair.");
System.Console.ReadLine();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
152 changes: 152 additions & 0 deletions src/Tutorial.RabbitMQ.Console.PublisherConfirms/PublisherConfirms.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

namespace Tutorial.RabbitMQ.Console.PublisherConfirms
{
class PublisherConfirms
{
private const int MESSAGE_COUNT = 50_000;

static void Main(string[] args)
{
PublishMessagesIndividually();
//PublishMessagesInBatch();
HandlePublishConfirmsAsynchronously();
}

private static IConnection CreateConnection()
{
var factory = new ConnectionFactory { HostName = "localhost" };
return factory.CreateConnection();
}

private static void PublishMessagesIndividually()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
// Declare a server-named queue
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var timer = new Stopwatch();
timer.Start();

for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(string.Empty, queueName, null, body);
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}

timer.Stop();
System.Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static void PublishMessagesInBatch()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
// Declare a server-named queue
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var batchSize = 100;
var outstandingMessageCount = 0;

var timer = new Stopwatch();
timer.Start();

for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(string.Empty, queueName, null, body);
outstandingMessageCount++;

if (outstandingMessageCount == batchSize)
{
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
outstandingMessageCount = 0;
}
}

if (outstandingMessageCount > 0)
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));

timer.Stop();
System.Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static void HandlePublishConfirmsAsynchronously()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
// Declare a server-named queue
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
outstandingConfirms.TryRemove(entry.Key, out _);
}
else
outstandingConfirms.TryRemove(sequenceNumber, out _);
}

channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);

channel.BasicNacks += (sender, ea) =>
{
outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
System.Console.WriteLine($"Message with body '{body}' has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

var timer = new Stopwatch();
timer.Start();

for (int i = 0; i < MESSAGE_COUNT; i++)
{
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, i.ToString());

var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(string.Empty, queueName, null, body);
}

if (!WaitUntil(60, () => outstandingConfirms.IsEmpty))
throw new Exception("All messages could not be confirmed in 60 seconds");

timer.Stop();
System.Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static bool WaitUntil(int numberOfSeconds, Func<bool> condition)
{
int waited = 0;

while (!condition() && waited < numberOfSeconds * 1000)
{
Thread.Sleep(100);
waited += 100;
}

return condition();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
Loading

0 comments on commit 0bca6e6

Please sign in to comment.