Skip to content

Commit 091d74c

Browse files
committed
rabbit_db_queue: Update durable queues outside of Khepri transactions
`rabbit_db_queue:update_durable/2`'s caller (`rabbit_amqqueue:mark_local_durable_queues_stopped`/1) passes a filter function that performs some operations that aren't allowed within Khepri transactions like looking up and using the current node and executing an RPC. Calling `rabbit_amqqueue:mark_local_durable_queues_stopped/1` on a Rabbit with the `khepri_db` feature flag enabled will result in an error. We can safely update a number of queues by using Khepri's `khepri_adv:get_many/3` advanced API which returns the internal version number of each queue. We can filter and update the queues outside of a transaction function and then perform all updates at once, failing if any queue has changed since the `khepri_adv:get_many/3` query. So we get the main benefits of a transaction but we can still execute any update or filter function.
1 parent 919ed46 commit 091d74c

File tree

2 files changed

+59
-15
lines changed

2 files changed

+59
-15
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -651,21 +651,61 @@ update_durable_in_mnesia(UpdateFun, FilterFun) ->
651651
ok.
652652

653653
update_durable_in_khepri(UpdateFun, FilterFun) ->
654-
Path = khepri_queues_path() ++ [rabbit_khepri:if_has_data_wildcard()],
655-
rabbit_khepri:transaction(
656-
fun() ->
657-
khepri_tx:foreach(Path,
658-
fun(Path0, #{data := Q}) ->
659-
DoUpdate = amqqueue:is_durable(Q)
660-
andalso FilterFun(Q),
661-
case DoUpdate of
662-
true ->
663-
khepri_tx:put(Path0, UpdateFun(Q));
664-
false ->
665-
ok
666-
end
667-
end)
668-
end).
654+
PathPattern = khepri_queues_path() ++
655+
[?KHEPRI_WILDCARD_STAR,
656+
#if_data_matches{
657+
pattern = amqqueue:pattern_match_on_durable(true)}],
658+
%% The `FilterFun' or `UpdateFun' might attempt to do something
659+
%% incompatible with Khepri transactions (such as dynamic apply, sending
660+
%% a message, etc.), so this function cannot be written as a regular
661+
%% transaction. Instead we can get all queues and track their versions,
662+
%% update them, then apply the updates in a transaction, failing if any
663+
%% queue has changed since reading the queue record.
664+
case rabbit_khepri:adv_get_many(PathPattern) of
665+
{ok, Props} ->
666+
Updates = maps:fold(
667+
fun(Path0, #{data := Q0, payload_version := Vsn}, Acc)
668+
when ?is_amqqueue(Q0) ->
669+
case FilterFun(Q0) of
670+
true ->
671+
Path = khepri_path:combine_with_conditions(
672+
Path0,
673+
[#if_payload_version{version = Vsn}]),
674+
Q = UpdateFun(Q0),
675+
[{Path, Q} | Acc];
676+
false ->
677+
Acc
678+
end
679+
end, [], Props),
680+
Res = rabbit_khepri:transaction(
681+
fun() ->
682+
for_each_while_ok(
683+
fun({Path, Q}) -> khepri_tx:put(Path, Q) end,
684+
Updates)
685+
end),
686+
case Res of
687+
ok ->
688+
ok;
689+
{error, {khepri, mismatching_node, _}} ->
690+
%% One of the queues changed while attempting to update
691+
%% all queues. Retry the operation.
692+
update_durable_in_khepri(UpdateFun, FilterFun);
693+
{error, _} = Error ->
694+
Error
695+
end;
696+
{error, _} = Error ->
697+
Error
698+
end.
699+
700+
for_each_while_ok(Fun, [Elem | Rest]) ->
701+
case Fun(Elem) of
702+
ok ->
703+
for_each_while_ok(Fun, Rest);
704+
{error, _} = Error ->
705+
Error
706+
end;
707+
for_each_while_ok(_, []) ->
708+
ok.
669709

670710
%% -------------------------------------------------------------------
671711
%% exists().

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
get/2,
121121
get_many/1,
122122
adv_get/1,
123+
adv_get_many/1,
123124
match/1,
124125
match/2,
125126
exists/1,
@@ -883,6 +884,9 @@ get_many(PathPattern) ->
883884
adv_get(Path) ->
884885
khepri_adv:get(?STORE_ID, Path, #{favor => low_latency}).
885886

887+
adv_get_many(PathPattern) ->
888+
khepri_adv:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
889+
886890
match(Path) ->
887891
match(Path, #{}).
888892

0 commit comments

Comments
 (0)