Skip to content

Commit f5b1f4e

Browse files
olikasgkjnilsson
authored andcommitted
Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: #7508 (cherry picked from commit 8e954ff)
1 parent 2679649 commit f5b1f4e

File tree

3 files changed

+238
-5
lines changed

3 files changed

+238
-5
lines changed

deps/amqp10_client/src/amqp10_msg.erl

+6
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
342342
Acc#'v1_0.properties'{message_id = utf8(V)};
343343
(user_id, V, Acc) when is_binary(V) ->
344344
Acc#'v1_0.properties'{user_id = {binary, V}};
345+
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
346+
Acc#'v1_0.properties'{user_id = binary(V)};
345347
(to, V, Acc) ->
346348
Acc#'v1_0.properties'{to = utf8(V)};
347349
(subject, V, Acc) ->
@@ -422,6 +424,8 @@ wrap_ap_value(true) ->
422424
{boolean, true};
423425
wrap_ap_value(false) ->
424426
{boolean, false};
427+
wrap_ap_value(F) when is_float(F) ->
428+
{double, F};
425429
wrap_ap_value(V) when is_binary(V) ->
426430
utf8(V);
427431
wrap_ap_value(V) when is_list(V) ->
@@ -472,6 +476,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
472476
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
473477
sym(B) when is_binary(B) -> {symbol, B}.
474478
uint(B) -> {uint, B}.
479+
binary(B) when is_binary(B) -> {binary, B};
480+
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.
475481

476482
has_value(undefined) -> false;
477483
has_value(_) -> true.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

+67-4
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ dest_endpoint(#{shovel_type := dynamic,
182182
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
183183
Tag = amqp10_msg:delivery_id(Msg),
184184
Payload = amqp10_msg:body_bin(Msg),
185-
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
185+
Props = props_to_map(Msg),
186+
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
186187
handle_source({amqp10_event, {connection, Conn, opened}},
187188
State = #{source := #{current := #{conn := Conn}}}) ->
188189
State;
@@ -380,7 +381,9 @@ add_forward_headers(_, Msg) -> Msg.
380381
set_message_properties(Props, Msg) ->
381382
%% this is effectively special handling properties from amqp 0.9.1
382383
maps:fold(
383-
fun(content_type, Ct, M) ->
384+
fun(_Key, undefined, M) ->
385+
M;
386+
(content_type, Ct, M) ->
384387
amqp10_msg:set_properties(
385388
#{content_type => to_binary(Ct)}, M);
386389
(content_encoding, Ct, M) ->
@@ -400,8 +403,8 @@ set_message_properties(Props, Msg) ->
400403
(message_id, Ct, M) ->
401404
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
402405
(timestamp, Ct, M) ->
403-
amqp10_msg:set_properties(#{creation_time => Ct}, M);
404-
(user_id, Ct, M) ->
406+
amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M);
407+
(user_id, Ct, M) when Ct =/= undefined ->
405408
amqp10_msg:set_properties(#{user_id => Ct}, M);
406409
(headers, Headers0, M) when is_list(Headers0) ->
407410
%% AMPQ 0.9.1 are added as applicatin properties
@@ -443,3 +446,63 @@ is_amqp10_compat(T) ->
443446
%% TODO: not all lists are compatible
444447
is_list(T) orelse
445448
is_boolean(T).
449+
450+
to_amqp091_compatible_value(Key, Value) when is_binary(Value) ->
451+
{Key, longstr, Value};
452+
to_amqp091_compatible_value(Key, Value) when is_integer(Value) ->
453+
{Key, long, Value};
454+
to_amqp091_compatible_value(Key, Value) when is_float(Value) ->
455+
{Key, double, Value};
456+
to_amqp091_compatible_value(Key, true) ->
457+
{Key, bool, true};
458+
to_amqp091_compatible_value(Key, false) ->
459+
{Key, bool, false};
460+
to_amqp091_compatible_value(_Key, _Value) ->
461+
undefined.
462+
463+
delivery_mode(Headers) ->
464+
case maps:get(durable, Headers, undefined) of
465+
undefined -> undefined;
466+
true -> 2;
467+
false -> 1
468+
end.
469+
470+
timestamp_10_to_091(T) when is_integer(T) ->
471+
trunc(T / 1000);
472+
timestamp_10_to_091(_) ->
473+
undefined.
474+
475+
timestamp_091_to_10(T) when is_integer(T) ->
476+
T * 1000;
477+
timestamp_091_to_10(_Value) ->
478+
undefined.
479+
480+
ttl(T) when is_integer(T) ->
481+
erlang:integer_to_binary(T);
482+
ttl(_T) -> undefined.
483+
484+
props_to_map(Msg) ->
485+
AppProps = amqp10_msg:application_properties(Msg),
486+
AppProps091Headers = lists:filtermap(fun({K, V}) ->
487+
case to_amqp091_compatible_value(K, V) of
488+
undefined ->
489+
false;
490+
Value ->
491+
{true, Value}
492+
end
493+
end, maps:to_list(AppProps)),
494+
InProps = amqp10_msg:properties(Msg),
495+
Headers = amqp10_msg:headers(Msg),
496+
#{
497+
headers => AppProps091Headers,
498+
content_type => maps:get(content_type, InProps, undefined),
499+
content_encoding => maps:get(content_encoding, InProps, undefined),
500+
delivery_mode => delivery_mode(Headers),
501+
priority => maps:get(priority, Headers, undefined),
502+
correlation_id => maps:get(correlation_id, InProps, undefined),
503+
reply_to => maps:get(reply_to, InProps, undefined),
504+
expiration => ttl(maps:get(ttl, Headers, undefined)),
505+
message_id => maps:get(message_id, InProps, undefined),
506+
timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)),
507+
user_id => maps:get(user_id, InProps, undefined)
508+
}.

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

+165-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ groups() ->
2727
autodelete_amqp091_dest_on_confirm,
2828
autodelete_amqp091_dest_on_publish,
2929
simple_amqp10_dest,
30-
simple_amqp10_src
30+
simple_amqp10_src,
31+
message_prop_conversion,
32+
message_prop_conversion_no_props
3133
]},
3234
{with_map_config, [], [
3335
simple,
@@ -169,6 +171,168 @@ simple_amqp10_src(Config) ->
169171
ok
170172
end).
171173

174+
message_prop_conversion(Config) ->
175+
MapConfig = ?config(map_config, Config),
176+
Src = ?config(srcq, Config),
177+
Dest = ?config(destq, Config),
178+
with_session(Config,
179+
fun (Sess) ->
180+
shovel_test_utils:set_param(
181+
Config,
182+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
183+
{<<"src-address">>, Src},
184+
{<<"dest-protocol">>, <<"amqp091">>},
185+
{<<"dest-queue">>, Dest},
186+
{<<"add-forward-headers">>, true},
187+
{<<"dest-add-timestamp-header">>, true},
188+
{<<"publish-properties">>,
189+
case MapConfig of
190+
true -> #{<<"cluster_id">> => <<"x">>};
191+
_ -> [{<<"cluster_id">>, <<"x">>}]
192+
end}
193+
]),
194+
LinkName = <<"dynamic-sender-", Dest/binary>>,
195+
Tag = <<"tag1">>,
196+
Payload = <<"payload">>,
197+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
198+
unsettled, unsettled_state),
199+
ok = await_amqp10_event(link, Sender, attached),
200+
Headers = #{durable => true, priority => 3, ttl => 180000},
201+
Msg = amqp10_msg:set_headers(Headers,
202+
amqp10_msg:new(Tag, Payload, false)),
203+
Msg2 = amqp10_msg:set_properties(#{
204+
message_id => <<"message-id">>,
205+
user_id => <<"guest">>,
206+
to => <<"to">>,
207+
subject => <<"subject">>,
208+
reply_to => <<"reply-to">>,
209+
correlation_id => <<"correlation-id">>,
210+
content_type => <<"content-type">>,
211+
content_encoding => <<"content-encoding">>,
212+
%absolute_expiry_time => 123456789,
213+
creation_time => 123456789,
214+
group_id => <<"group-id">>,
215+
group_sequence => 123,
216+
reply_to_group_id => <<"reply-to-group-id">>
217+
}, Msg),
218+
Msg3 = amqp10_msg:set_application_properties(#{
219+
<<"x-binary">> => <<"binary">>,
220+
<<"x-int">> => 33,
221+
<<"x-negative-int">> => -33,
222+
<<"x-float">> => 1.3,
223+
<<"x-true">> => true,
224+
<<"x-false">> => false
225+
}, Msg2),
226+
ok = amqp10_client:send_msg(Sender, Msg3),
227+
receive
228+
{amqp10_disposition, {accepted, Tag}} -> ok
229+
after 3000 ->
230+
exit(publish_disposition_not_received)
231+
end,
232+
amqp10_client:detach_link(Sender),
233+
Channel = rabbit_ct_client_helpers:open_channel(Config),
234+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{
235+
content_type = ReceivedContentType,
236+
content_encoding = ReceivedContentEncoding,
237+
headers = Headers2,
238+
delivery_mode = ReceivedDeliveryMode,
239+
priority = ReceivedPriority,
240+
correlation_id = ReceivedCorrelationId,
241+
reply_to = ReceivedReplyTo,
242+
expiration = ReceivedExpiration,
243+
message_id = ReceivedMessageId,
244+
timestamp = ReceivedTimestamp,
245+
type = _ReceivedType,
246+
user_id = ReceivedUserId,
247+
app_id = _ReceivedAppId,
248+
cluster_id = _ReceivedClusterId
249+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
250+
251+
?assertEqual(<<"payload">>, Payload),
252+
?assertEqual(2, ReceivedDeliveryMode),
253+
?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)),
254+
?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)),
255+
?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)),
256+
?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)),
257+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)),
258+
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)),
259+
260+
?assertEqual(<<"content-type">>, ReceivedContentType),
261+
?assertEqual(<<"content-encoding">>, ReceivedContentEncoding),
262+
263+
?assertEqual(3, ReceivedPriority),
264+
?assertEqual(<<"correlation-id">>, ReceivedCorrelationId),
265+
?assertEqual(<<"reply-to">>, ReceivedReplyTo),
266+
?assertEqual(<<"180000">>, ReceivedExpiration),
267+
?assertEqual(<<"message-id">>, ReceivedMessageId),
268+
?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000
269+
?assertEqual(<<"guest">>, ReceivedUserId),
270+
ok
271+
end).
272+
273+
message_prop_conversion_no_props(Config) ->
274+
MapConfig = ?config(map_config, Config),
275+
Src = ?config(srcq, Config),
276+
Dest = ?config(destq, Config),
277+
with_session(Config,
278+
fun (Sess) ->
279+
shovel_test_utils:set_param(
280+
Config,
281+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
282+
{<<"src-address">>, Src},
283+
{<<"dest-protocol">>, <<"amqp091">>},
284+
{<<"dest-queue">>, Dest},
285+
{<<"add-forward-headers">>, true},
286+
{<<"dest-add-timestamp-header">>, true},
287+
{<<"publish-properties">>,
288+
case MapConfig of
289+
true -> #{<<"cluster_id">> => <<"x">>};
290+
_ -> [{<<"cluster_id">>, <<"x">>}]
291+
end}
292+
]),
293+
LinkName = <<"dynamic-sender-", Dest/binary>>,
294+
Tag = <<"tag1">>,
295+
Payload = <<"payload">>,
296+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
297+
unsettled, unsettled_state),
298+
ok = await_amqp10_event(link, Sender, attached),
299+
Msg = amqp10_msg:new(Tag, Payload, false),
300+
ok = amqp10_client:send_msg(Sender, Msg),
301+
receive
302+
{amqp10_disposition, {accepted, Tag}} -> ok
303+
after 3000 ->
304+
exit(publish_disposition_not_received)
305+
end,
306+
amqp10_client:detach_link(Sender),
307+
Channel = rabbit_ct_client_helpers:open_channel(Config),
308+
{#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{
309+
content_type = undefined,
310+
content_encoding = undefined,
311+
headers = ReceivedHeaders,
312+
delivery_mode = ReceivedDeliveryMode,
313+
priority = ReceivedPriority,
314+
correlation_id = undefined,
315+
reply_to = undefined,
316+
expiration = undefined,
317+
message_id = undefined,
318+
timestamp = undefined,
319+
type = undefined,
320+
user_id = undefined,
321+
app_id = undefined,
322+
cluster_id = ReceivedClusterId
323+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
324+
325+
?assertEqual(<<"payload">>, ReceivedPayload),
326+
?assertEqual(1, ReceivedDeliveryMode),
327+
?assertEqual(<<"x">>, ReceivedClusterId),
328+
?assertEqual(4, ReceivedPriority),
329+
330+
?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)),
331+
332+
ok
333+
end).
334+
335+
172336
change_definition(Config) ->
173337
Src = ?config(srcq, Config),
174338
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)