Erlang Process input queue

http://www.cnblogs.com/me-sa/archive/2011/11/05/erlang0012.htmlhtml

     Erlang進程有本身的消息隊列來保存接收到的消息,新接收到的消息放在隊列的尾部。Erlang的接收原語receive就是用來從消息隊列中選擇性提取消息的。receive提取消息的過程是:從消息隊列的第一條消息開始匹配,若是有一條消息能夠匹配上就從消息隊列中移除,並執行相應的消息處理邏輯。若是沒有模式能夠匹配消息隊列中的消息,這條消息就會保留在消息隊列中。若是檢查到消息隊列中的最後一條消息尚未找到能夠匹配的消息,進程就會阻塞直到接收到下一條消息再一次出發提取過程。shell

    咱們能不能直觀的看到這個過程呢?Erlang對運行時信息的提取提供了很好的支持,咱們要查看的是一個進程在運行時的信息,使用的方法:erlang:process_info/1 .這個方法接收的參數就是進程的PID,返回的數據結果:cookie

[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcd,abcde,abcdef,abcdefg]},
{links,[]},{dictionary,[]},
{trap_exit,false},{error_handler,error_handler},
{priority,normal},{group_leader,<0.29.0>},
{total_heap_size,233},{heap_size,233},
{stack_size,1},{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]oop

    上面erlang:process_info(Pid).的執行結果中,紅色標出就是消息隊列中的消息數量,藍色標出的就是消息隊列的內容。下面咱們就經過一系列的Demo來理解Erlang進程消息隊列的處理機制.測試

 

 消息隊列堆積的狀況優化

      咱們先把進行測試的腳手架代碼準備好,邏輯很簡單若是接收到abc消息就輸出一下而後繼續接收消息,這經過尾遞歸loop方法就能夠實現。ui

-module(looper).this

-compile(export_all).spa

loop() ->
     receive 
        abc -> 
                  io:format("Receive abc. ~n "),
                  loop();
        stop-> 
                  io:format("stop"),
                  stop
      end.    rest

 

    模擬消息堆積的方法很簡單,咱們不停向這個進程發送沒法匹配的消息就能夠了,而後咱們查看進程的運行時狀態,下面是shell中執行的結果,你們看註釋:

(demo@192.168.1.123)1> Pid= spawn(looper,loop,[]).    %%啓動進程返回進程PID
<0.38.0>
(demo@192.168.1.123)2> Pid!abc.                              %%向進程發送abc消息
Receive abc.                                                              %% abc消息正常處理
abc
(demo@192.168.1.123)4> Pid!abcd.                            %%向進程發送消息abcd
abcd
(demo@192.168.1.123)5> Pid!abcde.                            %%進程發送消息abcde
abcde
(demo@192.168.1.123)6> Pid!abcdef.
abcdef
(demo@192.168.1.123)7> Pid!abcdefg.
abcdefg
(demo@192.168.1.123)9> erlang:process_info(Pid).      %%查看進程狀態
[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcd,abcde,abcdef,abcdefg]},              %%這裏能看到積壓的消息
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)10> Pid!haha.            %%再發送一條垃圾消息haha
haha
(demo@192.168.1.123)11> erlang:process_info(Pid).
[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,5},
{messages,[abcd,abcde,abcdef,abcdefg,haha]},   %%能夠看到haha消息被放在了消息隊列的隊尾
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)12>

 

按照優先級接收消息

 

下面的代碼範例來自LYSE,能夠看到首先是處理高優先級的消息,若是高優先級的消息處理完畢以後,處理低優先級的消息.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
important() ->
     receive
         {Priority, Message} when Priority > 10 ->
             [Message | important()]
     after 0 ->
         normal()
     end.
  
normal() ->
     receive
         {_, Message} ->
             [Message | normal()]
     after 0 ->
         []
     end.

  

 


  定時清理堆積的垃圾消息

   堆積的垃圾消息會慢慢吃掉內存,並且堆積的消息在Selective Receive過程當中會不斷地被遍歷檢查,成爲負擔,咱們如今就添加一個定時清理堆積消息的邏輯:

-module(looper).

-compile(export_all).

      
%% P= spawn(looper,loop2,[]).      
%% erlang:process_info(P).
%%
loop2() -> 
     receive 
             abc -> 
                  io:format("Receive abc. ~n "),
                  loop2();
        stop-> 
                  io:format("stop"),
                  stop
         after 15000 -> 
             receive 
                      Any ->
                               io:format("Receive ~p ~n ",[ Any])
              end,         
        
                    io:format("clear . ~n "),
                    loop2()
      end. 

  作法也很簡單,添加一個超時,超時以後用一個能夠接收任意消息(Any)的receive代碼段來從消息隊列中提取一條消息.爲了留出足夠的時間來輸入命令,咱們把超時時間定爲15000(15s).好了,啓動shell從新來過,你們仍是看我添加的註釋:

D:\Erlang\Einaction>"C:\Program Files (x86)\erl5.8.2\bin\erl.exe" -name demo@192.168.1.123 -setcookie 123
Eshell V5.8.2 (abort with ^G)
(demo@192.168.1.123)1> P= spawn(looper,loop2,[]).  %使用loop2建立進程
<0.38.0>
(demo@192.168.1.123)2> P!abcd.                    %快速輸入下面的命令向進程發送沒法匹配的垃圾進程
abcd
(demo@192.168.1.123)3> P!abcde.
abcde
(demo@192.168.1.123)4> P!abcdef.
abcdef
(demo@192.168.1.123)5> P!abcdefg.
abcdefg
(demo@192.168.1.123)6> P!abcdefgg.
abcdefgg
(demo@192.168.1.123)7> erlang:process_info(P).Receive abcd      %咱們輸入完erlang:process_info(P).的時候剛好遇到了清理邏輯執行
(demo@192.168.1.123)7> clear .
(demo@192.168.1.123)7>
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcde,abcdef,abcdefg,abcdefgg]}, %%除了已經被移除掉的abcd 其它垃圾消息還堆積在進程中
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,39},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)8> Receive abcde
(demo@192.168.1.123)8> clear .
(demo@192.168.1.123)8> erlang:process_info(P). %上面又執行了一次清理邏輯,咱們再次查看進程信息
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,3},
{messages,[abcdef,abcdefg,abcdefgg]},  %%看到了吧,又少了一條消息垃圾消息abcde
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,69},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)9> Receive abcdef
(demo@192.168.1.123)9> clear .
(demo@192.168.1.123)9> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,2},
{messages,[abcdefg,abcdefgg]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,99},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)10> P!abc.
Receive abc.
abc
(demo@192.168.1.123)11> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,2},
{messages,[abcdefg,abcdefgg]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,115},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)12> Receive abcdefg
(demo@192.168.1.123)12> clear .
(demo@192.168.1.123)12> Receive abcdefgg
(demo@192.168.1.123)12> clear .
(demo@192.168.1.123)12> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},                   %好了,執行到這裏消息隊列已經清空
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,175},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)13>

 

 從消息隊列中提取消息的慢鏡頭

  receive原語執行過程當中,遇到匹配的消息,提取消息進行處理的過程稍縱即逝,咱們如今經過添加sleep,來看看這個過程的慢鏡頭.注意下面代碼的修改

-module(looper).

-compile(export_all).

     

%% P= spawn(looper,loop3,[]).      
%% erlang:process_info(P).
%%      
loop3() -> 
     receive 
             abc -> 
                  io:format("Receive abc. ~n "),
                  timer:sleep(10000),
                  io:format("sleep after receive abc done. ~n "),
                  loop3();
        stop-> 
                  io:format("stop"),
                  stop
         after 25000 -> 
             receive 
                      Any ->
                               io:format("Receive ~p ~n ",[ Any])
              end,                 
                    io:format("clear . ~n "),
                    loop3()
      end.                  

下面的shell中,咱們向進程發送了一批能夠正常處理的abc消息,可是因爲處理邏輯中的sleep,消息提取會被拖慢,這個時間咱們能夠執行process_info

 

D:\Erlang\Einaction>"C:\Program Files (x86)\erl5.8.2\bin\erl.exe" -name demo@192.168.1.123 -setcookie 123
Eshell V5.8.2 (abort with ^G)
(demo@192.168.1.123)1> P= spawn(looper,loop3,[]).
<0.38.0>
(demo@192.168.1.123)2> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,9},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)3> P!abc.  
Receive abc.
abc
(demo@192.168.1.123)4> P!abc.
abc
(demo@192.168.1.123)5> P!abc.
abc
(demo@192.168.1.123)6> P!abc.
abc
(demo@192.168.1.123)7> P!abc.
abc
(demo@192.168.1.123)8> P!abc.
abc
(demo@192.168.1.123)9> P!abc.
abc
(demo@192.168.1.123)10> P!abc.
abc
(demo@192.168.1.123)11> P!abcd.
abcd
(demo@192.168.1.123)12> P!abcdd.
abcdd
(demo@192.168.1.123)13> erlang:process_info(P).sleep after receive abc done.
(demo@192.168.1.123)13> Receive abc.
(demo@192.168.1.123)13>
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,8},
{messages,[abc,abc,abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,66},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)14> erlang:process_info(P).
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,8},
{messages,[abc,abc,abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,66},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)15> sleep after receive abc done.
(demo@192.168.1.123)15> Receive abc.
(demo@192.168.1.123)15> sleep after receive abc done.
(demo@192.168.1.123)15> Receive abc.
(demo@192.168.1.123)15> erlang:process_info(P).
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,6},
{messages,[abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,131},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abcd
(demo@192.168.1.123)16> clear .
(demo@192.168.1.123)16> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,1},
{messages,[abcdd]},  %執行到這裏只有一條垃圾數據堆積了
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,610},
{heap_size,233},
{stack_size,1},
{reductions,305},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,2}]},
{suspending,[]}]
(demo@192.168.1.123)17> Receive abcdd
(demo@192.168.1.123)17> clear .
(demo@192.168.1.123)17> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},  %%消息隊列已經清空
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,610},
{heap_size,233},
{stack_size,1},
{reductions,335},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,2}]},
{suspending,[]}]
(demo@192.168.1.123)18>

  日出而做子夜還,偶得週末半日閒,各位週末愉快!

 

2012-4-28 17:45補充

Process Data 
For each process there will be at least one =proc_stack and one =proc_heap tag followed by the raw memory information for the stack and heap of the process. 
For each process there will also be a =proc_messages tag if the process' message queue is non-empty and a =proc_dictionary tag if the process' dictionary (the put/2 and get/1 thing) is non-empty.

The raw memory information can be decoded by the Crashdump Viewer tool. You will then be able to see the stack dump, the message queue (if any) and the dictionary (if any). The stack dump is a dump of the Erlang process stack. Most of the live data (i.e., variables currently in use) are placed on the stack; thus this can be quite interesting. One has to "guess" what's what, but as the information is symbolic, thorough reading of this information can be very useful. As an example we can find the state variable of the Erlang primitive loader on line (5) in the example below:

(1) 3cac44 Return addr 0x13BF58 (<terminate process normally>) 
(2) y(0) 
["/view/siri_r10_dev/clearcase/otp/erts/lib/kernel/ebin","/view/siri_r10_dev/ 
(3) clearcase/otp/erts/lib/stdlib/ebin"] 
(4) y(1) <0.1.0> 
(5) y(2) 
{state,[],none,#Fun<erl_prim_loader.6.7085890>,undefined,#Fun<erl_prim_loader.7.900
0327>,#Fun<erl_prim_loader.8.116480692>,#Port<0.2>,infinity,#Fun<erl_prim_loader.9.
10708760>} 
(6) y(3) infinity 
When interpreting the data for a process, it is helpful to know that anonymous function objects (funs) are given a name constructed from the name of the function in which they are created, and a number (starting with 
0) indicating the number of that fun within that function.

 

2012-08-30 16:23 更新

 
      若是須要按照優先級收發消息,能夠使用二叉堆(min_heap)或者gb_trees,接收到的消息填充到這樣的結構裏面(把優先級數值放在第一個key用來排序).使用的時候只須要檢索最小或者最大值就能夠了.大部分狀況下這種方法均可以實現按優先級接受消息,但收到大量高優先級消息的狀況就會變慢;從R14A開始,Erlang編譯器有一個優化減小了消息接收時進程之間的反覆通訊,在消息通訊之初會建立一個reference附加在往來的消息中,這樣在reference建立成功以前自動過濾掉全部不包含這個Reference特徵的消息.
 
複製代碼
%% optimized in R14A
optimized(Pid) ->
    Ref = make_ref(),
    Pid ! {self(), Ref, hello},
    receive
        {Pid, Ref, Msg} ->
            io:format("~p~n", [Msg])
    end.
複製代碼
相關文章
相關標籤/搜索