Skip to content

Commit 21c37b3

Browse files
olikasgluos
authored andcommitted
Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: rabbitmq#7508
1 parent 741fd43 commit 21c37b3

File tree

3 files changed

+244
-9
lines changed

3 files changed

+244
-9
lines changed

deps/amqp10_client/src/amqp10_msg.erl

+10-4
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ set_headers(Headers, #amqp10_msg{header = Current} = Msg) ->
306306
H = maps:fold(fun(durable, V, Acc) ->
307307
Acc#'v1_0.header'{durable = V};
308308
(priority, V, Acc) ->
309-
Acc#'v1_0.header'{priority = {uint, V}};
309+
Acc#'v1_0.header'{priority = {ubyte, V}};
310310
(first_acquirer, V, Acc) ->
311311
Acc#'v1_0.header'{first_acquirer = V};
312312
(ttl, V, Acc) ->
@@ -325,8 +325,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
325325
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
326326
% message_id can be any type but we restrict it here
327327
Acc#'v1_0.properties'{message_id = utf8(V)};
328-
(user_id, V, Acc) ->
329-
Acc#'v1_0.properties'{user_id = utf8(V)};
328+
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
329+
Acc#'v1_0.properties'{user_id = binary(V)};
330330
(to, V, Acc) ->
331331
Acc#'v1_0.properties'{to = utf8(V)};
332332
(subject, V, Acc) ->
@@ -407,8 +407,12 @@ wrap_ap_value(true) ->
407407
{boolean, true};
408408
wrap_ap_value(false) ->
409409
{boolean, false};
410-
wrap_ap_value(V) when is_integer(V) ->
410+
wrap_ap_value(V) when is_integer(V) andalso V >= 0 ->
411411
{uint, V};
412+
wrap_ap_value(V) when is_integer(V) andalso V < 0 ->
413+
{int, V};
414+
wrap_ap_value(F) when is_float(F) ->
415+
{double, F};
412416
wrap_ap_value(V) when is_binary(V) ->
413417
utf8(V);
414418
wrap_ap_value(V) when is_list(V) ->
@@ -449,6 +453,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
449453
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
450454
sym(B) when is_binary(B) -> {symbol, B}.
451455
uint(B) -> {uint, B}.
456+
binary(B) when is_binary(B) -> {binary, B};
457+
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.
452458

453459
has_value(undefined) -> false;
454460
has_value(_) -> true.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

+69-4
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ dest_endpoint(#{shovel_type := dynamic,
178178
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
179179
Tag = amqp10_msg:delivery_id(Msg),
180180
Payload = amqp10_msg:body_bin(Msg),
181-
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
181+
Props = props_to_map(Msg),
182+
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
182183
handle_source({amqp10_event, {connection, Conn, opened}},
183184
State = #{source := #{current := #{conn := Conn}}}) ->
184185
State;
@@ -382,23 +383,27 @@ add_forward_headers(_, Msg) -> Msg.
382383
set_message_properties(Props, Msg) ->
383384
%% this is effectively special handling properties from amqp 0.9.1
384385
maps:fold(
385-
fun(content_type, Ct, M) ->
386+
fun(_Key, undefined, M) ->
387+
M;
388+
(content_type, Ct, M) ->
386389
amqp10_msg:set_properties(
387390
#{content_type => to_binary(Ct)}, M);
388391
(content_encoding, Ct, M) ->
389392
amqp10_msg:set_properties(
390393
#{content_encoding => to_binary(Ct)}, M);
391394
(delivery_mode, 2, M) ->
392395
amqp10_msg:set_headers(#{durable => true}, M);
396+
(priority, P, M) when is_integer(P) ->
397+
amqp10_msg:set_headers(#{priority => P}, M);
393398
(correlation_id, Ct, M) ->
394399
amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M);
395400
(reply_to, Ct, M) ->
396401
amqp10_msg:set_properties(#{reply_to => to_binary(Ct)}, M);
397402
(message_id, Ct, M) ->
398403
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
399404
(timestamp, Ct, M) ->
400-
amqp10_msg:set_properties(#{creation_time => Ct}, M);
401-
(user_id, Ct, M) ->
405+
amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M);
406+
(user_id, Ct, M) when Ct =/= undefined ->
402407
amqp10_msg:set_properties(#{user_id => Ct}, M);
403408
(headers, Headers0, M) when is_list(Headers0) ->
404409
%% AMPQ 0.9.1 are added as applicatin properties
@@ -440,3 +445,63 @@ is_amqp10_compat(T) ->
440445
%% TODO: not all lists are compatible
441446
is_list(T) orelse
442447
is_boolean(T).
448+
449+
to_amqp091_compatible_value(Key, Value) when is_binary(Value) ->
450+
{Key, longstr, Value};
451+
to_amqp091_compatible_value(Key, Value) when is_integer(Value) ->
452+
{Key, long, Value};
453+
to_amqp091_compatible_value(Key, Value) when is_float(Value) ->
454+
{Key, double, Value};
455+
to_amqp091_compatible_value(Key, true) ->
456+
{Key, bool, true};
457+
to_amqp091_compatible_value(Key, false) ->
458+
{Key, bool, false};
459+
to_amqp091_compatible_value(_Key, _Value) ->
460+
undefined.
461+
462+
delivery_mode(Headers) ->
463+
case maps:get(durable, Headers, undefined) of
464+
undefined -> undefined;
465+
true -> 2;
466+
false -> 1
467+
end.
468+
469+
timestamp_10_to_091(undefined) ->
470+
undefined;
471+
timestamp_10_to_091(T) ->
472+
trunc(T / 1000).
473+
474+
timestamp_091_to_10(T) when is_integer(T) ->
475+
T * 1000;
476+
timestamp_091_to_10(_Value) ->
477+
undefined.
478+
479+
ttl(T) when is_integer(T) ->
480+
erlang:integer_to_binary(T);
481+
ttl(_T) -> undefined.
482+
483+
props_to_map(Msg) ->
484+
AppProps = amqp10_msg:application_properties(Msg),
485+
AppProps091Headers = lists:filtermap(fun({K, V}) ->
486+
case to_amqp091_compatible_value(K, V) of
487+
undefined ->
488+
false;
489+
Value ->
490+
{true, Value}
491+
end
492+
end, maps:to_list(AppProps)),
493+
InProps = amqp10_msg:properties(Msg),
494+
Headers = amqp10_msg:headers(Msg),
495+
#{
496+
headers => AppProps091Headers,
497+
content_type => maps:get(content_type, InProps, undefined),
498+
content_encoding => maps:get(content_encoding, InProps, undefined),
499+
delivery_mode => delivery_mode(Headers),
500+
priority => maps:get(priority, Headers, undefined),
501+
correlation_id => maps:get(correlation_id, InProps, undefined),
502+
reply_to => maps:get(reply_to, InProps, undefined),
503+
expiration => ttl(maps:get(ttl, Headers, undefined)),
504+
message_id => maps:get(message_id, InProps, undefined),
505+
timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)),
506+
user_id => maps:get(user_id, InProps, undefined)
507+
}.

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

+165-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ groups() ->
2929
autodelete_amqp091_dest_on_confirm,
3030
autodelete_amqp091_dest_on_publish,
3131
simple_amqp10_dest,
32-
simple_amqp10_src
32+
simple_amqp10_src,
33+
message_prop_conversion,
34+
message_prop_conversion_no_props
3335
]},
3436
{with_map_config, [], [
3537
simple,
@@ -168,6 +170,168 @@ simple_amqp10_src(Config) ->
168170
ok
169171
end).
170172

173+
message_prop_conversion(Config) ->
174+
MapConfig = ?config(map_config, Config),
175+
Src = ?config(srcq, Config),
176+
Dest = ?config(destq, Config),
177+
with_session(Config,
178+
fun (Sess) ->
179+
shovel_test_utils:set_param(
180+
Config,
181+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
182+
{<<"src-address">>, Src},
183+
{<<"dest-protocol">>, <<"amqp091">>},
184+
{<<"dest-queue">>, Dest},
185+
{<<"add-forward-headers">>, true},
186+
{<<"dest-add-timestamp-header">>, true},
187+
{<<"publish-properties">>,
188+
case MapConfig of
189+
true -> #{<<"cluster_id">> => <<"x">>};
190+
_ -> [{<<"cluster_id">>, <<"x">>}]
191+
end}
192+
]),
193+
LinkName = <<"dynamic-sender-", Dest/binary>>,
194+
Tag = <<"tag1">>,
195+
Payload = <<"payload">>,
196+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
197+
unsettled, unsettled_state),
198+
ok = await_amqp10_event(link, Sender, attached),
199+
Headers = #{durable => true, priority => 3, ttl => 180000},
200+
Msg = amqp10_msg:set_headers(Headers,
201+
amqp10_msg:new(Tag, Payload, false)),
202+
Msg2 = amqp10_msg:set_properties(#{
203+
message_id => <<"message-id">>,
204+
user_id => <<"guest">>,
205+
to => <<"to">>,
206+
subject => <<"subject">>,
207+
reply_to => <<"reply-to">>,
208+
correlation_id => <<"correlation-id">>,
209+
content_type => <<"content-type">>,
210+
content_encoding => <<"content-encoding">>,
211+
%absolute_expiry_time => 123456789,
212+
creation_time => 123456789,
213+
group_id => <<"group-id">>,
214+
group_sequence => 123,
215+
reply_to_group_id => <<"reply-to-group-id">>
216+
}, Msg),
217+
Msg3 = amqp10_msg:set_application_properties(#{
218+
<<"x-binary">> => <<"binary">>,
219+
<<"x-int">> => 33,
220+
<<"x-negative-int">> => -33,
221+
<<"x-float">> => 1.3,
222+
<<"x-true">> => true,
223+
<<"x-false">> => false
224+
}, Msg2),
225+
ok = amqp10_client:send_msg(Sender, Msg3),
226+
receive
227+
{amqp10_disposition, {accepted, Tag}} -> ok
228+
after 3000 ->
229+
exit(publish_disposition_not_received)
230+
end,
231+
amqp10_client:detach_link(Sender),
232+
Channel = rabbit_ct_client_helpers:open_channel(Config),
233+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{
234+
content_type = ReceivedContentType,
235+
content_encoding = ReceivedContentEncoding,
236+
headers = Headers2,
237+
delivery_mode = ReceivedDeliveryMode,
238+
priority = ReceivedPriority,
239+
correlation_id = ReceivedCorrelationId,
240+
reply_to = ReceivedReplyTo,
241+
expiration = ReceivedExpiration,
242+
message_id = ReceivedMessageId,
243+
timestamp = ReceivedTimestamp,
244+
type = _ReceivedType,
245+
user_id = ReceivedUserId,
246+
app_id = _ReceivedAppId,
247+
cluster_id = _ReceivedClusterId
248+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
249+
250+
?assertEqual(<<"payload">>, Payload),
251+
?assertEqual(2, ReceivedDeliveryMode),
252+
?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)),
253+
?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)),
254+
?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)),
255+
?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)),
256+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)),
257+
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)),
258+
259+
?assertEqual(<<"content-type">>, ReceivedContentType),
260+
?assertEqual(<<"content-encoding">>, ReceivedContentEncoding),
261+
262+
?assertEqual(3, ReceivedPriority),
263+
?assertEqual(<<"correlation-id">>, ReceivedCorrelationId),
264+
?assertEqual(<<"reply-to">>, ReceivedReplyTo),
265+
?assertEqual(<<"180000">>, ReceivedExpiration),
266+
?assertEqual(<<"message-id">>, ReceivedMessageId),
267+
?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000
268+
?assertEqual(<<"guest">>, ReceivedUserId),
269+
ok
270+
end).
271+
272+
message_prop_conversion_no_props(Config) ->
273+
MapConfig = ?config(map_config, Config),
274+
Src = ?config(srcq, Config),
275+
Dest = ?config(destq, Config),
276+
with_session(Config,
277+
fun (Sess) ->
278+
shovel_test_utils:set_param(
279+
Config,
280+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
281+
{<<"src-address">>, Src},
282+
{<<"dest-protocol">>, <<"amqp091">>},
283+
{<<"dest-queue">>, Dest},
284+
{<<"add-forward-headers">>, true},
285+
{<<"dest-add-timestamp-header">>, true},
286+
{<<"publish-properties">>,
287+
case MapConfig of
288+
true -> #{<<"cluster_id">> => <<"x">>};
289+
_ -> [{<<"cluster_id">>, <<"x">>}]
290+
end}
291+
]),
292+
LinkName = <<"dynamic-sender-", Dest/binary>>,
293+
Tag = <<"tag1">>,
294+
Payload = <<"payload">>,
295+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
296+
unsettled, unsettled_state),
297+
ok = await_amqp10_event(link, Sender, attached),
298+
Msg = amqp10_msg:new(Tag, Payload, false),
299+
ok = amqp10_client:send_msg(Sender, Msg),
300+
receive
301+
{amqp10_disposition, {accepted, Tag}} -> ok
302+
after 3000 ->
303+
exit(publish_disposition_not_received)
304+
end,
305+
amqp10_client:detach_link(Sender),
306+
Channel = rabbit_ct_client_helpers:open_channel(Config),
307+
{#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{
308+
content_type = undefined,
309+
content_encoding = undefined,
310+
headers = ReceivedHeaders,
311+
delivery_mode = ReceivedDeliveryMode,
312+
priority = ReceivedPriority,
313+
correlation_id = undefined,
314+
reply_to = undefined,
315+
expiration = undefined,
316+
message_id = undefined,
317+
timestamp = undefined,
318+
type = undefined,
319+
user_id = undefined,
320+
app_id = undefined,
321+
cluster_id = ReceivedClusterId
322+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
323+
324+
?assertEqual(<<"payload">>, ReceivedPayload),
325+
?assertEqual(1, ReceivedDeliveryMode),
326+
?assertEqual(<<"x">>, ReceivedClusterId),
327+
?assertEqual(4, ReceivedPriority),
328+
329+
?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)),
330+
331+
ok
332+
end).
333+
334+
171335
change_definition(Config) ->
172336
Src = ?config(srcq, Config),
173337
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)