Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEVEX-2399]: Make errored queues configurable #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,63 @@ and this project adheres to

---

## [7.0.0] - 2025-01-24

### Changed

- When you pass an `x-queue-type` to `Amqpx.Helper.declare/2` now it will be used also for the dead letter queue

This is a breaking change: if you are already using `Helper.declare` with an `x-queue-type` that is not the default type this will try to change the dead letter queue type.

In this case you can either remove the dead letter queue and recreate it with the correct type, or you can migrate to a new dead letter queue with a different name and remove the old one when it's fully drained:

```elixir
queue_spec = %{
queue: "test_1",
opts: [
durable: true,
arguments: [
{"x-queue-type", :longstr, "quorum"},
{"x-dead-letter-exchange", :longstr, "test_dle"},
{"x-dead-letter-routing-key", :longstr, "test_rk"}
]
],
exchanges: [
%{name: "test_exchange", type: :topic, routing_keys: ["test_rk"], opts: [durable: true]},
]
}

# As an example we'll take the following `Amqpx.Helper.declare`
# that creates a queue called `test_1` and a corresponding `test_1_errored`
:ok = Amqpx.Helper.declare(chan, queue_spec)

# Amqpx.Helper.setup_queue/2 takes the exact same queue_spec
# as declare/2 but it doesn't declare the dead letter queue
:ok = Amqpx.Helper.setup_queue(chan, queue_spec)

# Now we can create a new dead letter queue with type "quorum"
# by using a different name, we just need to make sure
# its routing key will match the `x-dead-letter-routing-key` argument
:ok = Amqpx.Helper.setup_dead_lettering(chan, %{
routing_key: "test_rk",
queue: "test_1_dlq",
exchange: "test_dle",
queue_opts: [durable: true, arguments: [{"x-queue-type", :longstr, "quorum"}]]
})

# At this point dead-lettered messages should be delivered to both
# `test_1_errored` and `test_1_dlq`, in this way we can migrate everything
# to the new one and as soon as it empties we can remove the old one
```

- The `original_routing_keys` option accepted by `Amqpx.Helper.setup_dead_lettering/2` must be a `[String.t()]`, if you are passing a `[[String.t()]]` to this function you have to pipe trough `List.flatten` now

### Added

- `Amqpx.Helper.setup_dead_lettering/2` now accepts a `queue_opts` key which will be used as third argument for `Amqpx.Queue.declare/3`

---

## [6.1.3] - 2025-01-23

### Fixed
Expand Down Expand Up @@ -113,8 +170,8 @@ This is due to elixir rabbit not supporting the older versions of the libraries
- ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX
queues instead of wildcard


[Unreleased]: https://github.com/primait/amqpx/compare/6.1.3...HEAD
[Unreleased]: https://github.com/primait/amqpx/compare/7.0.0...HEAD
[7.0.0]: https://github.com/primait/amqpx/compare/6.1.3...7.0.0
[6.1.3]: https://github.com/primait/amqpx/compare/6.1.2...6.1.3
[6.1.2]: https://github.com/primait/amqpx/compare/6.1.1...6.1.2
[6.1.1]: https://github.com/primait/amqpx/compare/6.1.0...6.1.1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rest!
```elixir
def deps do
[
{:amqpx, "~> 6.1.3"}
{:amqpx, "~> 7.0.0"}
]
end
```
Expand Down
83 changes: 70 additions & 13 deletions lib/amqp/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,51 @@ defmodule Amqpx.Helper do

require Logger

alias Amqpx.{Exchange, Queue}

alias Amqpx.{Basic, Channel, Exchange, Queue}

@dead_letter_queue_defaults [durable: true]

# Supervisor.module_spec() has been introduced with elixir 1.16
# we can remove this when we update the minimum supported version
@type module_spec :: {module, arg :: any}

@type exchange_spec :: %{
name: Basic.exchange(),
type: atom,
routing_keys: [String.t()],
opts: Keyword.t()
}

@type queue_spec :: %{
:queue => Basic.queue(),
:exchanges => [exchange_spec],
optional(:opts) => Keyword.t()
}

@type dead_letter_queue_spec :: %{
:queue => Basic.queue(),
:exchange => Basic.exchange(),
:routing_key => String.t(),
optional(:original_routing_keys) => [String.t()],
optional(:queue_opts) => Keyword.t()
}

@spec manager_supervisor_configuration(Keyword.t()) :: module_spec
def manager_supervisor_configuration(config) do
{Amqpx.Gen.ConnectionManager, %{connection_params: encrypt_password(config)}}
end

@spec consumers_supervisor_configuration([handler_conf :: map]) :: [Supervisor.child_spec()]
def consumers_supervisor_configuration(handlers_conf) do
Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1()))
end

@spec producer_supervisor_configuration(producer_conf :: map) :: module_spec
def producer_supervisor_configuration(producer_conf) do
{Amqpx.Gen.Producer, producer_conf}
end

@spec encrypt_password(Keyword.t()) :: Keyword.t()
def encrypt_password(config) do
case Keyword.get(config, :obfuscate_password, true) do
true ->
Expand All @@ -29,6 +60,7 @@ defmodule Amqpx.Helper do
end
end

@spec get_password(Keyword.t(), Keyword.t() | nil) :: Keyword.value()
def get_password(config, nil) do
case Keyword.get(config, :obfuscate_password, true) do
true ->
Expand All @@ -49,6 +81,7 @@ defmodule Amqpx.Helper do
end
end

@spec declare(Channel.t(), queue_spec) :: :ok | no_return
def declare(
channel,
%{
Expand All @@ -57,22 +90,25 @@ defmodule Amqpx.Helper do
exchanges: exchanges
} = queue
) do
case Enum.find(opts[:arguments], &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do
arguments = Keyword.get(opts, :arguments, [])

case Enum.find(arguments, &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do
{_, _, dle} ->
{dlr_config_key, dlr_config_value} =
case Enum.find(opts[:arguments], &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do
case Enum.find(arguments, &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do
{_, _, dlrk} ->
{:routing_key, dlrk}

nil ->
original_routing_keys = Enum.map(exchanges, & &1.routing_keys)
original_routing_keys = Enum.flat_map(exchanges, & &1.routing_keys)
{:original_routing_keys, original_routing_keys}
end

setup_dead_lettering(channel, %{
dlr_config_key => dlr_config_value,
queue: "#{qname}_errored",
exchange: dle
exchange: dle,
queue_opts: set_dead_letter_queue_type(@dead_letter_queue_defaults, arguments)
})

nil ->
Expand All @@ -86,10 +122,11 @@ defmodule Amqpx.Helper do
setup_queue(channel, queue)
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq}) do
@spec setup_dead_lettering(Channel.t(), dead_letter_queue_spec) :: :ok | {:ok, map} | Basic.error()
def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq} = spec) do
# DLX will work through [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default)
# since `x-dead-letter-routing-key` matches the queue name
Queue.declare(channel, dlq, durable: true)
Queue.declare(channel, dlq, dead_letter_queue_opts(spec))
end

def setup_dead_lettering(_channel, %{queue: dlq, exchange: "", routing_key: bad_dlq}) do
Expand All @@ -103,24 +140,27 @@ defmodule Amqpx.Helper do
end
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key}) do
def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key} = spec) do
Exchange.declare(channel, exchange, :topic, durable: true)
Queue.declare(channel, dlq, durable: true)
Queue.declare(channel, dlq, dead_letter_queue_opts(spec))
Queue.bind(channel, dlq, exchange, routing_key: routing_key)
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys}) do
def setup_dead_lettering(
channel,
%{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys} = spec
) do
Exchange.declare(channel, exchange, :topic, durable: true)
Queue.declare(channel, dlq, durable: true)
Queue.declare(channel, dlq, dead_letter_queue_opts(spec))

original_routing_keys
|> List.flatten()
|> Enum.uniq()
|> Enum.each(fn rk ->
:ok = Queue.bind(channel, dlq, exchange, routing_key: rk)
end)
end

@spec setup_queue(Channel.t(), queue_spec) :: :ok | no_return
def setup_queue(channel, %{
queue: queue,
exchanges: exchanges,
Expand All @@ -140,6 +180,7 @@ defmodule Amqpx.Helper do
Enum.each(exchanges, &setup_exchange(channel, queue, &1))
end

@spec setup_exchange(Channel.t(), Basic.queue(), exchange_spec) :: :ok | Basic.error() | no_return
def setup_exchange(channel, queue, %{
name: name,
type: type,
Expand Down Expand Up @@ -189,6 +230,22 @@ defmodule Amqpx.Helper do
Exchange.declare(channel, name, type)
end

@spec dead_letter_queue_opts(dead_letter_queue_spec) :: Keyword.t()
defp dead_letter_queue_opts(spec) do
Map.get(spec, :queue_opts, @dead_letter_queue_defaults)
end

@spec set_dead_letter_queue_type(Keyword.t(), [{String.t(), atom, any}]) :: Keyword.t()
defp set_dead_letter_queue_type(dlq_opts, queue_args) do
case Enum.find(queue_args, &match?({"x-queue-type", :longstr, _}, &1)) do
nil ->
dlq_opts

queue_type ->
Keyword.update(dlq_opts, :arguments, [queue_type], &[queue_type | &1])
end
end

defp skip_dead_letter_routing_key_check_for,
do: Application.get_env(:amqpx, :skip_dead_letter_routing_key_check_for, [])
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Amqpx.MixProject do
use Mix.Project

@version "6.1.3"
@version "7.0.0"

def project do
[
Expand Down
40 changes: 40 additions & 0 deletions test/helper_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,46 @@ defmodule HelperTest do
Application.put_env(:amqpx, :skip_dead_letter_routing_key_check_for, [])
end

test "declare/2 propagates x-queue-type to dead letter queue declaration",
meta do
queue_name = rand_name()
routing_key_name = rand_name()
exchange_name = rand_name()
dead_letter_queue = "#{queue_name}_errored"

assert :ok ==
Helper.declare(meta[:chan], %{
exchanges: [
%{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic}
],
opts: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, dead_letter_queue},
{"x-queue-type", :longstr, "quorum"}
]
],
queue: queue_name
})

rabbit_manager = Application.get_env(:amqpx, :rabbit_manager_url).rabbit
amqp_conn = Application.get_env(:amqpx, :amqp_connection)
credentials = Base.encode64("#{amqp_conn[:username]}:#{amqp_conn[:password]}")
headers = [{~c"Authorization", "Basic #{credentials}"}]

assert {:ok, {{_, 200, ~c"OK"}, _headers, body}} =
:httpc.request(:get, {"http://#{rabbit_manager}/api/queues", headers}, [], [])

assert {:ok, queues} = Jason.decode(body)

assert %{"durable" => true, "arguments" => %{"x-queue-type" => "quorum"}} =
Enum.find(queues, fn q -> match?(%{"name" => ^queue_name}, q) end)

assert %{"durable" => true, "arguments" => %{"x-queue-type" => "quorum"}} =
Enum.find(queues, fn q -> match?(%{"name" => ^dead_letter_queue}, q) end)
end

defp rand_name do
:crypto.strong_rand_bytes(8) |> Base.encode64()
end
Expand Down
Loading