Erlang源碼閱讀筆記之proc_lib 上篇

概述

在erlang otp源碼中,隨處可見proc_lib的身影,能夠發現,在otp中spawn一個進程,都不是經過erlang:spawn函數,而是經過proc_lib:spawn。那經過這倆庫spawn出的進程有啥區別呢?咱們要理解erlang otp其它組件的源碼,必需要先去了解proc_lib作了什麼事情。node

官方文檔對proc_lib的解釋是這樣的:app

Functions for asynchronous and synchronous start of processes adhering to the OTP design principles.框架

就是說proc_lib提供符合OTP設計原則的同步或異步進程啓動函數。關於OTP設計原則,後續會單開一篇進行講述,它大體描述了一系列的代碼組織標準,包括進程、模塊以及項目目錄組織結構等等。經過proc_lib啓動的進程,會容易符合這種設計原則的要求。異步

proc_lib開放的API不少,但基本能夠分紅spawn、start、hibernate、init_ack、init_p、format、initial_call、stop這幾組,下面咱們一組一組的來看。async

spawn 組

spawn組的函數有spawn/1, spawn_link/1, spawn/2, spawn_link/2, spawn/3, spawn_link/3, spawn/4, spawn_link/4, spawn_opt/2, spawn_opt/3, spawn_opt/4, spawn_opt/5函數

先看最爲簡單的spawn/1:atom

-spec spawn(Fun) -> pid() when
      Fun :: function().

spawn(F) when is_function(F) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(?MODULE, init_p, [Parent,Ancestors,F]).

邏輯以下:spa

  1. 經過get_my_name函數獲取當前進程的註冊名。
  2. 經過get_ancestors()函數獲取進程祖先列表
  3. 經過原生的erlang:spawn/3建立一個新的進程,以init_p函數做爲新的進程邏輯,當前進程名、祖先進程列表和要執行的目標函數做爲init_p的執行參數。

get_my_name實現的細節:hibernate

get_my_name() ->
    case proc_info(self(),registered_name) of
	{registered_name,Name} -> Name;
	_                      -> self()
    end.

...

proc_info(Pid,Item) when node(Pid) =:= node() ->
    process_info(Pid,Item);
proc_info(Pid,Item) ->
    case lists:member(node(Pid),nodes()) of
	true ->
	    check(rpc:call(node(Pid), erlang, process_info, [Pid, Item]));
	_ ->
	    hidden
    end.

get_my_name()根據進程是本地仍是遠程,從process_info返回進程註冊名稱。 process_info是個頗有用的函數,process_info/1能夠返回指定進程的所有信息:設計

1> Pid = spawn(fun() -> receive hehe -> hehe end end).
<0.35.0>
2> process_info(Pid).
[{current_function,{prim_eval,'receive',2}},
 {initial_call,{erlang,apply,2}},
 {status,waiting},
 {message_queue_len,0},
 {messages,[]},
 {links,[]},
 {dictionary,[]},
 {trap_exit,false},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.26.0>},
 {total_heap_size,233},
 {heap_size,233},
 {stack_size,9},
 {reductions,17},
 {garbage_collection,[{min_bin_vheap_size,46422},
                      {min_heap_size,233},
                      {fullsweep_after,65535},
                      {minor_gcs,0}]},
 {suspending,[]}]
3> Pid ! hehe.
hehe
4> process_info(Pid).
undefined

而經過process_info/2能夠得到某個段的信息。

get_ancestors的實現細節:

get_ancestors() ->
    case get('$ancestors') of
	A when is_list(A) -> A;
	_                 -> []
    end.

本地版的很簡單,直接讀進程字典並判斷類型是否正確便可。另外還有一個遠程版本:

get_ancestors(Pid) ->
    case get_dictionary(Pid,'$ancestors') of
	{'$ancestors',Ancestors} ->
	    {ancestors,Ancestors};
	_ ->
	    {ancestors,[]}
    end.

...

get_dictionary(Pid,Tag) ->
    case get_process_info(Pid,dictionary) of
	{dictionary,Dict} ->
	    case lists:keysearch(Tag,1,Dict) of
		{value,Value} -> Value;
		_             -> undefined
	    end;
	_ ->
	    undefined
    end.

...

get_process_info(Pid, Tag) ->
 translate_process_info(Tag, catch proc_info(Pid, Tag)).

translate_process_info(registered_name, []) ->
  {registered_name, []};
translate_process_info(_ , {'EXIT', _}) ->
  undefined;
translate_process_info(_, Result) ->
  Result.

這一長串其實就作了一件事情,從進程字典中讀出'$ancestors'這個屬性,但由於涉及到遠程進程的訪問,沒法直接使用get,因此須要經過proc_info這個函數(process_info是能夠返回進程字典內容的),另外translate_process_info對proc_info返回的結果做了包裝,包括異常發生的狀況。

最後,咱們重點來看init_p這個函數,init_p裏面所包含的邏輯纔是proc_lib真正的對外出售內容 —— 符合OTP設計原則的進程。init_p的實現:

init_p(Parent, Ancestors, Fun) when is_function(Fun) ->
    put('$ancestors', [Parent|Ancestors]),
    Mfa = erlang:fun_info_mfa(Fun),
    put('$initial_call', Mfa),
    try
	Fun()
    catch
	Class:Reason ->
	    exit_p(Class, Reason, erlang:get_stacktrace())
    end.

邏輯以下:

  1. 將Parent進程合併入Ancestors列表並加入到進程字典中。
  2. 獲取目標函數的MFA信息(MFA即Module、Function、Args,咱們在erlang中會處處看到這個縮寫)
  3. 將MFA信息也保存到進程字典中。
  4. 在try catch中運行目標函數。

咱們看到init_p爲進程增長了更多元信息以及提供了一個錯誤處理框架,目標函數所發生的異常都會由exit_p來處理。exit_p的實現:

exit_p(Class, Reason, Stacktrace) ->
    case get('$initial_call') of
	{M,F,A} when is_atom(M), is_atom(F), is_integer(A) ->
	    MFA = {M,F,make_dummy_args(A, [])},
	    crash_report(Class, Reason, MFA, Stacktrace),
	    erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace);
	_ ->
	    %% The process dictionary has been cleared or
	    %% possibly modified.
	    crash_report(Class, Reason, [], Stacktrace),
	    erlang:raise(exit, exit_reason(Class, Reason, Stacktrace), Stacktrace)
    end.

exit_reason(error, Reason, Stacktrace) ->
    {Reason, Stacktrace};
exit_reason(exit, Reason, _Stacktrace) ->
    Reason;
exit_reason(throw, Reason, Stacktrace) ->
    {{nocatch, Reason}, Stacktrace}.

exit_p作了兩件事情,一是調用crash_report生成錯誤報告,二是經過exit_reason函數從新對異常緣由進行標準化包裝,而後再次拋出。

crash_report(exit, normal, _, _)       -> ok;
crash_report(exit, shutdown, _, _)     -> ok;
crash_report(exit, {shutdown,_}, _, _) -> ok;
crash_report(Class, Reason, StartF, Stacktrace) ->
    OwnReport = my_info(Class, Reason, StartF, Stacktrace),
    LinkReport = linked_info(self()),
    Rep = [OwnReport,LinkReport],
    error_logger:error_report(crash_report, Rep).

以上分析就是proc_lib:spawn/1所作的主要工做了,spawn/2也是大同小異,只不過增長了Node參數:

spawn(Node, F) when is_function(F) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,F]).

spawn/3和spawn/4所調用的init_p有些差異,目標函數是經過apply調用的:

spawn(M,F,A) when is_atom(M), is_atom(F), is_list(A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(?MODULE, init_p, [Parent,Ancestors,M,F,A]).

spawn(Node, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn(Node, ?MODULE, init_p, [Parent,Ancestors,M,F,A]).

...

init_p(Parent, Ancestors, M, F, A) when is_atom(M), is_atom(F), is_list(A) ->
    put('$ancestors', [Parent|Ancestors]),
    put('$initial_call', trans_init(M, F, A)),
    init_p_do_apply(M, F, A).

init_p_do_apply(M, F, A) ->
    try
	apply(M, F, A) 
    catch
	Class:Reason ->
	    exit_p(Class, Reason, erlang:get_stacktrace())
    end.

另外spawn_link作的事情也同樣,只不過是經過erlang:spawn_link函數來建立進程的,在當前進程和新建立的進程之間創建了一個link關係:

spawn_link(F) when is_function(F) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn_link(?MODULE, init_p, [Parent,Ancestors,F]).

關於spawn_opt,邏輯也同樣,重點在於能夠傳遞一些建立進程的控制參數,這裏並不許備去研究這些參數,後續會專門拿出一篇來說述。

start組

前面說過,proc_lib包含的是同步和異步的進程啓動API,spawn組的函數無疑都是異步的,而start組提供的都是同步的。相對於spawn,start組提供的函數要少一些:start/3, start/4, start/5, start_link/3, start_link/4, start_link/5

先看具備表明性的start/4的實現:

start(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) ->
    PidRef = spawn_mon(M, F, A),
    sync_wait_mon(PidRef, Timeout).

...

spawn_mon(M,F,A) ->
    Parent = get_my_name(),
    Ancestors = get_ancestors(),
    erlang:spawn_monitor(?MODULE, init_p, [Parent,Ancestors,M,F,A]).

...

sync_wait_mon({Pid, Ref}, Timeout) ->
    receive
	{ack, Pid, Return} ->
	    erlang:demonitor(Ref, [flush]),
	    Return;
	{'DOWN', Ref, _Type, Pid, Reason} ->
	    {error, Reason};
	{'EXIT', Pid, Reason} -> %% link as spawn_opt?
	    erlang:demonitor(Ref, [flush]),
	    {error, Reason}
    after Timeout ->
	    erlang:demonitor(Ref, [flush]),
	    exit(Pid, kill),
	    flush(Pid),
	    {error, timeout}
    end.

能夠看到,start/4的工做分爲兩個過程,首先是基於init_p建立新進程,並於當前進程建立monitor的關係;接下來會同步等待新進程反饋的信息,分ack、'DOWN'、'EXIT'、超時四種狀況,並作了不一樣的處理。須要注意的是,erlang:demonitor函數能夠取消進程的監控關係,若是超時,會強制殺掉目標進程,超時有一個細節就是flush函數:

flush(Pid) ->
    receive
	{'EXIT', Pid, _} ->
	    true
    after 0 ->
	    true
    end.

這個函數有什麼用途?由於當發生超時後,在咱們顯式調用demonitor函數結束以前,函數可能已經向監控進程發出了exit消息,這條消息就會積攢在當前進程的郵箱裏得不到消費,flush能夠清空郵箱中的這些消息,指定了after 0的receive語句會率先將郵箱裏全部的消息進行匹配後當即返回而不會阻塞。讓咱們再複習一下receive ... after的執行規則:

  1. 若是包含after,進入receive語句時會先啓動一個定時器。
  2. 取出郵箱裏面的第一個消息,並嘗試同Pattern一、Pattern2等模式匹配,若是匹配成功,系統會從郵箱刪除這個消息,並執行模式後面的表達式。
  3. 若是receive裏的全部模式都不匹配郵箱的第一個消息,系統會從郵箱中移除這個消息並把它放入一個保存隊列,而後繼續嘗試郵箱裏的第二個消息,這一過程會不斷重複,直到發現匹配消息或者郵箱裏的全部消息都檢查過了爲止。
  4. 若是郵箱裏的全部消息都不匹配,進程就會被掛起並從新調度,直到新的消息進入郵箱纔會繼續執行。新消息到達後,保存隊列裏的全部消息不會從新匹配,只有新消息纔會進行匹配。
  5. 一旦某個消息匹配成功,保存隊列裏的全部消息就會按照到達進程的順序從新進入郵箱,若是設置了定時器,就會清除它。
  6. 若是定時器在咱們等待消息時到期了,系統就會執行after後的表達式,並把全部保存的消息按照它們的到達進程的順序從新放回郵箱。

其它start的實現也是這兩個步驟,只不過參數重載有差別。再來看start_link的實現,這裏選取的是start_link/4:

start_link(M, F, A, Timeout) when is_atom(M), is_atom(F), is_list(A) ->
    Pid = ?MODULE:spawn_link(M, F, A),
    sync_wait(Pid, Timeout).

...

sync_wait(Pid, Timeout) ->
    receive
	{ack, Pid, Return} ->
	    Return;
	{'EXIT', Pid, Reason} ->
	    {error, Reason}
    after Timeout ->
	    unlink(Pid),
	    exit(Pid, kill),
	    flush(Pid),
	    {error, timeout}
    end.

也是分爲兩個步驟,重點是sync_link的實現,相對比於monitor,沒有了'DOWN'的狀況。

相關文章
相關標籤/搜索