Skip to content

Commit

Permalink
Tutorial 5: Topics and 6: RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
ahcantarim committed Jul 13, 2021
1 parent e682564 commit 12077af
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 4 deletions.
Binary file added .github/tutorial-5-01.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .github/tutorial-6-01.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Para obter uma cópia local atualizada e que possa ser executada corretamente, s

### Pré-requisitos

Garanta que o **RabbitMQ** está instalado e sendo executado em `localhost` na porta padrão `5672`.
Os tutoriais assumem que o **RabbitMQ** está instalado e sendo executado em `localhost` na porta padrão (`5672`).

- **Management:** http://localhost:15672
- **Username:** guest
Expand All @@ -114,11 +114,15 @@ dotnet restore
```


## Visão geral
## Introdução

O **RabbitMQ** - e outras ferramentas de mensagens no geral, usa alguns jargões:
**RabbitMQ** é um *message broker*: ele aceita e encaminha mensagens. Você pode pensar sobre isso como se fossem os *Correios*: quando você coloca a carta que você quer em uma caixa de postagem, você pode ter certeza de que eventualmente o carteiro irá entregar sua carta ao destinatário. Nesta analogia, o **RabbitMQ** é a caixa de postagem, a agência dos *Correios* e o carteiro.

- Um programa que envia mensagens é um `Producer`:
A maior diferença entre o **RabbitMQ** e uma agência dos *Correios* é que ele não lida com papel, ao invés disso aceita, armazena e encaminha blobs binários de dados ‒ *mensagens*.

O **RabbitMQ** ‒ e outras ferramentas de mensagens no geral ‒ usa alguns jargões:

- *Producing* significa nada mais do que *enviando*. Um programa que envia mensagens é um `Producer` (*produtor*):

![Producer](.github/producer.png)

Expand Down Expand Up @@ -380,6 +384,15 @@ dotnet run
No próximo tutorial iremos aprender como escutar um subconjunto de mensagens.


## Tutorial 4 » Routing

[Routing](https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html)

No tutorial anterior, criamos um sistema de *log* simples. Fomos capazes de transmitir mensagens para vários receptores.

Neste tutorial vamos adicionar uma funcionalidade à ele - vamos tornar possível se subscrever apenas a um subconjunto de mensagens. Por exemplo, teremos a possibilidade de direcionar apenas as mensagens de *erro crítico* para o arquivo em disco, enquanto ainda é possível exibir todas as mensagens de *log* em tela.


## Licença

Distribuído através da licença MIT. Veja `LICENSE` para mais informações.
Expand Down
41 changes: 41 additions & 0 deletions src/Tutorial.RabbitMQ.Console.EmitLogTopic/EmitLogTopic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;

namespace Tutorial.RabbitMQ.Console.EmitLogTopic
{
class EmitLogTopic
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var exchangeName = "topic_logs";

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName,
type: ExchangeType.Topic);

var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";

var message = (args.Length > 1)
? string.Join(" ", args.Skip(1).ToArray())
: "Hello World!";

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "topic_logs",
routingKey: routingKey,
basicProperties: null,
body: body);

System.Console.WriteLine($"{DateTime.Now}: Sent '{routingKey}':'{message}'");
}

System.Console.WriteLine($"{DateTime.Now}: Press [enter] to exit.");
System.Console.ReadLine();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
82 changes: 82 additions & 0 deletions src/Tutorial.RabbitMQ.Console.RPCClient/RPCClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Concurrent;
using System.Text;

namespace Tutorial.RabbitMQ.Console.RPCClient
{
class RPCClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string queueName = "rpc_queue";
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
private readonly string CorrelationId;

public RPCClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };

connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);

props = channel.CreateBasicProperties();
CorrelationId = Guid.NewGuid().ToString();
props.CorrelationId = CorrelationId;
props.ReplyTo = replyQueueName;

consumer.Received += Consumer_Received;
}

private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var body = e.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
if (e.BasicProperties.CorrelationId == CorrelationId)
{
respQueue.Add(response);
}
}

public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: string.Empty,
routingKey: queueName,
basicProperties: props,
body: messageBytes);

channel.BasicConsume(consumer: consumer,
queue: replyQueueName,
autoAck: true);

return respQueue.Take();
}

public void Close()
{
connection.Close();
}
}

public class Rpc
{
static void Main(string[] args)
{
var rpcClient = new RPCClient();

System.Console.WriteLine($"{DateTime.Now}: Press Requesting fib(30).");
var response = rpcClient.Call("4");
System.Console.WriteLine($"{DateTime.Now}: Got '{response}'");

rpcClient.Close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
96 changes: 96 additions & 0 deletions src/Tutorial.RabbitMQ.Console.RPCServer/RPCServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Tutorial.RabbitMQ.Console.RPCServer
{
class RPCServer
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var queueName = "rpc_queue";

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

channel.BasicQos(0, 1, false);

var consumer = new EventingBasicConsumer(channel);

channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);

System.Console.WriteLine($"{DateTime.Now}: Awaiting RPC requests.");

consumer.Received += Consumer_Received;

System.Console.WriteLine($"{DateTime.Now}: Press [enter] to exit.");
System.Console.ReadLine();
}
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var channel = ((EventingBasicConsumer)sender).Model;
string response = null;

var body = e.Body.ToArray();
var props = e.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;

try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);

System.Console.WriteLine($"{DateTime.Now}: fib({message})");
response = fib(n).ToString();

}
catch (Exception ex)
{
System.Console.WriteLine($"{DateTime.Now}: ERROR {ex.Message}");
response = string.Empty;
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);

channel.BasicPublish(exchange: string.Empty,
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);

channel.BasicAck(deliveryTag: e.DeliveryTag,
multiple: false);
}
}

/// <summary>
/// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
/// </summary>
/// <param name="n"></param>
/// <returns></returns>
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}

return fib(n - 1) + fib(n - 2);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
64 changes: 64 additions & 0 deletions src/Tutorial.RabbitMQ.Console.ReceiveLogsTopic/ReceiveLogsTopic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Tutorial.RabbitMQ.Console.ReceiveLogsTopic
{
class ReceiveLogsTopic
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var exchangeName = "topic_logs";

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName,
type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

if (args.Length < 1)
{
System.Console.Error.WriteLine($"{DateTime.Now}: Usage: {Environment.GetCommandLineArgs()[0]} [info] [warning] [error].");

System.Console.WriteLine($"{DateTime.Now}: Press [enter] to exit.");
System.Console.ReadLine();

Environment.ExitCode = 1;
return;
}

foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: bindingKey);
}

System.Console.WriteLine($"{DateTime.Now}: Waiting for messages.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;

channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);

System.Console.WriteLine($"{DateTime.Now}: Press [enter] to exit.");
System.Console.ReadLine();
}
}

private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = e.RoutingKey;

System.Console.WriteLine($"{DateTime.Now}: Received '{routingKey}':'{message}'");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
</ItemGroup>

</Project>
Loading

0 comments on commit 12077af

Please sign in to comment.