Skip to content

Commit

Permalink
Merge pull request #20 from g-andrade/feature/synchronous-worker-init…
Browse files Browse the repository at this point in the history
…ialization

Do not allow workers to be used before they're initialized
  • Loading branch information
g-andrade authored Aug 3, 2021
2 parents 0ef4152 + eb32d60 commit af523d9
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 73 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `rebar3_hank [Paulo Oliveira]
- `rebar3_hank` [Paulo Oliveira]

### Changed

- CI container approach to `setup-beam` with cache [Paulo Oliveira]
- worker initialization as to make it synchronous with the pool's [Guilherme Andrade]

## [2.1.0] - 2021-03-04

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ code_change(_OldVsn, State, _Extra) ->
- `worker_module`: the module that represents the workers
- `size`: maximum pool size

## Authors
## Original Author

- Luis Rascão (lrascao) <[email protected]>

Expand Down
78 changes: 31 additions & 47 deletions src/poolgirl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

-record(state, {
name,
parent = undefined :: undefined | pid(),
internal_directory :: poolgirl_internal_directory:t(),
supervisor = undefined :: undefined | pid(),
workers = [] :: [{reference(), pid()}],
worker_module = undefined :: atom(),
Expand Down Expand Up @@ -82,16 +82,16 @@ start_link(PoolArgs, WorkerArgs) ->
poolgirl_app_sup:start_child(PoolArgs, WorkerArgs).

%% @private
-spec start_link2(Parent :: pid(),
-spec start_link2(InternalDirectory :: poolgirl_internal_directory:t(),
PoolArgs :: proplists:proplist(),
WorkerArgs :: proplists:proplist())
-> start_ret().
start_link2(Parent, PoolArgs, WorkerArgs) ->
start_link2(InternalDirectory, PoolArgs, WorkerArgs) ->
case proplists:get_value(name, PoolArgs) of
undefined ->
gen_server:start_link(?MODULE, {Parent, PoolArgs, WorkerArgs}, []);
gen_server:start_link(?MODULE, {InternalDirectory, PoolArgs, WorkerArgs}, []);
PoolName ->
gen_server:start_link(PoolName, ?MODULE, {Parent, PoolArgs, WorkerArgs}, [])
gen_server:start_link(PoolName, ?MODULE, {InternalDirectory, PoolArgs, WorkerArgs}, [])
end.

%% @doc Fetch a worker from a pool
Expand Down Expand Up @@ -136,7 +136,8 @@ spin(down, PoolId, HowMany) ->
%% @param PoolId The unique pool id
-spec stop(PoolId :: pool()) -> ok.
stop(PoolId) ->
gen_server:call(PoolId, stop).
{ok, TopSupPid} = gen_server:call(PoolId, get_pid_of_top_pool_supervisor),
sys:terminate(TopSupPid, _Reason = shutdown).

%% @doc List your pool's workers
%% @param PoolId The unique pool id
Expand All @@ -145,8 +146,9 @@ get_workers(PoolId) ->
poolgirl_pg:get_members(PoolId).

%% @private
init({Parent, PoolArgs, WorkerArgs}) ->
init(PoolArgs, WorkerArgs, #state{parent = Parent}).
init({InternalDirectory, PoolArgs, WorkerArgs}) ->
_ = process_flag(trap_exit, true), % Always call `:terminate/2' (unless killed)
init(PoolArgs, WorkerArgs, #state{internal_directory = InternalDirectory}).

%% @private
init([{name, {local, PoolName}} | Rest], WorkerArgs, State) ->
Expand All @@ -170,8 +172,23 @@ init([{workers, Workers0} | Rest], WorkerArgs, State) when is_list(Workers0) ->
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, State) ->
gen_server:cast(self(), init),
{ok, State}.
% create the pg group
ok = poolgirl_pg:create(State#state.name),
%% check if we were supplied with pre-started workers
case State#state.workers of
[] ->
{ok, Sup} = poolgirl_internal_directory:find(State#state.internal_directory,
poolgirl_worker_sup),
Workers = populate(State#state.size, Sup, State#state.name),
{ok, State#state{supervisor = Sup,
workers = Workers}};
Workers ->
lists:foreach(fun({_Ref, Pid}) ->
% join the worker to the pg group
ok = poolgirl_pg:join(State#state.name, Pid)
end, Workers),
{ok, State#state{supervisor = undefined}}
end.

%% @private
handle_call(status, _From, #state{supervisor = Sup} = State) ->
Expand All @@ -193,48 +210,15 @@ handle_call({spin_down, N}, _From, #state{name = PoolName,
poolgirl_pg:leave(PoolName, WorkerPid)
end, Victims),
{reply, ok, State#state{workers = Workers -- Victims}};
handle_call(stop, _From, #state{name = PoolName,
supervisor = undefined} = State) ->
poolgirl_pg:delete(PoolName),
{stop, normal, ok, State};
handle_call(stop, _From, State) ->
gen_server:cast(self(), stop),
{reply, ok, State};
handle_call(get_pid_of_top_pool_supervisor, _From, State) ->
#state{internal_directory = InternalDirectory} = State,
Reply = {ok, _Pid} = poolgirl_internal_directory:find(InternalDirectory, poolgirl_sup),
{reply, Reply, State};
handle_call(_Msg, _From, State) ->
Reply = {error, invalid_message},
{reply, Reply, State}.

%% @private
handle_cast(init, #state{name = PoolName,
parent = Parent,
size = Size,
workers = Workers0} = State) ->
% create the pg group
ok = poolgirl_pg:create(PoolName),
%% check if we were supplied with pre-started workers
case Workers0 of
[] ->
%% get the chilren of our supervisor
%% our brother is the worker supervisor
Children = supervisor:which_children(Parent),
{poolgirl_worker_sup, Sup, supervisor, [poolgirl_worker_sup]} =
lists:keyfind(poolgirl_worker_sup, 1, Children),
Workers = populate(Size, Sup, PoolName),
{noreply, State#state{supervisor = Sup,
workers = Workers}};
_ ->
lists:foreach(fun({_Ref, Pid}) ->
% join the worker to the pg group
ok = poolgirl_pg:join(PoolName, Pid)
end, Workers0),
{noreply, State#state{supervisor = undefined,
workers = Workers0}}
end;
handle_cast(stop, #state{name = PoolName,
parent = Parent} = State) ->
poolgirl_pg:delete(PoolName),
ok = supervisor:terminate_child(poolgirl_app_sup, Parent),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.

Expand Down
66 changes: 66 additions & 0 deletions src/poolgirl_internal_directory.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
%% @private
%% Poolgirl - A sexy Erlang worker pool factory

-module(poolgirl_internal_directory).

-include_lib("stdlib/include/ms_transform.hrl").

-export([new/0,
register/2,
find/2]).

%%%
%%% Types
%%%

-opaque t() :: ets:tab().
-export_type([t/0]).

%%%
%%% Exported functions
%%%

-spec new() -> t().
new() ->
ets:new('poolgirl.internal_directory', [public]).

-spec register(t(), atom()) -> boolean().
register(Directory, Name) ->
case find(Directory, Name) of
{ok, ExistingPid} ->
maybe_over_register(Directory, Name, ExistingPid);
error ->
ets:insert_new(Directory, {Name, self()})
end.

-spec find(t(), atom()) -> {ok, pid()} | error.
find(Directory, Name) ->
case ets:lookup(Directory, Name) of
[{Name, Pid}] ->
{ok, Pid};
[] ->
error
end.

%%%
%%% Local functions
%%%

maybe_over_register(Directory, Name, ExistingPid) ->
(ExistingPid =:= self() orelse not is_process_alive(ExistingPid))
andalso over_register(Directory, Name, ExistingPid).

over_register(Directory, Name, ExistingPid) ->
MatchSpec = over_registration_match_spec(Name, ExistingPid),
case _AmountReplaced = ets:select_replace(Directory, MatchSpec) of
1 -> true;
0 -> false
end.

over_registration_match_spec(Name, ExistingPid) ->
% Atomic replacement of the entry we've just looked up
ets:fun2ms(
fun ({EntryName, EntryPid}) when EntryName =:= Name,
EntryPid =:= ExistingPid ->
{EntryName, self()}
end).
7 changes: 5 additions & 2 deletions src/poolgirl_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ start_link(PoolArgs, WorkerArgs) ->
supervisor:start_link(?MODULE, {PoolArgs, WorkerArgs}).

init({PoolArgs, WorkerArgs}) ->
InternalDirectory = poolgirl_internal_directory:new(),
true = poolgirl_internal_directory:register(InternalDirectory, ?MODULE),

WorkerModule = proplists:get_value(worker_module, PoolArgs),
Flags = {rest_for_one, 0, 5},
Workers = [{poolgirl_worker_sup,
{poolgirl_worker_sup, start_link, [WorkerModule, WorkerArgs]},
{poolgirl_worker_sup, start_link, [InternalDirectory, WorkerModule, WorkerArgs]},
permanent, infinity, supervisor, [poolgirl_worker_sup]},
{poolgirl, {poolgirl, start_link2, [self(), PoolArgs, WorkerArgs]},
{poolgirl, {poolgirl, start_link2, [InternalDirectory, PoolArgs, WorkerArgs]},
permanent, 5000, worker, [poolgirl]}],
{ok, {Flags, Workers}}.
15 changes: 8 additions & 7 deletions src/poolgirl_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
-module(poolgirl_worker_sup).
-behaviour(supervisor).

-export([start_link/2]).
-export([start_link/3]).

%% supervisor.
-export([init/1]).

-ignore_xref([start_link/2]).
-ignore_xref([start_link/3]).

start_link(Mod, Args) ->
supervisor:start_link(?MODULE, {Mod, Args}).
start_link(InternalDirectory, WorkerMod, WorkerArgs) ->
supervisor:start_link(?MODULE, {InternalDirectory, WorkerMod, WorkerArgs}).

init({Mod, Args}) ->
init({InternalDirectory, WorkerMod, WorkerArgs}) ->
true = poolgirl_internal_directory:register(InternalDirectory, ?MODULE),
{ok, {{simple_one_for_one, 0, 1},
[{Mod, {Mod, start_link, [Args]},
temporary, 5000, worker, [Mod]}]}}.
[{WorkerMod, {WorkerMod, start_link, [WorkerArgs]},
temporary, 5000, worker, [WorkerMod]}]}}.
57 changes: 42 additions & 15 deletions test/poolgirl_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pool_test_() ->
},
{<<"Childspec'd pool isn't supervised by poolgirl">>,
fun independent_childspec_pool/0
},
{<<"Internal directory handles race conditions gracefully">>,
fun graceful_internal_directory_concurrency/0
}
]
}.
Expand All @@ -75,7 +78,7 @@ pool_startup() ->
?assertEqual(10, length(poolgirl:get_workers(Pid))),
_ = poolgirl:checkout(Pid),
?assertEqual(10, length(poolgirl:get_workers(Pid))),
ok = pool_call(Pid, stop).
ok = poolgirl:stop(Pid).

worker_death() ->
{ok, Pid} = new_pool(5),
Expand All @@ -85,30 +88,30 @@ worker_death() ->
%% a little pause to allow the dust to settle after a death
timer:sleep(1000),
?assertEqual(5, length(poolgirl:get_workers(Pid))),
ok = pool_call(Pid, stop).
ok = poolgirl:stop(Pid).

pool_returns_status() ->
{ok, Pool} = new_pool(2),
?assertEqual({ready, 2}, poolgirl:status(Pool)),
ok = pool_call(Pool, stop).
ok = poolgirl:stop(Pool).

pool_worker_spin_up() ->
{ok, Pool} = new_pool(2),
poolgirl:spin(up, Pool, 2),
?assertEqual({ready, 4}, poolgirl:status(Pool)),
ok = pool_call(Pool, stop).
ok = poolgirl:stop(Pool).

pool_worker_spin_down() ->
{ok, Pool} = new_pool(4),
poolgirl:spin(down, Pool, 2),
?assertEqual({ready, 2}, poolgirl:status(Pool)),
ok = pool_call(Pool, stop).
ok = poolgirl:stop(Pool).

pool_only_local_workers() ->
{ok, Pool} = new_pool(5),
Worker = poolgirl:checkout(Pool),
?assertEqual(node(), node(Worker)),
ok = pool_call(Pool, stop).
ok = poolgirl:stop(Pool).

pool_worker_depletion() ->
{ok, Pool} = new_pool(5),
Expand All @@ -121,7 +124,7 @@ pool_worker_depletion() ->
%% allow for the pool to replenish the crashed workers
timer:sleep(50),
?assertEqual({ready, 5}, poolgirl:status(Pool)),
ok = pool_call(Pool, stop).
ok = poolgirl:stop(Pool).

multiple_pools() ->
{ok, Pool1} = new_pool(poolgirl_test1, 5),
Expand All @@ -146,14 +149,14 @@ multiple_pools() ->
?assertEqual({ready, 5}, poolgirl:status(Pool1)),
?assertEqual({ready, 5}, poolgirl:status(Pool2)),
?assertEqual({ready, 5}, poolgirl:status(Pool3)),
ok = pool_call(Pool1, stop),
ok = pool_call(Pool2, stop),
ok = pool_call(Pool3, stop).
ok = poolgirl:stop(Pool1),
ok = poolgirl:stop(Pool2),
ok = poolgirl:stop(Pool3).

pool_proper_cleanup_on_stop() ->
{ok, Pool1} = new_pool(poolgirl_test1, 5),
?assertEqual(1, length(supervisor:which_children(poolgirl_app_sup))),
ok = pool_call(Pool1, stop),
ok = poolgirl:stop(Pool1),
%% a little pause to allow the dust to settle after a death
timer:sleep(500),
?assertEqual(0, length(supervisor:which_children(poolgirl_app_sup))).
Expand All @@ -170,7 +173,7 @@ pool_proper_restart() ->
{ok, Pool1} = new_pool(poolgirl_test1, 5),
%% a little pause to allow the dust to settle after a death
% timer:sleep(500),
ok = pool_call(Pool1, stop),
ok = poolgirl:stop(Pool1),
%% a little pause to allow the dust to settle after a death
{ok, Pool1} = new_pool(poolgirl_test1, 5).

Expand Down Expand Up @@ -211,6 +214,33 @@ independent_childspec_pool() ->
ok = supervisor:terminate_child(ExtSupervisor, PoolId),
ok = supervisor:delete_child(ExtSupervisor, PoolId).

graceful_internal_directory_concurrency() ->
_ = process_flag(trap_exit, true),
InternalDirectory = poolgirl_internal_directory:new(),
?assertEqual(error, poolgirl_internal_directory:find(InternalDirectory, foobar)),

TestPid = self(),
Pid1 = spawn_link(
fun () ->
?assertEqual(true, poolgirl_internal_directory:register(InternalDirectory,
foobar)),
TestPid ! proceed,
receive stop -> ok end
end),
receive proceed -> ok end,
?assertEqual({ok, Pid1}, poolgirl_internal_directory:find(InternalDirectory, foobar)),

% Unable to register - conflicting process is alive
?assertEqual(false, poolgirl_internal_directory:register(InternalDirectory, foobar)),
Pid1 ! stop,
receive {'EXIT', Pid1, _} -> ok end,

% Able to register - conflicting process has terminated
?assertEqual(true, poolgirl_internal_directory:register(InternalDirectory, foobar)),

% Able to re-register
?assertEqual(true, poolgirl_internal_directory:register(InternalDirectory, foobar)).

%%
%% Internal
%%
Expand All @@ -224,6 +254,3 @@ new_pool(Name, Size) ->
{size, Size}]),
timer:sleep(500),
{ok, Name}.

pool_call(ServerRef, Request) ->
gen_server:call(ServerRef, Request).

0 comments on commit af523d9

Please sign in to comment.