Skip to content

Commit 9df76ec

Browse files
olikasgmergify[bot]
authored andcommitted
Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: #7508 (cherry picked from commit 8e954ff) # Conflicts: # deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
1 parent 8ddd40f commit 9df76ec

File tree

3 files changed

+245
-9
lines changed

3 files changed

+245
-9
lines changed

deps/amqp10_client/src/amqp10_msg.erl

+10-4
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ set_headers(Headers, #amqp10_msg{header = Current} = Msg) ->
306306
H = maps:fold(fun(durable, V, Acc) ->
307307
Acc#'v1_0.header'{durable = V};
308308
(priority, V, Acc) ->
309-
Acc#'v1_0.header'{priority = {uint, V}};
309+
Acc#'v1_0.header'{priority = {ubyte, V}};
310310
(first_acquirer, V, Acc) ->
311311
Acc#'v1_0.header'{first_acquirer = V};
312312
(ttl, V, Acc) ->
@@ -325,8 +325,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
325325
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
326326
% message_id can be any type but we restrict it here
327327
Acc#'v1_0.properties'{message_id = utf8(V)};
328-
(user_id, V, Acc) ->
329-
Acc#'v1_0.properties'{user_id = utf8(V)};
328+
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
329+
Acc#'v1_0.properties'{user_id = binary(V)};
330330
(to, V, Acc) ->
331331
Acc#'v1_0.properties'{to = utf8(V)};
332332
(subject, V, Acc) ->
@@ -407,8 +407,12 @@ wrap_ap_value(true) ->
407407
{boolean, true};
408408
wrap_ap_value(false) ->
409409
{boolean, false};
410-
wrap_ap_value(V) when is_integer(V) ->
410+
wrap_ap_value(V) when is_integer(V) andalso V >= 0 ->
411411
{uint, V};
412+
wrap_ap_value(V) when is_integer(V) andalso V < 0 ->
413+
{int, V};
414+
wrap_ap_value(F) when is_float(F) ->
415+
{double, F};
412416
wrap_ap_value(V) when is_binary(V) ->
413417
utf8(V);
414418
wrap_ap_value(V) when is_list(V) ->
@@ -449,6 +453,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
449453
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
450454
sym(B) when is_binary(B) -> {symbol, B}.
451455
uint(B) -> {uint, B}.
456+
binary(B) when is_binary(B) -> {binary, B};
457+
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.
452458

453459
has_value(undefined) -> false;
454460
has_value(_) -> true.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

+70-4
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ dest_endpoint(#{shovel_type := dynamic,
178178
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
179179
Tag = amqp10_msg:delivery_id(Msg),
180180
Payload = amqp10_msg:body_bin(Msg),
181-
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
181+
Props = props_to_map(Msg),
182+
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
182183
handle_source({amqp10_event, {connection, Conn, opened}},
183184
State = #{source := #{current := #{conn := Conn}}}) ->
184185
State;
@@ -382,17 +383,22 @@ add_forward_headers(_, Msg) -> Msg.
382383
set_message_properties(Props, Msg) ->
383384
%% this is effectively special handling properties from amqp 0.9.1
384385
maps:fold(
385-
fun(content_type, Ct, M) ->
386+
fun(_Key, undefined, M) ->
387+
M;
388+
(content_type, Ct, M) ->
386389
amqp10_msg:set_properties(
387390
#{content_type => to_binary(Ct)}, M);
388391
(content_encoding, Ct, M) ->
389392
amqp10_msg:set_properties(
390393
#{content_encoding => to_binary(Ct)}, M);
391394
(delivery_mode, 2, M) ->
392395
amqp10_msg:set_headers(#{durable => true}, M);
396+
<<<<<<< HEAD
393397
(delivery_mode, 1, M) ->
394398
% by default the durable flag is false
395399
M;
400+
=======
401+
>>>>>>> 8e954ff366 (Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers)
396402
(priority, P, M) when is_integer(P) ->
397403
amqp10_msg:set_headers(#{priority => P}, M);
398404
(correlation_id, Ct, M) ->
@@ -402,8 +408,8 @@ set_message_properties(Props, Msg) ->
402408
(message_id, Ct, M) ->
403409
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
404410
(timestamp, Ct, M) ->
405-
amqp10_msg:set_properties(#{creation_time => Ct}, M);
406-
(user_id, Ct, M) ->
411+
amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M);
412+
(user_id, Ct, M) when Ct =/= undefined ->
407413
amqp10_msg:set_properties(#{user_id => Ct}, M);
408414
(headers, Headers0, M) when is_list(Headers0) ->
409415
%% AMPQ 0.9.1 are added as applicatin properties
@@ -445,3 +451,63 @@ is_amqp10_compat(T) ->
445451
%% TODO: not all lists are compatible
446452
is_list(T) orelse
447453
is_boolean(T).
454+
455+
to_amqp091_compatible_value(Key, Value) when is_binary(Value) ->
456+
{Key, longstr, Value};
457+
to_amqp091_compatible_value(Key, Value) when is_integer(Value) ->
458+
{Key, long, Value};
459+
to_amqp091_compatible_value(Key, Value) when is_float(Value) ->
460+
{Key, double, Value};
461+
to_amqp091_compatible_value(Key, true) ->
462+
{Key, bool, true};
463+
to_amqp091_compatible_value(Key, false) ->
464+
{Key, bool, false};
465+
to_amqp091_compatible_value(_Key, _Value) ->
466+
undefined.
467+
468+
delivery_mode(Headers) ->
469+
case maps:get(durable, Headers, undefined) of
470+
undefined -> undefined;
471+
true -> 2;
472+
false -> 1
473+
end.
474+
475+
timestamp_10_to_091(T) when is_integer(T) ->
476+
trunc(T / 1000);
477+
timestamp_10_to_091(_) ->
478+
undefined.
479+
480+
timestamp_091_to_10(T) when is_integer(T) ->
481+
T * 1000;
482+
timestamp_091_to_10(_Value) ->
483+
undefined.
484+
485+
ttl(T) when is_integer(T) ->
486+
erlang:integer_to_binary(T);
487+
ttl(_T) -> undefined.
488+
489+
props_to_map(Msg) ->
490+
AppProps = amqp10_msg:application_properties(Msg),
491+
AppProps091Headers = lists:filtermap(fun({K, V}) ->
492+
case to_amqp091_compatible_value(K, V) of
493+
undefined ->
494+
false;
495+
Value ->
496+
{true, Value}
497+
end
498+
end, maps:to_list(AppProps)),
499+
InProps = amqp10_msg:properties(Msg),
500+
Headers = amqp10_msg:headers(Msg),
501+
#{
502+
headers => AppProps091Headers,
503+
content_type => maps:get(content_type, InProps, undefined),
504+
content_encoding => maps:get(content_encoding, InProps, undefined),
505+
delivery_mode => delivery_mode(Headers),
506+
priority => maps:get(priority, Headers, undefined),
507+
correlation_id => maps:get(correlation_id, InProps, undefined),
508+
reply_to => maps:get(reply_to, InProps, undefined),
509+
expiration => ttl(maps:get(ttl, Headers, undefined)),
510+
message_id => maps:get(message_id, InProps, undefined),
511+
timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)),
512+
user_id => maps:get(user_id, InProps, undefined)
513+
}.

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

+165-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ groups() ->
2929
autodelete_amqp091_dest_on_confirm,
3030
autodelete_amqp091_dest_on_publish,
3131
simple_amqp10_dest,
32-
simple_amqp10_src
32+
simple_amqp10_src,
33+
message_prop_conversion,
34+
message_prop_conversion_no_props
3335
]},
3436
{with_map_config, [], [
3537
simple,
@@ -171,6 +173,168 @@ simple_amqp10_src(Config) ->
171173
ok
172174
end).
173175

176+
message_prop_conversion(Config) ->
177+
MapConfig = ?config(map_config, Config),
178+
Src = ?config(srcq, Config),
179+
Dest = ?config(destq, Config),
180+
with_session(Config,
181+
fun (Sess) ->
182+
shovel_test_utils:set_param(
183+
Config,
184+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
185+
{<<"src-address">>, Src},
186+
{<<"dest-protocol">>, <<"amqp091">>},
187+
{<<"dest-queue">>, Dest},
188+
{<<"add-forward-headers">>, true},
189+
{<<"dest-add-timestamp-header">>, true},
190+
{<<"publish-properties">>,
191+
case MapConfig of
192+
true -> #{<<"cluster_id">> => <<"x">>};
193+
_ -> [{<<"cluster_id">>, <<"x">>}]
194+
end}
195+
]),
196+
LinkName = <<"dynamic-sender-", Dest/binary>>,
197+
Tag = <<"tag1">>,
198+
Payload = <<"payload">>,
199+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
200+
unsettled, unsettled_state),
201+
ok = await_amqp10_event(link, Sender, attached),
202+
Headers = #{durable => true, priority => 3, ttl => 180000},
203+
Msg = amqp10_msg:set_headers(Headers,
204+
amqp10_msg:new(Tag, Payload, false)),
205+
Msg2 = amqp10_msg:set_properties(#{
206+
message_id => <<"message-id">>,
207+
user_id => <<"guest">>,
208+
to => <<"to">>,
209+
subject => <<"subject">>,
210+
reply_to => <<"reply-to">>,
211+
correlation_id => <<"correlation-id">>,
212+
content_type => <<"content-type">>,
213+
content_encoding => <<"content-encoding">>,
214+
%absolute_expiry_time => 123456789,
215+
creation_time => 123456789,
216+
group_id => <<"group-id">>,
217+
group_sequence => 123,
218+
reply_to_group_id => <<"reply-to-group-id">>
219+
}, Msg),
220+
Msg3 = amqp10_msg:set_application_properties(#{
221+
<<"x-binary">> => <<"binary">>,
222+
<<"x-int">> => 33,
223+
<<"x-negative-int">> => -33,
224+
<<"x-float">> => 1.3,
225+
<<"x-true">> => true,
226+
<<"x-false">> => false
227+
}, Msg2),
228+
ok = amqp10_client:send_msg(Sender, Msg3),
229+
receive
230+
{amqp10_disposition, {accepted, Tag}} -> ok
231+
after 3000 ->
232+
exit(publish_disposition_not_received)
233+
end,
234+
amqp10_client:detach_link(Sender),
235+
Channel = rabbit_ct_client_helpers:open_channel(Config),
236+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{
237+
content_type = ReceivedContentType,
238+
content_encoding = ReceivedContentEncoding,
239+
headers = Headers2,
240+
delivery_mode = ReceivedDeliveryMode,
241+
priority = ReceivedPriority,
242+
correlation_id = ReceivedCorrelationId,
243+
reply_to = ReceivedReplyTo,
244+
expiration = ReceivedExpiration,
245+
message_id = ReceivedMessageId,
246+
timestamp = ReceivedTimestamp,
247+
type = _ReceivedType,
248+
user_id = ReceivedUserId,
249+
app_id = _ReceivedAppId,
250+
cluster_id = _ReceivedClusterId
251+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
252+
253+
?assertEqual(<<"payload">>, Payload),
254+
?assertEqual(2, ReceivedDeliveryMode),
255+
?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)),
256+
?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)),
257+
?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)),
258+
?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)),
259+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)),
260+
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)),
261+
262+
?assertEqual(<<"content-type">>, ReceivedContentType),
263+
?assertEqual(<<"content-encoding">>, ReceivedContentEncoding),
264+
265+
?assertEqual(3, ReceivedPriority),
266+
?assertEqual(<<"correlation-id">>, ReceivedCorrelationId),
267+
?assertEqual(<<"reply-to">>, ReceivedReplyTo),
268+
?assertEqual(<<"180000">>, ReceivedExpiration),
269+
?assertEqual(<<"message-id">>, ReceivedMessageId),
270+
?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000
271+
?assertEqual(<<"guest">>, ReceivedUserId),
272+
ok
273+
end).
274+
275+
message_prop_conversion_no_props(Config) ->
276+
MapConfig = ?config(map_config, Config),
277+
Src = ?config(srcq, Config),
278+
Dest = ?config(destq, Config),
279+
with_session(Config,
280+
fun (Sess) ->
281+
shovel_test_utils:set_param(
282+
Config,
283+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
284+
{<<"src-address">>, Src},
285+
{<<"dest-protocol">>, <<"amqp091">>},
286+
{<<"dest-queue">>, Dest},
287+
{<<"add-forward-headers">>, true},
288+
{<<"dest-add-timestamp-header">>, true},
289+
{<<"publish-properties">>,
290+
case MapConfig of
291+
true -> #{<<"cluster_id">> => <<"x">>};
292+
_ -> [{<<"cluster_id">>, <<"x">>}]
293+
end}
294+
]),
295+
LinkName = <<"dynamic-sender-", Dest/binary>>,
296+
Tag = <<"tag1">>,
297+
Payload = <<"payload">>,
298+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
299+
unsettled, unsettled_state),
300+
ok = await_amqp10_event(link, Sender, attached),
301+
Msg = amqp10_msg:new(Tag, Payload, false),
302+
ok = amqp10_client:send_msg(Sender, Msg),
303+
receive
304+
{amqp10_disposition, {accepted, Tag}} -> ok
305+
after 3000 ->
306+
exit(publish_disposition_not_received)
307+
end,
308+
amqp10_client:detach_link(Sender),
309+
Channel = rabbit_ct_client_helpers:open_channel(Config),
310+
{#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{
311+
content_type = undefined,
312+
content_encoding = undefined,
313+
headers = ReceivedHeaders,
314+
delivery_mode = ReceivedDeliveryMode,
315+
priority = ReceivedPriority,
316+
correlation_id = undefined,
317+
reply_to = undefined,
318+
expiration = undefined,
319+
message_id = undefined,
320+
timestamp = undefined,
321+
type = undefined,
322+
user_id = undefined,
323+
app_id = undefined,
324+
cluster_id = ReceivedClusterId
325+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
326+
327+
?assertEqual(<<"payload">>, ReceivedPayload),
328+
?assertEqual(1, ReceivedDeliveryMode),
329+
?assertEqual(<<"x">>, ReceivedClusterId),
330+
?assertEqual(4, ReceivedPriority),
331+
332+
?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)),
333+
334+
ok
335+
end).
336+
337+
174338
change_definition(Config) ->
175339
Src = ?config(srcq, Config),
176340
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)