Skip to content

Commit 2dcb10d

Browse files
dcorbachomergify[bot]
authored andcommitted
Cancel AMQP stream consumer when local stream member is deleted
The consumer reader process is gone and there is no way to recover it as the node does not have a member of the stream anymore, so it should be cancelled/detached. (cherry picked from commit 0061944) (cherry picked from commit 66d1294)
1 parent ac09ea5 commit 2dcb10d

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ rabbitmq_integration_suite(
861861
additional_beam = [
862862
":test_queue_utils_beam",
863863
],
864-
shard_count = 19,
864+
shard_count = 20,
865865
deps = [
866866
"@proper//:erlang_app",
867867
],

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0},
17571757
{queue_event, QRef,
17581758
{stream_local_member_change, MemberPid}},
17591759
cast} | Efs]};
1760+
(_MNode, #member{state = {running, _, MemberPid},
1761+
role = {replica, _},
1762+
target = deleted}, {_, Efs}) ->
1763+
{MemberPid, [{send_msg, P,
1764+
{queue_event, QRef, deleted_replica},
1765+
cast} | Efs]};
17601766
(_N, _M, Acc) ->
17611767
%% not a replica, nothing to do
17621768
Acc

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,9 @@ handle_event(_QName, {stream_local_member_change, Pid},
607607
end, #{}, Readers0),
608608
{ok, State#stream_client{local_pid = Pid, readers = Readers1}, []};
609609
handle_event(_QName, eol, #stream_client{name = Name}) ->
610-
{eol, [{unblock, Name}]}.
610+
{eol, [{unblock, Name}]};
611+
handle_event(QName, deleted_replica, State) ->
612+
{ok, State, [{queue_down, QName}]}.
611613

612614
is_recoverable(Q) ->
613615
Node = node(),

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ all() ->
3434
{group, cluster_size_3},
3535
{group, cluster_size_3_1},
3636
{group, cluster_size_3_2},
37+
{group, cluster_size_3_3},
3738
{group, cluster_size_3_parallel_1},
3839
{group, cluster_size_3_parallel_2},
3940
{group, cluster_size_3_parallel_3},
@@ -79,6 +80,7 @@ groups() ->
7980
{cluster_size_3_2, [], [recover,
8081
declare_with_node_down_1,
8182
declare_with_node_down_2]},
83+
{cluster_size_3_3, [], [consume_while_deleting_replica]},
8284
{cluster_size_3_parallel_1, [parallel], [
8385
delete_replica,
8486
delete_last_replica,
@@ -207,6 +209,7 @@ init_per_group1(Group, Config) ->
207209
cluster_size_3_parallel_5 -> 3;
208210
cluster_size_3_1 -> 3;
209211
cluster_size_3_2 -> 3;
212+
cluster_size_3_3 -> 3;
210213
unclustered_size_3_1 -> 3;
211214
unclustered_size_3_2 -> 3;
212215
unclustered_size_3_3 -> 3;
@@ -1671,6 +1674,45 @@ consume_from_replica(Config) ->
16711674
receive_batch(Ch2, 0, 99),
16721675
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
16731676

1677+
consume_while_deleting_replica(Config) ->
1678+
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1679+
1680+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1681+
Q = ?config(queue_name, Config),
1682+
1683+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1684+
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1685+
1686+
rabbit_ct_helpers:await_condition(
1687+
fun () ->
1688+
Info = find_queue_info(Config, 1, [online]),
1689+
length(proplists:get_value(online, Info)) == 3
1690+
end),
1691+
1692+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
1693+
qos(Ch2, 10, false),
1694+
1695+
CTag = atom_to_binary(?FUNCTION_NAME),
1696+
subscribe(Ch2, Q, false, 0, CTag),
1697+
1698+
%% Delete replica in node 3
1699+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue,
1700+
delete_replica, [<<"/">>, Q, Server3]),
1701+
1702+
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
1703+
1704+
%% no messages should be received
1705+
receive
1706+
#'basic.cancel'{consumer_tag = CTag} ->
1707+
ok;
1708+
{_, #amqp_msg{}} ->
1709+
exit(unexpected_message)
1710+
after 30000 ->
1711+
exit(missing_consumer_cancel)
1712+
end,
1713+
1714+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
1715+
16741716
consume_credit(Config) ->
16751717
%% Because osiris provides one chunk on every read and we don't want to buffer
16761718
%% messages in the broker to avoid memory penalties, the credit value won't

0 commit comments

Comments
 (0)