Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 71 additions & 32 deletions src/poolboy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
transaction/3, child_spec/2, child_spec/3, child_spec/4, start/1,
start/2, start_link/1, start_link/2, stop/1, status/1]).
start/2, start_link/1, start_link/2, stop/1, status/1, stats/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export_type([pool/0]).
Expand Down Expand Up @@ -44,7 +44,10 @@
size = 5 :: non_neg_integer(),
overflow = 0 :: non_neg_integer(),
max_overflow = 10 :: non_neg_integer(),
strategy = lifo :: lifo | fifo
strategy = lifo :: lifo | fifo,
stats = false,
quant_start = erlang:monotonic_time(),
time_used = 0
}).

-spec checkout(Pool :: pool()) -> pid().
Expand Down Expand Up @@ -145,6 +148,15 @@ stop(Pool) ->
status(Pool) ->
gen_server:call(Pool, status).

-spec stats(Pool :: pool()) -> {ok, float()} | {error, disabled}.
stats(Pool) ->
case gen_server:call(Pool, stats) of
{error, disabled} -> {error, disabled};
{ok, {TimeTotal, TimeUsed, Workers}} ->
{ok, (TimeUsed / TimeTotal / Workers)}
end.


init({PoolArgs, WorkerArgs}) ->
process_flag(trap_exit, true),
Waiting = queue:new(),
Expand All @@ -162,6 +174,8 @@ init([{strategy, lifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = lifo});
init([{strategy, fifo} | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State#state{strategy = fifo});
init([{stats, StatsEnabled} | Rest], WorkerArgs, State) when is_boolean(StatsEnabled) ->
init(Rest, WorkerArgs, State#state{stats = StatsEnabled});
init([_ | Rest], WorkerArgs, State) ->
init(Rest, WorkerArgs, State);
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
Expand All @@ -170,21 +184,21 @@ init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->

handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) ->
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
[{Pid, _, MRef, MaybeStartedAt}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_checkin(Pid, State),
NewState = handle_checkin(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
{noreply, State}
end;

handle_cast({cancel_waiting, CRef}, State) ->
case ets:match(State#state.monitors, {'$1', CRef, '$2'}) of
[[Pid, MRef]] ->
case ets:match(State#state.monitors, {'$1', CRef, '$2', '$3'}) of
[[Pid, MRef, MaybeStartedAt]] ->
demonitor(MRef, [flush]),
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
NewState = handle_checkin(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
Cancel = fun({_, Ref, MRef}) when Ref =:= CRef ->
Expand All @@ -206,15 +220,20 @@ handle_call({checkout, CRef, Block}, {FromPid, _} = From, State) ->
monitors = Monitors,
overflow = Overflow,
max_overflow = MaxOverflow,
strategy = Strategy} = State,
strategy = Strategy,
stats = StatsEnabled} = State,
case get_worker_with_strategy(Workers, Strategy) of
{{value, Pid}, Left} ->
MRef = erlang:monitor(process, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time();
true -> undefined end,
true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}),
{reply, Pid, State#state{workers = Left}};
{empty, _Left} when MaxOverflow > 0, Overflow < MaxOverflow ->
{Pid, MRef} = new_worker(Sup, FromPid),
true = ets:insert(Monitors, {Pid, CRef, MRef}),
MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time();
true -> undefined end,
true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}),
{reply, Pid, State#state{overflow = Overflow + 1}};
{empty, _Left} when Block =:= false ->
{reply, full, State};
Expand All @@ -230,6 +249,13 @@ handle_call(status, _From, State) ->
overflow = Overflow} = State,
StateName = state_name(State),
{reply, {StateName, queue:len(Workers), Overflow, ets:info(Monitors, size)}, State};
handle_call(stats, _From, #state{stats = false} = State) ->
{reply, {error, disabled}, State};
handle_call(stats, _From, State) ->
{reply, {ok, {
erlang:monotonic_time() - State#state.quant_start,
State#state.time_used, State#state.size
}}, State#state{quant_start = erlang:monotonic_time(), time_used = 0}};
handle_call(get_avail_workers, _From, State) ->
Workers = State#state.workers,
{reply, Workers, State};
Expand All @@ -239,7 +265,7 @@ handle_call(get_all_workers, _From, State) ->
{reply, WorkerList, State};
handle_call(get_all_monitors, _From, State) ->
Monitors = ets:select(State#state.monitors,
[{{'$1', '_', '$2'}, [], [{{'$1', '$2'}}]}]),
[{{'$1', '_', '$2', '_'}, [], [{{'$1', '$2'}}]}]),
{reply, Monitors, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
Expand All @@ -248,10 +274,10 @@ handle_call(_Msg, _From, State) ->
{reply, Reply, State}.

handle_info({'DOWN', MRef, _, _, _}, State) ->
case ets:match(State#state.monitors, {'$1', '_', MRef}) of
[[Pid]] ->
case ets:match(State#state.monitors, {'$1', '_', MRef, '$2'}) of
[[Pid, MaybeStartedAt]] ->
true = ets:delete(State#state.monitors, Pid),
NewState = handle_checkin(Pid, State),
NewState = handle_checkin(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
Waiting = queue:filter(fun ({_, _, R}) -> R =/= MRef end, State#state.waiting),
Expand All @@ -261,10 +287,10 @@ handle_info({'EXIT', Pid, _Reason}, State) ->
#state{supervisor = Sup,
monitors = Monitors} = State,
case ets:lookup(Monitors, Pid) of
[{Pid, _, MRef}] ->
[{Pid, _, MRef, MaybeStartedAt}] ->
true = erlang:demonitor(MRef),
true = ets:delete(Monitors, Pid),
NewState = handle_worker_exit(Pid, State),
NewState = handle_worker_exit(Pid, MaybeStartedAt, State),
{noreply, NewState};
[] ->
case queue:member(Pid, State#state.workers) of
Expand Down Expand Up @@ -328,40 +354,53 @@ prepopulate(0, _Sup, Workers) ->
prepopulate(N, Sup, Workers) ->
prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)).

handle_checkin(Pid, State) ->
handle_checkin(Pid, StartedAt, State0) ->
#state{supervisor = Sup,
waiting = Waiting,
monitors = Monitors,
overflow = Overflow} = State,
overflow = Overflow,
stats = StatsEnabled} = State0,
State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0),
case queue:out(Waiting) of
{{value, {From, CRef, MRef}}, Left} ->
true = ets:insert(Monitors, {Pid, CRef, MRef}),
MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time();
true -> undefined end,
true = ets:insert(Monitors, {Pid, CRef, MRef, MaybeStartedAt}),
gen_server:reply(From, Pid),
State#state{waiting = Left};
State1#state{waiting = Left};
{empty, Empty} when Overflow > 0 ->
ok = dismiss_worker(Sup, Pid),
State#state{waiting = Empty, overflow = Overflow - 1};
State1#state{waiting = Empty, overflow = Overflow - 1};
{empty, Empty} ->
Workers = queue:in(Pid, State#state.workers),
State#state{workers = Workers, waiting = Empty, overflow = 0}
Workers = queue:in(Pid, State1#state.workers),
State1#state{workers = Workers, waiting = Empty, overflow = 0}
end.

handle_worker_exit(Pid, State) ->
handle_checkin_stats(false, _, State) -> State;
handle_checkin_stats(_, undefined, State) -> State;
handle_checkin_stats(_, StartedAt, #state{time_used = TimeUsed} = State) ->
State#state{time_used = TimeUsed + (erlang:monotonic_time() - StartedAt)}.

handle_worker_exit(Pid, StartedAt, State0) ->
#state{supervisor = Sup,
monitors = Monitors,
overflow = Overflow} = State,
case queue:out(State#state.waiting) of
overflow = Overflow,
stats = StatsEnabled} = State0,
State1 = handle_checkin_stats(StatsEnabled, StartedAt, State0),
case queue:out(State1#state.waiting) of
{{value, {From, CRef, MRef}}, LeftWaiting} ->
NewWorker = new_worker(State#state.supervisor),
true = ets:insert(Monitors, {NewWorker, CRef, MRef}),
NewWorker = new_worker(State1#state.supervisor),
MaybeStartedAt = if StatsEnabled -> erlang:monotonic_time();
true -> undefined end,
true = ets:insert(Monitors, {NewWorker, CRef, MRef, MaybeStartedAt}),
gen_server:reply(From, NewWorker),
State#state{waiting = LeftWaiting};
State1#state{waiting = LeftWaiting};
{empty, Empty} when Overflow > 0 ->
State#state{overflow = Overflow - 1, waiting = Empty};
State1#state{overflow = Overflow - 1, waiting = Empty};
{empty, Empty} ->
W = filter_worker_by_pid(Pid, State#state.workers),
W = filter_worker_by_pid(Pid, State1#state.workers),
Workers = queue:in(new_worker(Sup), W),
State#state{workers = Workers, waiting = Empty}
State1#state{workers = Workers, waiting = Empty}
end.

state_name(State = #state{overflow = Overflow}) when Overflow < 1 ->
Expand Down
27 changes: 26 additions & 1 deletion test/poolboy_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ pool_test_() ->
{<<"Recover from timeout without exit handling">>,
fun transaction_timeout_without_exit/0},
{<<"Recover from transaction timeout">>,
fun transaction_timeout/0}
fun transaction_timeout/0},
{<<"Pool behaves when stats disabled">>,
fun pool_stats_disabled/0},
{<<"Pool behaves when stats enabled">>,
fun pool_stats_enabled/0}
]
}.

Expand Down Expand Up @@ -124,6 +128,21 @@ transaction_timeout() ->
?assertEqual({ready,1,0,0}, pool_call(Pid, status)).


pool_stats_disabled() ->
{ok, Pid} = new_pool(1, 0),
?assertMatch({error, disabled}, poolboy:stats(Pid)).

pool_stats_enabled() ->
{ok, Pid} = new_pool(1, 0, lifo, true),
?assertMatch({ok, 0.0}, poolboy:stats(Pid)),
Worker = poolboy:checkout(Pid),
timer:sleep(1000), %% emulating load?..
checkin_worker(Pid, Worker),
StatsRet = poolboy:stats(Pid),
?assertMatch({ok, Load} when Load > 0.5, StatsRet),
?assertNotMatch({ok, 0.0}, StatsRet),
Comment thread
shizzard marked this conversation as resolved.
Outdated
?assertMatch({ok, 0.0}, poolboy:stats(Pid)).

pool_startup() ->
%% Check basic pool operation.
{ok, Pid} = new_pool(10, 5),
Expand Down Expand Up @@ -546,5 +565,11 @@ new_pool(Size, MaxOverflow, Strategy) ->
{size, Size}, {max_overflow, MaxOverflow},
{strategy, Strategy}]).

new_pool(Size, MaxOverflow, Strategy, StatsEnabled) ->
poolboy:start_link([{name, {local, poolboy_test}},
{worker_module, poolboy_test_worker},
{size, Size}, {max_overflow, MaxOverflow},
{strategy, Strategy}, {stats, StatsEnabled}]).

pool_call(ServerRef, Request) ->
gen_server:call(ServerRef, Request).