在團隊協同工具worktile的使用過程當中,你會發現不管是右上角的消息通知,仍是在任務面板中拖動任務,還有用戶的在線狀態,都是實時刷新。Worktile中的推送服務是採用的是基於xmpp協議、erlang語言實現的ejabberd,並在其源碼基礎上,結合咱們的業務,對源碼做了修改以適配咱們自身的需求。另外,基於amqp協議也能夠做爲實時消息推送的一種選擇,踢踢網就是採用rabbitmq+stomp協議實現的消息推送服務。本文將結合我在worktile和踢踢網的項目實踐,介紹下消息推送服務的具體實現。html
相較於手機端的消息推送(通常都是以socket方式實現),web端是基於http協議,很難像tcp同樣保持長鏈接。但隨着技術的發展,出現了websocket, comet等新的技術能夠達到相似長鏈接的效果,這些技術大致可分爲如下幾類:java
短輪詢。頁面端經過js定時異步刷新,這種方式實時效果較差。node
長輪詢。頁面端經過js異步請求服務端,服務端在接收到請求後,若是該次請求沒有數據,則掛起此次請求,直到有數據到達或時間片(服務端設定)到,則返回本次請求,客戶端接着下一次請求。示例以下:mysql
服務端的實現中,不管採用ejabberd仍是rabbitmq,都是基於erlang語言開發的,因此必須安裝erlang運行時環境。Erlang是一種函數式語言,具備容錯、高併發的特色,藉助OTP的函數庫,很容易構建一個健壯的分佈式系統。目前,基於erlang開發的產品有,數據庫方面:Riak(Dynamo實現)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中間件有rabbitmq等。對於服務端程序員來講,erlang提供的高併發、容錯、熱部署等特性是其餘語言沒法達到的。不管在實時通訊仍是在遊戲程序中,用erlang能夠很容易爲每個上線用戶建立一個對應的process,對一臺4核8個G的服務器來講,承載上百萬個這樣的process是很是輕鬆的事。下圖是erlang程序發起process的通常性示意圖:nginx
如圖所示,Session manager(or gateway)負責爲每一個用戶(uid)建立相對應的process, 並把這個對應關係(map)存放到數據表中。每一個process則對應用戶數據,而且他們之間能夠相互發送消息。Erlang的優點就是在內存足夠的狀況下建立上百萬個這樣的process,並且它的建立和銷燬比java的thread要輕量的多,二者不是一個數量級的。git
好了,咱們如今開始着手erlang環境的搭建(實驗的系統爲ubuntu12.04, 4核8個G內存):程序員
一、依賴庫安裝github
sudo apt-get install build-essential sudo apt-get install libncurses5-dev sudo apt-get install libssl-dev libyaml-dev sudo apt-get install m4 sudo apt-get install unixodbc unixodbc-dev sudo apt-get install freeglut3-dev libwxgtk2.8-dev sudo apt-get install xsltproc sudo apt-get install fop tk8.5 libxml2-utils
二、官網下載otp源碼包(http://www.erlang.org/download.html), 解壓並安裝:web
\>\> tar zxvf otpsrcR16B01.tar.gz \>\> cd otpsrcR16B01 \>\> configure \>\> make & make install
至此,erlang運行環境就完成了。下面將分別介紹rabbitmq和ejabberd構建實時消息服務。redis
RabbitMQ是在業界普遍應用的消息中間件,也是對AMQP協議實現最好的一種中間件。AMQP協議中定義了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等實體,他們的關係以下圖所示:
消息發佈者(Producer)鏈接交換器(Exchange), 交換器和消息隊列(Message Queue)經過key進行Binding,Binding是根據Exchange的類型(分爲fanout、direct、topic、header)分別對消息做不一樣形式的派發。Message Queue又分爲durable、temporary、auto-delete三種類型,durable queue是持久化隊列,不會由於服務shutdown而消失,temporary queue則服務重啓後會消失,auto-delete則是在沒有consumer鏈接時自動刪除。另外RabbitMQ有不少第三方插件,能夠基於AMQP協議基礎之上作出不少擴展的應用。下面咱們將介紹web stomp插件構建基於AMQP之上的stomp文本協議,經過瀏覽器websocket達到實時的消息傳輸。系統的結構如圖:
如圖所示,web端咱們使用stomp.js和sockjs.js與rabbitmq的web stomp plugin通訊,手機端能夠用stompj, gozirra(Android)或者objc-stomp(IOS)經過stomp協議與rabbitmq收發消息。由於咱們是實時消息系統一般都是要與已有的用戶系統結合,rabbitmq能夠經過第三方插件rabbitmq-auth-backend-http來適配已有的用戶系統,這個插件能夠經過http接口完成用戶鏈接時的認證過程。固然,認證方式還有ldap等其餘方式。下面介紹具體步驟:
\>\> tar zxf rabbitmq-server-x.x.x.tar.gz \>\> cd rabbitmq-server-x.x.x \>\> make & make install
\>\> cd /path/to/your/rabbitmq \>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp \>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples \>\> ./sbin/rabbitmqctl stop \>\> ./sbin/rabbitmqctl start \>\> ./sbin/rabbitmqctl status
將會顯示下圖所示的運行的插件列表
\>\> cd /path/to/your/rabbitmq/plugins \>\>wget http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez \>\> cd .. \>\> ./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http
編輯rabbitmq.config文件(默認存放於/etc/rabbitmq/下),添加:
[ ... {rabbit, [{auth_backends, [rabbit_auth_backend_http]}]}, ... {rabbitmq_auth_backend_http, [{user_path, 「http://your-server/auth/user」}, {vhost_path, 「http://your-server/auth/vhost」}, {resource_path, 「http://your-server/auth/resource」} ]} ... ].
其中,user_path是根據用戶名密碼進行校驗,vhost_path是校驗是否有權限訪問vhost, resource_path是校驗用戶對傳入的exchange、queue是否有權限。我下面的代碼是用nodejs實現的這三個接口的示例:
var express = require('express'); var app = express(); app.get('/auth/user', function(req, res){ var name = req.query.username; var pass = req.query.password; console.log("name : " + name + ", pass : " + pass); if(name === 'guest' && pass === "guest"){ console.log("allow"); res.send("allow"); }else{ res.send('deny'); } }); app.get('/auth/vhost', function(req, res){ console.log("/auth/vhost"); res.send("allow"); }); app.get('/auth/resource', function(req, res){ console.log("/auth/resource"); res.send("allow"); }); app.listen(3000);
...... var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp'); var client = Stomp.over(ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0; client.heartbeat.incoming = 0; client.debug = pipe('#second'); var print_first = pipe('#first', function(data) { client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data); }); var on_connect = function(x) { id = client.subscribe("/exchange/feed/user_x", function(d) { print_first(d.body); }); }; var on_error = function() { console.log('error'); }; client.connect('guest1', 'guest1', on_connect, on_error, '/'); ......
須要說明的時,在這裏咱們首先要在rabbitmq實例中建立feed這個exchange,咱們用stomp.js鏈接成功後,根據當前登錄用戶的id(user_x)
綁定到這個exchange,即 subscribe("/exchange/feed/user_x", ...)
這個操做的行爲,這樣在向rabbitmq中feed exchange發送消息並指定用戶id(user_x)爲key,頁面端就會經過websocket實時接收到這條消息。
到目前爲止,基於rabbitmq+stomp實現web端消息推送就已經完成,其中不少的細節須要小夥伴們親自去實踐了,這裏就很少說了。實踐過程當中能夠參照官方文檔:
以上的實現是我本人在踢踢網時採用的方式,下面接着介紹一下如今在Worktile中如何經過ejabberd實現消息推送。
與rabbitmq不一樣,ejabberd是xmpp協議的一種實現,與amqp相比,xmpp普遍應用於即時通訊領域。Xmpp協議的實現有不少種,好比java的openfire,但相較其餘實現,ejabberd的併發性能無疑使最優秀的。Xmpp協議的前身是jabber協議,早期的jabber協議主要包括在線狀態(presence)、好友花名冊(roster)、IQ(Info/Query)幾個部分。如今jabber已經成爲rfc的官方標準,如rfc2799, rfc4622, rfc6121,以及xmpp的擴展協議(xep)。Worktile Web端的消息提醒功能就是基於XEP-012四、XEP-0206定義的BOSH擴展協議。
因爲自身業務的須要,咱們對ejabberd的用戶認證和好友列表模塊的源碼進行修改,經過redis保存用戶的在線狀態,而不是mnesia和mysql。另外好友這塊咱們是從已有的數據庫中(mongodb)中獲取項目或團隊的成員。Web端經過strophe.js來鏈接(http-bind),strophe.js能夠以長輪詢和websocket兩種方式來鏈接,因爲ejabberd尚未好的websocket的實現,就採用了BOSH的方式模擬長鏈接。整個系統的結構以下:
xmpp-framwork
鏈接, Android能夠用smack直接連ejabberd服務器集羣。這些都是現有的庫,無需對client進行開發。用戶認證直接修改了ejabberd_auth_internal.erl文件,經過mongodb驅動鏈接用戶庫,在線狀態等功能是新加了模塊,其部分代碼以下:
-module(wt_mod_proj). -behaviour(gen_mod). -behaviour(gen_server). -include("ejabberd.hrl"). -include("logger.hrl"). -include("jlib.hrl"). -define(SUPERVISOR, ejabberd_sup). ... -define(ONLINE, 1). -define(OFFLINE, 0). -define(BUSY, 2). -define(LEAVE, 3). ... %% API -export([start_link/2, get_proj_online_users/2]). %% gen_mod callbacks -export([start/2, stop/1]). %% gen_server callbacks -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). %% Hook callbacks -export([user_available/1, unset_presence/3, set_presence/4]). -export([get_redis/1, remove_online_user/3, append_online_user/3]). ... -record(state,{host = <<"">>, server_host, rconn, mconn}). start_link(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). user_available(New) -> LUser = New#jid.luser, LServer = New#jid.lserver, Proc = gen_mod:get_module_proc(LServer, ?MODULE), gen_server:cast(Proc, {user_available, LUser, LServer}). append_online_user(Uid, Proj, Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, {append_online_user, Uid, Proj}). remove_online_user(Uid, Proj, Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, {remove_online_user, Uid, Proj}). ... set_presence(User, Server, Resource, Packet) -> Proc = gen_mod:get_module_proc(Server, ?MODULE), gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}). ... start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, 2000, worker, [?MODULE]}, supervisor:start_child(?SUPERVISOR, ChildSpec). stop(Host) -> Proc = gen_mod:get_module_proc(Host, ?MODULE), gen_server:call(Proc, stop), supervisor:delete_child(?SUPERVISOR, Proc). init([Host, Opts]) -> MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>), RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST), RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT), ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100), ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50), ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50), MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST), MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT), {ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}), C = c(RedisHost, RedisPort), ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}. terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) -> ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100), ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50), ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50), eredis:stop(C), ok. ... handle_call({append_online_user, Uid, ProjId}, _From, State) -> C = State#state.rconn, Key = <<?PRE_RPOJ_ONLINE_USERS/binary, ProjId/binary>>, Resp = eredis:q(C, ["SADD", Key, Uid]), {reply, Resp, State}; handle_call({remove_online_user, Uid, ProjId}, _From, State) -> ... handle_call({get_proj_online_users, ProjId}, _From, State) -> ... handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) -> C = State#state.rconn, Key = <<?USER_PRESENCE/binary, User/binary>>, Pids = get_user_projs(User, Mongo), Cmd = get_proj_key(Pids, ["SUNION"]), case xml:get_subtag_cdata(Packet, <<"show">>) of <<"away">> -> eredis:q(C, ["SET", Key, ?LEAVE]); <<"offline">> -> ... handle_cast(_Msg, State) -> {noreply, State}. handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) -> case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of {'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]); _ -> ok end, {noreply, State}; handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. ...
其中,user\_available\_hook和sm\_remove\_connection\_hook
就是用戶上線和用戶斷開鏈接觸發的事件,ejabberd 中正是因爲這些hook,才能很容易擴展功能。
在用tsung對ejabberd進行壓力測試,測試機器爲4核心8G內存的普通PC,以3臺客戶機模擬用戶登陸、設置在線狀態、發送一條文本消息、關閉鏈接操做,在同時在線達到30w時,CPU佔用不到3%,內存大概到3個G左右,隨着用戶數增多,主要內存的損耗較大。因爲壓力測試比較耗時,再等到有時間的時候,會在作一些更深刻的測試。
對於ejabberd的安裝與集羣的搭建,你們能夠參照官方文檔,這裏再也不贅述。若是在使用過程當中有什麼問題,能夠加入worktile官方羣(110257147),進行討論。