原文:A Million-user Comet Application with Mochiweb, Part 2html
參考資料:Comet--基於 HTTP 長鏈接、無須在瀏覽器端安裝插件的「服務器推」技術爲「Comet」前端
MochiWeb--創建輕量級HTTP服務器的Erlang庫node
在第一部分 , 咱們構建了一個每10秒向客戶端發送一條消息的mochiweb comet應用(沒什麼用處)。咱們微調了一下linux內核,作了一個可以創建大量網絡鏈接以測試應用性能和所耗內存的工具 。咱們發現每一個鏈接花費大約45K內存。linux
本系列的第二部分講的主要是把應用變得更加有用,更加節省內存:web
用一個login/logout/send API實現一個消息路由器更新mochiweb應用使之可以從路由器接收消息創建一個分佈式erlang系統,這樣咱們能夠在不一樣的節點和主機上運行路由器寫一個能給路由器發送大量無用信息的工具超過24小時的內存用量圖,優化mochiweb應用以節約內存shell
這就意味着咱們須要把消息發送邏輯從mochiweb應用中剝離出來。利用第一部分的壓力測試工具,咱們能夠創建一個更接近產品級別的基準測試。瀏覽器
實現消息路由器bash
路由器的API只有3個函數:服務器
login(Id, Pid) 爲Id註冊一個接收消息的進程(Pid )logout(Pid) 中止接受消息send(Id, Msg)向已登陸客戶端(Id)發送消息(Msg)網絡
注意,從設計上來講,多個不一樣的用戶登錄到同一個進程是有可能的。
這個實例路由器模塊用了2個ets表存儲Pids和Ids的雙向映射(pid2id和id2pid在下面的#state記錄定義中)。
router.erl:
-module( router) .-behaviour( gen_server) . -export([start_link /0]) .-export([ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 , code_change/3]) . -export([ send/2 , login/2 , logout/1]) . -define(SERVER , global:whereis_name( ?MODULE)) . % will hold bidirectional mapping between id <–> pid-record( state, { pid2id, id2pid}) . start_link() -> gen_server :start_link({ global, ?MODULE} , ?MODULE , [] , []) . % sends Msg to anyone logged in as Idsend(Id , Msg) -> gen_server :call( ?SERVER , { send, Id , Msg}) . login(Id , Pid) when is_pid(Pid) -> gen_server :call( ?SERVER , { login, Id , Pid}) . logout(Pid) when is_pid(Pid) -> gen_server :call( ?SERVER , { logout, Pid}) . %% init([]) -> % set this so we can catch death of logged in pids: process_flag( trap_exit, true) , % use ets for routing tables { ok, #state{ pid2id = ets:new( ?MODULE , [ bag]) , id2pid = ets:new( ?MODULE , [ bag]) } } . handle_call({ login, Id , Pid} , _From , State) when is_pid(Pid) -> ets :insert(State #state.pid2id, {Pid , Id}) , ets:insert(State #state.id2pid, {Id , Pid}) , link(Pid) , % tell us if they exit, so we can log them out io:format("~w logged in as ~w\n " ,[Pid , Id]) , { reply, ok, State} ; handle_call({ logout, Pid} , _From , State) when is_pid(Pid) -> unlink(Pid) , PidRows = ets:lookup(State #state.pid2id, Pid) , casePidRowsof [] -> ok ; _ -> IdRows = [{I ,P} || {P ,I} <- PidRows] , % invert tuples % delete all pid->id entries ets:delete(State #state.pid2id, Pid) , % and all id->pid [ ets:delete_object(State #state.id2pid, Obj) || Obj <- IdRows] end , io:format("pid ~w logged out\n " ,[Pid]) , { reply, ok, State} ; handle_call({ send, Id , Msg} , _From , State) -> % get pids who are logged in as this Id Pids = [P || { _Id , P} <- ets:lookup(State #state.id2pid, Id)] , % send Msg to them all M = { router_msg, Msg} , [Pid ! M || Pid <- Pids] , { reply, ok, State} . % handle death and cleanup of logged in processeshandle_info(Info , State) -> caseInfoof {‘EXIT’ , Pid , _Why} -> % force logout: handle_call({ logout, Pid} , blah, State) ; Wtf -> io :format("Caught unhandled message: ~w\n " , [Wtf]) end , { noreply, State} . handle_cast( _Msg , State) -> { noreply, State} .terminate( _Reason , _State) -> ok .code_change( _OldVsn , State , _Extra) -> { ok, State} .
更新mochiweb應用
讓咱們假設用戶是由基於連入mochieweb的URl中的Id號所描述的,咱們用那個id向消息路由器註冊。 取代阻塞10秒後發送消息,mochiweb的loop循環將組塞在從路由器接收消息上,路由器給mochiweb進程發送消息後mochiweb進程就會向客戶端發送Http數據塊:
客戶端從http://localhost:8000/test/123鏈接mochiwebMochiweb應用爲id爲123的用戶的那個鏈接向消息路由器註冊進程(pid)假如你用id爲123向路由器發送一條消息,他將轉發到正確的mochiweb進程,繼而消息會出如今那個用戶的瀏覽器上
這是mochiconntest_web.erl的更新版本:
-module( mochiconntest_web) . -export([ start/1 , stop/0 , loop/2]) . %% External API start(Options) -> {DocRoot , Options1} = get_option( docroot, Options) , Loop = fun (Req) -> ?MODULE :loop(Req , DocRoot) end , % we’ll set our maximum to 1 million connections. (default: 2048) mochiweb_http:start([{ max, 1000000} , { name, ?MODULE} , { loop, Loop} | Options1]) . stop() -> mochiweb_http :stop( ?MODULE) . loop(Req , DocRoot) -> "/" ++ Path = Req :get( path) , caseReq :get( method)of Method when Method =:= ‘GET’ ; Method =:= ‘HEAD’ -> casePathof "test/" ++ Id -> Response = Req :ok({"text/html; charset=utf-8" , [{"Server" ,"Mochiweb-Test"}] , chunked}) , % login using an integer rather than a string {IdInt , _} = string:to_integer(Id) , router:login(IdInt , self()) , feed(Response , IdInt , 1) ; _ -> Req :not_found() end ; ‘POST’ -> casePathof _ -> Req :not_found() end ; _ -> Req :respond({501 , [] , []}) end . feed(Response , Id , N) -> receive { router_msg, Msg} -> Html = io_lib:format("Recvd msg #~w: ‘~s’" , [N , Msg]) , Response :write_chunk(Html) end , feed(Response , Id , N+1) . %% Internal API get_option(Option , Options) -> { proplists:get_value(Option , Options) , proplists:delete(Option , Options)} .
動起來!
如今讓咱們讓它活起來 - 咱們用兩個erlang shell, 一個用於mochiweb,一個用於路由器。 編輯start-dev.sh , 用於啓動mochiweb, 下面的額外參數是用於erl的:
-sname n1 命名節點‘n1′+K true 使kernel-poll有效。 當處理當量的鏈接時看起來不是那麼的不知所措+P 134217727 缺省的你能調度的最大進程數爲 32768. 想像一下咱們每一個鏈接就用一個進程(我不知道不那樣作的更好緣由) 我建議設置這個參數爲最大的可能值。 根據 「man erl」,134,217,727 是最大的
如今運行make && ./start-dev.sh 你將看到一個像(n1@localhost)1>的提示符 -你的mochiweb應用已經那個運行起來了,erlang節點也有了名字。
如今運行另外的erlang shell,就像這樣:
erl -sname n2
如今兩個erlang實例彼此不知道對方,更正它:
(n2@localhost)1> nodes().
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]
如今從這個shell上編譯啓動路由器:
(n2@localhost)4> c(router).
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}
如今更好玩點, 從瀏覽器中執行 http://localhost:8000/test/123 (或者從終端執行lynx --source "http://localhost:8000/test/123" )。 檢查運行路由器的shell,你將看到已經有一個用戶登錄了。
你如今能夠向路由器發送消息而且在瀏覽器上看到她們。如今只是發送字符串,由於咱們在feed函數中用~s 來格式化io_lib:fomart中的參數,原子將使其崩潰:
借用你運行路由器的shell:
(n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").
檢查你的瀏覽器,你已經獲得了comet,呵呵
在分佈式erlang系統中運行
感受上路由器和mochiweb前端運行在不一樣的機器上。 假設你有一對備用機用來測試,你應該把erlangshell做爲分佈式節點啓動,也就是說用 -name n1@host1.example.com 取代 -sname n1 (n2也同樣)。確信他們能夠看到彼此,就像上面似的用 net_adm:ping(...) 。
注意router.erl中的16行, 路由器進程的名字(’router’)被註冊成全局的,由於咱們在對gen_server的調用中用隨後的宏去標誌和定位路由器,它已經在分佈式系統中很好的工做了:
-define(SERVER, global:whereis_name(?MODULE))。
在分佈式系統中爲進程註冊全局名是elang爲你作的很天然的事情之一。
生成大量信息
在實際環境中咱們可能看到像用例模型似的長尾想象,有一些很活躍的用戶和不少不活躍用戶。可是在這個測試中咱們將不分青紅皁白的爲隨機用戶生成無用的消息。
msggen.erl:
-module( msggen) .-export([ start/3]) . start(0 , _, _) -> ok ;start(Num , Interval , Max) -> Id = random:uniform(Max) , router:send(Id , "Fake message Num = " ++ Num) , receiveafterInterval -> start(Num-1 , Interval , Max)end .
這將向id在1到max之間的隨機用戶發送Num個消息,每次發送等待Interval 毫秒。
你能夠看到這些東西假如你運行路由器和mochiweb應用後用瀏覽器鏈接http://localhost:8000/test/3之 後執行
erl -sname test (test@localhost)1> net_adm:ping(n1@localhost).pong (test@localhost)2> c(msggen).{ok,msggen} (test@localhost)3> msggen:start(20, 10, 5).ok
這將向隨機id在1-5之間的用戶發送20條消息,每條消息之間有10毫秒等待。 Id 3有機會收到一條或者四條消息。
咱們能夠均等的並行運行一些進程以模擬多個消息源。這裏的例子是生成10個進程,每一個進程發送20條消息到1-5號用戶,每條消息之間間隔100毫秒:
[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.\n", [self()]) end) || _ <- lists:seq(1,10) ].
[<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.
再次感覺C10K測試
如今咱們須要運行另一個有更大伸縮行的測試;客戶端連接到mochiweb應用,mochiweb把他們註冊到路由器。咱們能生成更多的虛假消息來考驗路由器,路由器將把這些消息發送到任何註冊的客戶端。讓咱們再次運行在Part 1 使用的10,000個併發用戶的測試,可是此次咱們將在傳輸大量消息以前保持全部的客戶鏈接一段時間。
假設你按照在 部分提到的操做調整了你的內核,增長了最大文件限制,這很簡單。你已經運行了mochiweb應用和路由器,讓咱們查看下流量狀況。
在沒有任何客戶鏈接的狀況下, 每一個pochiweb beam進程用了大約40MB內存(常駐):
$ ps -o rss= -p `pgrep -f 'sname n1'`
40156
帶‘sname n1'的greps命令是爲了獲得咱們的mochiweb erlang進程ID的,而後用帶有格式化選項的ps命令打印常駐內存大小 (KB)
我組合出了這討厭的每60秒用一行打印時間戳, 當前內存用量 (常駐 KB), 當前創建的鏈接數的語句 - 在另一個運行mochiweb的機子的終端中聽任他運行:
假若有誰知道爲一個進程長時間動態剝離出內存用量更好的方法,請留個言。
如今在一個新的erl shell中運行第一部分使用的floodtest工具:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
他已經達到了10K個併發鏈接(加上我用firefox打開的這個),mochiweb常駐內存的大小也在90MB左右(90964KB)。
也就是100個進程各自以每秒10條消息的速度向隨機的id爲1到10000的客戶端發送1百萬條消息。那就意味着路由器每秒鐘看到1000條消息,平均咱們10K個客戶端每10秒獲得一條信息。
當10K個客戶端已經鏈接後,你能看到內存已經由40MB漲到了90M,運行更長一段時間漲到了125M。
我運行這個24小時, mochiweb進程的內存用量信息被寫到mochimem.log文件中。這是10000個鏈接客戶端,每秒1000條消息發送給隨機的客戶端。
下面的bash/awk語句是爲了把mochimem.log信息轉成圖例:
內存用量,c10k鏈接, 1000條消息/秒,24小時
這個圖展現內存用量 (10k 活躍鏈接, 1000條消息/秒) 24小時持續在250M。有兩個大點的下掉, 一個在測試開始一個在結束, 這是當在mochiweb進程中處於好奇運行這個:
erl> [erlang:garbage_collect(P) || P <- erlang:processes()].
他迫使全部的進程進行垃圾回收,這收回大約100MB的內存-下面咱們研究一些不用手動強迫進行垃圾回收的方法以節約內存。
在mochiweb減小內存的方法
看起來mochiweb應用只是發送消息而後當即跌掉她們,內存用量不該該隨消息發送數的增加而增加。
對於Erlang內存管理我是個新手,可是我繼續假設可以頻繁的進行垃圾回收,這將容許咱們剩下大量內存, 最終讓咱們能用比較少的系統內存服務更多用戶。 咱們可能利用更多點的cpu佔用率, 可是是能夠接收的。
深挖
有這麼幾個選項:
erlang:system_flag(fullsweep_after, Number)
Number是一個標誌在沒有全掃描的狀況下多少次垃圾回收能夠作的一個非負常數。這個值適用於新進程;已經運行的進程不受影響。
在低內存的系統中(特別沒有虛擬內存),設置這個值爲0能夠幫助節約內存
另外一個設置次值可選的方法是經過(操做系統)環境變量ERL_FULLSWEEP_AFTER。
聽起來挺有意思,可是他僅僅適用於新進程並且將對虛擬機中的全部進程產生做用,只是除了咱們的mochiweb進程,呵呵
接下來
erlang:system_flag(min_heap_size, MinHeapSize)
爲進程設置缺省的堆大小。以字爲單位。新的min_heap_size僅僅影響當min_heap_size改變後生成的進程。經過spawn_opt/N or process_flag/2 min_heap_size能夠爲單獨進行設置
可能有用,可是我更願意確保咱們的mochiweb進程有一個比缺省值大點的內存堆。我更喜歡儘量避免爲了加spawn選項而對mochieweb源代碼打補丁
下面的吸引了個人眼球
erlang:hibernate(Module, Function, Args)
把正在調用的進程處於等待狀態,它的內存分配就會盡量的少,假如進程不想在短期內接收任何數據了那麼這是很是有用的。
進程在有消息發送過來時被喚醒, 跟着調用棧被清空,帶有由Args給定參數的Module:Function將獲得控制權, 意味着這個進程當函數返回時將被終止。 這樣erlang:hibernate/3將永遠也返回不到調用他的地方
假如進程在消息隊列裏有任何消息,進程也會以上面的方式被當即喚醒。
用更專業的術語來講,erlang:hibernate/3 作了下面幾點。他丟棄進程的調用棧以後進行垃圾回收。在回收後,全部活躍數據都在一個連續的堆中。這個堆而後把空間壓縮到恰好能容納的了活躍數據的尺寸 (即便這個尺寸比進程的最小堆的還小).
假如進程活躍數據的大小小於最小堆尺寸,第一次垃圾回收也會在進程被喚醒後發生,這樣確保堆尺寸會變到不小於最小堆尺寸。
注意,清空調用棧意味着任何異常處理都被移除並在休眠後從新插入。一個影響是進程要用proc_lib啓動(間接的, gen_server也能夠), 用proc_lib:hibernate/3代替主要是確保異常處理在進程被喚醒後可以繼續工做。
聽起來很合理 - 讓咱們發送完每一個消息後試着休眠,看到底發生了什麼
編輯mochiconntest_web.erl ,改變以下:
讓 feed(Response, Id, N)函數的 最後一行調用hibernate,而不是調用他本身登進路由器後立馬調用hibernate,而不是調用feed並阻塞在receive上記住導出feed/3 ,這樣hibernate在喚醒時能夠回調回這個函數
用hibernation更新後的mochiconntest_web.erl :
-module( mochiconntest_web) . -export([ start/1 , stop/0 , loop/2 , feed/3]) . %% External API start(Options) -> {DocRoot , Options1} = get_option( docroot, Options) , Loop = fun (Req) -> ?MODULE :loop(Req , DocRoot) end , % we’ll set our maximum to 1 million connections. (default: 2048) mochiweb_http:start([{ max, 1000000} , { name, ?MODULE} , { loop, Loop} | Options1]) . stop() -> mochiweb_http :stop( ?MODULE) . loop(Req , DocRoot) -> "/" ++ Path = Req :get( path) , caseReq :get( method)of Method when Method =:= ‘GET’ ; Method =:= ‘HEAD’ -> casePathof "test/" ++ IdStr -> Response = Req :ok({"text/html; charset=utf-8" , [{"Server" ,"Mochiweb-Test"}] , chunked}) , {Id , _} = string:to_integer(IdStr) , router:login(Id , self()) , % Hibernate this process until it receives a message: proc_lib:hibernate( ?MODULE , feed, [Response , Id , 1]) ; _ -> Req :not_found() end ; ‘POST’ -> casePathof _ -> Req :not_found() end ; _ -> Req :respond({501 , [] , []}) end . feed(Response , Id , N) -> receive { router_msg, Msg} -> Html = io_lib:format("Recvd msg #~w: ‘~w’<br/>" , [N , Msg]) , Response :write_chunk(Html) end , % Hibernate this process until it receives a message: proc_lib:hibernate( ?MODULE , feed, [Response , Id , N+1]) . %% Internal API get_option(Option , Options) -> { proplists:get_value(Option , Options) , proplists:delete(Option , Options)} .
我作了這些改變,運行make從新構建mochiweb,而後重作一樣的c10k測試 (1000條消息/秒 24小時).
運行24小時後的結果 w/ proc_lib:hibernate()
內存用量 c10k, 1000條消息/秒, 24小時, 用hibernate()
恰如其分的,用了hibernate,10K個鏈接的mochiweb應用的內存用量維持在78MB這個水平, 要比咱們在
看到的450MB好的多。CPU佔用率也沒明顯增長。
總結
咱們基於mochiweb作了個comet應用,他讓咱們推送任意消息給由ID標誌的客戶端。在以每秒1000條消息的速度推送24小時後, 10,000個鏈接用戶,咱們發現它用了80MB內存,或者說每一個用戶8KB 。 咱們一樣也作了很漂亮的圖。
相對於在
咱們所看到的每一個用戶45KB,這是一個很大的改進。這些優化節省是歸因於讓咱們的應用表現的更加貼近實際, 爲mochiweb進程在每條消息之間用 hibernate。
下一步
後續, 我將調整它到一百萬個鏈接客戶端。我將部署這個測試應用到擁有充沛內存的多核64位服務器上 。這將展現有什麼不一樣,若是有的話也能夠運行在64位虛擬機上。爲了模擬一百萬客戶鏈接我將詳細介紹一些額外的技巧和調整 。
這個應用將發展成一系列公共子系統,在那訂閱被關聯於用戶ID 並被存儲於這個應用, 而不是當用戶鏈接時由他們提供。咱們將調入一個典型的社會網絡數據集: friends。這將容許一個用戶用一個用戶ID登錄而且自動接收任何有他朋友生成的消息。