Skip to content

Commit 2c5b2ff

Browse files
committed
Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: rabbitmq#7508
1 parent 7dfebb9 commit 2c5b2ff

File tree

5 files changed

+200
-9
lines changed

5 files changed

+200
-9
lines changed

deps/amqp10_client/src/amqp10_msg.erl

+10-4
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ set_headers(Headers, #amqp10_msg{header = Current} = Msg) ->
306306
H = maps:fold(fun(durable, V, Acc) ->
307307
Acc#'v1_0.header'{durable = V};
308308
(priority, V, Acc) ->
309-
Acc#'v1_0.header'{priority = {uint, V}};
309+
Acc#'v1_0.header'{priority = {ubyte, V}};
310310
(first_acquirer, V, Acc) ->
311311
Acc#'v1_0.header'{first_acquirer = V};
312312
(ttl, V, Acc) ->
@@ -325,8 +325,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
325325
P = maps:fold(fun(message_id, V, Acc) when is_binary(V) ->
326326
% message_id can be any type but we restrict it here
327327
Acc#'v1_0.properties'{message_id = utf8(V)};
328-
(user_id, V, Acc) ->
329-
Acc#'v1_0.properties'{user_id = utf8(V)};
328+
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
329+
Acc#'v1_0.properties'{user_id = binary(V)};
330330
(to, V, Acc) ->
331331
Acc#'v1_0.properties'{to = utf8(V)};
332332
(subject, V, Acc) ->
@@ -407,8 +407,12 @@ wrap_ap_value(true) ->
407407
{boolean, true};
408408
wrap_ap_value(false) ->
409409
{boolean, false};
410-
wrap_ap_value(V) when is_integer(V) ->
410+
wrap_ap_value(V) when is_integer(V) andalso V >= 0 ->
411411
{uint, V};
412+
wrap_ap_value(V) when is_integer(V) andalso V < 0 ->
413+
{int, V};
414+
wrap_ap_value(F) when is_float(F) ->
415+
{double, F};
412416
wrap_ap_value(V) when is_binary(V) ->
413417
utf8(V);
414418
wrap_ap_value(V) when is_list(V) ->
@@ -449,6 +453,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
449453
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
450454
sym(B) when is_binary(B) -> {symbol, B}.
451455
uint(B) -> {uint, B}.
456+
binary(B) when is_binary(B) -> {binary, B};
457+
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.
452458

453459
has_value(undefined) -> false;
454460
has_value(_) -> true.

deps/rabbitmq_shovel/app.bzl

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def all_srcs(name = "all_srcs"):
110110

111111
filegroup(
112112
name = "priv",
113+
srcs = ["priv/schema/rabbitmq_shovel.schema"],
113114
)
114115

115116
filegroup(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
{mapping, "shovel.convert_amqp10_props_to_amqp091", "rabbitmq_shovel.convert_amqp10_props_to_amqp091",
3+
[{datatype, {enum, [true, false]}}]}.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

+74-4
Original file line numberDiff line numberDiff line change
@@ -172,13 +172,13 @@ dest_endpoint(#{shovel_type := static}) ->
172172
dest_endpoint(#{shovel_type := dynamic,
173173
dest := #{target_address := Addr}}) ->
174174
[{dest_address, Addr}].
175-
176175
-spec handle_source(Msg :: any(), state()) ->
177176
not_handled | state() | {stop, any()}.
178177
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
179178
Tag = amqp10_msg:delivery_id(Msg),
180179
Payload = amqp10_msg:body_bin(Msg),
181-
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
180+
Props = props_to_map(Msg),
181+
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
182182
handle_source({amqp10_event, {connection, Conn, opened}},
183183
State = #{source := #{current := #{conn := Conn}}}) ->
184184
State;
@@ -390,15 +390,17 @@ set_message_properties(Props, Msg) ->
390390
#{content_encoding => to_binary(Ct)}, M);
391391
(delivery_mode, 2, M) ->
392392
amqp10_msg:set_headers(#{durable => true}, M);
393+
(priority, P, M) when is_integer(P) ->
394+
amqp10_msg:set_headers(#{priority => P}, M);
393395
(correlation_id, Ct, M) ->
394396
amqp10_msg:set_properties(#{correlation_id => to_binary(Ct)}, M);
395397
(reply_to, Ct, M) ->
396398
amqp10_msg:set_properties(#{reply_to => to_binary(Ct)}, M);
397399
(message_id, Ct, M) ->
398400
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
399401
(timestamp, Ct, M) ->
400-
amqp10_msg:set_properties(#{creation_time => Ct}, M);
401-
(user_id, Ct, M) ->
402+
amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M);
403+
(user_id, Ct, M) when Ct =/= undefined ->
402404
amqp10_msg:set_properties(#{user_id => Ct}, M);
403405
(headers, Headers0, M) when is_list(Headers0) ->
404406
%% AMPQ 0.9.1 are added as applicatin properties
@@ -440,3 +442,71 @@ is_amqp10_compat(T) ->
440442
%% TODO: not all lists are compatible
441443
is_list(T) orelse
442444
is_boolean(T).
445+
446+
to_amqp091_compatible_value(Key, Value) when is_binary(Value) ->
447+
{Key, longstr, Value};
448+
to_amqp091_compatible_value(Key, Value) when is_integer(Value) ->
449+
{Key, long, Value};
450+
to_amqp091_compatible_value(Key, Value) when is_float(Value) ->
451+
{Key, double, Value};
452+
to_amqp091_compatible_value(Key, true) ->
453+
{Key, bool, true};
454+
to_amqp091_compatible_value(Key, false) ->
455+
{Key, bool, false};
456+
to_amqp091_compatible_value(_Key, _Value) ->
457+
undefined.
458+
459+
delivery_mode(Headers) ->
460+
case maps:get(durable, Headers, undefined) of
461+
undefined -> undefined;
462+
true -> 2;
463+
false -> 1
464+
end.
465+
466+
timestamp_10_to_091(undefined) ->
467+
undefined;
468+
timestamp_10_to_091(T) ->
469+
trunc(T / 1000).
470+
471+
timestamp_091_to_10(T) when is_integer(T) ->
472+
T * 1000;
473+
timestamp_091_to_10(_) ->
474+
undefined.
475+
476+
ttl(T) when is_integer(T) ->
477+
erlang:integer_to_binary(T);
478+
ttl(_T) -> undefined.
479+
480+
conversion_enabled() ->
481+
application:get_env(rabbitmq_shovel, convert_amqp10_props_to_amqp091, false).
482+
483+
props_to_map(Msg) ->
484+
case conversion_enabled() of
485+
true ->
486+
AppProps = amqp10_msg:application_properties(Msg),
487+
AppProps091Headers = lists:filtermap(fun({K, V}) ->
488+
case to_amqp091_compatible_value(K, V) of
489+
undefined ->
490+
false;
491+
Value ->
492+
{true, Value}
493+
end
494+
end, maps:to_list(AppProps)),
495+
InProps = amqp10_msg:properties(Msg),
496+
Headers = amqp10_msg:headers(Msg),
497+
#{
498+
headers => AppProps091Headers,
499+
content_type => maps:get(content_type, InProps, undefined),
500+
content_encoding => maps:get(content_encoding, InProps, undefined),
501+
delivery_mode => delivery_mode(Headers),
502+
priority => maps:get(priority, Headers, undefined),
503+
correlation_id => maps:get(correlation_id, InProps, undefined),
504+
reply_to => maps:get(reply_to, InProps, undefined),
505+
expiration => ttl(maps:get(ttl, Headers, undefined)),
506+
message_id => maps:get(message_id, InProps, undefined),
507+
timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)),
508+
user_id => maps:get(user_id, InProps, undefined)
509+
};
510+
_ ->
511+
#{}
512+
end.

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

+112-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ groups() ->
2929
autodelete_amqp091_dest_on_confirm,
3030
autodelete_amqp091_dest_on_publish,
3131
simple_amqp10_dest,
32-
simple_amqp10_src
32+
simple_amqp10_src,
33+
message_prop_conversion
3334
]},
3435
{with_map_config, [], [
3536
simple,
@@ -168,6 +169,116 @@ simple_amqp10_src(Config) ->
168169
ok
169170
end).
170171

172+
message_prop_conversion(Config) ->
173+
MapConfig = ?config(map_config, Config),
174+
Src = ?config(srcq, Config),
175+
Dest = ?config(destq, Config),
176+
ok = rabbit_ct_broker_helpers:rpc(Config,
177+
0,
178+
application,
179+
set_env,
180+
[rabbitmq_shovel, convert_amqp10_props_to_amqp091, true]),
181+
with_session(Config,
182+
fun (Sess) ->
183+
shovel_test_utils:set_param(
184+
Config,
185+
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
186+
{<<"src-address">>, Src},
187+
{<<"dest-protocol">>, <<"amqp091">>},
188+
{<<"dest-queue">>, Dest},
189+
{<<"add-forward-headers">>, true},
190+
{<<"dest-add-timestamp-header">>, true},
191+
{<<"publish-properties">>,
192+
case MapConfig of
193+
true -> #{<<"cluster_id">> => <<"x">>};
194+
_ -> [{<<"cluster_id">>, <<"x">>}]
195+
end}
196+
]),
197+
LinkName = <<"dynamic-sender-", Dest/binary>>,
198+
Tag = <<"tag1">>,
199+
Payload = <<"payload">>,
200+
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
201+
unsettled, unsettled_state),
202+
ok = await_amqp10_event(link, Sender, attached),
203+
Headers = #{durable => true, priority => 3, ttl => 180000},
204+
Msg = amqp10_msg:set_headers(Headers,
205+
amqp10_msg:new(Tag, Payload, false)),
206+
Msg2 = amqp10_msg:set_properties(#{
207+
message_id => <<"message-id">>,
208+
user_id => <<"guest">>,
209+
to => <<"to">>,
210+
subject => <<"subject">>,
211+
reply_to => <<"reply-to">>,
212+
correlation_id => <<"correlation-id">>,
213+
content_type => <<"content-type">>,
214+
content_encoding => <<"content-encoding">>,
215+
%absolute_expiry_time => 123456789,
216+
creation_time => 123456789,
217+
group_id => <<"group-id">>,
218+
group_sequence => 123,
219+
reply_to_group_id => <<"reply-to-group-id">>
220+
}, Msg),
221+
Msg3 = amqp10_msg:set_application_properties(#{
222+
<<"x-binary">> => <<"binary">>,
223+
<<"x-int">> => 33,
224+
<<"x-negative-int">> => -33,
225+
<<"x-float">> => 1.3,
226+
<<"x-true">> => true,
227+
<<"x-false">> => false
228+
}, Msg2),
229+
ok = amqp10_client:send_msg(Sender, Msg3),
230+
receive
231+
{amqp10_disposition, {accepted, Tag}} -> ok
232+
after 3000 ->
233+
exit(publish_disposition_not_received)
234+
end,
235+
amqp10_client:detach_link(Sender),
236+
Channel = rabbit_ct_client_helpers:open_channel(Config),
237+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{
238+
content_type = ReceivedContentType,
239+
content_encoding = ReceivedContentEncoding,
240+
headers = Headers2,
241+
delivery_mode = ReceivedDeliveryMode,
242+
priority = ReceivedPriority,
243+
correlation_id = ReceivedCorrelationId,
244+
reply_to = ReceivedReplyTo,
245+
expiration = ReceivedExpiration,
246+
message_id = ReceivedMessageId,
247+
timestamp = ReceivedTimestamp,
248+
type = _ReceivedType,
249+
user_id = ReceivedUserId,
250+
app_id = _ReceivedAppId,
251+
cluster_id = _ReceivedClusterId
252+
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),
253+
254+
?assertEqual(<<"payload">>, Payload),
255+
?assertEqual(2, ReceivedDeliveryMode),
256+
?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)),
257+
?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)),
258+
?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)),
259+
?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)),
260+
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)),
261+
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)),
262+
263+
?assertEqual(<<"content-type">>, ReceivedContentType),
264+
?assertEqual(<<"content-encoding">>, ReceivedContentEncoding),
265+
266+
?assertEqual(3, ReceivedPriority),
267+
?assertEqual(<<"correlation-id">>, ReceivedCorrelationId),
268+
?assertEqual(<<"reply-to">>, ReceivedReplyTo),
269+
?assertEqual(<<"180000">>, ReceivedExpiration),
270+
?assertEqual(<<"message-id">>, ReceivedMessageId),
271+
?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000
272+
?assertEqual(<<"guest">>, ReceivedUserId),
273+
ok
274+
end),
275+
ok = rabbit_ct_broker_helpers:rpc(Config,
276+
0,
277+
application,
278+
set_env,
279+
[rabbitmq_shovel, convert_amqp10_props_to_amqp091, false]).
280+
281+
171282
change_definition(Config) ->
172283
Src = ?config(srcq, Config),
173284
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)