Skip to content

Commit a784c2f

Browse files
committed
Deal with noconnection in stream SAC coordinator
1 parent 4261c01 commit a784c2f

File tree

5 files changed

+513
-83
lines changed

5 files changed

+513
-83
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 87 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
apply/3,
1616
state_enter/2,
1717
init_aux/1,
18-
handle_aux/6,
18+
handle_aux/5,
1919
tick/2,
2020
version/0,
2121
which_module/1,
@@ -629,11 +629,19 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
629629
return(Meta, State#?MODULE{streams = Streams0,
630630
monitors = Monitors1}, ok, Effects0)
631631
end;
632-
{sac, Monitors1} ->
632+
{sac, Monitors1} when MachineVersion < 5 orelse Reason =/= noconnection ->
633+
%% A connection went down, v5+ treats noconnection differently but
634+
%% v4- does not.
633635
Mod = sac_module(Meta),
634636
{SacState1, Effects} = Mod:handle_connection_down(Pid, SacState0),
635637
return(Meta, State#?MODULE{single_active_consumer = SacState1,
636-
monitors = Monitors1}, ok, Effects);
638+
monitors = Monitors1}, ok, [Effects0 ++ Effects]);
639+
{sac, Monitors1} when Reason =:= noconnection ->
640+
%% the node of a connection got disconnected
641+
Mod = sac_module(Meta),
642+
{SacState1, Effects} = Mod:handle_connection_node_disconnected(Pid, SacState0),
643+
return(Meta, State#?MODULE{single_active_consumer = SacState1,
644+
monitors = Monitors1}, ok, [Effects0 ++ Effects]);
637645
error ->
638646
return(Meta, State, ok, Effects0)
639647
end;
@@ -687,9 +695,11 @@ apply(#{machine_version := MachineVersion} = Meta,
687695
_ ->
688696
return(Meta, State0, stream_not_found, [])
689697
end;
690-
apply(Meta, {nodeup, Node} = Cmd,
698+
apply(#{machine_version := MachineVersion} = Meta,
699+
{nodeup, Node} = Cmd,
691700
#?MODULE{monitors = Monitors0,
692-
streams = Streams0} = State) ->
701+
streams = Streams0,
702+
single_active_consumer = Sac0} = State) ->
693703
%% reissue monitors for all disconnected members
694704
{Effects0, Monitors} =
695705
maps:fold(
@@ -703,14 +713,23 @@ apply(Meta, {nodeup, Node} = Cmd,
703713
{Acc, Mon}
704714
end
705715
end, {[], Monitors0}, Streams0),
706-
{Streams, Effects} =
716+
{Streams, Effects1} =
707717
maps:fold(fun (Id, S0, {Ss, E0}) ->
708718
S1 = update_stream(Meta, Cmd, S0),
709719
{S, E} = evaluate_stream(Meta, S1, E0),
710720
{Ss#{Id => S}, E}
711721
end, {Streams0, Effects0}, Streams0),
722+
723+
{Sac1, Effects2} = case MachineVersion > 5 of
724+
true ->
725+
SacMod = sac_module(Meta),
726+
SacMod:handle_node_reconnected(Sac0, Effects1);
727+
false ->
728+
{Sac0, Effects1}
729+
end,
712730
return(Meta, State#?MODULE{monitors = Monitors,
713-
streams = Streams}, ok, Effects);
731+
streams = Streams,
732+
single_active_consumer = Sac1}, ok, Effects2);
714733
apply(Meta, {machine_version, From, To}, State0) ->
715734
rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, "
716735
++ "applying incremental upgrade.", [From, To]),
@@ -721,6 +740,12 @@ apply(Meta, {machine_version, From, To}, State0) ->
721740
{S1, Eff0 ++ Eff1}
722741
end, {State0, []}, lists:seq(From, To - 1)),
723742
return(Meta, State1, ok, Effects);
743+
apply(Meta, {timeout, {sac, node_disconnected, #{connection_pid := Pid}}},
744+
#?MODULE{single_active_consumer = SacState0} = State0) ->
745+
Mod = sac_module(Meta),
746+
{SacState1, Effects} = Mod:forget_connection(Pid, SacState0),
747+
return(Meta, State0#?MODULE{single_active_consumer = SacState1}, ok,
748+
Effects);
724749
apply(Meta, UnkCmd, State) ->
725750
rabbit_log:debug("~ts: unknown command ~W",
726751
[?MODULE, UnkCmd, 10]),
@@ -787,7 +812,7 @@ members() ->
787812
end
788813
end.
789814

790-
maybe_resize_coordinator_cluster() ->
815+
maybe_resize_coordinator_cluster(MachineVersion) ->
791816
spawn(fun() ->
792817
RabbitIsRunning = rabbit:is_running(),
793818
case members() of
@@ -813,19 +838,38 @@ maybe_resize_coordinator_cluster() ->
813838
case MemberNodes -- RabbitNodes of
814839
[] ->
815840
ok;
816-
[Old | _] ->
841+
[Old | _] when length(RabbitNodes) > 0 ->
817842
%% this ought to be rather rare as the stream
818843
%% coordinator member is now removed as part
819844
%% of the forget_cluster_node command
820845
rabbit_log:info("~ts: Rabbit node(s) removed from the cluster, "
821846
"deleting: ~w", [?MODULE, Old]),
822847
remove_member(Leader, Members, Old)
823-
end;
824-
_ ->
848+
end,
849+
maybe_handle_stale_nodes(MemberNodes, RabbitNodes,
850+
MachineVersion);
851+
_ ->
825852
ok
826853
end
827854
end).
828855

856+
maybe_handle_stale_nodes(MemberNodes, ExpectedNodes,
857+
MachineVersion) when MachineVersion > 4 ->
858+
case MemberNodes -- ExpectedNodes of
859+
[] ->
860+
ok;
861+
Stale when length(ExpectedNodes) > 0 ->
862+
rabbit_log:debug("Stale nodes detected in stream SAC "
863+
"coordinator: ~w. Purging state.",
864+
[Stale]),
865+
%% TODO SAC pipeline command to purge state from stale nodes
866+
ok;
867+
_ ->
868+
ok
869+
end;
870+
maybe_handle_stale_nodes(_, _, _) ->
871+
ok.
872+
829873
add_member(Members, Node) ->
830874
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
831875
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
@@ -899,80 +943,78 @@ init_aux(_Name) ->
899943

900944
%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
901945
handle_aux(leader, _, maybe_resize_coordinator_cluster,
902-
#aux{resizer = undefined} = Aux, LogState, _) ->
903-
Pid = maybe_resize_coordinator_cluster(),
904-
{no_reply, Aux#aux{resizer = Pid}, LogState, [{monitor, process, aux, Pid}]};
946+
#aux{resizer = undefined} = Aux, RaAux) ->
947+
MachineVersion = ra_aux:effective_machine_version(RaAux),
948+
Pid = maybe_resize_coordinator_cluster(MachineVersion),
949+
{no_reply, Aux#aux{resizer = Pid}, RaAux, [{monitor, process, aux, Pid}]};
905950
handle_aux(leader, _, maybe_resize_coordinator_cluster,
906-
AuxState, LogState, _) ->
951+
AuxState, RaAux) ->
907952
%% Coordinator resizing is still happening, let's ignore this tick event
908-
{no_reply, AuxState, LogState};
953+
{no_reply, AuxState, RaAux};
909954
handle_aux(leader, _, {down, Pid, _},
910-
#aux{resizer = Pid} = Aux, LogState, _) ->
955+
#aux{resizer = Pid} = Aux, RaAux) ->
911956
%% Coordinator resizing has finished
912-
{no_reply, Aux#aux{resizer = undefined}, LogState};
957+
{no_reply, Aux#aux{resizer = undefined}, RaAux};
913958
handle_aux(leader, _, {start_writer, StreamId,
914959
#{epoch := Epoch, node := Node} = Args, Conf},
915-
Aux, LogState, _) ->
960+
Aux, RaAux) ->
916961
rabbit_log:debug("~ts: running action: 'start_writer'"
917962
" for ~ts on node ~w in epoch ~b",
918963
[?MODULE, StreamId, Node, Epoch]),
919964
ActionFun = phase_start_writer(StreamId, Args, Conf),
920-
run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
965+
run_action(starting, StreamId, Args, ActionFun, Aux, RaAux);
921966
handle_aux(leader, _, {start_replica, StreamId,
922967
#{epoch := Epoch, node := Node} = Args, Conf},
923-
Aux, LogState, _) ->
968+
Aux, RaAux) ->
924969
rabbit_log:debug("~ts: running action: 'start_replica'"
925970
" for ~ts on node ~w in epoch ~b",
926971
[?MODULE, StreamId, Node, Epoch]),
927972
ActionFun = phase_start_replica(StreamId, Args, Conf),
928-
run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
973+
run_action(starting, StreamId, Args, ActionFun, Aux, RaAux);
929974
handle_aux(leader, _, {stop, StreamId, #{node := Node,
930975
epoch := Epoch} = Args, Conf},
931-
Aux, LogState, _) ->
976+
Aux, RaAux) ->
932977
rabbit_log:debug("~ts: running action: 'stop'"
933978
" for ~ts on node ~w in epoch ~b",
934979
[?MODULE, StreamId, Node, Epoch]),
935980
ActionFun = phase_stop_member(StreamId, Args, Conf),
936-
run_action(stopping, StreamId, Args, ActionFun, Aux, LogState);
981+
run_action(stopping, StreamId, Args, ActionFun, Aux, RaAux);
937982
handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf},
938-
#aux{actions = _Monitors} = Aux, LogState,
939-
#?MODULE{streams = _Streams}) ->
983+
#aux{actions = _Monitors} = Aux, RaAux) ->
940984
rabbit_log:debug("~ts: running action: 'update_mnesia'"
941985
" for ~ts", [?MODULE, StreamId]),
942986
ActionFun = phase_update_mnesia(StreamId, Args, Conf),
943-
run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState);
987+
run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, RaAux);
944988
handle_aux(leader, _, {update_retention, StreamId, Args, _Conf},
945-
#aux{actions = _Monitors} = Aux, LogState,
946-
#?MODULE{streams = _Streams}) ->
989+
#aux{actions = _Monitors} = Aux, RaAux) ->
947990
rabbit_log:debug("~ts: running action: 'update_retention'"
948991
" for ~ts", [?MODULE, StreamId]),
949992
ActionFun = phase_update_retention(StreamId, Args),
950-
run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState);
993+
run_action(update_retention, StreamId, Args, ActionFun, Aux, RaAux);
951994
handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf},
952-
#aux{actions = _Monitors} = Aux, LogState,
953-
#?MODULE{streams = _Streams}) ->
995+
#aux{actions = _Monitors} = Aux, RaAux) ->
954996
rabbit_log:debug("~ts: running action: 'delete_member'"
955997
" for ~ts ~ts", [?MODULE, StreamId, Node]),
956998
ActionFun = phase_delete_member(StreamId, Args, Conf),
957-
run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState);
999+
run_action(delete_member, StreamId, Args, ActionFun, Aux, RaAux);
9581000
handle_aux(leader, _, fail_active_actions,
959-
#aux{actions = Actions} = Aux, LogState,
960-
#?MODULE{streams = Streams}) ->
1001+
#aux{actions = Actions} = Aux, RaAux) ->
9611002
%% this bit of code just creates an exclude map of currently running
9621003
%% tasks to avoid failing them, this could only really happen during
9631004
%% a leader flipflap
9641005
Exclude = maps:from_list([{S, ok}
9651006
|| {P, {S, _, _}} <- maps_to_list(Actions),
9661007
is_process_alive(P)]),
9671008
rabbit_log:debug("~ts: failing actions: ~w", [?MODULE, Exclude]),
1009+
#?MODULE{streams = Streams} = ra_aux:machine_state(RaAux),
9681010
fail_active_actions(Streams, Exclude),
969-
{no_reply, Aux, LogState, []};
1011+
{no_reply, Aux, RaAux, []};
9701012
handle_aux(leader, _, {down, Pid, normal},
971-
#aux{actions = Monitors} = Aux, LogState, _) ->
1013+
#aux{actions = Monitors} = Aux, RaAux) ->
9721014
%% action process finished normally, just remove from actions map
973-
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, LogState, []};
1015+
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, RaAux, []};
9741016
handle_aux(leader, _, {down, Pid, Reason},
975-
#aux{actions = Monitors0} = Aux, LogState, _) ->
1017+
#aux{actions = Monitors0} = Aux, RaAux) ->
9761018
%% An action has failed - report back to the state machine
9771019
case maps:get(Pid, Monitors0, undefined) of
9781020
{StreamId, Action, #{node := Node, epoch := Epoch} = Args} ->
@@ -983,13 +1025,13 @@ handle_aux(leader, _, {down, Pid, Reason},
9831025
Cmd = {action_failed, StreamId, Args#{action => Action}},
9841026
send_self_command(Cmd),
9851027
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)},
986-
LogState, []};
1028+
RaAux, []};
9871029
undefined ->
9881030
%% should this ever happen?
989-
{no_reply, Aux, LogState, []}
1031+
{no_reply, Aux, RaAux, []}
9901032
end;
991-
handle_aux(_, _, _, AuxState, LogState, _) ->
992-
{no_reply, AuxState, LogState}.
1033+
handle_aux(_, _, _, AuxState, RaAux) ->
1034+
{no_reply, AuxState, RaAux}.
9931035

9941036
overview(#?MODULE{streams = Streams,
9951037
monitors = Monitors,
@@ -1025,15 +1067,15 @@ stream_overview0(#stream{epoch = Epoch,
10251067

10261068
run_action(Action, StreamId, #{node := _Node,
10271069
epoch := _Epoch} = Args,
1028-
ActionFun, #aux{actions = Actions0} = Aux, Log) ->
1070+
ActionFun, #aux{actions = Actions0} = Aux, RaAux) ->
10291071
Coordinator = self(),
10301072
Pid = spawn_link(fun() ->
10311073
ActionFun(),
10321074
unlink(Coordinator)
10331075
end),
10341076
Effects = [{monitor, process, aux, Pid}],
10351077
Actions = Actions0#{Pid => {StreamId, Action, Args}},
1036-
{no_reply, Aux#aux{actions = Actions}, Log, Effects}.
1078+
{no_reply, Aux#aux{actions = Actions}, RaAux, Effects}.
10371079

10381080
wrap_reply(From, Reply) ->
10391081
[{reply, From, {wrap_reply, Reply}}].

0 commit comments

Comments
 (0)