diff --git a/CHANGELOG.md b/CHANGELOG.md index 6723aeb..401e457 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `rebar3_hank [Paulo Oliveira] +- `rebar3_hank` [Paulo Oliveira] ### Changed - CI container approach to `setup-beam` with cache [Paulo Oliveira] +- worker initialization as to make it synchronous with the pool's [Guilherme Andrade] ## [2.1.0] - 2021-03-04 diff --git a/README.md b/README.md index 9b6dda3..77aba15 100644 --- a/README.md +++ b/README.md @@ -191,7 +191,7 @@ code_change(_OldVsn, State, _Extra) -> - `worker_module`: the module that represents the workers - `size`: maximum pool size -## Authors +## Original Author - Luis Rascão (lrascao) diff --git a/src/poolgirl.erl b/src/poolgirl.erl index 250ee05..287a9e1 100644 --- a/src/poolgirl.erl +++ b/src/poolgirl.erl @@ -41,7 +41,7 @@ -record(state, { name, - parent = undefined :: undefined | pid(), + internal_directory :: poolgirl_internal_directory:t(), supervisor = undefined :: undefined | pid(), workers = [] :: [{reference(), pid()}], worker_module = undefined :: atom(), @@ -82,16 +82,16 @@ start_link(PoolArgs, WorkerArgs) -> poolgirl_app_sup:start_child(PoolArgs, WorkerArgs). %% @private --spec start_link2(Parent :: pid(), +-spec start_link2(InternalDirectory :: poolgirl_internal_directory:t(), PoolArgs :: proplists:proplist(), WorkerArgs :: proplists:proplist()) -> start_ret(). -start_link2(Parent, PoolArgs, WorkerArgs) -> +start_link2(InternalDirectory, PoolArgs, WorkerArgs) -> case proplists:get_value(name, PoolArgs) of undefined -> - gen_server:start_link(?MODULE, {Parent, PoolArgs, WorkerArgs}, []); + gen_server:start_link(?MODULE, {InternalDirectory, PoolArgs, WorkerArgs}, []); PoolName -> - gen_server:start_link(PoolName, ?MODULE, {Parent, PoolArgs, WorkerArgs}, []) + gen_server:start_link(PoolName, ?MODULE, {InternalDirectory, PoolArgs, WorkerArgs}, []) end. %% @doc Fetch a worker from a pool @@ -136,7 +136,8 @@ spin(down, PoolId, HowMany) -> %% @param PoolId The unique pool id -spec stop(PoolId :: pool()) -> ok. stop(PoolId) -> - gen_server:call(PoolId, stop). + {ok, TopSupPid} = gen_server:call(PoolId, get_pid_of_top_pool_supervisor), + sys:terminate(TopSupPid, _Reason = shutdown). %% @doc List your pool's workers %% @param PoolId The unique pool id @@ -145,8 +146,9 @@ get_workers(PoolId) -> poolgirl_pg:get_members(PoolId). %% @private -init({Parent, PoolArgs, WorkerArgs}) -> - init(PoolArgs, WorkerArgs, #state{parent = Parent}). +init({InternalDirectory, PoolArgs, WorkerArgs}) -> + _ = process_flag(trap_exit, true), % Always call `:terminate/2' (unless killed) + init(PoolArgs, WorkerArgs, #state{internal_directory = InternalDirectory}). %% @private init([{name, {local, PoolName}} | Rest], WorkerArgs, State) -> @@ -170,8 +172,23 @@ init([{workers, Workers0} | Rest], WorkerArgs, State) when is_list(Workers0) -> init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); init([], _WorkerArgs, State) -> - gen_server:cast(self(), init), - {ok, State}. + % create the pg group + ok = poolgirl_pg:create(State#state.name), + %% check if we were supplied with pre-started workers + case State#state.workers of + [] -> + {ok, Sup} = poolgirl_internal_directory:find(State#state.internal_directory, + poolgirl_worker_sup), + Workers = populate(State#state.size, Sup, State#state.name), + {ok, State#state{supervisor = Sup, + workers = Workers}}; + Workers -> + lists:foreach(fun({_Ref, Pid}) -> + % join the worker to the pg group + ok = poolgirl_pg:join(State#state.name, Pid) + end, Workers), + {ok, State#state{supervisor = undefined}} + end. %% @private handle_call(status, _From, #state{supervisor = Sup} = State) -> @@ -193,48 +210,15 @@ handle_call({spin_down, N}, _From, #state{name = PoolName, poolgirl_pg:leave(PoolName, WorkerPid) end, Victims), {reply, ok, State#state{workers = Workers -- Victims}}; -handle_call(stop, _From, #state{name = PoolName, - supervisor = undefined} = State) -> - poolgirl_pg:delete(PoolName), - {stop, normal, ok, State}; -handle_call(stop, _From, State) -> - gen_server:cast(self(), stop), - {reply, ok, State}; +handle_call(get_pid_of_top_pool_supervisor, _From, State) -> + #state{internal_directory = InternalDirectory} = State, + Reply = {ok, _Pid} = poolgirl_internal_directory:find(InternalDirectory, poolgirl_sup), + {reply, Reply, State}; handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, {reply, Reply, State}. %% @private -handle_cast(init, #state{name = PoolName, - parent = Parent, - size = Size, - workers = Workers0} = State) -> - % create the pg group - ok = poolgirl_pg:create(PoolName), - %% check if we were supplied with pre-started workers - case Workers0 of - [] -> - %% get the chilren of our supervisor - %% our brother is the worker supervisor - Children = supervisor:which_children(Parent), - {poolgirl_worker_sup, Sup, supervisor, [poolgirl_worker_sup]} = - lists:keyfind(poolgirl_worker_sup, 1, Children), - Workers = populate(Size, Sup, PoolName), - {noreply, State#state{supervisor = Sup, - workers = Workers}}; - _ -> - lists:foreach(fun({_Ref, Pid}) -> - % join the worker to the pg group - ok = poolgirl_pg:join(PoolName, Pid) - end, Workers0), - {noreply, State#state{supervisor = undefined, - workers = Workers0}} - end; -handle_cast(stop, #state{name = PoolName, - parent = Parent} = State) -> - poolgirl_pg:delete(PoolName), - ok = supervisor:terminate_child(poolgirl_app_sup, Parent), - {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. diff --git a/src/poolgirl_internal_directory.erl b/src/poolgirl_internal_directory.erl new file mode 100644 index 0000000..0c273cf --- /dev/null +++ b/src/poolgirl_internal_directory.erl @@ -0,0 +1,66 @@ +%% @private +%% Poolgirl - A sexy Erlang worker pool factory + +-module(poolgirl_internal_directory). + +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([new/0, + register/2, + find/2]). + +%%% +%%% Types +%%% + +-opaque t() :: ets:tab(). +-export_type([t/0]). + +%%% +%%% Exported functions +%%% + +-spec new() -> t(). +new() -> + ets:new('poolgirl.internal_directory', [public]). + +-spec register(t(), atom()) -> boolean(). +register(Directory, Name) -> + case find(Directory, Name) of + {ok, ExistingPid} -> + maybe_over_register(Directory, Name, ExistingPid); + error -> + ets:insert_new(Directory, {Name, self()}) + end. + +-spec find(t(), atom()) -> {ok, pid()} | error. +find(Directory, Name) -> + case ets:lookup(Directory, Name) of + [{Name, Pid}] -> + {ok, Pid}; + [] -> + error + end. + +%%% +%%% Local functions +%%% + +maybe_over_register(Directory, Name, ExistingPid) -> + (ExistingPid =:= self() orelse not is_process_alive(ExistingPid)) + andalso over_register(Directory, Name, ExistingPid). + +over_register(Directory, Name, ExistingPid) -> + MatchSpec = over_registration_match_spec(Name, ExistingPid), + case _AmountReplaced = ets:select_replace(Directory, MatchSpec) of + 1 -> true; + 0 -> false + end. + +over_registration_match_spec(Name, ExistingPid) -> + % Atomic replacement of the entry we've just looked up + ets:fun2ms( + fun ({EntryName, EntryPid}) when EntryName =:= Name, + EntryPid =:= ExistingPid -> + {EntryName, self()} + end). diff --git a/src/poolgirl_sup.erl b/src/poolgirl_sup.erl index 7de4b23..a47ad39 100644 --- a/src/poolgirl_sup.erl +++ b/src/poolgirl_sup.erl @@ -15,11 +15,14 @@ start_link(PoolArgs, WorkerArgs) -> supervisor:start_link(?MODULE, {PoolArgs, WorkerArgs}). init({PoolArgs, WorkerArgs}) -> + InternalDirectory = poolgirl_internal_directory:new(), + true = poolgirl_internal_directory:register(InternalDirectory, ?MODULE), + WorkerModule = proplists:get_value(worker_module, PoolArgs), Flags = {rest_for_one, 0, 5}, Workers = [{poolgirl_worker_sup, - {poolgirl_worker_sup, start_link, [WorkerModule, WorkerArgs]}, + {poolgirl_worker_sup, start_link, [InternalDirectory, WorkerModule, WorkerArgs]}, permanent, infinity, supervisor, [poolgirl_worker_sup]}, - {poolgirl, {poolgirl, start_link2, [self(), PoolArgs, WorkerArgs]}, + {poolgirl, {poolgirl, start_link2, [InternalDirectory, PoolArgs, WorkerArgs]}, permanent, 5000, worker, [poolgirl]}], {ok, {Flags, Workers}}. diff --git a/src/poolgirl_worker_sup.erl b/src/poolgirl_worker_sup.erl index e90b3d5..49d344c 100644 --- a/src/poolgirl_worker_sup.erl +++ b/src/poolgirl_worker_sup.erl @@ -4,17 +4,18 @@ -module(poolgirl_worker_sup). -behaviour(supervisor). --export([start_link/2]). +-export([start_link/3]). %% supervisor. -export([init/1]). --ignore_xref([start_link/2]). +-ignore_xref([start_link/3]). -start_link(Mod, Args) -> - supervisor:start_link(?MODULE, {Mod, Args}). +start_link(InternalDirectory, WorkerMod, WorkerArgs) -> + supervisor:start_link(?MODULE, {InternalDirectory, WorkerMod, WorkerArgs}). -init({Mod, Args}) -> +init({InternalDirectory, WorkerMod, WorkerArgs}) -> + true = poolgirl_internal_directory:register(InternalDirectory, ?MODULE), {ok, {{simple_one_for_one, 0, 1}, - [{Mod, {Mod, start_link, [Args]}, - temporary, 5000, worker, [Mod]}]}}. + [{WorkerMod, {WorkerMod, start_link, [WorkerArgs]}, + temporary, 5000, worker, [WorkerMod]}]}}. diff --git a/test/poolgirl_tests.erl b/test/poolgirl_tests.erl index 3846270..7e22051 100644 --- a/test/poolgirl_tests.erl +++ b/test/poolgirl_tests.erl @@ -56,6 +56,9 @@ pool_test_() -> }, {<<"Childspec'd pool isn't supervised by poolgirl">>, fun independent_childspec_pool/0 + }, + {<<"Internal directory handles race conditions gracefully">>, + fun graceful_internal_directory_concurrency/0 } ] }. @@ -75,7 +78,7 @@ pool_startup() -> ?assertEqual(10, length(poolgirl:get_workers(Pid))), _ = poolgirl:checkout(Pid), ?assertEqual(10, length(poolgirl:get_workers(Pid))), - ok = pool_call(Pid, stop). + ok = poolgirl:stop(Pid). worker_death() -> {ok, Pid} = new_pool(5), @@ -85,30 +88,30 @@ worker_death() -> %% a little pause to allow the dust to settle after a death timer:sleep(1000), ?assertEqual(5, length(poolgirl:get_workers(Pid))), - ok = pool_call(Pid, stop). + ok = poolgirl:stop(Pid). pool_returns_status() -> {ok, Pool} = new_pool(2), ?assertEqual({ready, 2}, poolgirl:status(Pool)), - ok = pool_call(Pool, stop). + ok = poolgirl:stop(Pool). pool_worker_spin_up() -> {ok, Pool} = new_pool(2), poolgirl:spin(up, Pool, 2), ?assertEqual({ready, 4}, poolgirl:status(Pool)), - ok = pool_call(Pool, stop). + ok = poolgirl:stop(Pool). pool_worker_spin_down() -> {ok, Pool} = new_pool(4), poolgirl:spin(down, Pool, 2), ?assertEqual({ready, 2}, poolgirl:status(Pool)), - ok = pool_call(Pool, stop). + ok = poolgirl:stop(Pool). pool_only_local_workers() -> {ok, Pool} = new_pool(5), Worker = poolgirl:checkout(Pool), ?assertEqual(node(), node(Worker)), - ok = pool_call(Pool, stop). + ok = poolgirl:stop(Pool). pool_worker_depletion() -> {ok, Pool} = new_pool(5), @@ -121,7 +124,7 @@ pool_worker_depletion() -> %% allow for the pool to replenish the crashed workers timer:sleep(50), ?assertEqual({ready, 5}, poolgirl:status(Pool)), - ok = pool_call(Pool, stop). + ok = poolgirl:stop(Pool). multiple_pools() -> {ok, Pool1} = new_pool(poolgirl_test1, 5), @@ -146,14 +149,14 @@ multiple_pools() -> ?assertEqual({ready, 5}, poolgirl:status(Pool1)), ?assertEqual({ready, 5}, poolgirl:status(Pool2)), ?assertEqual({ready, 5}, poolgirl:status(Pool3)), - ok = pool_call(Pool1, stop), - ok = pool_call(Pool2, stop), - ok = pool_call(Pool3, stop). + ok = poolgirl:stop(Pool1), + ok = poolgirl:stop(Pool2), + ok = poolgirl:stop(Pool3). pool_proper_cleanup_on_stop() -> {ok, Pool1} = new_pool(poolgirl_test1, 5), ?assertEqual(1, length(supervisor:which_children(poolgirl_app_sup))), - ok = pool_call(Pool1, stop), + ok = poolgirl:stop(Pool1), %% a little pause to allow the dust to settle after a death timer:sleep(500), ?assertEqual(0, length(supervisor:which_children(poolgirl_app_sup))). @@ -170,7 +173,7 @@ pool_proper_restart() -> {ok, Pool1} = new_pool(poolgirl_test1, 5), %% a little pause to allow the dust to settle after a death % timer:sleep(500), - ok = pool_call(Pool1, stop), + ok = poolgirl:stop(Pool1), %% a little pause to allow the dust to settle after a death {ok, Pool1} = new_pool(poolgirl_test1, 5). @@ -211,6 +214,33 @@ independent_childspec_pool() -> ok = supervisor:terminate_child(ExtSupervisor, PoolId), ok = supervisor:delete_child(ExtSupervisor, PoolId). +graceful_internal_directory_concurrency() -> + _ = process_flag(trap_exit, true), + InternalDirectory = poolgirl_internal_directory:new(), + ?assertEqual(error, poolgirl_internal_directory:find(InternalDirectory, foobar)), + + TestPid = self(), + Pid1 = spawn_link( + fun () -> + ?assertEqual(true, poolgirl_internal_directory:register(InternalDirectory, + foobar)), + TestPid ! proceed, + receive stop -> ok end + end), + receive proceed -> ok end, + ?assertEqual({ok, Pid1}, poolgirl_internal_directory:find(InternalDirectory, foobar)), + + % Unable to register - conflicting process is alive + ?assertEqual(false, poolgirl_internal_directory:register(InternalDirectory, foobar)), + Pid1 ! stop, + receive {'EXIT', Pid1, _} -> ok end, + + % Able to register - conflicting process has terminated + ?assertEqual(true, poolgirl_internal_directory:register(InternalDirectory, foobar)), + + % Able to re-register + ?assertEqual(true, poolgirl_internal_directory:register(InternalDirectory, foobar)). + %% %% Internal %% @@ -224,6 +254,3 @@ new_pool(Name, Size) -> {size, Size}]), timer:sleep(500), {ok, Name}. - -pool_call(ServerRef, Request) -> - gen_server:call(ServerRef, Request).