Skip to content

Commit

Permalink
Merge pull request #13138 from rabbitmq/simplify-direct-reply-to
Browse files Browse the repository at this point in the history
Simplify Direct Reply-To
  • Loading branch information
michaelklishin authored Jan 24, 2025
2 parents c0bb5fb + 1267d59 commit 4627e7a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 99 deletions.
33 changes: 11 additions & 22 deletions deps/rabbit/src/pid_recomposition.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

-module(pid_recomposition).


%% API
-export([
to_binary/1,
Expand All @@ -19,40 +18,23 @@
-define(TTB_PREFIX, 131).

-define(NEW_PID_EXT, 88).
-define(PID_EXT, 103).
-define(ATOM_UTF8_EXT, 118).
-define(SMALL_ATOM_UTF8_EXT, 119).

%%
%% API
%%

-spec decompose(pid()) -> #{atom() => any()}.
decompose(Pid) ->
from_binary(term_to_binary(Pid, [{minor_version, 2}])).

-spec from_binary(binary()) -> #{atom() => any()}.
from_binary(Bin) ->
PidData = case Bin of
%% Erlang 23+
<<?TTB_PREFIX, ?NEW_PID_EXT, Val0/binary>> -> Val0;
%% Erlang 22
<<?TTB_PREFIX, ?PID_EXT, Val1/binary>> -> Val1
end,
<<?TTB_PREFIX, ?NEW_PID_EXT, PidData/binary>> = Bin,
{Node, Rest2} = case PidData of
<<?ATOM_UTF8_EXT, AtomLen:16/integer, Node0:AtomLen/binary, Rest1/binary>> ->
{Node0, Rest1};
<<?SMALL_ATOM_UTF8_EXT, AtomLen/integer, Node0:AtomLen/binary, Rest1/binary>> ->
{Node0, Rest1}
end,
{ID, Serial, Creation} = case Rest2 of
%% NEW_PID_EXT on Erlang 23+
<<ID0:32/integer, Serial0:32/integer, Creation0:32/integer>> ->
{ID0, Serial0, Creation0};
%% PID_EXT on Erlang 22
<<ID1:32/integer, Serial1:32/integer, Creation1:8/integer>> ->
{ID1, Serial1, Creation1}
end,
<<ID:32/integer, Serial:32/integer, Creation:32/integer>> = Rest2,
#{
node => binary_to_atom(Node, utf8),
id => ID,
Expand All @@ -62,9 +44,16 @@ from_binary(Bin) ->

-spec to_binary(#{atom() => any()}) -> binary().
to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) ->
BinNode = atom_to_binary(Node, utf8),
BinNode = atom_to_binary(Node),
NodeLen = byte_size(BinNode),
<<?TTB_PREFIX:8/unsigned, ?NEW_PID_EXT:8/unsigned, ?ATOM_UTF8_EXT:8/unsigned, NodeLen:16/unsigned, BinNode/binary, ID:32, Serial:32, Creation:32>>.
<<?TTB_PREFIX:8/unsigned,
?NEW_PID_EXT:8/unsigned,
?ATOM_UTF8_EXT:8/unsigned,
NodeLen:16/unsigned,
BinNode/binary,
ID:32,
Serial:32,
Creation:32>>.

-spec recompose(#{atom() => any()}) -> pid().
recompose(M) ->
Expand Down
42 changes: 10 additions & 32 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,11 @@ send_command(Pid, Msg) ->

-spec deliver_reply(binary(), mc:state()) -> 'ok'.
deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin,
rabbit_nodes:all_running_with_hashes()) of
Nodes = rabbit_nodes:all_running_with_hashes(),
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
{ok, Pid, Key} ->
delegate:invoke_no_result(Pid, {?MODULE, deliver_reply_local,
[Key, Message]});
{error, _} ->
deliver_reply_v1(EncodedBin, Message)
end.

-spec deliver_reply_v1(binary(), mc:state()) -> 'ok'.
deliver_reply_v1(EncodedBin, Message) ->
%% the the original encoding function
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
{ok, V1Pid, V1Key} ->
delegate:invoke_no_result(V1Pid,
{?MODULE, deliver_reply_local, [V1Key, Message]});
delegate:invoke_no_result(
Pid, {?MODULE, deliver_reply_local, [Key, Message]});
{error, _} ->
ok
end.
Expand All @@ -331,30 +320,19 @@ deliver_reply_local(Pid, Key, Message) ->
declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
exists;
declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of
{error, _} ->
declare_fast_reply_to_v1(EncodedBin);
Nodes = rabbit_nodes:all_running_with_hashes(),
case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of
{ok, Pid, Key} ->
Msg = {declare_fast_reply_to, Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(Pid, Msg, infinity) end)
fun() -> gen_server2:call(Pid, Msg, infinity) end);
{error, _} ->
not_found
end;
declare_fast_reply_to(_) ->
not_found.

declare_fast_reply_to_v1(EncodedBin) ->
%% the the original encoding function
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
{ok, V1Pid, V1Key} ->
Msg = {declare_fast_reply_to, V1Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(V1Pid, Msg, infinity) end);
{error, _} ->
not_found
end.

-spec list() -> [pid()].

list() ->
Expand Down Expand Up @@ -1319,7 +1297,7 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
Other -> Other
end,
%% Precalculate both suffix and key
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()),
{Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix(self()),
Consumer = {CTag, Suffix, Key},
State1 = State#ch{reply_consumer = Consumer},
case NoWait of
Expand Down
52 changes: 12 additions & 40 deletions deps/rabbit/src/rabbit_direct_reply_to.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,14 @@

-module(rabbit_direct_reply_to).

%% API
-export([
%% Original amq.rabbitmq.reply-to target channel encoding
compute_key_and_suffix_v1/1,
decode_reply_to_v1/1,
-export([compute_key_and_suffix/1,
decode_reply_to/2]).

%% v2 amq.rabbitmq.reply-to target channel encoding
compute_key_and_suffix_v2/1,
decode_reply_to_v2/2
]).

%%
%% API
%%

-type decoded_pid_and_key() :: {ok, pid(), binary()}.

-spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}.
%% This original pid encoding function produces values that exceed routing key length limit
%% on nodes with long (say, 130+ characters) node names.
compute_key_and_suffix_v1(Pid) ->
Key = base64:encode(rabbit_guid:gen()),
PidEnc = base64:encode(term_to_binary(Pid)),
Suffix = <<PidEnc/binary, ".", Key/binary>>,
{Key, Suffix}.

-spec decode_reply_to_v1(binary()) -> decoded_pid_and_key() | {error, any()}.
decode_reply_to_v1(Bin) ->
case string:lexemes(Bin, ".") of
[PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
{ok, Pid, unicode:characters_to_binary(Key)};
_ -> {error, unrecognized_format}
end.


-spec compute_key_and_suffix_v2(pid()) -> {binary(), binary()}.
%% This pid encoding function produces values that are of mostly fixed size
%% regardless of the node name length.
compute_key_and_suffix_v2(Pid) ->
-spec compute_key_and_suffix(pid()) ->
{binary(), binary()}.
compute_key_and_suffix(Pid) ->
Key = base64:encode(rabbit_guid:gen()),

PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid),
Expand All @@ -61,19 +30,22 @@ compute_key_and_suffix_v2(Pid) ->
Suffix = <<RecomposedEncoded/binary, ".", Key/binary>>,
{Key, Suffix}.

-spec decode_reply_to_v2(binary(), #{non_neg_integer() => node()}) -> decoded_pid_and_key() | {error, any()}.
decode_reply_to_v2(Bin, CandidateNodes) ->
-spec decode_reply_to(binary(), #{non_neg_integer() => node()}) ->
{ok, pid(), binary()} | {error, any()}.
decode_reply_to(Bin, CandidateNodes) ->
try
[PidEnc, Key] = binary:split(Bin, <<".">>),
RawPidBin = base64:decode(PidEnc),
PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin),
{_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename),
case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of
undefined -> {error, target_node_not_found};
undefined ->
{error, target_node_not_found};
Candidate ->
PidParts = maps:update(node, Candidate, PidParts0),
{ok, pid_recomposition:recompose(PidParts), Key}
end
catch
error:_ -> {error, unrecognized_format}
error:_ ->
{error, unrecognized_format}
end.
10 changes: 5 additions & 5 deletions deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

all() ->
[
decode_reply_to_v2
decode_reply_to
].

init_per_suite(Config) ->
Expand All @@ -32,7 +32,7 @@ end_per_testcase(_TestCase, _Config) ->
%%% Tests %%%


decode_reply_to_v2(Config) ->
decode_reply_to(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_decode_reply_to(Config) end,
[],
Expand Down Expand Up @@ -61,9 +61,9 @@ prop_decode_reply_to(_) ->
NonB64 = <<0, Random/binary>>,

{ok, pid_recomposition:recompose(PidParts), Key} =:=
rabbit_direct_reply_to:decode_reply_to_v2(IxBin, NodeMap)
rabbit_direct_reply_to:decode_reply_to(IxBin, NodeMap)
andalso {error, target_node_not_found} =:=
rabbit_direct_reply_to:decode_reply_to_v2(IxBin, NoNodeMap)
rabbit_direct_reply_to:decode_reply_to(IxBin, NoNodeMap)
andalso {error, unrecognized_format} =:=
rabbit_direct_reply_to:decode_reply_to_v2(NonB64, NodeMap)
rabbit_direct_reply_to:decode_reply_to(NonB64, NodeMap)
end).

0 comments on commit 4627e7a

Please sign in to comment.