diff --git a/deps/rabbit/src/pid_recomposition.erl b/deps/rabbit/src/pid_recomposition.erl index ae927016a571..95f49e51be21 100644 --- a/deps/rabbit/src/pid_recomposition.erl +++ b/deps/rabbit/src/pid_recomposition.erl @@ -7,7 +7,6 @@ -module(pid_recomposition). - %% API -export([ to_binary/1, @@ -19,40 +18,23 @@ -define(TTB_PREFIX, 131). -define(NEW_PID_EXT, 88). --define(PID_EXT, 103). -define(ATOM_UTF8_EXT, 118). -define(SMALL_ATOM_UTF8_EXT, 119). -%% -%% API -%% - -spec decompose(pid()) -> #{atom() => any()}. decompose(Pid) -> from_binary(term_to_binary(Pid, [{minor_version, 2}])). -spec from_binary(binary()) -> #{atom() => any()}. from_binary(Bin) -> - PidData = case Bin of - %% Erlang 23+ - <> -> Val0; - %% Erlang 22 - <> -> Val1 - end, + <> = Bin, {Node, Rest2} = case PidData of <> -> {Node0, Rest1}; <> -> {Node0, Rest1} end, - {ID, Serial, Creation} = case Rest2 of - %% NEW_PID_EXT on Erlang 23+ - <> -> - {ID0, Serial0, Creation0}; - %% PID_EXT on Erlang 22 - <> -> - {ID1, Serial1, Creation1} - end, + <> = Rest2, #{ node => binary_to_atom(Node, utf8), id => ID, @@ -62,9 +44,16 @@ from_binary(Bin) -> -spec to_binary(#{atom() => any()}) -> binary(). to_binary(#{node := Node, id := ID, serial := Serial, creation := Creation}) -> - BinNode = atom_to_binary(Node, utf8), + BinNode = atom_to_binary(Node), NodeLen = byte_size(BinNode), - <>. + <>. -spec recompose(#{atom() => any()}) -> pid(). recompose(M) -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 57b26c187bce..c98326837075 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -297,22 +297,11 @@ send_command(Pid, Msg) -> -spec deliver_reply(binary(), mc:state()) -> 'ok'. deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) -> - case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, - rabbit_nodes:all_running_with_hashes()) of + Nodes = rabbit_nodes:all_running_with_hashes(), + case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of {ok, Pid, Key} -> - delegate:invoke_no_result(Pid, {?MODULE, deliver_reply_local, - [Key, Message]}); - {error, _} -> - deliver_reply_v1(EncodedBin, Message) - end. - --spec deliver_reply_v1(binary(), mc:state()) -> 'ok'. -deliver_reply_v1(EncodedBin, Message) -> - %% the the original encoding function - case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of - {ok, V1Pid, V1Key} -> - delegate:invoke_no_result(V1Pid, - {?MODULE, deliver_reply_local, [V1Key, Message]}); + delegate:invoke_no_result( + Pid, {?MODULE, deliver_reply_local, [Key, Message]}); {error, _} -> ok end. @@ -331,30 +320,19 @@ deliver_reply_local(Pid, Key, Message) -> declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> exists; declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> - case rabbit_direct_reply_to:decode_reply_to_v2(EncodedBin, rabbit_nodes:all_running_with_hashes()) of - {error, _} -> - declare_fast_reply_to_v1(EncodedBin); + Nodes = rabbit_nodes:all_running_with_hashes(), + case rabbit_direct_reply_to:decode_reply_to(EncodedBin, Nodes) of {ok, Pid, Key} -> Msg = {declare_fast_reply_to, Key}, rabbit_misc:with_exit_handler( rabbit_misc:const(not_found), - fun() -> gen_server2:call(Pid, Msg, infinity) end) + fun() -> gen_server2:call(Pid, Msg, infinity) end); + {error, _} -> + not_found end; declare_fast_reply_to(_) -> not_found. -declare_fast_reply_to_v1(EncodedBin) -> - %% the the original encoding function - case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of - {ok, V1Pid, V1Key} -> - Msg = {declare_fast_reply_to, V1Key}, - rabbit_misc:with_exit_handler( - rabbit_misc:const(not_found), - fun() -> gen_server2:call(V1Pid, Msg, infinity) end); - {error, _} -> - not_found - end. - -spec list() -> [pid()]. list() -> @@ -1319,7 +1297,7 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, Other -> Other end, %% Precalculate both suffix and key - {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix_v2(self()), + {Key, Suffix} = rabbit_direct_reply_to:compute_key_and_suffix(self()), Consumer = {CTag, Suffix, Key}, State1 = State#ch{reply_consumer = Consumer}, case NoWait of diff --git a/deps/rabbit/src/rabbit_direct_reply_to.erl b/deps/rabbit/src/rabbit_direct_reply_to.erl index e1080c6544e1..377ceeb6fcbd 100644 --- a/deps/rabbit/src/rabbit_direct_reply_to.erl +++ b/deps/rabbit/src/rabbit_direct_reply_to.erl @@ -7,45 +7,14 @@ -module(rabbit_direct_reply_to). -%% API --export([ - %% Original amq.rabbitmq.reply-to target channel encoding - compute_key_and_suffix_v1/1, - decode_reply_to_v1/1, +-export([compute_key_and_suffix/1, + decode_reply_to/2]). - %% v2 amq.rabbitmq.reply-to target channel encoding - compute_key_and_suffix_v2/1, - decode_reply_to_v2/2 -]). - -%% -%% API -%% - --type decoded_pid_and_key() :: {ok, pid(), binary()}. - --spec compute_key_and_suffix_v1(pid()) -> {binary(), binary()}. -%% This original pid encoding function produces values that exceed routing key length limit -%% on nodes with long (say, 130+ characters) node names. -compute_key_and_suffix_v1(Pid) -> - Key = base64:encode(rabbit_guid:gen()), - PidEnc = base64:encode(term_to_binary(Pid)), - Suffix = <>, - {Key, Suffix}. - --spec decode_reply_to_v1(binary()) -> decoded_pid_and_key() | {error, any()}. -decode_reply_to_v1(Bin) -> - case string:lexemes(Bin, ".") of - [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), - {ok, Pid, unicode:characters_to_binary(Key)}; - _ -> {error, unrecognized_format} - end. - - --spec compute_key_and_suffix_v2(pid()) -> {binary(), binary()}. %% This pid encoding function produces values that are of mostly fixed size %% regardless of the node name length. -compute_key_and_suffix_v2(Pid) -> +-spec compute_key_and_suffix(pid()) -> + {binary(), binary()}. +compute_key_and_suffix(Pid) -> Key = base64:encode(rabbit_guid:gen()), PidParts0 = #{node := Node} = pid_recomposition:decompose(Pid), @@ -61,19 +30,22 @@ compute_key_and_suffix_v2(Pid) -> Suffix = <>, {Key, Suffix}. --spec decode_reply_to_v2(binary(), #{non_neg_integer() => node()}) -> decoded_pid_and_key() | {error, any()}. -decode_reply_to_v2(Bin, CandidateNodes) -> +-spec decode_reply_to(binary(), #{non_neg_integer() => node()}) -> + {ok, pid(), binary()} | {error, any()}. +decode_reply_to(Bin, CandidateNodes) -> try [PidEnc, Key] = binary:split(Bin, <<".">>), RawPidBin = base64:decode(PidEnc), PidParts0 = #{node := ShortenedNodename} = pid_recomposition:from_binary(RawPidBin), {_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename), case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of - undefined -> {error, target_node_not_found}; + undefined -> + {error, target_node_not_found}; Candidate -> PidParts = maps:update(node, Candidate, PidParts0), {ok, pid_recomposition:recompose(PidParts), Key} end catch - error:_ -> {error, unrecognized_format} + error:_ -> + {error, unrecognized_format} end. diff --git a/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl b/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl index 59451186ce94..7ae0c4d568ab 100644 --- a/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl +++ b/deps/rabbit/test/rabbit_direct_reply_to_prop_SUITE.erl @@ -8,7 +8,7 @@ all() -> [ - decode_reply_to_v2 + decode_reply_to ]. init_per_suite(Config) -> @@ -32,7 +32,7 @@ end_per_testcase(_TestCase, _Config) -> %%% Tests %%% -decode_reply_to_v2(Config) -> +decode_reply_to(Config) -> rabbit_ct_proper_helpers:run_proper( fun() -> prop_decode_reply_to(Config) end, [], @@ -61,9 +61,9 @@ prop_decode_reply_to(_) -> NonB64 = <<0, Random/binary>>, {ok, pid_recomposition:recompose(PidParts), Key} =:= - rabbit_direct_reply_to:decode_reply_to_v2(IxBin, NodeMap) + rabbit_direct_reply_to:decode_reply_to(IxBin, NodeMap) andalso {error, target_node_not_found} =:= - rabbit_direct_reply_to:decode_reply_to_v2(IxBin, NoNodeMap) + rabbit_direct_reply_to:decode_reply_to(IxBin, NoNodeMap) andalso {error, unrecognized_format} =:= - rabbit_direct_reply_to:decode_reply_to_v2(NonB64, NodeMap) + rabbit_direct_reply_to:decode_reply_to(NonB64, NodeMap) end).