-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shovel: convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers #10022
Conversation
Message containers were introduced in part to make this kind of conversions easier to pull off. |
@luos why was the priority value moved to a header in AMQP 1.0? I doubt we can ship this in 3.12.x because of this change. |
Hi, I think we can definitely remove the priority change, I changed this because there is already a place for it in the AMQP 1.0 Headers and it gets placed into Application Properties. This is not critical for us. Regarding the MC, I'd be happy to make this more MC compatible, though I am quite unsure how as shovels are a level above that if I understand it correctly, or if somehow we can make this in a way that in the future it can be easily changed to use MC? |
Ah, so it is a header in AMQP 1.0 (section |
@luos for Shovels specifically this can remain where it is. Larger changes can come in Shovel v2 for which we have a lot of other internal changes. |
@luos we can drop the configuration key and treat this as a bug fix for Shovels with AMQP 1.0 destinations. |
We prepared the backporting here: #10037 We will work on the MC version at the end of this week. |
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: rabbitmq#7508
Hi @michaelklishin , We've started looking at the conversion with MC. We are planning to remove the We see that there is a somewhat related PR already which contains some AMQP 1.0 MC conversion functions, we may depend on these changes. What would be your preference, should we base our changes on that PR (#9022) by @ansd ? |
- remove old code converinting message properties to map and any related conversion function - use mc to hold the message - implement some helper functions to convert amqp10 to mc - use some reasonable default for AMQP1.0 exchange and routing keys as they are mandatory for mc to work - the differences in behaviour are the following compared to 3.12.x - default priority is undefined instead of 4 - some long tagged amqp values are now signedint / unsignedint
2c5b2ff
to
2695dd9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @michaelklishin , we've updated the changes already merged into 3.12.x with the conversion to mc
. Let us know what you think.
@@ -63,6 +63,10 @@ | |||
init(Sections) when is_list(Sections) -> | |||
Msg = decode(Sections, #msg{}), | |||
init(Msg); | |||
init(Amqp10Msg) when element(1, Amqp10Msg) =:= amqp10_msg -> | |||
[#'v1_0.transfer'{}| AmqpRecords] = amqp10_msg:to_amqp_records(Amqp10Msg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conversions seems a little bit hacky, though we could not come up with a better way.
By adding the v1_0.transfer
record match at the beginning of the list, it effectively drops that list item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduce an implicit dependency on the AMQP 1.0 client to RabbitMQ core. We plan on making AMQP 1.0 support a core feature later this year (after #9022 is merged) but we cannot accept a dependency on an AMQP 1.0 client.
Just like at some point rabbit_core
had to be introduced to avoid a dependency on amqp_client
for AMQP 0-9-1.
case send_msg(Link, Msg) of | ||
MsgCont = mc:convert(mc_amqp, InMsg), | ||
AmqpMessage = mc:protocol_state(MsgCont), | ||
NewMsg = amqp10_msg:from_amqp_records([#'v1_0.transfer'{} | AmqpMessage]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the other side of that conversion, we have to prepend the v1_0.transfer
record to have a valid amqp10_msg
.
% these are required values, however I don't think they make sense for AMQP 1.0 | ||
exchange => <<>>, | ||
routing_keys => [<<>>] | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exchange
and routing_keys
seem to be required by the mc
implementation. If it is not set, then some crashes can be expected.
@@ -168,6 +170,169 @@ simple_amqp10_src(Config) -> | |||
ok | |||
end). | |||
|
|||
message_prop_conversion(Config) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the same tests as on 3.12.x. With minimal changes, they produce the same output.
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State); | ||
Anns = #{exchange => Exchange, routing_keys => [RoutingKey]}, | ||
Content = rabbit_basic:build_content(Props0, [Payload]), | ||
Mc = mc:init(mc_amqpl, Content, Anns), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conversion as well looks a bit hacky. Let us know if you already have some better way to implement these.
@kjnilsson @ansd this must be the first use of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have decided to postpone this until after 3.13 ships and then #9022 is merged.
@@ -63,6 +63,10 @@ | |||
init(Sections) when is_list(Sections) -> | |||
Msg = decode(Sections, #msg{}), | |||
init(Msg); | |||
init(Amqp10Msg) when element(1, Amqp10Msg) =:= amqp10_msg -> | |||
[#'v1_0.transfer'{}| AmqpRecords] = amqp10_msg:to_amqp_records(Amqp10Msg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduce an implicit dependency on the AMQP 1.0 client to RabbitMQ core. We plan on making AMQP 1.0 support a core feature later this year (after #9022 is merged) but we cannot accept a dependency on an AMQP 1.0 client.
Just like at some point rabbit_core
had to be introduced to avoid a dependency on amqp_client
for AMQP 0-9-1.
Hi,
We've implemented Application Property conversion to AMQP 0.9.1 headers for shovel. We wanted this to work on 3.12 and we were not sure if this is appropriate for these message container changes.
Let us know what you think.
GitHub issue: #7508.
A
v3.12.x
version of this PR: #10037.Proposed Changes
Implement conversion of Application Properties to Headers.
Configurable by using the following key:
Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply.You can also fill these out after creating the PR.
If you're unsure about any of them, don't hesitate to ask on the mailing list.
We're here to help!
This is simply a reminder of what we are going to look for before merging your code.
CONTRIBUTING.md
document