Skip to content

Commit 068cbdc

Browse files
authored
Merge pull request #12196 from rabbitmq/mergify/bp/v3.13.x/pr-12195
Cancel AMQP stream consumer when local stream member is deleted (backport #12074) (backport #12195)
2 parents ac09ea5 + 2dcb10d commit 068cbdc

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

deps/rabbit/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ rabbitmq_integration_suite(
861861
additional_beam = [
862862
":test_queue_utils_beam",
863863
],
864-
shard_count = 19,
864+
shard_count = 20,
865865
deps = [
866866
"@proper//:erlang_app",
867867
],

deps/rabbit/src/rabbit_stream_coordinator.erl

+6
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0},
17571757
{queue_event, QRef,
17581758
{stream_local_member_change, MemberPid}},
17591759
cast} | Efs]};
1760+
(_MNode, #member{state = {running, _, MemberPid},
1761+
role = {replica, _},
1762+
target = deleted}, {_, Efs}) ->
1763+
{MemberPid, [{send_msg, P,
1764+
{queue_event, QRef, deleted_replica},
1765+
cast} | Efs]};
17601766
(_N, _M, Acc) ->
17611767
%% not a replica, nothing to do
17621768
Acc

deps/rabbit/src/rabbit_stream_queue.erl

+3-1
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,9 @@ handle_event(_QName, {stream_local_member_change, Pid},
607607
end, #{}, Readers0),
608608
{ok, State#stream_client{local_pid = Pid, readers = Readers1}, []};
609609
handle_event(_QName, eol, #stream_client{name = Name}) ->
610-
{eol, [{unblock, Name}]}.
610+
{eol, [{unblock, Name}]};
611+
handle_event(QName, deleted_replica, State) ->
612+
{ok, State, [{queue_down, QName}]}.
611613

612614
is_recoverable(Q) ->
613615
Node = node(),

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

+42
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ all() ->
3434
{group, cluster_size_3},
3535
{group, cluster_size_3_1},
3636
{group, cluster_size_3_2},
37+
{group, cluster_size_3_3},
3738
{group, cluster_size_3_parallel_1},
3839
{group, cluster_size_3_parallel_2},
3940
{group, cluster_size_3_parallel_3},
@@ -79,6 +80,7 @@ groups() ->
7980
{cluster_size_3_2, [], [recover,
8081
declare_with_node_down_1,
8182
declare_with_node_down_2]},
83+
{cluster_size_3_3, [], [consume_while_deleting_replica]},
8284
{cluster_size_3_parallel_1, [parallel], [
8385
delete_replica,
8486
delete_last_replica,
@@ -207,6 +209,7 @@ init_per_group1(Group, Config) ->
207209
cluster_size_3_parallel_5 -> 3;
208210
cluster_size_3_1 -> 3;
209211
cluster_size_3_2 -> 3;
212+
cluster_size_3_3 -> 3;
210213
unclustered_size_3_1 -> 3;
211214
unclustered_size_3_2 -> 3;
212215
unclustered_size_3_3 -> 3;
@@ -1671,6 +1674,45 @@ consume_from_replica(Config) ->
16711674
receive_batch(Ch2, 0, 99),
16721675
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
16731676

1677+
consume_while_deleting_replica(Config) ->
1678+
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1679+
1680+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1681+
Q = ?config(queue_name, Config),
1682+
1683+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1684+
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1685+
1686+
rabbit_ct_helpers:await_condition(
1687+
fun () ->
1688+
Info = find_queue_info(Config, 1, [online]),
1689+
length(proplists:get_value(online, Info)) == 3
1690+
end),
1691+
1692+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
1693+
qos(Ch2, 10, false),
1694+
1695+
CTag = atom_to_binary(?FUNCTION_NAME),
1696+
subscribe(Ch2, Q, false, 0, CTag),
1697+
1698+
%% Delete replica in node 3
1699+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue,
1700+
delete_replica, [<<"/">>, Q, Server3]),
1701+
1702+
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
1703+
1704+
%% no messages should be received
1705+
receive
1706+
#'basic.cancel'{consumer_tag = CTag} ->
1707+
ok;
1708+
{_, #amqp_msg{}} ->
1709+
exit(unexpected_message)
1710+
after 30000 ->
1711+
exit(missing_consumer_cancel)
1712+
end,
1713+
1714+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
1715+
16741716
consume_credit(Config) ->
16751717
%% Because osiris provides one chunk on every read and we don't want to buffer
16761718
%% messages in the broker to avoid memory penalties, the credit value won't

0 commit comments

Comments
 (0)