Skip to content

Commit a04b092

Browse files
Merge pull request #10742 from rabbitmq/md-fix-vhost-recovery-queue-update
Update durable queues outside of Khepri transactions
2 parents f5be782 + 8a03b28 commit a04b092

File tree

3 files changed

+78
-15
lines changed

3 files changed

+78
-15
lines changed

deps/rabbit/src/rabbit_db_queue.erl

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -651,21 +651,61 @@ update_durable_in_mnesia(UpdateFun, FilterFun) ->
651651
ok.
652652

653653
update_durable_in_khepri(UpdateFun, FilterFun) ->
654-
Path = khepri_queues_path() ++ [rabbit_khepri:if_has_data_wildcard()],
655-
rabbit_khepri:transaction(
656-
fun() ->
657-
khepri_tx:foreach(Path,
658-
fun(Path0, #{data := Q}) ->
659-
DoUpdate = amqqueue:is_durable(Q)
660-
andalso FilterFun(Q),
661-
case DoUpdate of
662-
true ->
663-
khepri_tx:put(Path0, UpdateFun(Q));
664-
false ->
665-
ok
666-
end
667-
end)
668-
end).
654+
PathPattern = khepri_queues_path() ++
655+
[?KHEPRI_WILDCARD_STAR,
656+
#if_data_matches{
657+
pattern = amqqueue:pattern_match_on_durable(true)}],
658+
%% The `FilterFun' or `UpdateFun' might attempt to do something
659+
%% incompatible with Khepri transactions (such as dynamic apply, sending
660+
%% a message, etc.), so this function cannot be written as a regular
661+
%% transaction. Instead we can get all queues and track their versions,
662+
%% update them, then apply the updates in a transaction, failing if any
663+
%% queue has changed since reading the queue record.
664+
case rabbit_khepri:adv_get_many(PathPattern) of
665+
{ok, Props} ->
666+
Updates = maps:fold(
667+
fun(Path0, #{data := Q0, payload_version := Vsn}, Acc)
668+
when ?is_amqqueue(Q0) ->
669+
case FilterFun(Q0) of
670+
true ->
671+
Path = khepri_path:combine_with_conditions(
672+
Path0,
673+
[#if_payload_version{version = Vsn}]),
674+
Q = UpdateFun(Q0),
675+
[{Path, Q} | Acc];
676+
false ->
677+
Acc
678+
end
679+
end, [], Props),
680+
Res = rabbit_khepri:transaction(
681+
fun() ->
682+
for_each_while_ok(
683+
fun({Path, Q}) -> khepri_tx:put(Path, Q) end,
684+
Updates)
685+
end),
686+
case Res of
687+
ok ->
688+
ok;
689+
{error, {khepri, mismatching_node, _}} ->
690+
%% One of the queues changed while attempting to update
691+
%% all queues. Retry the operation.
692+
update_durable_in_khepri(UpdateFun, FilterFun);
693+
{error, _} = Error ->
694+
Error
695+
end;
696+
{error, _} = Error ->
697+
Error
698+
end.
699+
700+
for_each_while_ok(Fun, [Elem | Rest]) ->
701+
case Fun(Elem) of
702+
ok ->
703+
for_each_while_ok(Fun, Rest);
704+
{error, _} = Error ->
705+
Error
706+
end;
707+
for_each_while_ok(_, []) ->
708+
ok.
669709

670710
%% -------------------------------------------------------------------
671711
%% exists().

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
get/2,
121121
get_many/1,
122122
adv_get/1,
123+
adv_get_many/1,
123124
match/1,
124125
match/2,
125126
exists/1,
@@ -883,6 +884,9 @@ get_many(PathPattern) ->
883884
adv_get(Path) ->
884885
khepri_adv:get(?STORE_ID, Path, #{favor => low_latency}).
885886

887+
adv_get_many(PathPattern) ->
888+
khepri_adv:get_many(?STORE_ID, PathPattern, #{favor => low_latency}).
889+
886890
match(Path) ->
887891
match(Path, #{}).
888892

deps/rabbit/test/rabbit_db_queue_SUITE.erl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ all_tests() ->
5151
get_durable,
5252
get_many_durable,
5353
update_durable,
54+
mark_local_durable_queues_stopped,
5455
foreach_durable,
5556
internal_delete
5657
].
@@ -463,6 +464,24 @@ update_durable1(_Config) ->
463464
?assertMatch(my_policy, amqqueue:get_policy(Q0)),
464465
passed.
465466

467+
mark_local_durable_queues_stopped(Config) ->
468+
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
469+
?MODULE, mark_local_durable_queues_stopped1, [Config]).
470+
471+
mark_local_durable_queues_stopped1(_Config) ->
472+
DurableQName = rabbit_misc:r(?VHOST, queue, <<"test-queue1">>),
473+
TransientQName = rabbit_misc:r(?VHOST, queue, <<"test-queue2">>),
474+
DurableQ = new_queue(DurableQName, rabbit_classic_queue),
475+
TransientQ = new_queue(TransientQName, rabbit_classic_queue),
476+
%% Set Q1's pid to a dead process
477+
RecoverableQ = amqqueue:set_pid(DurableQ, spawn(fun() -> ok end)),
478+
?assertEqual(ok, rabbit_db_queue:set(RecoverableQ)),
479+
?assertEqual(ok, rabbit_db_queue:set_dirty(TransientQ)),
480+
?assertEqual(ok, rabbit_amqqueue:mark_local_durable_queues_stopped(?VHOST)),
481+
{ok, StoppedQ} = rabbit_db_queue:get_durable(DurableQName),
482+
?assertEqual(stopped, amqqueue:get_state(StoppedQ)),
483+
passed.
484+
466485
foreach_durable(Config) ->
467486
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, foreach_durable1, [Config]).
468487

0 commit comments

Comments
 (0)