Erlang實現進程池

開發工做中,常常會碰到進程池或者線程池,或者其它的資源池。在這裏,用erlang實現一個簡單的進程池。api

erlang進程是很是輕量級的,這個進程池的主要目的是用一種通用的方式去管理和限制系統中運行的資源佔用。當運行的工做者進程數量達到上限,進程池還能夠把任務放到隊列中,只要進程資源被釋放,排隊的任務就能得到運行,不然任務只能阻塞。async

這是進程池的監督樹:atom

supervisor tree

ppool_supersup監督着全部的進程池。一個進程池由ppool_sup、ppool_serv和worker_sup監督的工做者進程池組成。ppool_serv提供對外的進程池調用api,ppool_sup負責監督單個進程池。線程

下面是實現代碼。3d

%% ppool_supersup
-module(ppool_supersup).
-behavior(supervisor).
-export([start_link/0, stop/0, start_pool/3, stop_pool/1]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ppool}, ?MODULE, []).

stop() ->
    case whereis(ppool) of
        P when is_pid(P) ->
            exit(P, kill);
        _ ->
            ok
    end.

init([]) ->
    MaxRestart = 6,
    MaxTime = 3600,
    {ok, {{one_for_one, MaxRestart, MaxTime}, []}}.

start_pool(Name, Limit, MFA) ->
%%每一個進程池的最大終止時間設置爲10500,這個值並無什麼特殊意義,只是保證足夠大,全部子進程都有足夠的時間終止。若是實在不知道設置爲多大,能夠試試infinity。
    ChildSpec = {Name,
                 {ppool_sup, start_link, [Name, Limit, MFA]},
                 permanent, 10500, supervisor, [ppool_sup]},
    supervisor:start_child(ppool, ChildSpec).

stop_pool(Name) ->
    supervisor:terminate_child(ppool, Name),
    supervisor:delete_child(ppool, Name).
%% ppool_sup
-module(ppool_sup).
-export([start_link/3, init/1]).
-behavior(supervisor).

start_link(Name, Limit, MFA) ->
    supervisor:start_link(?MODULE, {Name, Limit, MFA}).

init({Name, Limit, MFA}) ->
    MaxRestart = 1,
    MaxTime = 3600,
    {ok, {{one_for_all, MaxRestart, MaxTime},
          [{serv, 
            {ppool_serv, start_link, [Name, Limit, self(), MFA]},
            permanent,
            5000,
            worker,
            [ppool_serv]}]}}.
%% ppool_worker_sup
-module(ppool_worker_sup).
-export([start_link/1, init/1]).
-behavior(supervisor).

start_link(MFA = {_, _, _}) ->
    supervisor:start_link(?MODULE, MFA).

init({M, F, A}) ->
    MaxRestart = 5,
    MaxTime = 3600,
    {ok, {{simple_one_for_one, MaxRestart, MaxTime},
          [{ppool_worker,
           {M, F, A},
           temporary, 5000, worker, [M]}]}}.

ppool_serv是最複雜的一個模塊了。由於ppool_serv對外提供接口,它須要能聯繫到worker_sup。若是由ppool_sup同時啓動ppool_serv和worker_sup,存在亂序的風險,除非都註冊進程名。但erlang中對於原子的使用必定要慎重,能少用就少用。因此在這兒,由ppool_serv動態添加worker_sup到ppool_sup。code

ppool_serv提供了三種添加任務的方式:orm

  • 若是進程池中有空間,馬上運行;若是已滿,給出沒法運行的指示。
  • 若是進程池中有空間,馬上運行;若是已滿,調用者進程阻塞等待,任務入隊列。
  • 若是進程池中有空間,馬上運行;若是已滿,任務入隊列,調用者進程不阻塞。
%% ppool_serv
-module(ppool_serv).
-behavior(gen_server).
-export([start/4, start_link/4, run/2, sync_queue/2, async_queue/2, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).

-define(SPEC(MFA),
        {worker_sup,
         {ppool_worker_sup, start_link, [MFA]},
         permanent,
         10000,
         supervisor,
         [ppool_woker_sup]}).

-record(state, {limit = 0,
                sup,
                refs,
                queue = queue:new()}).

start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) ->
    gen_server:start({local, Name}, ?MODULE, {Limit, MFA, Sup}, []).

start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit) ->
    gen_server:start_link({local, Name}, ?MODULE, {Limit, MFA, Sup}, []).

run(Name, Args) ->
    gen_server:call(Name, {run, Args}).

sync_queue(Name, Args) ->
    gen_server:call(Name, {sync, Args}, infinity).

async_queue(Name, Args) ->
    gen_server:cast(Name, {async, Args}).

stop(Name) ->
    gen_server:call(Name, stop).

init({Limit, MFA, Sup}) ->
    self() ! {start_worker_supervisor, Sup, MFA},
    {ok, #state{limit = Limit, refs = gb_sets:empty()}}.

handle_info({'DOWN', Ref, process, _Pid, _}, S = #state{refs = Refs}) ->
    case gb_sets:is_element(Ref, Refs) of
        true ->
            handle_down_worker(Ref, S);
        false ->
            {noreply, S}
    end;
handle_info({start_worker_supervisor, Sup, MFA}, S = #state{}) ->
    {ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)),
    link(Pid),
    {noreply, S#state{sup = Pid}};
handle_info(_Msg, State) ->
    {noreply, State}.

handle_call({run, Args}, _From, S = #state{limit = N, sup = Sup, refs = R}) when N > 0 ->
    {ok, Pid} = supervisor:start_child(Sup, Args),
    Ref = erlang:monitor(process, Pid),
    {reply, {ok, Pid}, S#state{limit = N - 1, refs = gb_sets:add(Ref, R)}};
handle_call({run, _Args}, _From, S = #state{limit = N}) when N =< 0 ->
    {reply, noalloc, S};
handle_call({sync, Args}, _From, S = #state{limit=N, sup=Sup, refs=R}) when N > 0 ->
    {ok, Pid} = supervisor:start_child(Sup, Args),
    Ref = erlang:monitor(process, Pid),
    {reply, {ok,Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}};
handle_call({sync, Args},  From, S = #state{queue=Q}) ->
    {noreply, S#state{queue=queue:in({From, Args}, Q)}};
handle_call(stop, _From, State) ->
    {stop, normal, ok, State};
handle_call(_Msg, _From, State) ->
    {noreply, State}.

handle_cast({async, Args}, S=#state{limit=N, sup=Sup, refs=R}) when N > 0 ->
    {ok, Pid} = supervisor:start_child(Sup, Args),
    Ref = erlang:monitor(process, Pid),
    {noreply, S#state{limit=N-1, refs=gb_sets:add(Ref,R)}};
handle_cast({async, Args}, S=#state{limit=N, queue=Q}) when N =< 0 ->
    {noreply, S#state{queue=queue:in(Args,Q)}};
handle_cast(_Msg, State) ->
    {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(_Reason, _State) ->
    ok.

handle_down_worker(Ref, S = #state{limit = L, sup = Sup, refs = Refs}) ->
    case queue:out(S#state.queue) of
        {{value, {From, Args}}, Q} ->
            {ok, Pid} = supervisor:start_child(Sup, Args),
            NewRef = erlang:monitor(process, Pid),
            NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)),
            gen_server:reply(From, {ok, Pid}),
            {noreply, S#state{refs = NewRefs, queue = Q}};
        {{value, Args}, Q} ->
            {ok, Pid} = supervisor:start_child(Sup, Args),
            NewRef = erlang:monitor(process, Pid),
            NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)),
            {noreply, S#state{refs = NewRefs, queue = Q}};
        {empty, _} ->
            {noreply, S#state{limit = L + 1, refs = gb_sets:delete(Ref, Refs)}}
    end.

摘自《learn you some Erlang for great good》,最近出了中文版本,人民郵電出版社的《Erlang趣學指南》

相關文章
相關標籤/搜索