Skip to content

Commit 2331739

Browse files
authored
Merge pull request #184 from keynslug/fix/EMQX-13588/stricter-lock
fix: make global lock guarding join operations stricter
2 parents 4cab8da + fee7a61 commit 2331739

File tree

5 files changed

+54
-4
lines changed

5 files changed

+54
-4
lines changed

include/mria.hrl

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@
1717

1818
-type(member() :: #member{}).
1919

20-
-define(JOIN_LOCK_ID, {mria_sync_join, node()}).
20+
-define(JOIN_LOCK_ID(REQUESTER), {mria_sync_join, REQUESTER}).

rebar.config

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
{minimum_otp_vsn, "21.0"}.
33

44
{deps,
5-
[{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "1.0.7"}}},
6-
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}},
5+
[{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "1.0.10"}}},
6+
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.4.1"}}},
77
{replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.6"}}},
88
{mnesia_rocksdb, {git, "https://github.com/emqx/mnesia_rocksdb", {tag, "0.1.16"}}},
99
{optvar, {git, "https://github.com/emqx/optvar", {tag, "1.0.5"}}}

src/mria.erl

+12-1
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,22 @@ join(Node) ->
235235
join(Node, _) when Node =:= node() ->
236236
ignore;
237237
join(Node, Reason) when is_atom(Node) ->
238+
%% NOTE
239+
%%
238240
%% If two nodes are trying to join each other simultaneously,
239241
%% one of them must be blocked waiting for a lock.
240242
%% Once lock is released, it is expected to be already in the
241243
%% cluster (if the other node joined it successfully).
242-
global:trans(?JOIN_LOCK_ID, fun() -> join1(Node, Reason) end, [node(), Node]).
244+
%%
245+
%% Additionally, avoid conducting concurrent join operations
246+
%% by specifying current process PID as the lock requester.
247+
%% Otherwise, concurrent joins can ruin each other's lives and
248+
%% make any further cluster operations impossible.
249+
%% This can happen, for example, when a concurrent join stops the
250+
%% entire `mnesia` system while another join is running schema
251+
%% transactions.
252+
LockId = ?JOIN_LOCK_ID(self()),
253+
global:trans(LockId, fun() -> join1(Node, Reason) end, [node(), Node]).
243254

244255
%% @doc Leave the cluster
245256
-spec leave() -> ok | {error, term()}.

src/mria_mnesia.erl

+2
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ ensure_stopped() ->
130130
%% @doc Cluster with node.
131131
-spec(connect(node()) -> ok | {error, any()}).
132132
connect(Node) ->
133+
?tp(mria_mnesia_connect, #{to => Node}),
133134
case mnesia:change_config(extra_db_nodes, [Node]) of
134135
{ok, [Node]} -> ok;
135136
{ok, []} -> {error, {failed_to_connect_node, Node}};
@@ -237,6 +238,7 @@ is_node_in_cluster(Node) ->
237238

238239
%% @doc Copy schema.
239240
copy_schema(Node) ->
241+
?tp(mria_mnesia_copy_schema, #{}),
240242
case mnesia:change_table_copy_type(schema, Node, disc_copies) of
241243
{atomic, ok} -> ok;
242244
{aborted, {already_exists, schema, Node, disc_copies}} ->

test/mria_SUITE.erl

+37
Original file line numberDiff line numberDiff line change
@@ -1317,6 +1317,43 @@ t_join_another_node_simultaneously(_) ->
13171317
end,
13181318
[]).
13191319

1320+
t_join_many_nodes_simultaneously(_) ->
1321+
% Self = self(),
1322+
CommonEnv = mria_mnesia_test_util:common_env(),
1323+
Cluster = [maps:remove(join_to, Spec)
1324+
|| Spec <- mria_ct:cluster([core, core, core, core], CommonEnv)],
1325+
?check_trace(
1326+
#{timetrap => 15_000},
1327+
try
1328+
%% Spin the cluster up.
1329+
[N1, N2, N3, N4] = Nodes = mria_ct:start_cluster(mria, Cluster),
1330+
%% Connect only N2, N3, N4 together.
1331+
ok = rpc:call(N2, mria, join, [N4]),
1332+
ok = rpc:call(N3, mria, join, [N4]),
1333+
%% Subscribe to an event emitted right before schema transactions take place.
1334+
{ok, SRef} = snabbkaffe:subscribe(?match_event(#{?snk_kind := mria_mnesia_connect})),
1335+
%% Ask N1 to join the cluster (using N2 as a seed).
1336+
K1 = rpc:async_call(N1, mria, join, [N2]),
1337+
%% Wait for the event, and ask (concurrently) N1 to join the cluster (using
1338+
%% other 2 nodes as seeds).
1339+
{ok, _} = snabbkaffe:receive_events(SRef),
1340+
K2 = rpc:async_call(N1, mria, join, [N3]),
1341+
K3 = rpc:async_call(N1, mria, join, [N4]),
1342+
?assertMatch([ok,
1343+
{error, {already_in_cluster, _}},
1344+
{error, {already_in_cluster, _}}],
1345+
lists:sort([rpc:yield(K) || K <- [K1, K2, K3]])),
1346+
timer:sleep(3000),
1347+
?assertEqual({[true, true, true, true], []},
1348+
rpc:multicall(Nodes, mria_sup, is_running, [])),
1349+
{Results, []} = rpc:multicall(Nodes, mria_mnesia, running_nodes, []),
1350+
?assertEqual([Nodes, Nodes, Nodes, Nodes],
1351+
lists:map(fun lists:sort/1, Results))
1352+
after
1353+
ok = mria_ct:teardown_cluster(Cluster)
1354+
end,
1355+
[]).
1356+
13201357
cluster_benchmark(_) ->
13211358
NReplicas = 6,
13221359
Config = #{ trans_size => 10

0 commit comments

Comments
 (0)