Skip to content

Commit b149d7e

Browse files
committed
feat(rebalance): Implement core node rebalancing
1 parent d6123c7 commit b149d7e

File tree

4 files changed

+354
-13
lines changed

4 files changed

+354
-13
lines changed

src/mria.erl

+13-10
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,13 @@
8080

8181
-type info_key() :: members | running_nodes | stopped_nodes | partitions | rlog.
8282

83-
-type infos() :: #{members := list(member()),
84-
running_nodes := list(node()),
85-
stopped_nodes := list(node()),
86-
partitions := list(node()),
87-
rlog := map()
83+
-type infos() :: #{members := list(member()),
84+
running_nodes := list(node()),
85+
stopped_nodes := list(node()),
86+
partitions := list(node()),
87+
rlog := map(),
88+
rebalance_plan := mria_rebalance:plan(),
89+
rebalance_status := _
8890
}.
8991

9092
-type storage() :: ram_copies | disc_copies | disc_only_copies | null_copies | atom().
@@ -153,11 +155,12 @@ info(Key) ->
153155

154156
-spec info() -> infos().
155157
info() ->
156-
#{ running_nodes => cluster_nodes(running)
157-
, stopped_nodes => cluster_nodes(stopped)
158-
, members => mria_membership:members()
159-
, partitions => mria_node_monitor:partitions()
160-
, rlog => mria_rlog:status()
158+
#{ running_nodes => cluster_nodes(running)
159+
, stopped_nodes => cluster_nodes(stopped)
160+
, members => mria_membership:members()
161+
, partitions => mria_node_monitor:partitions()
162+
, rlog => mria_rlog:status()
163+
, rebalance_status => catch mria_rebalance:status()
161164
}.
162165

163166
-spec rocksdb_backend_available() -> boolean().

src/mria_core_shard_sup.erl

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -26,6 +26,8 @@
2626
, start_bootstrapper_sup/2
2727
, restart_agent_sup/1
2828
, restart_bootstrapper_sup/1
29+
30+
, list_agents/0
2931
]).
3032

3133
%% supervisor callbacks & external exports:
@@ -52,6 +54,17 @@ restart_agent_sup(SupPid) ->
5254
restart_bootstrapper_sup(SupPid) ->
5355
restart_sibling(SupPid, bootstrapper).
5456

57+
%% @doc Return the list of agents for all shards
58+
-spec list_agents() -> [{mria_rlog:shard(), [pid()]}].
59+
list_agents() ->
60+
lists:map(
61+
fun({Shard, Pid, _, _}) ->
62+
[AgentSupPid] = [P || {agent, P, supervisor, _} <- supervisor:which_children(Pid)],
63+
Agents = [P || {_, P, _, _} <- supervisor:which_children(AgentSupPid)],
64+
{Shard, Agents}
65+
end,
66+
supervisor:which_children(mria_shards_sup)).
67+
5568
%%================================================================================
5669
%% Supervisor callbacks
5770
%%================================================================================

src/mria_rebalance.erl

+324
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
%%--------------------------------------------------------------------
2+
%% Copyright (c) 2025 EMQ Technologies Co., Ltd. All Rights Reserved.
3+
%%
4+
%% Licensed under the Apache License, Version 2.0 (the "License");
5+
%% you may not use this file except in compliance with the License.
6+
%% You may obtain a copy of the License at
7+
%%
8+
%% http://www.apache.org/licenses/LICENSE-2.0
9+
%%
10+
%% Unless required by applicable law or agreed to in writing, software
11+
%% distributed under the License is distributed on an "AS IS" BASIS,
12+
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
%% See the License for the specific language governing permissions and
14+
%% limitations under the License.
15+
%%--------------------------------------------------------------------
16+
17+
%% @doc This module implements a semi-manual procedure for rebalancing
18+
%% replicants among core nodes.
19+
%%
20+
%% Since bootstrapping of replicants can be relatively expensive,
21+
%% rebalance must be triggered manually. But the rest of the procedure
22+
%% is automatic.
23+
%%
24+
%% How to use it:
25+
%%
26+
%% 1. mria_rebalance:start(). -- plan the rebalance. Should be
27+
%% executed on a core node.
28+
%%
29+
%% 2. mria_rebalance:status(). -- get information about the rebalance.
30+
%%
31+
%% 3. mria_rebalance:confirm(). -- start executing the plan.
32+
%%
33+
%% 4. mria_rebalance:abort(). -- abort the rebalance.
34+
-module(mria_rebalance).
35+
36+
%% API:
37+
-export([start/0, abort/0, confirm/0, status/0]).
38+
39+
%% gen_statem callbacks:
40+
-export([init/1, callback_mode/0, handle_event/4]).
41+
42+
%% Internal exports:
43+
-export([list_agents/0, kick/2, collect/0, plan/1]).
44+
45+
-export_type([input/0, plan/0]).
46+
47+
-include_lib("snabbkaffe/include/trace.hrl").
48+
-include("mria_rlog.hrl").
49+
50+
-ifdef(TEST).
51+
-include_lib("eunit/include/eunit.hrl").
52+
-endif.
53+
54+
%%================================================================================
55+
%% Type declarations
56+
%%================================================================================
57+
58+
-type input() :: #{mria_rlog:shard() =>
59+
[{_Core :: node(), _Agents :: [pid()]}]}.
60+
61+
-record(kick, {shard :: mria_rlog:shard(), core :: node(), agents :: [pid()]}).
62+
63+
-type plan() :: [#kick{}].
64+
65+
-ifndef(TEST).
66+
-define(second, 1000).
67+
-else.
68+
-define(second, 100).
69+
-endif.
70+
71+
-define(n, {global, ?MODULE}).
72+
73+
-record(d, {plan = [] :: plan()}).
74+
75+
-define(wait_confirmation, wait_confirmation).
76+
-define(idle_timeout, idle_timeout).
77+
-define(running, running).
78+
-define(exec_timeout, exec_timeout).
79+
-define(complete, complete).
80+
81+
-define(execute, execute).
82+
-define(get_status, get_status).
83+
84+
%%================================================================================
85+
%% API functions
86+
%%================================================================================
87+
88+
abort() ->
89+
try gen_statem:stop(?n)
90+
catch
91+
exit:noproc -> not_started
92+
end.
93+
94+
start() ->
95+
_ = abort(),
96+
gen_statem:start(?n, ?MODULE, [], []).
97+
98+
confirm() ->
99+
gen_statem:call(?n, ?execute).
100+
101+
status() ->
102+
try gen_statem:call(?n, ?get_status)
103+
catch
104+
exit:{noproc, _} -> not_stated
105+
end.
106+
107+
%%================================================================================
108+
%% Behaviour callbacks
109+
%%================================================================================
110+
111+
callback_mode() -> [handle_event_function, state_enter].
112+
113+
init(_) ->
114+
Plan = plan(collect()),
115+
D = #d{plan = Plan},
116+
case Plan of
117+
[] ->
118+
{ok, ?complete, D};
119+
_ ->
120+
{ok, ?wait_confirmation, D}
121+
end.
122+
123+
%% Wait confirmation state:
124+
handle_event(enter, _, ?wait_confirmation, _D) ->
125+
%% Shut down automatically if plan is not confirmed in 60 seconds:
126+
Timeout = 60 * ?second,
127+
{keep_state_and_data, [{state_timeout, Timeout, ?idle_timeout}]};
128+
handle_event({call, From}, ?execute, ?wait_confirmation, D) ->
129+
Reply = {reply, From, ok},
130+
{next_state, ?running, D, [Reply]};
131+
%% Running state:
132+
handle_event(enter, _, ?running, _D) ->
133+
{keep_state_and_data, [{state_timeout, 0, ?exec_timeout}]};
134+
handle_event(state_timeout, ?exec_timeout, ?running, D = #d{plan = P0}) ->
135+
case pop_task(P0) of
136+
{{Shard, Core, Agent}, P} ->
137+
erpc:call(Core, ?MODULE, kick, [Shard, Agent]),
138+
%% TODO: Make it configurable?
139+
Timeout = 5 * ?second,
140+
{keep_state, D#d{plan = P}, [{state_timeout, Timeout, ?exec_timeout}]};
141+
undefined ->
142+
{next_state, ?complete, D#d{plan = []}}
143+
end;
144+
%% Complete state:
145+
handle_event(enter, _, ?complete, _D) ->
146+
Timeout = 60 * ?second,
147+
{keep_state_and_data, [{state_timeout, Timeout, ?idle_timeout}]};
148+
%% Common:
149+
handle_event({call, From}, ?get_status, State, D) ->
150+
Reply = {reply, From, {State, D#d.plan}},
151+
{keep_state_and_data, [Reply]};
152+
handle_event(state_timeout, ?idle_timeout, _, _D) ->
153+
{stop, normal};
154+
handle_event(EventType, Event, State, Data) ->
155+
?unexpected_event_tp(#{ event_type => EventType
156+
, event => Event
157+
, state => State
158+
, data => Data
159+
}),
160+
keep_state_and_data.
161+
162+
%%================================================================================
163+
%% Internal exports
164+
%%================================================================================
165+
166+
%% @doc Given the current status of the core cluster, derive the
167+
%% rebalance plan:
168+
-spec plan(input()) -> plan().
169+
plan(Status) ->
170+
L = maps:fold(
171+
fun(Shard, Input, Acc) ->
172+
plan(Shard, Input) ++ Acc
173+
end,
174+
[],
175+
Status),
176+
%% Prioritize the most unbalanced nodes/shards:
177+
lists:sort(fun(A, B) ->
178+
length(A#kick.agents) =< length(B#kick.agents)
179+
end,
180+
L).
181+
182+
%% @doc Collect information about agents from the core nodes. Export
183+
%% for debugging/testing.
184+
-spec collect() -> input().
185+
collect() ->
186+
core = mria_rlog:role(),
187+
Cores = mria_mnesia:db_nodes(),
188+
Return = erpc:multicall(Cores, ?MODULE, list_agents, []),
189+
L = [{Shard, Node, Agents} ||
190+
{Node, {ok, L}} <- lists:zip(Cores, Return),
191+
{Shard, Agents} <- L],
192+
maps:groups_from_list(
193+
fun({Shard, _, _}) ->
194+
Shard
195+
end,
196+
fun({_, Node, Agents}) ->
197+
{Node, Agents}
198+
end,
199+
L).
200+
201+
%% RPC target: kick the replicant from the given core node by stopping
202+
%% the agent process. Replicant will automatically reconnect to the
203+
%% core node that is currently the least loaded, hence approaching the
204+
%% balance.
205+
-spec kick(mria_rlog:shard(), pid()) -> ok.
206+
kick(Shard, AgentPid) ->
207+
?tp(notice, "Kicking agent due to rebalance", #{agent => AgentPid, shard => Shard}),
208+
mria_rlog_agent:stop(AgentPid).
209+
210+
%% RPC target:
211+
list_agents() ->
212+
mria_core_shard_sup:list_agents().
213+
214+
%%================================================================================
215+
%% Internal functions
216+
%%================================================================================
217+
218+
pop_task([]) ->
219+
undefined;
220+
pop_task([#kick{agents = []} | Rest]) ->
221+
pop_task(Rest);
222+
pop_task([K = #kick{shard = Shard, core = Core, agents = [A | AL]} | Rest]) ->
223+
{{Shard, Core, A}, [K#kick{agents = AL} | Rest]}.
224+
225+
-spec plan(mria_rlog:shard(), [{node(), [pid()]}]) -> [#kick{}].
226+
plan(Shard, L) ->
227+
NAgents = lists:foldl(
228+
fun({_Node, Agents}, Acc) ->
229+
Acc + length(Agents)
230+
end,
231+
0,
232+
L),
233+
NNodes = length(L),
234+
Avg = ceil(NAgents / NNodes),
235+
lists:filtermap(
236+
fun({Node, Agents}) when length(Agents) > Avg ->
237+
{_, Excess} = lists:split(Avg, Agents),
238+
{true, #kick{shard = Shard, core = Node, agents = Excess}};
239+
(_) ->
240+
false
241+
end,
242+
L).
243+
244+
%%================================================================================
245+
%% Tests
246+
%%================================================================================
247+
248+
-ifdef(TEST).
249+
250+
plan0_test() ->
251+
?assertMatch(
252+
[],
253+
plan(#{})).
254+
255+
%% No rebalance is needed when there is only one core node:
256+
plan_single_node_test() ->
257+
?assertMatch(
258+
[],
259+
plan(#{foo => [{n1, [1, 2, 3]}],
260+
bar => [{n1, [1, 2, 3]}]
261+
})).
262+
263+
%% No further rebalance is needed:
264+
plan_balanced1_test() ->
265+
?assertMatch(
266+
[],
267+
plan(#{foo => [ {n1, [1, 2]}
268+
, {n2, [3]}
269+
, {n3, [4]}
270+
]})).
271+
272+
plan_balanced2_test() ->
273+
?assertMatch(
274+
[],
275+
plan(#{foo => [ {n1, [1, 2]}
276+
, {n2, [3, 4]}
277+
, {n3, [5]}
278+
]})).
279+
280+
plan_balanced3_test() ->
281+
?assertMatch(
282+
[],
283+
plan(#{foo => [ {n1, [1, 2]}
284+
, {n2, [3]}
285+
, {n3, [4]}
286+
, {n4, [5, 6]}
287+
]})).
288+
289+
%% Rebalance is needed:
290+
plan_unbalanced1_test() ->
291+
?assertMatch(
292+
[#kick{shard = foo, core = n1, agents = [2]}],
293+
plan(#{foo => [ {n1, [1, 2]}
294+
, {n2, []}
295+
]})).
296+
297+
plan_unbalanced2_test() ->
298+
?assertMatch(
299+
[#kick{shard = foo, core = n1, agents = [3]}],
300+
plan(#{foo => [ {n1, [1, 2, 3]}
301+
, {n2, [4]}
302+
, {n3, []}
303+
]})).
304+
305+
plan_unbalanced3_test() ->
306+
?assertMatch(
307+
[#kick{shard = foo, core = n1, agents = [2, 3]}],
308+
plan(#{foo => [ {n1, [1, 2, 3]}
309+
, {n2, [4]}
310+
, {n3, []}
311+
, {n4, []}
312+
]})).
313+
314+
plan_unbalanced4_test() ->
315+
?assertMatch(
316+
[ #kick{shard = foo, core = n1, agents = [3]}
317+
, #kick{shard = foo, core = n2, agents = [6]}
318+
],
319+
plan(#{foo => [ {n1, [1, 2, 3]}
320+
, {n2, [4, 5, 6]}
321+
, {n3, []}
322+
]})).
323+
324+
-endif. % TEST

0 commit comments

Comments
 (0)