開發工做中,常常會碰到進程池或者線程池,或者其它的資源池。在這裏,用erlang實現一個簡單的進程池。api
erlang進程是很是輕量級的,這個進程池的主要目的是用一種通用的方式去管理和限制系統中運行的資源佔用。當運行的工做者進程數量達到上限,進程池還能夠把任務放到隊列中,只要進程資源被釋放,排隊的任務就能得到運行,不然任務只能阻塞。async
這是進程池的監督樹:atom
ppool_supersup監督着全部的進程池。一個進程池由ppool_sup、ppool_serv和worker_sup監督的工做者進程池組成。ppool_serv提供對外的進程池調用api,ppool_sup負責監督單個進程池。線程
下面是實現代碼。3d
%% ppool_supersup -module(ppool_supersup). -behavior(supervisor). -export([start_link/0, stop/0, start_pool/3, stop_pool/1]). -export([init/1]). start_link() -> supervisor:start_link({local, ppool}, ?MODULE, []). stop() -> case whereis(ppool) of P when is_pid(P) -> exit(P, kill); _ -> ok end. init([]) -> MaxRestart = 6, MaxTime = 3600, {ok, {{one_for_one, MaxRestart, MaxTime}, []}}. start_pool(Name, Limit, MFA) -> %%每一個進程池的最大終止時間設置爲10500,這個值並無什麼特殊意義,只是保證足夠大,全部子進程都有足夠的時間終止。若是實在不知道設置爲多大,能夠試試infinity。 ChildSpec = {Name, {ppool_sup, start_link, [Name, Limit, MFA]}, permanent, 10500, supervisor, [ppool_sup]}, supervisor:start_child(ppool, ChildSpec). stop_pool(Name) -> supervisor:terminate_child(ppool, Name), supervisor:delete_child(ppool, Name).
%% ppool_sup -module(ppool_sup). -export([start_link/3, init/1]). -behavior(supervisor). start_link(Name, Limit, MFA) -> supervisor:start_link(?MODULE, {Name, Limit, MFA}). init({Name, Limit, MFA}) -> MaxRestart = 1, MaxTime = 3600, {ok, {{one_for_all, MaxRestart, MaxTime}, [{serv, {ppool_serv, start_link, [Name, Limit, self(), MFA]}, permanent, 5000, worker, [ppool_serv]}]}}.
%% ppool_worker_sup -module(ppool_worker_sup). -export([start_link/1, init/1]). -behavior(supervisor). start_link(MFA = {_, _, _}) -> supervisor:start_link(?MODULE, MFA). init({M, F, A}) -> MaxRestart = 5, MaxTime = 3600, {ok, {{simple_one_for_one, MaxRestart, MaxTime}, [{ppool_worker, {M, F, A}, temporary, 5000, worker, [M]}]}}.
ppool_serv是最複雜的一個模塊了。由於ppool_serv對外提供接口,它須要能聯繫到worker_sup。若是由ppool_sup同時啓動ppool_serv和worker_sup,存在亂序的風險,除非都註冊進程名。但erlang中對於原子的使用必定要慎重,能少用就少用。因此在這兒,由ppool_serv動態添加worker_sup到ppool_sup。code
ppool_serv提供了三種添加任務的方式:orm
%% ppool_serv -module(ppool_serv). -behavior(gen_server). -export([start/4, start_link/4, run/2, sync_queue/2, async_queue/2, stop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -define(SPEC(MFA), {worker_sup, {ppool_worker_sup, start_link, [MFA]}, permanent, 10000, supervisor, [ppool_woker_sup]}). -record(state, {limit = 0, sup, refs, queue = queue:new()}). start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) -> gen_server:start({local, Name}, ?MODULE, {Limit, MFA, Sup}, []). start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) -> gen_server:start_link({local, Name}, ?MODULE, {Limit, MFA, Sup}, []). run(Name, Args) -> gen_server:call(Name, {run, Args}). sync_queue(Name, Args) -> gen_server:call(Name, {sync, Args}, infinity). async_queue(Name, Args) -> gen_server:cast(Name, {async, Args}). stop(Name) -> gen_server:call(Name, stop). init({Limit, MFA, Sup}) -> self() ! {start_worker_supervisor, Sup, MFA}, {ok, #state{limit = Limit, refs = gb_sets:empty()}}. handle_info({'DOWN', Ref, process, _Pid, _}, S = #state{refs = Refs}) -> case gb_sets:is_element(Ref, Refs) of true -> handle_down_worker(Ref, S); false -> {noreply, S} end; handle_info({start_worker_supervisor, Sup, MFA}, S = #state{}) -> {ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)), link(Pid), {noreply, S#state{sup = Pid}}; handle_info(_Msg, State) -> {noreply, State}. handle_call({run, Args}, _From, S = #state{limit = N, sup = Sup, refs = R}) when N > 0 -> {ok, Pid} = supervisor:start_child(Sup, Args), Ref = erlang:monitor(process, Pid), {reply, {ok, Pid}, S#state{limit = N - 1, refs = gb_sets:add(Ref, R)}}; handle_call({run, _Args}, _From, S = #state{limit = N}) when N =< 0 -> {reply, noalloc, S}; handle_call({sync, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 -> {ok, Pid} = supervisor:start_child(Sup, Args), Ref = erlang:monitor(process, Pid), {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; handle_call({sync, Args}, From, S = #state{queue=Q}) -> {noreply, S#state{queue=queue:in({From, Args}, Q)}}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(_Msg, _From, State) -> {noreply, State}. handle_cast({async, Args}, S=#state{limit=N, sup=Sup, refs=R}) when N > 0 -> {ok, Pid} = supervisor:start_child(Sup, Args), Ref = erlang:monitor(process, Pid), {noreply, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}}; handle_cast({async, Args}, S=#state{limit=N, queue=Q}) when N =< 0 -> {noreply, S#state{queue=queue:in(Args,Q)}}; handle_cast(_Msg, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, _State) -> ok. handle_down_worker(Ref, S = #state{limit = L, sup = Sup, refs = Refs}) -> case queue:out(S#state.queue) of {{value, {From, Args}}, Q} -> {ok, Pid} = supervisor:start_child(Sup, Args), NewRef = erlang:monitor(process, Pid), NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)), gen_server:reply(From, {ok, Pid}), {noreply, S#state{refs = NewRefs, queue = Q}}; {{value, Args}, Q} -> {ok, Pid} = supervisor:start_child(Sup, Args), NewRef = erlang:monitor(process, Pid), NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)), {noreply, S#state{refs = NewRefs, queue = Q}}; {empty, _} -> {noreply, S#state{limit = L + 1, refs = gb_sets:delete(Ref, Refs)}} end.