Skip to content

Commit 0b48c76

Browse files
committed
fix(rpc): Introduce a separate optvar for RPC targeting
1 parent baa7aae commit 0b48c76

File tree

6 files changed

+60
-33
lines changed

6 files changed

+60
-33
lines changed

src/mria.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -595,7 +595,7 @@ should_retry_rpc(_) ->
595595
find_upstream_node(Shard) ->
596596
?tp_span(find_upstream_node, #{shard => Shard},
597597
begin
598-
{ok, Node} = mria_status:get_core_node(Shard, infinity),
598+
{ok, Node} = mria_status:rpc_target(Shard, infinity),
599599
Node
600600
end).
601601

@@ -688,7 +688,7 @@ db_nodes_maybe_rpc() ->
688688
replicant ->
689689
case mria_status:shards_up() of
690690
[Shard|_] ->
691-
{ok, CoreNode} = mria_status:get_core_node(Shard, 5_000),
691+
{ok, CoreNode} = mria_status:rpc_target(Shard, 5_000),
692692
case mria_lib:rpc_call_nothrow(CoreNode, mnesia, system_info, [db_nodes]) of
693693
{badrpc, _} -> [];
694694
{badtcp, _} -> [];

src/mria_rlog_replica.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -226,6 +226,7 @@ initiate_bootstrap(D) ->
226226
, remote_core_node = Remote
227227
, parent_sup = ParentSup
228228
} = D,
229+
mria_status:notify_rpc_target_up(Shard, Remote),
229230
_Pid = mria_replicant_shard_sup:start_bootstrap_client(ParentSup, Shard, Remote, self()),
230231
ReplayqMemOnly = application:get_env(mria, rlog_replayq_mem_only, true),
231232
ReplayqBaseDir = application:get_env(mria, rlog_replayq_dir, "/tmp/rlog"),
@@ -256,6 +257,7 @@ handle_agent_down(State, Reason, D) ->
256257
#{ reason => Reason
257258
, repl_state => State
258259
}),
260+
mria_status:notify_rpc_target_down(D#d.shard),
259261
case State of
260262
?normal ->
261263
{next_state, ?disconnected, D#d{agent = undefined}};
@@ -378,7 +380,7 @@ handle_reconnect(D0 = #d{shard = Shard, checkpoint = Checkpoint, parent_sup = Pa
378380
try_connect(Shard, Checkpoint) ->
379381
Timeout = 4_000, % Don't block FSM forever, allow it to process other messages.
380382
%% Get the best node according to the LB
381-
Nodes = case mria_status:get_core_node(Shard, Timeout) of
383+
Nodes = case mria_status:replica_get_core_node(Shard, Timeout) of
382384
{ok, N} -> [N];
383385
timeout -> []
384386
end,

src/mria_status.erl

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -24,8 +24,9 @@
2424

2525
%% API:
2626
-export([start_link/0,
27+
notify_rpc_target_up/2, notify_rpc_target_down/1, rpc_target/2,
2728
notify_shard_up/2, notify_shard_down/1, wait_for_shards/2,
28-
notify_core_node_up/2, notify_core_node_down/1, get_core_node/2,
29+
notify_core_node_up/2, notify_core_node_down/1, replica_get_core_node/2,
2930
notify_core_intercept_trans/2,
3031

3132
upstream/1, upstream_node/1,
@@ -57,11 +58,13 @@
5758
-include("mria_rlog.hrl").
5859
-include_lib("snabbkaffe/include/trace.hrl").
5960

60-
%% Tables and table keys:
61+
%% Optvars:
6162
-define(optvar(KEY), {mria, KEY}).
6263
-define(upstream_pid, upstream_pid).
6364
-define(core_node, core_node).
65+
-define(rpc_target, rpc_target).
6466

67+
%% Tables and table keys:
6568
-define(stats_tab, mria_rlog_stats_tab).
6669
-define(core_intercept, core_intercept).
6770
-define(replicant_state, replicant_state).
@@ -80,31 +83,64 @@
8083
start_link() ->
8184
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
8285

83-
%% @doc Return name of the core node that is _currently serving_ the
84-
%% downstream shard. Note the difference in behavior as compared with
85-
%% `get_core_node'. Returns `disconnected' if the local replica of the
86-
%% shard is down.
86+
%% @doc Return name of the core node that can serve as the RPC target.
87+
%% It is the same node that serves the local replica, but this optvar
88+
%% is set before local replica goes up fully. WARNING: this `optvar'
89+
%% is set before local replica becomes consistent.
90+
-spec rpc_target(mria_rlog:shard(), timeout()) -> {ok, node()} | disconnected.
91+
rpc_target(Shard, Timeout) ->
92+
case optvar:read(?optvar({?rpc_target, Shard}), Timeout) of
93+
OK = {ok, _} ->
94+
OK;
95+
timeout ->
96+
disconnected
97+
end.
98+
99+
-spec notify_rpc_target_up(mria_rlog:shard(), node()) -> ok.
100+
notify_rpc_target_up(Shard, Upstream) ->
101+
do_notify_up(?rpc_target, Shard, Upstream).
102+
103+
-spec notify_rpc_target_down(mria_rlog:shard()) -> ok.
104+
notify_rpc_target_down(Shard) ->
105+
do_notify_down(?rpc_target, Shard).
106+
107+
%% @doc Return name of the core node that is currently serving the
108+
%% downstream shard. In contrast with `rpc_target', this optvar is set
109+
%% when the shard reaches `normal' state and local reads become
110+
%% consistent.
87111
-spec upstream_node(mria_rlog:shard()) -> {ok, node()} | disconnected.
88112
upstream_node(Shard) ->
89113
case upstream(Shard) of
90114
{ok, Pid} -> {ok, node(Pid)};
91115
disconnected -> disconnected
92116
end.
93117

94-
%% @doc Return pid of the core node agent that serves us.
118+
%% @doc Return pid of the core node agent that serves us (when shard
119+
%% is in `normal' state).
95120
-spec upstream(mria_rlog:shard()) -> {ok, pid()} | disconnected.
96121
upstream(Shard) ->
97122
case optvar:peek(?optvar({?upstream_pid, Shard})) of
98123
{ok, Pid} -> {ok, Pid};
99124
undefined -> disconnected
100125
end.
101126

102-
%% @doc Return a core node that _might_ be able to serve the specified
103-
%% shard.
104-
-spec get_core_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
105-
get_core_node(Shard, Timeout) ->
127+
%% @doc WARNING: this optvar is used STRICTLY for interaction between
128+
%% `mria_lb' and `mria_replica' FSM. Its value is equal to core node
129+
%% that serves minimal number of replicants. As such, it must NOT be
130+
%% used for RPC targeting: all RPCs from the entire cluster will end
131+
%% up on a single node.
132+
-spec replica_get_core_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
133+
replica_get_core_node(Shard, Timeout) ->
106134
optvar:read(?optvar({?core_node, Shard}), Timeout).
107135

136+
-spec notify_core_node_up(mria_rlog:shard(), node()) -> ok.
137+
notify_core_node_up(Shard, Node) ->
138+
do_notify_up(?core_node, Shard, Node).
139+
140+
-spec notify_core_node_down(mria_rlog:shard()) -> ok.
141+
notify_core_node_down(Shard) ->
142+
do_notify_down(?core_node, Shard).
143+
108144
-spec notify_shard_up(mria_rlog:shard(), _AgentPid :: pid()) -> ok.
109145
notify_shard_up(Shard, Upstream) ->
110146
do_notify_up(?upstream_pid, Shard, Upstream).
@@ -122,14 +158,6 @@ notify_shard_down(Shard) ->
122158
?replicant_bootstrap_import
123159
]).
124160

125-
-spec notify_core_node_up(mria_rlog:shard(), node()) -> ok.
126-
notify_core_node_up(Shard, Node) ->
127-
do_notify_up(?core_node, Shard, Node).
128-
129-
-spec notify_core_node_down(mria_rlog:shard()) -> ok.
130-
notify_core_node_down(Shard) ->
131-
do_notify_down(?core_node, Shard).
132-
133161
-spec notify_core_intercept_trans(mria_rlog:shard(), mria_rlog:seqno()) -> ok.
134162
notify_core_intercept_trans(Shard, SeqNo) ->
135163
set_stat(Shard, ?core_intercept, SeqNo).

test/concuerror_tests.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -69,7 +69,7 @@ get_core_node_test() ->
6969
spawn(fun() ->
7070
catch mria_status:notify_core_node_up(foo, Node)
7171
end),
72-
?assertMatch({ok, Node}, mria_status:get_core_node(foo, infinity)),
72+
?assertMatch({ok, Node}, mria_status:replica_get_core_node(foo, infinity)),
7373
?assertMatch([], flush())
7474
after
7575
cleanup()

test/mria_SUITE.erl

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -863,9 +863,6 @@ t_sum_verify(_) ->
863863
?check_trace(
864864
#{timetrap => 30000},
865865
try
866-
?force_ordering( #{?snk_kind := verify_trans_step, n := N} when N =:= NTrans div 4
867-
, #{?snk_kind := state_change, to := bootstrap, shard := test_shard}
868-
),
869866
?force_ordering( #{?snk_kind := verify_trans_step, n := N} when N =:= 2 * NTrans div 4
870867
, #{?snk_kind := state_change, to := local_replay, shard := test_shard}
871868
),

test/mria_lb_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2019-2021, 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -283,7 +283,7 @@ t_custom_compat_check(_Config) ->
283283
[_C1, _C2, C3, R1] = mria_ct:start_cluster(mria, Cluster),
284284
?assertEqual({ok, C3},
285285
erpc:call( R1
286-
, mria_status, get_core_node, [?mria_meta_shard, infinity]
286+
, mria_status, replica_get_core_node, [?mria_meta_shard, infinity]
287287
, infinity
288288
))
289289
after

0 commit comments

Comments
 (0)