Worktile中的實時消息推送服務實現

在團隊協同工具worktile的使用過程當中,你會發現不管是右上角的消息通知,仍是在任務面板中拖動任務,還有用戶的在線狀態,都是實時刷新。Worktile中的推送服務是採用的是基於xmpp協議、erlang語言實現的ejabberd,並在其源碼基礎上,結合咱們的業務,對源碼做了修改以適配咱們自身的需求。另外,基於amqp協議也能夠做爲實時消息推送的一種選擇,踢踢網就是採用rabbitmq+stomp協議實現的消息推送服務。本文將結合我在worktile和踢踢網的項目實踐,介紹下消息推送服務的具體實現。html

實時推送的幾種實現方式

相較於手機端的消息推送(通常都是以socket方式實現),web端是基於http協議,很難像tcp同樣保持長鏈接。但隨着技術的發展,出現了websocket, comet等新的技術能夠達到相似長鏈接的效果,這些技術大致可分爲如下幾類:java

  1. 短輪詢。頁面端經過js定時異步刷新,這種方式實時效果較差。node

  2. 長輪詢。頁面端經過js異步請求服務端,服務端在接收到請求後,若是該次請求沒有數據,則掛起此次請求,直到有數據到達或時間片(服務端設定)到,則返回本次請求,客戶端接着下一次請求。示例以下:mysql

comet

  1. Websocket。瀏覽器經過websocket協議鏈接服務端,實現了瀏覽器和服務器端的全雙工通訊。須要服務端和瀏覽器都支持websocket協議。
    以上幾種方式中,方式1實現較簡單,但效率和實時效果較差。方式2對服務端實現的要求比較高,尤爲是併發量大的狀況下,對服務端的壓力很大。方式3效率較高,但對較低版本的瀏覽器不支持,另外服務端也須要有支持websocket的實現。Worktile的web端實時消息推送,採用的是xmpp擴展協議xep-0124 BOSH(http://xmpp.org/extensions/xep-0124.html),本質是採用方式2長輪詢的方式。踢踢網則採用了websocket鏈接rabbitmq的方式實現,下面我會具體介紹如何用這兩種方式實現Server Push。

運行時環境準備

服務端的實現中,不管採用ejabberd仍是rabbitmq,都是基於erlang語言開發的,因此必須安裝erlang運行時環境。Erlang是一種函數式語言,具備容錯、高併發的特色,藉助OTP的函數庫,很容易構建一個健壯的分佈式系統。目前,基於erlang開發的產品有,數據庫方面:Riak(Dynamo實現)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中間件有rabbitmq等。對於服務端程序員來講,erlang提供的高併發、容錯、熱部署等特性是其餘語言沒法達到的。不管在實時通訊仍是在遊戲程序中,用erlang能夠很容易爲每個上線用戶建立一個對應的process,對一臺4核8個G的服務器來講,承載上百萬個這樣的process是很是輕鬆的事。下圖是erlang程序發起process的通常性示意圖:nginx

erlang process

如圖所示,Session manager(or gateway)負責爲每一個用戶(uid)建立相對應的process, 並把這個對應關係(map)存放到數據表中。每一個process則對應用戶數據,而且他們之間能夠相互發送消息。Erlang的優點就是在內存足夠的狀況下建立上百萬個這樣的process,並且它的建立和銷燬比java的thread要輕量的多,二者不是一個數量級的。git

好了,咱們如今開始着手erlang環境的搭建(實驗的系統爲ubuntu12.04, 4核8個G內存):程序員

一、依賴庫安裝github

sudo apt-get install build-essential
    sudo apt-get install libncurses5-dev
    sudo apt-get install libssl-dev libyaml-dev
    sudo apt-get install m4
    sudo apt-get install unixodbc unixodbc-dev
    sudo apt-get install freeglut3-dev libwxgtk2.8-dev
    sudo apt-get install xsltproc
    sudo apt-get install fop tk8.5 libxml2-utils

二、官網下載otp源碼包(http://www.erlang.org/download.html), 解壓並安裝:web

\>\> tar zxvf otpsrcR16B01.tar.gz
   \>\> cd otpsrcR16B01
   \>\> configure
   \>\> make & make install

至此,erlang運行環境就完成了。下面將分別介紹rabbitmq和ejabberd構建實時消息服務。redis

基於RabbitMQ的實時消息服務

RabbitMQ是在業界普遍應用的消息中間件,也是對AMQP協議實現最好的一種中間件。AMQP協議中定義了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等實體,他們的關係以下圖所示:

amqp entity

消息發佈者(Producer)鏈接交換器(Exchange), 交換器和消息隊列(Message Queue)經過key進行Binding,Binding是根據Exchange的類型(分爲fanout、direct、topic、header)分別對消息做不一樣形式的派發。Message Queue又分爲durable、temporary、auto-delete三種類型,durable queue是持久化隊列,不會由於服務shutdown而消失,temporary queue則服務重啓後會消失,auto-delete則是在沒有consumer鏈接時自動刪除。另外RabbitMQ有不少第三方插件,能夠基於AMQP協議基礎之上作出不少擴展的應用。下面咱們將介紹web stomp插件構建基於AMQP之上的stomp文本協議,經過瀏覽器websocket達到實時的消息傳輸。系統的結構如圖:

rabbitmq

如圖所示,web端咱們使用stomp.js和sockjs.js與rabbitmq的web stomp plugin通訊,手機端能夠用stompj, gozirra(Android)或者objc-stomp(IOS)經過stomp協議與rabbitmq收發消息。由於咱們是實時消息系統一般都是要與已有的用戶系統結合,rabbitmq能夠經過第三方插件rabbitmq-auth-backend-http來適配已有的用戶系統,這個插件能夠經過http接口完成用戶鏈接時的認證過程。固然,認證方式還有ldap等其餘方式。下面介紹具體步驟:

\>\> tar zxf rabbitmq-server-x.x.x.tar.gz
    \>\> cd rabbitmq-server-x.x.x
    \>\> make & make install
  • 爲rabbitmq安裝web-stomp插件
\>\> cd /path/to/your/rabbitmq
    \>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp
    \>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples
    \>\> ./sbin/rabbitmqctl stop
    \>\> ./sbin/rabbitmqctl start
    \>\> ./sbin/rabbitmqctl status

將會顯示下圖所示的運行的插件列表

rabbitmq status

  • 安裝用戶受權插件
\>\> cd /path/to/your/rabbitmq/plugins
   \>\>wget http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez
   \>\> cd ..
   \>\> ./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http

編輯rabbitmq.config文件(默認存放於/etc/rabbitmq/下),添加:

[
    ...
    {rabbit, [{auth_backends, [rabbit_auth_backend_http]}]},
    ...
    {rabbitmq_auth_backend_http,
    [{user_path, 「http://your-server/auth/user」},
     {vhost_path, 「http://your-server/auth/vhost」},
     {resource_path, 「http://your-server/auth/resource」}
    ]}
    ...
   ].

其中,user_path是根據用戶名密碼進行校驗,vhost_path是校驗是否有權限訪問vhost, resource_path是校驗用戶對傳入的exchange、queue是否有權限。我下面的代碼是用nodejs實現的這三個接口的示例:

var express = require('express');
    var app = express();
    app.get('/auth/user', function(req, res){
    var name = req.query.username;
    var pass = req.query.password;
    console.log("name : " + name + ", pass : " + pass);
if(name === 'guest' && pass === "guest"){
    console.log("allow");
    res.send("allow");
}else{
    res.send('deny');
}
});
app.get('/auth/vhost', function(req, res){
    console.log("/auth/vhost");
    res.send("allow");
});
app.get('/auth/resource', function(req, res){
    console.log("/auth/resource");
    res.send("allow");
});
app.listen(3000);
  • 瀏覽器端js實現,示例代碼以下:
......
var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');
var print_first = pipe('#first', function(data) {
client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data);
});
var on_connect = function(x) {
id = client.subscribe("/exchange/feed/user_x", function(d) {
print_first(d.body);
});
};
var on_error = function() {
console.log('error');
};
client.connect('guest1', 'guest1', on_connect, on_error, '/');
......

須要說明的時,在這裏咱們首先要在rabbitmq實例中建立feed這個exchange,咱們用stomp.js鏈接成功後,根據當前登錄用戶的id(user_x)綁定到這個exchange,即 subscribe("/exchange/feed/user_x", ...) 這個操做的行爲,這樣在向rabbitmq中feed exchange發送消息並指定用戶id(user_x)爲key,頁面端就會經過websocket實時接收到這條消息。

到目前爲止,基於rabbitmq+stomp實現web端消息推送就已經完成,其中不少的細節須要小夥伴們親自去實踐了,這裏就很少說了。實踐過程當中能夠參照官方文檔:

以上的實現是我本人在踢踢網時採用的方式,下面接着介紹一下如今在Worktile中如何經過ejabberd實現消息推送。

基於ejabberd的實時消息推送

與rabbitmq不一樣,ejabberd是xmpp協議的一種實現,與amqp相比,xmpp普遍應用於即時通訊領域。Xmpp協議的實現有不少種,好比java的openfire,但相較其餘實現,ejabberd的併發性能無疑使最優秀的。Xmpp協議的前身是jabber協議,早期的jabber協議主要包括在線狀態(presence)、好友花名冊(roster)、IQ(Info/Query)幾個部分。如今jabber已經成爲rfc的官方標準,如rfc2799, rfc4622, rfc6121,以及xmpp的擴展協議(xep)。Worktile Web端的消息提醒功能就是基於XEP-012四、XEP-0206定義的BOSH擴展協議。

因爲自身業務的須要,咱們對ejabberd的用戶認證和好友列表模塊的源碼進行修改,經過redis保存用戶的在線狀態,而不是mnesia和mysql。另外好友這塊咱們是從已有的數據庫中(mongodb)中獲取項目或團隊的成員。Web端經過strophe.js來鏈接(http-bind),strophe.js能夠以長輪詢和websocket兩種方式來鏈接,因爲ejabberd尚未好的websocket的實現,就採用了BOSH的方式模擬長鏈接。整個系統的結構以下:

worktile ejabberd

  • Web端用strophe.js經過http-bind進行鏈接nginx代理,nginx反向代理ejabberd cluster。
  • IOS用xmpp-framwork鏈接, Android能夠用smack直接連ejabberd服務器集羣。這些都是現有的庫,無需對client進行開發。
  • 在線狀態根據用戶uid做爲key定義了在線、離線、忙等狀態存放於redis中。好友列表從mongodb的project表中獲取。

用戶認證直接修改了ejabberd_auth_internal.erl文件,經過mongodb驅動鏈接用戶庫,在線狀態等功能是新加了模塊,其部分代碼以下:

-module(wt_mod_proj).
    -behaviour(gen_mod).
    -behaviour(gen_server).
    -include("ejabberd.hrl").
    -include("logger.hrl").
    -include("jlib.hrl").
    -define(SUPERVISOR, ejabberd_sup).
    ...
    -define(ONLINE, 1).
    -define(OFFLINE, 0).
    -define(BUSY, 2).
    -define(LEAVE, 3).
    ...
    %% API
    -export([start_link/2, get_proj_online_users/2]).
    %% gen_mod callbacks
    -export([start/2, stop/1]).
    %% gen_server callbacks
    -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
    %% Hook callbacks
    -export([user_available/1, unset_presence/3, set_presence/4]).
    -export([get_redis/1, remove_online_user/3, append_online_user/3]).
    ...
    -record(state,{host = <<"">>, server_host, rconn, mconn}).

    start_link(Host, Opts) ->
        Proc = gen_mod:get_module_proc(Host, ?MODULE),
        gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
        user_available(New) ->
        LUser = New#jid.luser, LServer = New#jid.lserver,
        Proc = gen_mod:get_module_proc(LServer, ?MODULE),
        gen_server:cast(Proc, {user_available, LUser, LServer}).

    append_online_user(Uid, Proj, Host) ->
        Proc = gen_mod:get_module_proc(Host, ?MODULE),
        gen_server:call(Proc, {append_online_user, Uid, Proj}).
        remove_online_user(Uid, Proj, Host) ->
        Proc = gen_mod:get_module_proc(Host, ?MODULE),
        gen_server:call(Proc, {remove_online_user, Uid, Proj}).
    ...
    set_presence(User, Server, Resource, Packet) ->
        Proc = gen_mod:get_module_proc(Server, ?MODULE),
        gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}).
    ...

    start(Host, Opts) ->
         Proc = gen_mod:get_module_proc(Host, ?MODULE),
         ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
         transient, 2000, worker, [?MODULE]},
         supervisor:start_child(?SUPERVISOR, ChildSpec).

    stop(Host) ->
         Proc = gen_mod:get_module_proc(Host, ?MODULE),
         gen_server:call(Proc, stop),
         supervisor:delete_child(?SUPERVISOR, Proc).

    init([Host, Opts]) ->
         MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>),
         RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST),
         RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT),
         ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100),
         ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50),
         ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50),
         MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST),
         MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT),
         {ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}),
         C = c(RedisHost, RedisPort),
         ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}.
    terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) ->
         ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100),
         ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50),
         ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50),
         eredis:stop(C),
         ok.
        ...
    handle_call({append_online_user, Uid, ProjId}, _From, State) ->
        C = State#state.rconn,
        Key = <<?PRE_RPOJ_ONLINE_USERS/binary, ProjId/binary>>,
        Resp = eredis:q(C, ["SADD", Key, Uid]),
        {reply, Resp, State};
    handle_call({remove_online_user, Uid, ProjId}, _From, State) ->
        ...
    handle_call({get_proj_online_users, ProjId}, _From, State) ->
    ...
    handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) ->
        C = State#state.rconn,
        Key = <<?USER_PRESENCE/binary, User/binary>>,
        Pids = get_user_projs(User, Mongo),
        Cmd = get_proj_key(Pids, ["SUNION"]),
        case xml:get_subtag_cdata(Packet, <<"show">>) of
        <<"away">> ->
            eredis:q(C, ["SET", Key, ?LEAVE]);
        <<"offline">> ->
            ...
     handle_cast(_Msg, State) -> {noreply, State}.

     handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) ->
         case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of
         {'EXIT', Reason} ->
              ?ERROR_MSG("~p", [Reason]);
         _ ->
              ok
     end,
     {noreply, State};

    handle_info(_Info, State) -> {noreply, State}.

    code_change(_OldVsn, State, _Extra) -> {ok, State}.

    ...

其中,user\_available\_hook和sm\_remove\_connection\_hook 就是用戶上線和用戶斷開鏈接觸發的事件,ejabberd 中正是因爲這些hook,才能很容易擴展功能。

在用tsung對ejabberd進行壓力測試,測試機器爲4核心8G內存的普通PC,以3臺客戶機模擬用戶登陸、設置在線狀態、發送一條文本消息、關閉鏈接操做,在同時在線達到30w時,CPU佔用不到3%,內存大概到3個G左右,隨着用戶數增多,主要內存的損耗較大。因爲壓力測試比較耗時,再等到有時間的時候,會在作一些更深刻的測試。

對於ejabberd的安裝與集羣的搭建,你們能夠參照官方文檔,這裏再也不贅述。若是在使用過程當中有什麼問題,能夠加入worktile官方羣(110257147),進行討論。

相關文章
相關標籤/搜索