Skip to content

Commit d6123c7

Browse files
authored
Merge pull request #188 from emqx/dev/fix-rpc-target2
fix(rpc): Follow-up to the previous fix
2 parents e00eba5 + 5882d04 commit d6123c7

6 files changed

+68
-69
lines changed

src/mria.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ should_retry_rpc(_) ->
595595
find_upstream_node(Shard) ->
596596
?tp_span(find_upstream_node, #{shard => Shard},
597597
begin
598-
{ok, Node} = mria_status:upstream_node(Shard, infinity),
598+
{ok, Node} = mria_status:rpc_target(Shard, infinity),
599599
Node
600600
end).
601601

@@ -688,7 +688,7 @@ db_nodes_maybe_rpc() ->
688688
replicant ->
689689
case mria_status:shards_up() of
690690
[Shard|_] ->
691-
{ok, CoreNode} = mria_status:upstream_node(Shard, 5_000),
691+
{ok, CoreNode} = mria_status:rpc_target(Shard, 5_000),
692692
case mria_lib:rpc_call_nothrow(CoreNode, mnesia, system_info, [db_nodes]) of
693693
{badrpc, _} -> [];
694694
{badtcp, _} -> [];

src/mria_rlog_replica.erl

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -226,6 +226,7 @@ initiate_bootstrap(D) ->
226226
, remote_core_node = Remote
227227
, parent_sup = ParentSup
228228
} = D,
229+
mria_status:notify_rpc_target_up(Shard, Remote),
229230
_Pid = mria_replicant_shard_sup:start_bootstrap_client(ParentSup, Shard, Remote, self()),
230231
ReplayqMemOnly = application:get_env(mria, rlog_replayq_mem_only, true),
231232
ReplayqBaseDir = application:get_env(mria, rlog_replayq_dir, "/tmp/rlog"),
@@ -256,6 +257,7 @@ handle_agent_down(State, Reason, D) ->
256257
#{ reason => Reason
257258
, repl_state => State
258259
}),
260+
mria_status:notify_rpc_target_down(D#d.shard),
259261
case State of
260262
?normal ->
261263
{next_state, ?disconnected, D#d{agent = undefined}};
@@ -378,7 +380,7 @@ handle_reconnect(D0 = #d{shard = Shard, checkpoint = Checkpoint, parent_sup = Pa
378380
try_connect(Shard, Checkpoint) ->
379381
Timeout = 4_000, % Don't block FSM forever, allow it to process other messages.
380382
%% Get the best node according to the LB
381-
Nodes = case mria_status:get_core_node(Shard, Timeout) of
383+
Nodes = case mria_status:replica_get_core_node(Shard, Timeout) of
382384
{ok, N} -> [N];
383385
timeout -> []
384386
end,

src/mria_status.erl

+57-27
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@
2424

2525
%% API:
2626
-export([start_link/0,
27+
notify_rpc_target_up/2, notify_rpc_target_down/1, rpc_target/2,
2728
notify_shard_up/2, notify_shard_down/1, wait_for_shards/2,
28-
notify_core_node_up/2, notify_core_node_down/1, get_core_node/2,
29+
notify_core_node_up/2, notify_core_node_down/1, replica_get_core_node/2,
2930
notify_core_intercept_trans/2,
3031

31-
upstream/1, upstream_node/2,
32+
upstream/1, upstream_node/1,
3233
shards_status/0, shards_up/0, shards_syncing/0, shards_down/0,
3334
get_shard_stats/1, agents/0, agents/1, replicants/0, get_shard_lag/1,
3435

@@ -57,11 +58,13 @@
5758
-include("mria_rlog.hrl").
5859
-include_lib("snabbkaffe/include/trace.hrl").
5960

60-
%% Tables and table keys:
61+
%% Optvars:
6162
-define(optvar(KEY), {mria, KEY}).
6263
-define(upstream_pid, upstream_pid).
6364
-define(core_node, core_node).
65+
-define(rpc_target, rpc_target).
6466

67+
%% Tables and table keys:
6568
-define(stats_tab, mria_rlog_stats_tab).
6669
-define(core_intercept, core_intercept).
6770
-define(replicant_state, replicant_state).
@@ -80,29 +83,64 @@
8083
start_link() ->
8184
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
8285

83-
%% @doc Return name of the core node that is _currently serving_ the
84-
%% downstream shard. If shard is currently down, wait until started
85-
%% for at most `Timeout' millisecond. Note the difference in behavior
86-
%% as compared with `get_core_node'.
87-
-spec upstream_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
88-
upstream_node(Shard, Timeout) ->
89-
optvar:read(?optvar({?core_node, Shard}), Timeout).
86+
%% @doc Return name of the core node that can serve as the RPC target.
87+
%% It is the same node that serves the local replica, but this optvar
88+
%% is set before local replica goes up fully. WARNING: this `optvar'
89+
%% is set before local replica becomes consistent.
90+
-spec rpc_target(mria_rlog:shard(), timeout()) -> {ok, node()} | disconnected.
91+
rpc_target(Shard, Timeout) ->
92+
case optvar:read(?optvar({?rpc_target, Shard}), Timeout) of
93+
OK = {ok, _} ->
94+
OK;
95+
timeout ->
96+
disconnected
97+
end.
98+
99+
-spec notify_rpc_target_up(mria_rlog:shard(), node()) -> ok.
100+
notify_rpc_target_up(Shard, Upstream) ->
101+
do_notify_up(?rpc_target, Shard, Upstream).
102+
103+
-spec notify_rpc_target_down(mria_rlog:shard()) -> ok.
104+
notify_rpc_target_down(Shard) ->
105+
do_notify_down(?rpc_target, Shard).
106+
107+
%% @doc Return name of the core node that is currently serving the
108+
%% downstream shard. In contrast with `rpc_target', this optvar is set
109+
%% when the shard reaches `normal' state and local reads become
110+
%% consistent.
111+
-spec upstream_node(mria_rlog:shard()) -> {ok, node()} | disconnected.
112+
upstream_node(Shard) ->
113+
case upstream(Shard) of
114+
{ok, Pid} -> {ok, node(Pid)};
115+
disconnected -> disconnected
116+
end.
90117

91-
%% @doc Return pid of the core node agent that serves us.
118+
%% @doc Return pid of the core node agent that serves us (when shard
119+
%% is in `normal' state).
92120
-spec upstream(mria_rlog:shard()) -> {ok, pid()} | disconnected.
93121
upstream(Shard) ->
94122
case optvar:peek(?optvar({?upstream_pid, Shard})) of
95123
{ok, Pid} -> {ok, Pid};
96124
undefined -> disconnected
97125
end.
98126

99-
%% @deprecated Return a core node that _might_ be able to serve the
100-
%% specified shard. WARNING: use of this function leads to unbalanced
101-
%% load of the core nodes.
102-
-spec get_core_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
103-
get_core_node(Shard, Timeout) ->
127+
%% @doc WARNING: this optvar is used STRICTLY for interaction between
128+
%% `mria_lb' and `mria_replica' FSM. Its value is equal to core node
129+
%% that serves minimal number of replicants. As such, it must NOT be
130+
%% used for RPC targeting: all RPCs from the entire cluster will end
131+
%% up on a single node.
132+
-spec replica_get_core_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
133+
replica_get_core_node(Shard, Timeout) ->
104134
optvar:read(?optvar({?core_node, Shard}), Timeout).
105135

136+
-spec notify_core_node_up(mria_rlog:shard(), node()) -> ok.
137+
notify_core_node_up(Shard, Node) ->
138+
do_notify_up(?core_node, Shard, Node).
139+
140+
-spec notify_core_node_down(mria_rlog:shard()) -> ok.
141+
notify_core_node_down(Shard) ->
142+
do_notify_down(?core_node, Shard).
143+
106144
-spec notify_shard_up(mria_rlog:shard(), _AgentPid :: pid()) -> ok.
107145
notify_shard_up(Shard, Upstream) ->
108146
do_notify_up(?upstream_pid, Shard, Upstream).
@@ -120,14 +158,6 @@ notify_shard_down(Shard) ->
120158
?replicant_bootstrap_import
121159
]).
122160

123-
-spec notify_core_node_up(mria_rlog:shard(), node()) -> ok.
124-
notify_core_node_up(Shard, Node) ->
125-
do_notify_up(?core_node, Shard, Node).
126-
127-
-spec notify_core_node_down(mria_rlog:shard()) -> ok.
128-
notify_core_node_down(Shard) ->
129-
do_notify_down(?core_node, Shard).
130-
131161
-spec notify_core_intercept_trans(mria_rlog:shard(), mria_rlog:seqno()) -> ok.
132162
notify_core_intercept_trans(Shard, SeqNo) ->
133163
set_stat(Shard, ?core_intercept, SeqNo).
@@ -171,10 +201,10 @@ replicants() ->
171201

172202
-spec get_shard_lag(mria_rlog:shard()) -> non_neg_integer() | disconnected.
173203
get_shard_lag(Shard) ->
174-
case {mria_config:role(), upstream_node(Shard, 0)} of
204+
case {mria_config:role(), upstream_node(Shard)} of
175205
{core, _} ->
176206
0;
177-
{replicant, timeout} ->
207+
{replicant, disconnected} ->
178208
disconnected;
179209
{replicant, {ok, Upstream}} ->
180210
RemoteSeqNo = erpc:call(Upstream, ?MODULE, get_stat, [Shard, ?core_intercept], 1000),
@@ -242,7 +272,7 @@ get_shard_stats(Shard) ->
242272
, server_mql => get_mql(Shard)
243273
};
244274
replicant ->
245-
case upstream_node(Shard, 0) of
275+
case upstream_node(Shard) of
246276
{ok, Upstream} -> ok;
247277
_ -> Upstream = undefined
248278
end,

test/concuerror_tests.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -69,7 +69,7 @@ get_core_node_test() ->
6969
spawn(fun() ->
7070
catch mria_status:notify_core_node_up(foo, Node)
7171
end),
72-
?assertMatch({ok, Node}, mria_status:get_core_node(foo, infinity)),
72+
?assertMatch({ok, Node}, mria_status:replica_get_core_node(foo, infinity)),
7373
?assertMatch([], flush())
7474
after
7575
cleanup()

test/mria_SUITE.erl

+1-34
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -365,36 +365,6 @@ t_transaction_on_replicant(_) ->
365365
mria_rlog_props:no_unexpected_events(Trace)
366366
end).
367367

368-
%% EMQX-11006
369-
%%
370-
%% Test retry of aborted R/O transaction RPCs
371-
t_ro_trans_retry(_) ->
372-
Cluster = mria_ct:cluster([core, replicant], mria_mnesia_test_util:common_env()),
373-
?check_trace(
374-
#{timetrap => 10000},
375-
try
376-
Nodes = [N1, N2] = mria_ct:start_cluster(mria, Cluster),
377-
mria_mnesia_test_util:wait_tables(Nodes),
378-
%% Delay restart of mria on the core until the replicant
379-
%% retries the transaction at least once:
380-
?force_ordering(
381-
#{?snk_kind := mria_retry_rpc_to_core},
382-
#{?snk_kind := "Starting mnesia", ?snk_meta := #{node := N1}}),
383-
%% Restart mria on the core.
384-
?assertMatch(ok, rpc:call(N1, mria, stop, [])),
385-
rpc:cast(N1, mria, start, []),
386-
%% Issue a R/O transaction on the replicant:
387-
TransFun = fun() -> 42 end,
388-
?assertMatch({atomic, 42},
389-
rpc:call(N2, mria, ro_transaction, [test_shard, TransFun]))
390-
after
391-
mria_ct:teardown_cluster(Cluster)
392-
end,
393-
fun(Trace) ->
394-
%% Ensure that the replicant retried RPC:
395-
?assertMatch([_|_], ?of_kind(mria_retry_rpc_to_core, Trace))
396-
end).
397-
398368
t_sync_transaction_on_replicant(_) ->
399369
Cluster = mria_ct:cluster([core, replicant, replicant], mria_mnesia_test_util:common_env()),
400370
?check_trace(
@@ -863,9 +833,6 @@ t_sum_verify(_) ->
863833
?check_trace(
864834
#{timetrap => 30000},
865835
try
866-
?force_ordering( #{?snk_kind := verify_trans_step, n := N} when N =:= NTrans div 4
867-
, #{?snk_kind := state_change, to := bootstrap, shard := test_shard}
868-
),
869836
?force_ordering( #{?snk_kind := verify_trans_step, n := N} when N =:= 2 * NTrans div 4
870837
, #{?snk_kind := state_change, to := local_replay, shard := test_shard}
871838
),

test/mria_lb_SUITE.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2019-2021, 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -283,7 +283,7 @@ t_custom_compat_check(_Config) ->
283283
[_C1, _C2, C3, R1] = mria_ct:start_cluster(mria, Cluster),
284284
?assertEqual({ok, C3},
285285
erpc:call( R1
286-
, mria_status, get_core_node, [?mria_meta_shard, infinity]
286+
, mria_status, replica_get_core_node, [?mria_meta_shard, infinity]
287287
, infinity
288288
))
289289
after

0 commit comments

Comments
 (0)