Skip to content

Commit 31017f9

Browse files
authored
Merge pull request #240 from keynslug/fix/EEC-112/autoheal-asymm
fix(autoheal): attempt healing complex asymmetric partitions
2 parents 81cff1c + 5b2846d commit 31017f9

File tree

5 files changed

+244
-63
lines changed

5 files changed

+244
-63
lines changed

.github/workflows/run_test_case.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ jobs:
99
runs-on: ubuntu-latest
1010

1111
container:
12-
image: erlang:22.1
12+
image: erlang:24
1313

1414
steps:
15-
- uses: actions/checkout@v1
15+
- uses: actions/checkout@v4
1616
- name: Run tests
1717
run: |
1818
make eunit
@@ -23,12 +23,12 @@ jobs:
2323
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
2424
run: |
2525
make coveralls
26-
- uses: actions/upload-artifact@v1
26+
- uses: actions/upload-artifact@v4
2727
if: always()
2828
with:
2929
name: logs
3030
path: _build/test/logs
31-
- uses: actions/upload-artifact@v1
31+
- uses: actions/upload-artifact@v4
3232
with:
3333
name: cover
3434
path: _build/test/cover

src/ekka.appup.src

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
%% -*- mode: erlang -*-
22
%% Unless you know what you are doing, DO NOT edit manually!!
33
{VSN,
4-
[{"0.8.1.11",
4+
[{"0.8.1.12",
5+
[{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]},
6+
{load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]},
7+
{"0.8.1.11",
58
[{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]},
69
{load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]},
710
{"0.8.1.10",
@@ -67,7 +70,10 @@
6770
{load_module,ekka_httpc,brutal_purge,soft_purge,[]},
6871
{load_module,ekka_mnesia,brutal_purge,soft_purge,[]},
6972
{load_module,ekka_dist,brutal_purge,soft_purge,[]}]}],
70-
[{"0.8.1.11",
73+
[{"0.8.1.12",
74+
[{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]},
75+
{load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]},
76+
{"0.8.1.11",
7177
[{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]},
7278
{load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]},
7379
{"0.8.1.10",

src/ekka_autoheal.erl

+95-36
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
-module(ekka_autoheal).
1818

19+
-include_lib("snabbkaffe/include/trace.hrl").
20+
1921
-export([ init/0
2022
, enabled/0
2123
, proc/1
@@ -24,7 +26,7 @@
2426

2527
-record(autoheal, {delay, role, proc, timer}).
2628

27-
-type(autoheal() :: #autoheal{}).
29+
-type autoheal() :: #autoheal{}.
2830

2931
-export_type([autoheal/0]).
3032

@@ -47,8 +49,7 @@ enabled() ->
4749
end.
4850

4951
proc(undefined) -> undefined;
50-
proc(#autoheal{proc = Proc}) ->
51-
Proc.
52+
proc(#autoheal{proc = Proc}) -> Proc.
5253

5354
handle_msg(Msg, undefined) ->
5455
?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
@@ -76,12 +77,24 @@ handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, t
7677
Nodes = ekka_mnesia:cluster_nodes(all),
7778
case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
7879
{Views, []} ->
79-
SplitView = lists:sort(fun compare_view/2, lists:usort(Views)),
80-
ekka_node_monitor:cast(coordinator(SplitView), {heal_partition, SplitView});
80+
SplitView = find_split_view(Nodes, Views),
81+
HealPlan = find_heal_plan(SplitView),
82+
case HealPlan of
83+
{Candidates = [_ | _], Minority} ->
84+
%% Non-empty list of candidates, choose a coordinator.
85+
CoordNode = pick_coordinator(Candidates),
86+
ekka_node_monitor:cast(CoordNode, {heal_cluster, Minority, SplitView});
87+
{[], Cluster} ->
88+
%% It's very unlikely but possible to have empty list of candidates.
89+
ekka_node_monitor:cast(node(), {heal_cluster, Cluster, SplitView});
90+
{} ->
91+
ignore
92+
end,
93+
Autoheal#autoheal{timer = undefined};
8194
{_Views, BadNodes} ->
82-
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes])
83-
end,
84-
Autoheal#autoheal{timer = undefined};
95+
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
96+
Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
97+
end;
8598
false ->
8699
Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
87100
end;
@@ -91,51 +104,98 @@ handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
91104
Autoheal;
92105

93106
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
107+
%% NOTE: Backward compatibility.
108+
case SplitView of
109+
%% No partitions.
110+
[] -> Autoheal;
111+
[{_, []}] -> Autoheal;
112+
%% Partitions.
113+
SplitView ->
114+
Proc = spawn_link(fun() -> heal_partition(SplitView) end),
115+
Autoheal#autoheal{role = coordinator, proc = Proc}
116+
end;
117+
118+
handle_msg({heal_cluster, Minority, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
94119
Proc = spawn_link(fun() ->
95-
?LOG(info, "Healing partition: ~p", [SplitView]),
96-
_ = heal_partition(SplitView)
97-
end),
120+
?tp(notice, "Healing cluster partition", #{
121+
need_reboot => Minority,
122+
split_view => SplitView
123+
}),
124+
reboot_minority(Minority -- [node()])
125+
end),
98126
Autoheal#autoheal{role = coordinator, proc = Proc};
99127

100-
handle_msg({heal_partition, SplitView}, Autoheal= #autoheal{proc = _Proc}) ->
128+
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
101129
?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
102130
Autoheal;
103131

104132
handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) ->
105133
Autoheal#autoheal{proc = undefined};
106134
handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) ->
107-
?LOG(critical, "Autoheal process crashed: ~s", [Reason]),
135+
?LOG(critical, "Autoheal process crashed: ~p", [Reason]),
136+
_Retry = ekka_node_monitor:run_after(1000, confirm_partition),
108137
Autoheal#autoheal{proc = undefined};
109138

110139
handle_msg(Msg, Autoheal) ->
111140
?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
112141
Autoheal.
113142

114-
compare_view({Running1, _} , {Running2, _}) ->
115-
Len1 = length(Running1), Len2 = length(Running2),
116-
if
117-
Len1 > Len2 -> true;
118-
Len1 == Len2 -> lists:member(node(), Running1);
119-
true -> false
143+
find_split_view(Nodes, Views) ->
144+
ClusterView = lists:zipwith(
145+
fun(N, {Running, Stopped}) -> {N, Running, Stopped} end,
146+
Nodes,
147+
Views
148+
),
149+
MajorityView = lists:sort(fun compare_node_views/2, ClusterView),
150+
find_split_view(MajorityView).
151+
152+
compare_node_views({_N1, Running1, _}, {_N2, Running2, _}) ->
153+
Len1 = length(Running1),
154+
Len2 = length(Running2),
155+
case Len1 of
156+
%% Prefer partitions with higher number of surviving nodes.
157+
L when L > Len2 -> true;
158+
%% If number of nodes is the same, sort by list of running nodes.
159+
Len2 -> Running1 < Running2;
160+
L when L < Len2 -> false
120161
end.
121162

122-
coordinator([{Nodes, _} | _]) ->
123-
ekka_membership:coordinator(Nodes).
124-
125-
-spec heal_partition(list()) -> list(node()).
126-
heal_partition([]) ->
127-
[];
128-
%% All nodes connected.
129-
heal_partition([{_, []}]) ->
130-
[];
131-
%% Partial partitions happened.
132-
heal_partition([{Nodes, []}|_]) ->
163+
find_split_view([{_Node, _Running, []} | Views]) ->
164+
%% Node observes no partitions, ignore.
165+
find_split_view(Views);
166+
find_split_view([View = {_Node, _Running, Partitioned} | Views]) ->
167+
%% Node observes some nodes as partitioned from it.
168+
%% These nodes need to be rebooted, and as such they should not be part of split view.
169+
Rest = lists:foldl(fun(N, Acc) -> lists:keydelete(N, 1, Acc) end, Views, Partitioned),
170+
[View | find_split_view(Rest)];
171+
find_split_view([]) ->
172+
[].
173+
174+
find_heal_plan([{_Node, R0, P0} | Rest]) ->
175+
%% If we have more than one parition in split view, we need to reboot _all_ of the nodes
176+
%% in each view's partition (i.e. ⋃(Partitions)) for better safety. But then we need to
177+
%% find candidates to do it, as ⋃(Running) ∖ ⋃(Partitions).
178+
{_Nodes, Rs, Ps} = lists:unzip3(Rest),
179+
URunning = ordsets:union(lists:map(fun ordsets:from_list/1, [R0 | Rs])),
180+
UPartitions = ordsets:union(lists:map(fun ordsets:from_list/1, [P0 | Ps])),
181+
{ordsets:subtract(URunning, UPartitions), UPartitions};
182+
find_heal_plan([]) ->
183+
{}.
184+
185+
pick_coordinator(Candidates) ->
186+
case lists:member(node(), Candidates) of
187+
true -> node();
188+
false -> ekka_membership:coordinator(Candidates)
189+
end.
190+
191+
heal_partition([{Nodes, []} | _] = SplitView) ->
192+
%% Symmetric partition.
193+
?LOG(info, "Healing partition: ~p", [SplitView]),
133194
reboot_minority(Nodes -- [node()]);
134-
heal_partition([{Majority, Minority}, {Minority, Majority}]) ->
135-
reboot_minority(Minority);
136-
heal_partition(SplitView) ->
137-
?LOG(critical, "Cannot heal the partitions: ~p", [SplitView]),
138-
error({unknown_splitview, SplitView}).
195+
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
196+
%% Symmetric partition.
197+
?LOG(info, "Healing partition: ~p", [SplitView]),
198+
reboot_minority(Minority).
139199

140200
reboot_minority(Minority) ->
141201
lists:foreach(fun shutdown/1, Minority),
@@ -155,4 +215,3 @@ ensure_cancel_timer(undefined) ->
155215
ok;
156216
ensure_cancel_timer(TRef) ->
157217
catch erlang:cancel_timer(TRef).
158-

src/ekka_node_monitor.erl

+8-1
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,10 @@ handle_cast({confirm, TargetNode, Status}, State) ->
110110

111111
handle_cast(Msg = {report_partition, _Node}, State) ->
112112
{noreply, autoheal_handle_msg(Msg, State)};
113-
114113
handle_cast(Msg = {heal_partition, _SplitView}, State) ->
115114
{noreply, autoheal_handle_msg(Msg, State)};
115+
handle_cast(Msg = {heal_cluster, _, _}, State) ->
116+
{noreply, autoheal_handle_msg(Msg, State)};
116117

117118
handle_cast(Msg, State) ->
118119
?LOG(error, "Unexpected cast: ~p", [Msg]),
@@ -142,6 +143,12 @@ handle_info({mnesia_system_event, {mnesia_up, Node}},
142143
false -> ok;
143144
true -> ekka_membership:partition_healed(Node)
144145
end,
146+
%% If there was an anymmetric cluster partition, we might need more
147+
%% autoheal iterations to completely bring the cluster back to normal.
148+
case ekka_autoheal:enabled() of
149+
{true, _} -> run_after(3000, confirm_partition);
150+
false -> ignore
151+
end,
145152
{noreply, State#state{partitions = lists:delete(Node, Partitions)}};
146153

147154
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->

0 commit comments

Comments
 (0)