diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..6c1f74a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,14 @@ +# EditorConfig is awesome: http://EditorConfig.org + +# top-most EditorConfig file +root = true + +# Unix-style newlines with a newline ending every file +[*] +end_of_line = lf +insert_final_newline = true + +# 4 space indentation +[*.{erl,src}] +indent_style = space +indent_size = 4 diff --git a/.gitignore b/.gitignore index 33df706..c6adcd0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ -*.app -*.beam -.test +.rebar .eunit .dialyzer_plt +erl_crash.dump +ebin diff --git a/.travis.yml b/.travis.yml index f64d33e..4166698 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,10 @@ notifications: webhooks: http://basho-engbot.herokuapp.com/travis?key=f298b56e20009568a29135485bca1220d0733d05 email: eng@basho.com otp_release: - - R15B01 - - R15B - - R14B04 - - R14B03 + - 17.1 + - 17.0 + - R16B03-1 + - R16B03 + - R16B02 + - R16B01 + - R16B diff --git a/Makefile b/Makefile index 1453a77..5b05539 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ -REBAR = ./rebar +REBAR = rebar DIALYZER = dialyzer DIALYZER_WARNINGS = -Wunmatched_returns -Werror_handling \ -Wrace_conditions -Wunderspecs -.PHONY: all compile test clean get-deps build-plt dialyze +.PHONY: all compile test qc clean get-deps build-plt dialyze all: compile @@ -23,10 +23,11 @@ clean: get-deps: @$(REBAR) get-deps -build-plt: +.dialyzer_plt: @$(DIALYZER) --build_plt --output_plt .dialyzer_plt \ --apps kernel stdlib -dialyze: compile - @$(DIALYZER) --src src --plt .dialyzer_plt $(DIALYZER_WARNINGS) | \ - fgrep -vf .dialyzer-ignore-warnings +build-plt: .dialyzer_plt + +dialyze: build-plt + @$(DIALYZER) --src src --plt .dialyzer_plt $(DIALYZER_WARNINGS) diff --git a/README.md b/README.md index 67fff6c..3f7baab 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,9 @@ [![Build Status](https://secure.travis-ci.org/basho/poolboy.png?branch=master)](http://travis-ci.org/basho/poolboy) +Poolboy is a **lightweight**, **generic** pooling library for Erlang with a +focus on **simplicity**, **performance**, and **rock-solid** disaster recovery. + ## Usage ```erl-sh @@ -113,6 +116,7 @@ start_link(Args) -> gen_server:start_link(?MODULE, Args, []). init(Args) -> + process_flag(trap_exit, true), Hostname = proplists:get_value(hostname, Args), Database = proplists:get_value(database, Args), Username = proplists:get_value(username, Args), @@ -161,3 +165,6 @@ code_change(_OldVsn, State, _Extra) -> Poolboy is available in the public domain (see `UNLICENSE`). Poolboy is also optionally available under the Apache License (see `LICENSE`), meant especially for jurisdictions that do not recognize public domain works. + + +[![Bitdeli Badge](https://d2weczhvl823v0.cloudfront.net/devinus/poolboy/trend.png)](https://bitdeli.com/free "Bitdeli Badge") diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..f0bb29e --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.3.0 diff --git a/package.exs b/package.exs new file mode 100644 index 0000000..ae4ba16 --- /dev/null +++ b/package.exs @@ -0,0 +1,19 @@ +defmodule Poolboy.Mixfile do + use Mix.Project + + @version File.read!("VERSION") |> String.strip + + def project do + [app: :poolboy, + version: @version, + description: "A hunky Erlang worker pool factory", + package: package] + end + + defp package do + [files: ~w(src rebar.config README.md LICENSE UNLICENSE VERSION), + contributors: ["Devin Torres", "Andrew Thompson", "Kurt Williams"], + licenses: ["Unlicense", "Apache 2.0"], + links: [{"GitHub", "https://github.com/devinus/poolboy"}]] + end +end diff --git a/rebar.config b/rebar.config index f8d97f5..128628f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,7 @@ -{erl_opts, [debug_info, warnings_as_errors]}. +{erl_opts, [ + debug_info, + {platform_define, "^R", pre17} +]}. + {eunit_opts, [verbose]}. {cover_enabled, true}. diff --git a/src/poolboy.app.src b/src/poolboy.app.src index 56f8fe0..80608a0 100644 --- a/src/poolboy.app.src +++ b/src/poolboy.app.src @@ -1,6 +1,6 @@ {application, poolboy, [ {description, "A hunky Erlang worker pool factory"}, - {vsn, git}, + {vsn, {cmd, "echo `cat VERSION`"}}, {applications, [kernel, stdlib]}, {registered, [poolboy]} ]}. diff --git a/src/poolboy.erl b/src/poolboy.erl index 5dad6c1..f6944b0 100644 --- a/src/poolboy.erl +++ b/src/poolboy.erl @@ -1,14 +1,15 @@ %% Poolboy - A hunky Erlang worker pool factory -module(poolboy). --behaviour(gen_fsm). +-behaviour(gen_server). -export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2, - child_spec/2, child_spec/3, start/1, start/2, start_link/1, - start_link/2, stop/1, status/1]). --export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3, - handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, - code_change/4]). + transaction/3, child_spec/2, child_spec/3, start/1, start/2, + start_link/1, start_link/2, stop/1, status/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + + -ifdef(PULSE). -compile(export_all). -compile({parse_transform, pulse_instrument}). @@ -19,300 +20,242 @@ -define(TIMEOUT, 5000). +-ifdef(pre17). +-type pid_queue() :: queue(). +-else. +-type pid_queue() :: queue:queue(). +-endif. + +-type pool() :: + Name :: atom() | + {Name :: atom(), node()} | + {local, Name :: atom()} | + {global, GlobalName :: any()} | + {via, Module :: atom(), ViaName :: any()}. + +% Copied from gen:start_ret/0 +-type start_ret() :: {'ok', pid()} | 'ignore' | {'error', term()}. + -record(state, { supervisor :: pid(), - workers :: queue(), - waiting :: queue(), + workers :: [pid()], + waiting :: pid_queue(), monitors :: ets:tid(), size = 5 :: non_neg_integer(), overflow = 0 :: non_neg_integer(), max_overflow = 10 :: non_neg_integer() }). --spec checkout(Pool :: node()) -> pid(). +-spec checkout(Pool :: pool()) -> pid(). checkout(Pool) -> checkout(Pool, true). --spec checkout(Pool :: node(), Block :: boolean()) -> pid() | full. +-spec checkout(Pool :: pool(), Block :: boolean()) -> pid() | full. checkout(Pool, Block) -> checkout(Pool, Block, ?TIMEOUT). --spec checkout(Pool :: node(), Block :: boolean(), Timeout :: timeout()) +-spec checkout(Pool :: pool(), Block :: boolean(), Timeout :: timeout()) -> pid() | full. checkout(Pool, Block, Timeout) -> - gen_fsm:sync_send_event(Pool, {checkout, Block, Timeout}, Timeout). + try + gen_server:call(Pool, {checkout, Block}, Timeout) + catch + Class:Reason -> + gen_server:cast(Pool, {cancel_waiting, self()}), + erlang:raise(Class, Reason, erlang:get_stacktrace()) + end. --spec checkin(Pool :: node(), Worker :: pid()) -> ok. +-spec checkin(Pool :: pool(), Worker :: pid()) -> ok. checkin(Pool, Worker) when is_pid(Worker) -> - gen_fsm:send_event(Pool, {checkin, Worker}). + gen_server:cast(Pool, {checkin, Worker}). --spec transaction(Pool :: node(), Fun :: fun((Worker :: pid()) -> any())) +-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any())) -> any(). transaction(Pool, Fun) -> - Worker = poolboy:checkout(Pool), + transaction(Pool, Fun, ?TIMEOUT). + +-spec transaction(Pool :: pool(), Fun :: fun((Worker :: pid()) -> any()), + Timeout :: timeout()) -> any(). +transaction(Pool, Fun, Timeout) -> + Worker = poolboy:checkout(Pool, true, Timeout), try Fun(Worker) after ok = poolboy:checkin(Pool, Worker) end. --spec child_spec(Pool :: node(), PoolArgs :: proplists:proplist()) +-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist()) -> supervisor:child_spec(). -child_spec(Pool, PoolArgs) -> - child_spec(Pool, PoolArgs, []). +child_spec(PoolId, PoolArgs) -> + child_spec(PoolId, PoolArgs, []). --spec child_spec(Pool :: node(), +-spec child_spec(PoolId :: term(), PoolArgs :: proplists:proplist(), WorkerArgs :: proplists:proplist()) -> supervisor:child_spec(). -child_spec(Pool, PoolArgs, WorkerArgs) -> - {Pool, {poolboy, start_link, [PoolArgs, WorkerArgs]}, +child_spec(PoolId, PoolArgs, WorkerArgs) -> + {PoolId, {poolboy, start_link, [PoolArgs, WorkerArgs]}, permanent, 5000, worker, [poolboy]}. -spec start(PoolArgs :: proplists:proplist()) - -> {ok, pid()}. + -> start_ret(). start(PoolArgs) -> start(PoolArgs, PoolArgs). -spec start(PoolArgs :: proplists:proplist(), WorkerArgs:: proplists:proplist()) - -> {ok, pid()}. + -> start_ret(). start(PoolArgs, WorkerArgs) -> start_pool(start, PoolArgs, WorkerArgs). -spec start_link(PoolArgs :: proplists:proplist()) - -> {ok, pid()}. + -> start_ret(). start_link(PoolArgs) -> %% for backwards compatability, pass the pool args as the worker args as well start_link(PoolArgs, PoolArgs). -spec start_link(PoolArgs :: proplists:proplist(), WorkerArgs:: proplists:proplist()) - -> {ok, pid()}. + -> start_ret(). start_link(PoolArgs, WorkerArgs) -> start_pool(start_link, PoolArgs, WorkerArgs). --spec stop(Pool :: node()) -> ok. +-spec stop(Pool :: pool()) -> ok. stop(Pool) -> - gen_fsm:sync_send_all_state_event(Pool, stop). + gen_server:call(Pool, stop). --spec status(Pool :: node()) -> {atom(), integer(), integer(), integer()}. +-spec status(Pool :: pool()) -> {atom(), integer(), integer(), integer()}. status(Pool) -> - gen_fsm:sync_send_all_state_event(Pool, status). + gen_server:call(Pool, status). init({PoolArgs, WorkerArgs}) -> process_flag(trap_exit, true), Waiting = queue:new(), Monitors = ets:new(monitors, [private]), - init(PoolArgs, WorkerArgs, #state{waiting=Waiting, monitors=Monitors}). + init(PoolArgs, WorkerArgs, #state{waiting = Waiting, monitors = Monitors}). init([{worker_module, Mod} | Rest], WorkerArgs, State) when is_atom(Mod) -> {ok, Sup} = poolboy_sup:start_link(Mod, WorkerArgs), - init(Rest, WorkerArgs, State#state{supervisor=Sup}); + init(Rest, WorkerArgs, State#state{supervisor = Sup}); init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) -> - init(Rest, WorkerArgs, State#state{size=Size}); + init(Rest, WorkerArgs, State#state{size = Size}); init([{max_overflow, MaxOverflow} | Rest], WorkerArgs, State) when is_integer(MaxOverflow) -> - init(Rest, WorkerArgs, State#state{max_overflow=MaxOverflow}); + init(Rest, WorkerArgs, State#state{max_overflow = MaxOverflow}); init([_ | Rest], WorkerArgs, State) -> init(Rest, WorkerArgs, State); -init([], _WorkerArgs, #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow}=State) -> +init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) -> Workers = prepopulate(Size, Sup), - StartState = case Size of - Size when Size < 1, MaxOverflow < 1 -> full; - Size when Size < 1 -> overflow; - Size -> ready - end, - {ok, StartState, State#state{workers=Workers}}. - -ready({checkin, Pid}, State) -> - Monitors = State#state.monitors, + {ok, State#state{workers = Workers}}. + +handle_cast({checkin, Pid}, State = #state{monitors = Monitors}) -> case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers}}; + NewState = handle_checkin(Pid, State), + {noreply, NewState}; [] -> - {next_state, ready, State} + {noreply, State} end; -ready(_Event, State) -> - {next_state, ready, State}. -ready({checkout, Block, Timeout}, {FromPid, _}=From, State) -> +handle_cast({cancel_waiting, Pid}, State) -> + Waiting = queue:filter(fun ({{P, _}, _}) -> P =/= Pid end, State#state.waiting), + {noreply, State#state{waiting = Waiting}}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_call({checkout, Block}, {FromPid, _} = From, State) -> #state{supervisor = Sup, workers = Workers, monitors = Monitors, + overflow = Overflow, max_overflow = MaxOverflow} = State, - case queue:out(Workers) of - {{value, Pid}, Left} -> + case Workers of + [Pid | Left] -> Ref = erlang:monitor(process, FromPid), true = ets:insert(Monitors, {Pid, Ref}), - NextState = case queue:is_empty(Left) of - true when MaxOverflow < 1 -> full; - true -> overflow; - false -> ready - end, - {reply, Pid, NextState, State#state{workers=Left}}; - {empty, Empty} when MaxOverflow > 0 -> + {reply, Pid, State#state{workers = Left}}; + [] when MaxOverflow > 0, Overflow < MaxOverflow -> {Pid, Ref} = new_worker(Sup, FromPid), true = ets:insert(Monitors, {Pid, Ref}), - {reply, Pid, overflow, State#state{workers=Empty, overflow=1}}; - {empty, Empty} when Block =:= false -> - {reply, full, full, State#state{workers=Empty}}; - {empty, Empty} -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{workers=Empty, waiting=Waiting}} - end; -ready(_Event, _From, State) -> - {reply, ok, ready, State}. - -overflow({checkin, Pid}, #state{overflow=0}=State) -> - Monitors = State#state.monitors, - case ets:lookup(Monitors, Pid) of - [{Pid, Ref}] -> - true = erlang:demonitor(Ref), - true = ets:delete(Monitors, Pid), - NextState = case State#state.size > 0 of - true -> ready; - false -> overflow - end, - Workers = queue:in(Pid, State#state.workers), - {next_state, NextState, State#state{overflow=0, workers=Workers}}; - [] -> - {next_state, overflow, State} - end; -overflow({checkin, Pid}, State) -> - #state{supervisor=Sup, monitors=Monitors, overflow=Overflow} = State, - case ets:lookup(Monitors, Pid) of - [{Pid, Ref}] -> - ok = dismiss_worker(Sup, Pid), - true = erlang:demonitor(Ref), - true = ets:delete(Monitors, Pid), - {next_state, overflow, State#state{overflow=Overflow-1}}; - [] -> - {next_state, overflow, State} - end; -overflow(_Event, State) -> - {next_state, overflow, State}. - -overflow({checkout, Block, Timeout}, From, - #state{overflow=Overflow, - max_overflow=MaxOverflow}=State) when Overflow >= MaxOverflow -> - case Block of - true -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{waiting=Waiting}}; - false -> - {reply, full, full, State} - end; -overflow({checkout, _Block, _Timeout}, {From, _}, State) -> - #state{supervisor = Sup, - overflow = Overflow, - max_overflow = MaxOverflow} = State, - {Pid, Ref} = new_worker(Sup, From), - true = ets:insert(State#state.monitors, {Pid, Ref}), - NewOverflow = Overflow + 1, - NextState = case NewOverflow >= MaxOverflow of - true -> full; - false -> overflow - end, - {reply, Pid, NextState, State#state{overflow=NewOverflow}}; -overflow(_Event, _From, State) -> - {reply, ok, overflow, State}. - -full({checkin, Pid}, State) -> - #state{monitors = Monitors} = State, - case ets:lookup(Monitors, Pid) of - [{Pid, Ref}] -> - true = erlang:demonitor(Ref), - true = ets:delete(Monitors, Pid), - checkin_while_full(Pid, State); + {reply, Pid, State#state{overflow = Overflow + 1}}; + [] when Block =:= false -> + {reply, full, State}; [] -> - {next_state, full, State} + Ref = erlang:monitor(process, FromPid), + Waiting = queue:in({From, Ref}, State#state.waiting), + {noreply, State#state{waiting = Waiting}} end; -full(_Event, State) -> - {next_state, full, State}. - -full({checkout, true, Timeout}, From, State) -> - Waiting = add_waiting(From, Timeout, State#state.waiting), - {next_state, full, State#state{waiting=Waiting}}; -full({checkout, false, _Timeout}, _From, State) -> - {reply, full, full, State}; -full(_Event, _From, State) -> - {reply, ok, full, State}. - -handle_event(_Event, StateName, State) -> - {next_state, StateName, State}. - -handle_sync_event(status, _From, StateName, State) -> - {reply, {StateName, queue:len(State#state.workers), State#state.overflow, - ets:info(State#state.monitors, size)}, - StateName, State}; -handle_sync_event(get_avail_workers, _From, StateName, State) -> + +handle_call(status, _From, State) -> + #state{workers = Workers, + monitors = Monitors, + overflow = Overflow} = State, + StateName = state_name(State), + {reply, {StateName, length(Workers), Overflow, ets:info(Monitors, size)}, State}; +handle_call(get_avail_workers, _From, State) -> Workers = State#state.workers, - WorkerList = queue:to_list(Workers), - {reply, WorkerList, StateName, State}; -handle_sync_event(get_all_workers, _From, StateName, State) -> + {reply, Workers, State}; +handle_call(get_all_workers, _From, State) -> Sup = State#state.supervisor, WorkerList = supervisor:which_children(Sup), - {reply, WorkerList, StateName, State}; -handle_sync_event(get_all_monitors, _From, StateName, State) -> + {reply, WorkerList, State}; +handle_call(get_all_monitors, _From, State) -> Monitors = ets:tab2list(State#state.monitors), - {reply, Monitors, StateName, State}; -handle_sync_event(stop, _From, _StateName, State) -> - Sup = State#state.supervisor, - true = exit(Sup, shutdown), + {reply, Monitors, State}; +handle_call(stop, _From, State) -> {stop, normal, ok, State}; -handle_sync_event(_Event, _From, StateName, State) -> +handle_call(_Msg, _From, State) -> Reply = {error, invalid_message}, - {reply, Reply, StateName, State}. + {reply, Reply, State}. -handle_info({'DOWN', Ref, _, _, _}, StateName, State) -> +handle_info({'DOWN', Ref, _, _, _}, State) -> case ets:match(State#state.monitors, {'$1', Ref}) of [[Pid]] -> - Sup = State#state.supervisor, - ok = supervisor:terminate_child(Sup, Pid), - %% Don't wait for the EXIT message to come in. - %% Deal with the worker exit right now to avoid - %% a race condition with messages waiting in the - %% mailbox. true = ets:delete(State#state.monitors, Pid), - handle_worker_exit(Pid, StateName, State); + NewState = handle_checkin(Pid, State), + {noreply, NewState}; [] -> - {next_state, StateName, State} + Waiting = queue:filter(fun ({_, R}) -> R =/= Ref end, State#state.waiting), + {noreply, State#state{waiting = Waiting}} end; -handle_info({'EXIT', Pid, _Reason}, StateName, State) -> +handle_info({'EXIT', Pid, _Reason}, State) -> #state{supervisor = Sup, monitors = Monitors} = State, case ets:lookup(Monitors, Pid) of [{Pid, Ref}] -> true = erlang:demonitor(Ref), true = ets:delete(Monitors, Pid), - handle_worker_exit(Pid, StateName, State); + NewState = handle_worker_exit(Pid, State), + {noreply, NewState}; [] -> - case queue:member(Pid, State#state.workers) of + case lists:member(Pid, State#state.workers) of true -> - W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers), - {next_state, StateName, State#state{workers=queue:in(new_worker(Sup), W)}}; + W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers), + {noreply, State#state{workers = [new_worker(Sup) | W]}}; false -> - {next_state, StateName, State} + {noreply, State} end end; -handle_info(_Info, StateName, State) -> - {next_state, StateName, State}. -terminate(_Reason, _StateName, _State) -> +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + ok = lists:foreach(fun (W) -> unlink(W) end, State#state.workers), + true = exit(State#state.supervisor, shutdown), ok. -code_change(_OldVsn, StateName, State, _Extra) -> - {ok, StateName, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. start_pool(StartFun, PoolArgs, WorkerArgs) -> case proplists:get_value(name, PoolArgs) of undefined -> - gen_fsm:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []); + gen_server:StartFun(?MODULE, {PoolArgs, WorkerArgs}, []); Name -> - gen_fsm:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) + gen_server:StartFun(Name, ?MODULE, {PoolArgs, WorkerArgs}, []) end. new_worker(Sup) -> @@ -330,101 +273,62 @@ dismiss_worker(Sup, Pid) -> supervisor:terminate_child(Sup, Pid). prepopulate(N, _Sup) when N < 1 -> - queue:new(); + []; prepopulate(N, Sup) -> - prepopulate(N, Sup, queue:new()). + prepopulate(N, Sup, []). prepopulate(0, _Sup, Workers) -> Workers; prepopulate(N, Sup, Workers) -> - prepopulate(N-1, Sup, queue:in(new_worker(Sup), Workers)). - -add_waiting(Pid, Timeout, Queue) -> - queue:in({Pid, Timeout, os:timestamp()}, Queue). + prepopulate(N-1, Sup, [new_worker(Sup) | Workers]). -wait_valid(infinity, _Timeout) -> - true; -wait_valid(StartTime, Timeout) -> - Waited = timer:now_diff(os:timestamp(), StartTime), - (Waited div 1000) < Timeout. - -checkin_while_full(Pid, State) -> +handle_checkin(Pid, State) -> #state{supervisor = Sup, waiting = Waiting, monitors = Monitors, - max_overflow = MaxOverflow, overflow = Overflow} = State, case queue:out(Waiting) of - {{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} -> - case wait_valid(StartTime, Timeout) of - true -> - Ref1 = erlang:monitor(process, FromPid), - true = ets:insert(Monitors, {Pid, Ref1}), - gen_fsm:reply(From, Pid), - {next_state, full, State#state{waiting=Left}}; - false -> - checkin_while_full(Pid, State#state{waiting=Left}) - end; - {empty, Empty} when MaxOverflow < 1 -> - Workers = queue:in(Pid, State#state.workers), - {next_state, ready, State#state{workers=Workers, - waiting=Empty}}; - {empty, Empty} -> + {{value, {{FromPid, _} = From, _}}, Left} -> + Ref = erlang:monitor(process, FromPid), + true = ets:insert(Monitors, {Pid, Ref}), + gen_server:reply(From, Pid), + State#state{waiting = Left}; + {empty, Empty} when Overflow > 0 -> ok = dismiss_worker(Sup, Pid), - {next_state, overflow, State#state{waiting=Empty, - overflow=Overflow-1}} + State#state{waiting = Empty, overflow = Overflow - 1}; + {empty, Empty} -> + Workers = [Pid | State#state.workers], + State#state{workers = Workers, waiting = Empty, overflow = 0} end. -handle_worker_exit(Pid, StateName, State) -> +handle_worker_exit(Pid, State) -> #state{supervisor = Sup, - overflow = Overflow, - waiting = Waiting, monitors = Monitors, - max_overflow = MaxOverflow} = State, - case StateName of - ready -> - W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers), - {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}}; - overflow when Overflow =:= 0 -> - W = queue:filter(fun (P) -> P =/= Pid end, State#state.workers), - {next_state, ready, State#state{workers=queue:in(new_worker(Sup), W)}}; - overflow -> - {next_state, overflow, State#state{overflow=Overflow-1}}; - full when MaxOverflow < 1 -> - case queue:out(Waiting) of - {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} -> - case wait_valid(StartTime, Timeout) of - true -> - MonitorRef = erlang:monitor(process, FromPid), - NewWorker = new_worker(Sup), - true = ets:insert(Monitors, {NewWorker, MonitorRef}), - gen_fsm:reply(From, NewWorker), - {next_state, full, State#state{waiting=LeftWaiting}}; - false -> - handle_worker_exit(Pid, StateName, State#state{waiting=LeftWaiting}) - end; - {empty, Empty} -> - Workers2 = queue:in(new_worker(Sup), State#state.workers), - {next_state, ready, State#state{waiting=Empty, - workers=Workers2}} - end; - full when Overflow =< MaxOverflow -> - case queue:out(Waiting) of - {{value, {{FromPid, _}=From, Timeout, StartTime}}, LeftWaiting} -> - case wait_valid(StartTime, Timeout) of - true -> - MonitorRef = erlang:monitor(process, FromPid), - NewWorker = new_worker(Sup), - true = ets:insert(Monitors, {NewWorker, MonitorRef}), - gen_fsm:reply(From, NewWorker), - {next_state, full, State#state{waiting=LeftWaiting}}; - _ -> - handle_worker_exit(Pid, StateName, State#state{waiting=LeftWaiting}) - end; - {empty, Empty} -> - {next_state, overflow, State#state{overflow=Overflow-1, - waiting=Empty}} - end; - full -> - {next_state, full, State#state{overflow=Overflow-1}} + overflow = Overflow} = State, + case queue:out(State#state.waiting) of + {{value, {{FromPid, _} = From, _}}, LeftWaiting} -> + MonitorRef = erlang:monitor(process, FromPid), + NewWorker = new_worker(State#state.supervisor), + true = ets:insert(Monitors, {NewWorker, MonitorRef}), + gen_server:reply(From, NewWorker), + State#state{waiting = LeftWaiting}; + {empty, Empty} when Overflow > 0 -> + State#state{overflow = Overflow - 1, waiting = Empty}; + {empty, Empty} -> + Workers = + [new_worker(Sup) + | lists:filter(fun (P) -> P =/= Pid end, State#state.workers)], + State#state{workers = Workers, waiting = Empty} end. + +state_name(State = #state{overflow = Overflow}) when Overflow < 1 -> + #state{max_overflow = MaxOverflow, workers = Workers} = State, + case length(Workers) == 0 of + true when MaxOverflow < 1 -> full; + true -> overflow; + false -> ready + end; +state_name(#state{overflow = MaxOverflow, max_overflow = MaxOverflow}) -> + full; +state_name(_State) -> + overflow. diff --git a/src/poolboy_worker.erl b/src/poolboy_worker.erl index 7376756..062660f 100644 --- a/src/poolboy_worker.erl +++ b/src/poolboy_worker.erl @@ -2,13 +2,14 @@ -module(poolboy_worker). --export([behaviour_info/1]). -ifdef(PULSE). -compile(export_all). -compile({parse_transform, pulse_instrument}). -endif. -behaviour_info(callbacks) -> - [{start_link, 1}]; -behaviour_info(_Other) -> - undefined. +-callback start_link(WorkerArgs) -> {ok, Pid} | + {error, {already_started, Pid}} | + {error, Reason} when + WorkerArgs :: proplists:proplist(), + Pid :: pid(), + Reason :: term(). diff --git a/test/poolboy_eqc.erl b/test/poolboy_eqc.erl index 1c0957e..59031d4 100644 --- a/test/poolboy_eqc.erl +++ b/test/poolboy_eqc.erl @@ -67,7 +67,7 @@ start_poolboy(Args) -> Pid. stop_poolboy(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, stop), + gen_server:call(Pid, stop), timer:sleep(1). checkout_nonblock(Pool) -> @@ -78,7 +78,7 @@ checkout_block(Pool) -> checkin(Pool, {Worker, _}) -> Res = poolboy:checkin(Pool, Worker), - gen_fsm:sync_send_all_state_event(Pool, get_avail_workers), + gen_server:call(Pool, get_avail_workers), Res. kill_worker({Worker, _}) -> @@ -130,7 +130,7 @@ invariant(S = #state{pid=Pid},_) when Pid /= undefined -> OverFlow = max(0, length(S#state.checked_out) - S#state.size), Monitors = length(S#state.checked_out), - RealStatus = gen_fsm:sync_send_all_state_event(Pid, status), + RealStatus = gen_server:call(Pid, status), case RealStatus == {State, Workers, OverFlow, Monitors} of true -> true; diff --git a/test/poolboy_tests.erl b/test/poolboy_tests.erl index 762c341..807b3dc 100644 --- a/test/poolboy_tests.erl +++ b/test/poolboy_tests.erl @@ -2,9 +2,6 @@ -include_lib("eunit/include/eunit.hrl"). --define(sync(Pid, Event), - gen_fsm:sync_send_all_state_event(Pid, Event)). - pool_test_() -> {foreach, fun() -> @@ -13,7 +10,7 @@ pool_test_() -> fun(_) -> case whereis(poolboy_test) of undefined -> ok; - Pid -> ?sync(Pid, stop) + Pid -> pool_call(Pid, stop) end, error_logger:tty(true) end, @@ -60,7 +57,7 @@ pool_test_() -> %% Tell a worker to exit and await its impending doom. kill_worker(Pid) -> erlang:monitor(process, Pid), - gen_server:call(Pid, die), + pool_call(Pid, die), receive {'DOWN', _, process, Pid, _} -> ok @@ -76,48 +73,48 @@ checkin_worker(Pid, Worker) -> pool_startup() -> %% Check basic pool operation. {ok, Pid} = new_pool(10, 5), - ?assertEqual(10, length(?sync(Pid, get_avail_workers))), + ?assertEqual(10, length(pool_call(Pid, get_avail_workers))), poolboy:checkout(Pid), - ?assertEqual(9, length(?sync(Pid, get_avail_workers))), + ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), Worker = poolboy:checkout(Pid), - ?assertEqual(8, length(?sync(Pid, get_avail_workers))), + ?assertEqual(8, length(pool_call(Pid, get_avail_workers))), checkin_worker(Pid, Worker), - ?assertEqual(9, length(?sync(Pid, get_avail_workers))), - ?assertEqual(1, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(9, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). pool_overflow() -> %% Check that the pool overflows properly. {ok, Pid} = new_pool(5, 5), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(7, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, checkin_worker(Pid, A), checkin_worker(Pid, B), - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), - ?assertEqual(0, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). pool_empty() -> %% Checks that the the pool handles the empty condition correctly when %% overflow is enabled. {ok, Pid} = new_pool(5, 2), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(7, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(7, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E, F, G] = Workers, Self = self(), spawn(fun() -> @@ -141,29 +138,29 @@ pool_empty() -> after 500 -> ?assert(false) end, - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(2, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), checkin_worker(Pid, F), - ?assertEqual(4, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, G), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), - ?assertEqual(0, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). pool_empty_no_overflow() -> %% Checks the pool handles the empty condition properly when overflow is %% disabled. {ok, Pid} = new_pool(5, 0), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), [A, B, C, D, E] = Workers, Self = self(), spawn(fun() -> @@ -187,17 +184,17 @@ pool_empty_no_overflow() -> after 500 -> ?assert(false) end, - ?assertEqual(2, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, C), checkin_worker(Pid, D), - ?assertEqual(4, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(4, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), checkin_worker(Pid, E), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), - ?assertEqual(0, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). worker_death() -> %% Check that dead workers are only restarted when the pool is not full @@ -205,19 +202,19 @@ worker_death() -> {ok, Pid} = new_pool(5, 2), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(7, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(7, length(pool_call(Pid, get_all_workers))), kill_worker(A), - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(6, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(6, length(pool_call(Pid, get_all_workers))), kill_worker(B), kill_worker(C), - ?assertEqual(1, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), - ?assertEqual(4, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), + ?assertEqual(4, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). worker_death_while_full() -> %% Check that if a worker dies while the pool is full and there is a @@ -226,10 +223,10 @@ worker_death_while_full() -> {ok, Pid} = new_pool(5, 2), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), [A, B|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 6)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(7, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(7, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> poolboy:checkout(Pid), @@ -255,10 +252,10 @@ worker_death_while_full() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(6, length(?sync(Pid, get_all_workers))), - ?assertEqual(6, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(6, length(pool_call(Pid, get_all_workers))), + ?assertEqual(6, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). worker_death_while_full_no_overflow() -> %% Check that if a worker dies while the pool is full and there's no @@ -267,10 +264,10 @@ worker_death_while_full_no_overflow() -> {ok, Pid} = new_pool(5, 0), Worker = poolboy:checkout(Pid), kill_worker(Worker), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), [A, B, C|_Workers] = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), Self = self(), spawn(fun() -> poolboy:checkout(Pid), @@ -295,36 +292,36 @@ worker_death_while_full_no_overflow() -> 1000 -> ?assert(false) end, kill_worker(B), - ?assertEqual(1, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(1, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), kill_worker(C), - ?assertEqual(2, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), - ?assertEqual(3, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(2, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), + ?assertEqual(3, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). pool_full_nonblocking_no_overflow() -> %% Check that when the pool is full, checkouts return 'full' when the %% option to use non-blocking checkouts is used. {ok, Pid} = new_pool(5, 0), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 4)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), ?assertEqual(full, poolboy:checkout(Pid, false)), A = hd(Workers), checkin_worker(Pid, A), ?assertEqual(A, poolboy:checkout(Pid)), - ?assertEqual(5, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(5, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). pool_full_nonblocking() -> %% Check that when the pool is full, checkouts return 'full' when the %% option to use non-blocking checkouts is used. {ok, Pid} = new_pool(5, 5), Workers = [poolboy:checkout(Pid) || _ <- lists:seq(0, 9)], - ?assertEqual(0, length(?sync(Pid, get_avail_workers))), - ?assertEqual(10, length(?sync(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(10, length(pool_call(Pid, get_all_workers))), ?assertEqual(full, poolboy:checkout(Pid, false)), A = hd(Workers), checkin_worker(Pid, A), @@ -332,8 +329,8 @@ pool_full_nonblocking() -> ?assertEqual(false, is_process_alive(A)), %% Overflow workers get shutdown ?assert(is_pid(NewWorker)), ?assertEqual(full, poolboy:checkout(Pid, false)), - ?assertEqual(10, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(10, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). owner_death() -> %% Check that a dead owner (a process that dies with a worker checked out) @@ -344,17 +341,17 @@ owner_death() -> receive after 500 -> exit(normal) end end), timer:sleep(1000), - ?assertEqual(5, length(?sync(Pid, get_avail_workers))), - ?assertEqual(5, length(?sync(Pid, get_all_workers))), - ?assertEqual(0, length(?sync(Pid, get_all_monitors))), - ok = ?sync(Pid, stop). + ?assertEqual(5, length(pool_call(Pid, get_avail_workers))), + ?assertEqual(5, length(pool_call(Pid, get_all_workers))), + ?assertEqual(0, length(pool_call(Pid, get_all_monitors))), + ok = pool_call(Pid, stop). checkin_after_exception_in_transaction() -> {ok, Pool} = new_pool(2, 0), - ?assertEqual(2, length(?sync(Pool, get_avail_workers))), + ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), Tx = fun(Worker) -> ?assert(is_pid(Worker)), - ?assertEqual(1, length(?sync(Pool, get_avail_workers))), + ?assertEqual(1, length(pool_call(Pool, get_avail_workers))), throw(it_on_the_ground), ?assert(false) end, @@ -363,15 +360,42 @@ checkin_after_exception_in_transaction() -> catch throw:it_on_the_ground -> ok end, - ?assertEqual(2, length(?sync(Pool, get_avail_workers))), - ok = ?sync(Pool, stop). + ?assertEqual(2, length(pool_call(Pool, get_avail_workers))), + ok = pool_call(Pool, stop). pool_returns_status() -> {ok, Pool} = new_pool(2, 0), ?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)), - ok = ?sync(Pool, stop). + poolboy:checkout(Pool), + ?assertEqual({ready, 1, 0, 1}, poolboy:status(Pool)), + poolboy:checkout(Pool), + ?assertEqual({full, 0, 0, 2}, poolboy:status(Pool)), + ok = pool_call(Pool, stop), + + {ok, Pool2} = new_pool(1, 1), + ?assertEqual({ready, 1, 0, 0}, poolboy:status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual({overflow, 0, 0, 1}, poolboy:status(Pool2)), + poolboy:checkout(Pool2), + ?assertEqual({full, 0, 1, 2}, poolboy:status(Pool2)), + ok = pool_call(Pool2, stop), + + {ok, Pool3} = new_pool(0, 2), + ?assertEqual({overflow, 0, 0, 0}, poolboy:status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual({overflow, 0, 1, 1}, poolboy:status(Pool3)), + poolboy:checkout(Pool3), + ?assertEqual({full, 0, 2, 2}, poolboy:status(Pool3)), + ok = pool_call(Pool3, stop), + + {ok, Pool4} = new_pool(0, 0), + ?assertEqual({full, 0, 0, 0}, poolboy:status(Pool4)), + ok = pool_call(Pool4, stop). new_pool(Size, MaxOverflow) -> poolboy:start_link([{name, {local, poolboy_test}}, {worker_module, poolboy_test_worker}, {size, Size}, {max_overflow, MaxOverflow}]). + +pool_call(ServerRef, Request) -> + gen_server:call(ServerRef, Request).