RabbitMQ Elixir library to handle all consuming, producing, and exchanging of RabbitMQ messages.
This lightweight library makes use of the AMQP 0-9-1 protocol. Use WhiteRabbit to help with scaling and processing of your RabbitMQ message queues.
Whether you need just connection and channel recovery with running processes, or want to take advantage of the RPC call architecture across independent distributed OTP applications with the benefit of having your messages being sent to your RabbitMQ's queues; this library will build upon whatever is required for your application.
Features:
- RPC call architecture between apps using RabbitMQ and reply_to key routing.
- Auto recovery and restarts of connections, their channels, and the consumers using them
- Startup and runtime support for Consumer GenServer processes. Start them at runtime using Elixir's DynamicSupervisors.
- Telemetry event emission on publishes, acks, and rejects
If this isn't exactly what you are looking for, I recommend the well-known elixir library broadway. Even though that is intended for multi-stage data ingest using GenStages and has more batch-level processing in mind, it has similar features that could fit your needs.
defp deps do
[
{:white_rabbit, "~> 0.2.0"},
]
end
# In MyApp
defmodule MyApp.WhiteRabbit do
use WhiteRabbit
def start_link(_opts) do
WhiteRabbit.start_link(__MODULE__, name: __MODULE__)
end
# Callbacks below
end
# In Main Application Supervisor
children = [
# other fun stuff...
{MyApp.WhiteRabbit, []}
]
Supervisor.start_link(children, opts)
WhiteRabbit.Processor
Behaviour is used to define how a WhiteRabbit.Consumer
Genserver will process the messages that it receives once it is registered to a RabbitMQ queue.
The consume_payload/2
callback is called immediately after a message is received. It should return a {:ok, AMQP.Basic.delivery_tag()}
tuple if successful or a {:error, {AMQP.Basic.delivery_tag(), Keyword.t()}}
if not successful so the Consumer can properly send an 'ack' or 'reject' for the message and then let RabbitMQ broker can handle it appropriately (requeue, send to dead-letter-queue, etc.)
See WhiteRabbit.Processor
for more on the behaviour used.
Example:
defmodule AppOne.TestJsonProcessor do
@behaviour WhiteRabbit.Processor
require Logger
@impl true
def consume_payload(
payload,
%{delivery_tag: tag, redelivered: redelivered, content_type: content_type} = meta
) do
Logger.debug("Processing message #{tag} with #{__MODULE__}")
case content_type do
"application/json" ->
{:ok, _json} = Jason.decode(payload)
# If successful, send back `{:ok, AMQP.Basic.delivery_tag()}`
{:ok, tag}
_ ->
{:error, {tag, [requeue: false]}}
end
rescue
exception ->
# Requeue unless it's a redelivered message.
# This means we will retry consuming a message once in case of exception
# before we give up and have it moved to the error queue
Logger.warn("Error consuming message: #{tag} #{inspect(exception)}")
{:error, {tag, [requeue: not redelivered]}}
end
end
See WhiteRabbit.Consumer
for more info.
@impl true
def get_startup_consumers do
[
{WhiteRabbit.Consumer, %WhiteRabbit.Consumer{
connection_name: :appone_connection,
name: "AppOne.JsonConsumer",
exchange: "json_test_exchange",
queue: "json_test_queue",
processor: %WhiteRabbit.Processor.Config{module: AppOne.TestJsonProcessor}
}
}
]
end
defmodule AppOne.WhiteRabbit do
use WhiteRabbit
alias WhiteRabbit.{Connection, Core}
# Define list of connections for this WhiteRabbit instance
def get_connections do
[
%Connection{
connection_name: :appone_connection,
conn_opts: [url: "amqp://user:pass@localhost:5673/dev"],
channels: [
%{
name: :appone_consumer_channel
},
%{
name: :appone_producer_channel
}
]
},
%Connection{
connection_name: :appone_rpc_connection,
conn_opts: [url: "amqp://user:pass@localhost:5673/dev"],
channels: [
%{
name: :appone_rpc_consumer_channel_1
},
%{
name: :appone_rpc_consumer_channel_2
}
]
}
]
end
# Use callback spec to return %WhiteRabbit.RPC.Config{} struct
@impl true
def get_rpc_config do
%WhiteRabbit.RPC.Config{
service_name: :appone,
connection_name: :appone_rpc_connection
}
end
# Set optional Consumer process to start consuming at app start-up
@impl true
def get_startup_consumers do
[
{WhiteRabbit.Consumer, %WhiteRabbit.Consumer{
connection_name: :appone_connection,
name: "AppOne.JsonConsumer",
exchange: "json_test_exchange",
queue: "json_test_queue",
processor: %WhiteRabbit.Processor.Config{module: AppOne.TestJsonProcessor}
}
}
]
end
def start_link(_opts) do
rpc_enabled = if Mix.env() !== :test, do: true, else: false
# Start the WhiteRabbit Hole
WhiteRabbit.start_link(__MODULE__,
name: __MODULE__,
connections: connections(),
rpc_enabled: rpc_enabled,
rpc_config: get_rpc_config()
)
end
end
The processor module needs to exist before trying to register a consumer for it.
See WhiteRabbit.Processor
module for more info on that behaviour
appone_config = %WhiteRabbit.Consumer{
owner_module: AppOne.WhiteRabbit,
connection_name: :appone_connection,
name: "AppOne.TestJsonProcessor",
exchange: "json_test_exchange",
queue: "json_test_queue",
processor: %WhiteRabbit.Processor.Config{module: AppOne.TestJsonProcessor}
}
apptwo_config = %WhiteRabbit.Consumer{
owner_module: AppTwo.WhiteRabbit,
connection_name: :apptwo_connection,
name: "AppTwo.TestJsonProcessor",
exchange: "apptwo_json_test_exchange",
queue: "apptwo_json_test_queue",
processor: %WhiteRabbit.Processor.Config{module: AppTwo.TestJsonProcessor}
}
AppOne.WhiteRabbit.start_dynamic_consumers(appone_config, 3)
AppTwo.WhiteRabbit.start_dynamic_consumers(apptwo_config, 3)
# or specifying the owner module
WhiteRabbit.start_dynamic_consumers(appone_config, 3, AppOne.WhiteRabbit)
WhiteRabbit.start_dynamic_consumers(apptwo_config, 3, AppTwo.WhiteRabbit)
See WhiteRabbit.Producer
If using the Producer as a behavior there is a providing publish/5
that you can just provide the connection_name so it will use the correct connection pool and use a channel under that already opened connection.
This method will make sure the channel and connection used are from the calling application which will be under the correct supervision tree.
This is the prefered method especially if there are serveral applications with WhiteRabbit
modules under an umbrella application.
defmodule AppOne.Producer.Json do
use WhiteRabbit.Producer
def send_json(payload) do
publish(:appone_connection, "json_test_exchange", "test_json", payload,
content_type: "application/json",
persistent: true
)
end
end
If you know the channel registry name and the connection name of the pool to use then you can use the publish/5
function with a tuple as the first argument.
WhiteRabbit.Producer.publish({:appone_connection, AppOne.WhiteRabbit.ChannelRegistry}, "test_exchange", "test_route", "hello there", persistent: true)
If there is an already connected channel you can use the publish/5
.
Example:
channel = %AMQP.Channel{}
WhiteRabbit.Producer.publish(channel, "test_exchange", "test_route", "hello there", persistent: true)
WhiteRabbit.Core.test_publish(100, "json_test_exchange", "test_json", %{hello: "there"})
WhiteRabbit.Core.test_publish(100, "apptwo_json_test_exchange", "test_json", %{hello: "there"})
See WhiteRabbit.RPC
for more information on RPC calls and message handling.
Using the RPC config shown above, a process can call the function WhiteRabbit.RPC.call/4
to make an RPC call to the service and get a response back from the server and have it map back correctly to the calling process.
If using as a behavior, then a rpc_call/3
function will be provided that accepts the service to call, mfa tuple, and options.
Example:
iex> AppFour.WhiteRabbit.rpc_call(:appone, {AppOne.Utils, :get_versions, []})
{:ok,
[
%{"name" => "appone", "version" => "3.5.0"},
%{"name" => "custom_lib_1", "version" => "1.3.0"},
%{"name" => "processing_lib", "version" => "2.6.0"}
]}
$ mix docs
Copyright (c) 2021, Phillip Langland
WhiteRabbit source code is licensed under the MIT License.