Skip to content

Commit e00eba5

Browse files
authored
Merge pull request #186 from emqx/dev/fix-rpc-target
fix(rpc): RPC to the upstream node
2 parents 2331739 + 1cfddf5 commit e00eba5

File tree

2 files changed

+17
-19
lines changed

2 files changed

+17
-19
lines changed

src/mria.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2019-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -595,7 +595,7 @@ should_retry_rpc(_) ->
595595
find_upstream_node(Shard) ->
596596
?tp_span(find_upstream_node, #{shard => Shard},
597597
begin
598-
{ok, Node} = mria_status:get_core_node(Shard, infinity),
598+
{ok, Node} = mria_status:upstream_node(Shard, infinity),
599599
Node
600600
end).
601601

@@ -688,7 +688,7 @@ db_nodes_maybe_rpc() ->
688688
replicant ->
689689
case mria_status:shards_up() of
690690
[Shard|_] ->
691-
{ok, CoreNode} = mria_status:get_core_node(Shard, 5_000),
691+
{ok, CoreNode} = mria_status:upstream_node(Shard, 5_000),
692692
case mria_lib:rpc_call_nothrow(CoreNode, mnesia, system_info, [db_nodes]) of
693693
{badrpc, _} -> [];
694694
{badtcp, _} -> [];

src/mria_status.erl

+14-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%%--------------------------------------------------------------------
2-
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
2+
%% Copyright (c) 2021-2025 EMQ Technologies Co., Ltd. All Rights Reserved.
33
%%
44
%% Licensed under the Apache License, Version 2.0 (the "License");
55
%% you may not use this file except in compliance with the License.
@@ -28,7 +28,7 @@
2828
notify_core_node_up/2, notify_core_node_down/1, get_core_node/2,
2929
notify_core_intercept_trans/2,
3030

31-
upstream/1, upstream_node/1,
31+
upstream/1, upstream_node/2,
3232
shards_status/0, shards_up/0, shards_syncing/0, shards_down/0,
3333
get_shard_stats/1, agents/0, agents/1, replicants/0, get_shard_lag/1,
3434

@@ -81,15 +81,12 @@ start_link() ->
8181
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
8282

8383
%% @doc Return name of the core node that is _currently serving_ the
84-
%% downstream shard. Note the difference in behavior as compared with
85-
%% `get_core_node'. Returns `disconnected' if the local replica of the
86-
%% shard is down.
87-
-spec upstream_node(mria_rlog:shard()) -> {ok, node()} | disconnected.
88-
upstream_node(Shard) ->
89-
case upstream(Shard) of
90-
{ok, Pid} -> {ok, node(Pid)};
91-
disconnected -> disconnected
92-
end.
84+
%% downstream shard. If shard is currently down, wait until started
85+
%% for at most `Timeout' millisecond. Note the difference in behavior
86+
%% as compared with `get_core_node'.
87+
-spec upstream_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
88+
upstream_node(Shard, Timeout) ->
89+
optvar:read(?optvar({?core_node, Shard}), Timeout).
9390

9491
%% @doc Return pid of the core node agent that serves us.
9592
-spec upstream(mria_rlog:shard()) -> {ok, pid()} | disconnected.
@@ -99,8 +96,9 @@ upstream(Shard) ->
9996
undefined -> disconnected
10097
end.
10198

102-
%% @doc Return a core node that _might_ be able to serve the specified
103-
%% shard.
99+
%% @deprecated Return a core node that _might_ be able to serve the
100+
%% specified shard. WARNING: use of this function leads to unbalanced
101+
%% load of the core nodes.
104102
-spec get_core_node(mria_rlog:shard(), timeout()) -> {ok, node()} | timeout.
105103
get_core_node(Shard, Timeout) ->
106104
optvar:read(?optvar({?core_node, Shard}), Timeout).
@@ -173,10 +171,10 @@ replicants() ->
173171

174172
-spec get_shard_lag(mria_rlog:shard()) -> non_neg_integer() | disconnected.
175173
get_shard_lag(Shard) ->
176-
case {mria_config:role(), upstream_node(Shard)} of
174+
case {mria_config:role(), upstream_node(Shard, 0)} of
177175
{core, _} ->
178176
0;
179-
{replicant, disconnected} ->
177+
{replicant, timeout} ->
180178
disconnected;
181179
{replicant, {ok, Upstream}} ->
182180
RemoteSeqNo = erpc:call(Upstream, ?MODULE, get_stat, [Shard, ?core_intercept], 1000),
@@ -244,7 +242,7 @@ get_shard_stats(Shard) ->
244242
, server_mql => get_mql(Shard)
245243
};
246244
replicant ->
247-
case upstream_node(Shard) of
245+
case upstream_node(Shard, 0) of
248246
{ok, Upstream} -> ok;
249247
_ -> Upstream = undefined
250248
end,

0 commit comments

Comments
 (0)