Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shovel: convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers #10022

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
new/3,
set_handle/2,
set_settled/2,
set_delivery_tag/2,
set_message_format/2,
set_headers/2,
set_properties/2,
Expand Down Expand Up @@ -298,6 +299,10 @@ set_handle(Handle, #amqp10_msg{transfer = T} = Msg) ->
set_settled(Settled, #amqp10_msg{transfer = T} = Msg) ->
Msg#amqp10_msg{transfer = T#'v1_0.transfer'{settled = Settled}}.

-spec set_delivery_tag(binary(), amqp10_msg()) -> amqp10_msg().
set_delivery_tag(Tag, #amqp10_msg{transfer = T} = Msg) when is_binary(Tag) ->
Msg#amqp10_msg{transfer = T#'v1_0.transfer'{delivery_tag = {binary, Tag}}}.

%% @doc Set amqp message headers.
-spec set_headers(#{atom() => any()}, amqp10_msg()) -> amqp10_msg().
set_headers(Headers, #amqp10_msg{header = undefined} = Msg) ->
Expand All @@ -306,7 +311,7 @@ set_headers(Headers, #amqp10_msg{header = Current} = Msg) ->
H = maps:fold(fun(durable, V, Acc) ->
Acc#'v1_0.header'{durable = V};
(priority, V, Acc) ->
Acc#'v1_0.header'{priority = {uint, V}};
Acc#'v1_0.header'{priority = {ubyte, V}};
(first_acquirer, V, Acc) ->
Acc#'v1_0.header'{first_acquirer = V};
(ttl, V, Acc) ->
Expand All @@ -325,8 +330,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
% message_id can be any type but we restrict it here
Acc#'v1_0.properties'{message_id = utf8(V)};
(user_id, V, Acc) ->
Acc#'v1_0.properties'{user_id = utf8(V)};
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
Acc#'v1_0.properties'{user_id = binary(V)};
(to, V, Acc) ->
Acc#'v1_0.properties'{to = utf8(V)};
(subject, V, Acc) ->
Expand Down Expand Up @@ -407,8 +412,12 @@ wrap_ap_value(true) ->
{boolean, true};
wrap_ap_value(false) ->
{boolean, false};
wrap_ap_value(V) when is_integer(V) ->
wrap_ap_value(V) when is_integer(V) andalso V >= 0 ->
{uint, V};
wrap_ap_value(V) when is_integer(V) andalso V < 0 ->
{int, V};
wrap_ap_value(F) when is_float(F) ->
{double, F};
wrap_ap_value(V) when is_binary(V) ->
utf8(V);
wrap_ap_value(V) when is_list(V) ->
Expand Down Expand Up @@ -449,6 +458,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
sym(B) when is_binary(B) -> {symbol, B}.
uint(B) -> {uint, B}.
binary(B) when is_binary(B) -> {binary, B};
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.

has_value(undefined) -> false;
has_value(_) -> true.
4 changes: 4 additions & 0 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
init(Sections) when is_list(Sections) ->
Msg = decode(Sections, #msg{}),
init(Msg);
init(Amqp10Msg) when element(1, Amqp10Msg) =:= amqp10_msg ->
[#'v1_0.transfer'{}| AmqpRecords] = amqp10_msg:to_amqp_records(Amqp10Msg),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversions seems a little bit hacky, though we could not come up with a better way.

By adding the v1_0.transfer record match at the beginning of the list, it effectively drops that list item.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduce an implicit dependency on the AMQP 1.0 client to RabbitMQ core. We plan on making AMQP 1.0 support a core feature later this year (after #9022 is merged) but we cannot accept a dependency on an AMQP 1.0 client.

Just like at some point rabbit_core had to be introduced to avoid a dependency on amqp_client for AMQP 0-9-1.

Msg = decode(AmqpRecords, #msg{}),
init(Msg);
init(#msg{} = Msg) ->
%% TODO: as the essential annotations, durable, priority, ttl and delivery_count
%% is all we are interested in it isn't necessary to keep hold of the
Expand Down
89 changes: 25 additions & 64 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_shovel.hrl").


-export([
parse/2,
source_uri/1,
Expand All @@ -31,7 +32,7 @@
ack/3,
nack/3,
status/1,
forward/4
forward/3
]).

%% Function references should not be stored on the metadata store.
Expand Down Expand Up @@ -167,8 +168,8 @@ forward_pending(State) ->
case pop_pending(State) of
empty ->
State;
{{Tag, Props, Payload}, S} ->
S2 = do_forward(Tag, Props, Payload, S),
{{Tag, Msg}, S} ->
S2 = do_forward(Tag, Msg, S),
S3 = control_throttle(S2),
case is_blocked(S3) of
true ->
Expand All @@ -181,91 +182,51 @@ forward_pending(State) ->
end
end.

forward(IncomingTag, Props, Payload, State) ->
forward(IncomingTag, Msg, State) ->
case is_blocked(State) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Simply cache the forward.
PendingEntry = {IncomingTag, Props, Payload},
PendingEntry = {IncomingTag, Msg},
add_pending(PendingEntry, State);
false ->
State1 = do_forward(IncomingTag, Props, Payload, State),
State1 = do_forward(IncomingTag, Msg, State),
control_throttle(State1)
end.

do_forward(IncomingTag, Props, Payload,
do_forward(IncomingTag, InMsg,
State0 = #{dest := #{props_fun := {M, F, Args},
current := {_, _, DstUri},
fields_fun := {Mf, Ff, Argsf}}}) ->
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
% do publish
Exchange = maps:get(exchange, Props, undefined),
RoutingKey = maps:get(routing_key, Props, undefined),
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},

Mc = mc:convert(mc_amqpl, InMsg),

Exchange = mc:get_annotation(exchange, Mc),
RoutingKey = case mc:get_annotation(routing_keys, Mc) of
[Rk | _] -> Rk;
_ -> undefined
end,
{Props, Payload} = rabbit_basic:from_content(mc:protocol_state(Mc)),

Method = #'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]),
payload = Payload},
publish(IncomingTag, Method1, Msg1, State0).

props_from_map(Map) ->
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
content_encoding = maps:get(content_encoding, Map, undefined),
headers = maps:get(headers, Map, undefined),
delivery_mode = maps:get(delivery_mode, Map, undefined),
priority = maps:get(priority, Map, undefined),
correlation_id = maps:get(correlation_id, Map, undefined),
reply_to = maps:get(reply_to, Map, undefined),
expiration = maps:get(expiration, Map, undefined),
message_id = maps:get(message_id, Map, undefined),
timestamp = maps:get(timestamp, Map, undefined),
type = maps:get(type, Map, undefined),
user_id = maps:get(user_id, Map, undefined),
app_id = maps:get(app_id, Map, undefined),
cluster_id = maps:get(cluster_id, Map, undefined)}.

map_from_props(#'P_basic'{content_type = Content_type,
content_encoding = Content_encoding,
headers = Headers,
delivery_mode = Delivery_mode,
priority = Priority,
correlation_id = Correlation_id,
reply_to = Reply_to,
expiration = Expiration,
message_id = Message_id,
timestamp = Timestamp,
type = Type,
user_id = User_id,
app_id = App_id,
cluster_id = Cluster_id}) ->
lists:foldl(fun({_K, undefined}, Acc) -> Acc;
({K, V}, Acc) -> Acc#{K => V}
end, #{}, [{content_type, Content_type},
{content_encoding, Content_encoding},
{headers, Headers},
{delivery_mode, Delivery_mode},
{priority, Priority},
{correlation_id, Correlation_id},
{reply_to, Reply_to},
{expiration, Expiration},
{message_id, Message_id},
{timestamp, Timestamp},
{type, Type},
{user_id, User_id},
{app_id, App_id},
{cluster_id, Cluster_id}
]).

handle_source(#'basic.consume_ok'{}, State) ->
State;
handle_source({#'basic.deliver'{delivery_tag = Tag,
exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{props = Props0, payload = Payload}}, State) ->
Props = (map_from_props(Props0))#{exchange => Exchange,
routing_key => RoutingKey},
% forward to destination
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
Anns = #{exchange => Exchange, routing_keys => [RoutingKey]},
Content = rabbit_basic:build_content(Props0, [Payload]),
Mc = mc:init(mc_amqpl, Content, Anns),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion as well looks a bit hacky. Let us know if you already have some better way to implement these.

rabbit_shovel_behaviour:forward(Tag, Mc, State);

handle_source({'EXIT', Conn, Reason},
#{source := #{current := {Conn, _, _}}}) ->
Expand Down
111 changes: 33 additions & 78 deletions deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-behaviour(rabbit_shovel_behaviour).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include("rabbit_shovel.hrl").

-export([
Expand All @@ -31,7 +32,7 @@
ack/3,
nack/3,
status/1,
forward/4
forward/3
]).

-import(rabbit_misc, [pget/2, pget/3]).
Expand Down Expand Up @@ -175,10 +176,14 @@ dest_endpoint(#{shovel_type := dynamic,

-spec handle_source(Msg :: any(), state()) ->
not_handled | state() | {stop, any()}.
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
Tag = amqp10_msg:delivery_id(Msg),
Payload = amqp10_msg:body_bin(Msg),
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
handle_source({amqp10_msg, _LinkRef, Amqp10Msg}, State) ->
Tag = amqp10_msg:delivery_id(Amqp10Msg),
Mc = mc:init(mc_amqp, Amqp10Msg, #{
% these are required values, however I don't think they make sense for AMQP 1.0
exchange => <<>>,
routing_keys => [<<>>]
}),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exchange and routing_keys seem to be required by the mc implementation. If it is not set, then some crashes can be expected.

rabbit_shovel_behaviour:forward(Tag, Mc, State);
handle_source({amqp10_event, {connection, Conn, opened}},
State = #{source := #{current := #{conn := Conn}}}) ->
State;
Expand Down Expand Up @@ -251,8 +256,8 @@ handle_dest({amqp10_event, {link, Link, credited}},
%% we have credit so can begin to forward
State = State0#{dest => Dst#{link_state => credited,
pending => []}},
lists:foldl(fun ({A, B, C}, S) ->
forward(A, B, C, S)
lists:foldl(fun ({A, M}, S) ->
forward(A, M, S)
end, State, lists:reverse(Pend));
handle_dest({amqp10_event, {link, Link, _Evt}},
State= #{dest := #{current := #{link := Link}}}) ->
Expand Down Expand Up @@ -312,28 +317,27 @@ status(_) ->
%% Destination not yet connected
ignore.

-spec forward(Tag :: tag(), Props :: #{atom() => any()},
Payload :: binary(), state()) ->
-spec forward(Tag :: tag(), mc:state(), state()) ->
state() | {stop, any()}.
forward(_Tag, _Props, _Payload,
forward(_Tag, _Msg,
#{source := #{remaining_unacked := 0}} = State) ->
State;
forward(Tag, Props, Payload,
forward(Tag, Msg,
#{dest := #{current := #{link_state := attached},
pending := Pend0} = Dst} = State) ->
%% simply cache the forward oo
Pend = [{Tag, Props, Payload} | Pend0],
Pend = [{Tag, Msg} | Pend0],
State#{dest => Dst#{pending => {Pend}}};
forward(Tag, Props, Payload,
forward(Tag, InMsg,
#{dest := #{current := #{link := Link},
unacked := Unacked} = Dst,
ack_mode := AckMode} = State) ->
OutTag = rabbit_data_coercion:to_binary(Tag),
Msg0 = new_message(OutTag, Payload, State),
Msg = add_timestamp_header(
State, set_message_properties(
Props, add_forward_headers(State, Msg0))),
case send_msg(Link, Msg) of
MsgCont = mc:convert(mc_amqp, InMsg),
AmqpMessage = mc:protocol_state(MsgCont),
NewMsg = amqp10_msg:from_amqp_records([#'v1_0.transfer'{} | AmqpMessage]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other side of that conversion, we have to prepend the v1_0.transfer record to have a valid amqp10_msg.

NewMsg2 = with_shovel_info(OutTag, NewMsg, State),
case send_msg(Link, NewMsg2) of
ok ->
rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
Expand All @@ -349,6 +353,17 @@ forward(Tag, Props, Payload,
Stop
end.

with_shovel_info(OutTag, Message, #{ack_mode := AckMode,
dest := #{properties := Props,
application_properties := AppProps,
message_annotations := MsgAnns}} = State) ->
Msg0 = amqp10_msg:set_delivery_tag(OutTag, Message),
Msg1 = amqp10_msg:set_settled(AckMode =/= on_confirm, Msg0),
Msg2 = amqp10_msg:set_properties(Props, Msg1),
Msg3 = amqp10_msg:set_message_annotations(MsgAnns, Msg2),
Msg4 = amqp10_msg:set_application_properties(AppProps, Msg3),
add_timestamp_header(State, add_forward_headers(State, Msg4)).

send_msg(Link, Msg) ->
case amqp10_client:send_msg(Link, Msg) of
ok ->
Expand All @@ -361,15 +376,6 @@ send_msg(Link, Msg) ->
end
end.

new_message(Tag, Payload, #{ack_mode := AckMode,
dest := #{properties := Props,
application_properties := AppProps,
message_annotations := MsgAnns}}) ->
Msg0 = amqp10_msg:new(Tag, Payload, AckMode =/= on_confirm),
Msg1 = amqp10_msg:set_properties(Props, Msg0),
Msg = amqp10_msg:set_message_annotations(MsgAnns, Msg1),
amqp10_msg:set_application_properties(AppProps, Msg).

add_timestamp_header(#{dest := #{add_timestamp_header := true}}, Msg) ->
P =#{creation_time => os:system_time(milli_seconds)},
amqp10_msg:set_properties(P, Msg);
Expand All @@ -379,50 +385,6 @@ add_forward_headers(#{dest := #{cached_forward_headers := Props}}, Msg) ->
amqp10_msg:set_application_properties(Props, Msg);
add_forward_headers(_, Msg) -> Msg.

set_message_properties(Props, Msg) ->
%% this is effectively special handling properties from amqp 0.9.1
maps:fold(
fun(content_type, Ct, M) ->
amqp10_msg:set_properties(
#{content_type => to_binary(Ct)}, M);
(content_encoding, Ct, M) ->
amqp10_msg:set_properties(
#{content_encoding => to_binary(Ct)}, M);
(delivery_mode, 2, M) ->
amqp10_msg:set_headers(#{durable => true}, M);
(correlation_id, Ct, M) ->
amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M);
(reply_to, Ct, M) ->
amqp10_msg:set_properties(#{reply_to => to_binary(Ct)}, M);
(message_id, Ct, M) ->
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
(timestamp, Ct, M) ->
amqp10_msg:set_properties(#{creation_time => Ct}, M);
(user_id, Ct, M) ->
amqp10_msg:set_properties(#{user_id => Ct}, M);
(headers, Headers0, M) when is_list(Headers0) ->
%% AMPQ 0.9.1 are added as applicatin properties
%% TODO: filter headers to make safe
Headers = lists:foldl(
fun ({K, _T, V}, Acc) ->
case is_amqp10_compat(V) of
true ->
Acc#{to_binary(K) => V};
false ->
Acc
end
end, #{}, Headers0),
amqp10_msg:set_application_properties(Headers, M);
(Key, Value, M) ->
case is_amqp10_compat(Value) of
true ->
amqp10_msg:set_application_properties(
#{to_binary(Key) => Value}, M);
false ->
M
end
end, Msg, Props).

gen_unique_name(Pre0, Post0) ->
Pre = to_binary(Pre0),
Post = to_binary(Post0),
Expand All @@ -433,10 +395,3 @@ bin_to_hex(Bin) ->
<<<<if N >= 10 -> N -10 + $a;
true -> N + $0 end>>
|| <<N:4>> <= Bin>>.

is_amqp10_compat(T) ->
is_binary(T) orelse
is_number(T) orelse
%% TODO: not all lists are compatible
is_list(T) orelse
is_boolean(T).
Loading
Loading