ejabberd中的http反向推送

http的反向推送一般使用"長輪詢"或"長鏈接"的方式。 html

所謂"長輪詢"是指客戶端發送請求給服務器,服務器發現沒有數據須要發送給客戶端因而hold住不及時返回,等有數據須要發送給客戶端時,進行回覆,而後關閉鏈接,客戶端收到回覆後再發送新的http請求,以便服務器能有對應的請求用於消息的反向推送。 java

而"長鏈接"是在長輪詢的基礎上增長"keep-alive"屬性,服務器收到請求後不直接回復,等有數據須要發送給客戶端時再進行response,可是並不關閉鏈接,這樣客戶端收到服務器的response後在同一鏈接上再次發送http請求。  服務器

在ejabberd的實現中,採用了bosh技術來完成對應的工做,具體定義可參考: session

英文: http://xmpp.org/extensions/xep-0124.html socket

中文: http://wiki.jabbercn.org/XEP-0124 tcp

大概實現原理:ejabberd收到一個客戶端http請求後會爲該客戶端最終建立三個進程:ejabberd_http, ejabberd_http_bind, ejabberd_c2s。 spa

ejabberd_http進程不斷的從對應的socket上收客戶端的請求,並轉發交給對應的ejabberd_http_bind進程進行處理,而後同步等待處理結果,並將結果返回給客戶端。 code

init() ->
    ...
    receive_headers(State).


receive_headers(#state{trail=Trail} = State) ->
    SockMod = State#state.sockmod,
    Socket = State#state.socket,
    Data = SockMod:recv(Socket, 0, 300000),
    case State#state.sockmod of
        gen_tcp ->
            NewState = process_header(State, Data),
            case NewState#state.end_of_request of
                true -> ok;
                _ -> receive_headers(NewState)
            end;
        _ ->
            case Data of
                {ok, D} ->
                    parse_headers(State#state{trail = <<Trail/binary, D/binary>>});
                {error, _} ->
                    ok
            end
    end.

process_header(State, Data) ->
    case Data of
        ...
        {ok, http_eoh} ->
            ...
            Out = process_request(State2),
            send_text(State2, Out),
            case State2#state.request_keepalive of
                true ->
                    ...
                    #state{sockmod = SockMod,
                           socket = Socket,
                           request_handlers = State#state.request_handlers};
                _ ->
                   #state{end_of_request = true,
                          request_handlers = State#state.request_handlers}

從代碼中可看出,未設置keep-alive屬性的時候,該進程處理完一次http請求後便本身結束(長輪詢模式)。設置了keep-alive屬性的時候,該進程不斷的循環接收http請求,並轉發接收與響應(長鏈接模式)。 xml

================== htm

ejabberd_http_bind進程負責hold住http請求,對於正常的客戶端請求,ejabberd_http_bind進程會將請求轉發給對應的ejabberd_c2s進程進行實際業務的處理,而對於空的請求(便於服務器反向推送數據),ejabberd_http_bind設置定時器,等待ejabberd_c2s進程對實際請求的響應或者是須要推送給客戶端的消息。

handle_sync_event({send_xml,Packet},_From,StateName,
                  #state{http_receiver = undefined} = StateData) ->
    Output = [Packet | StateData#state.output],
    Reply = ok,
    {reply, Reply, StateName, StateData#state{output = Output}};

handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
    Output = [Packet | StateData#state.output],
    cancel_timer(StateData#state.timer),
    Timer = set_inactivity_timer(StateData#state.pause,
                                 StateData#state.max_inactivity),
    HTTPReply = {ok, Output},
    gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
    cancel_timer(StateData#state.wait_timer),
    Rid = StateData#state.rid,
    ReqList = [#hbr{rid = Rid,key = StateData#state.key,
                    out = Output
                   } |
               [El || El <- StateData#state.req_list,
                      El#hbr.rid /= Rid ]],
    Reply = ok,
    {reply, Reply, StateName,
     StateData#state{output = [],
                     http_receiver = undefined,
                     req_list = ReqList,
                     wait_timer = undefined,
                     timer = Timer}};

handle_sync_event({http_get,Rid,Wait,Hold},From,StateName,
                  StateData) ->
    %% setup timer
    send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
    cancel_timer(StateData#state.wait_timer),
    TNow = tnow(),
    if
        (Hold > 0) and
        (StateData#state.output == []) and
        ((TNow -StateData#state.ctime<(Wait*1000*1000)) and
        (StateData#state.rid == Rid) and
        (StateData#state.input /= cancel) and
        (StateData#state.pause == 0) ->
            WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
            %% MR: Not sure we should cancel the state timer here.
            cancel_timer(StateData#state.timer),
                         {next_state,StateName,
                          StateData#state{http_receiver = From,
                                          wait_timer = WaitTimer,
                                          timer = undefined}};
        (StateData#state.input == cancel) ->
            cancel_timer(StateData#state.timer),
            Timer = set_inactivity_timer(StateData#state.pause,
                                    StateData#state.max_inactivity),
            Reply = {ok, cancel},
            {reply, Reply, StateName,
             StateData#state{input = queue:new(),
                             http_receiver = undefined,
                             wait_timer = undefined,
                             timer = Timer}};
        true ->
            cancel_timer(StateData#state.timer),
            Timer = set_inactivity_timer(StateData#state.pause,
                                     StateData#state.max_inactivity),
            Reply = {ok, StateData#state.output},
            %% save request
            ReqList = [#hbr{rid = Rid,
                            key = StateData#state.key,
                            out = StateData#state.output
                           } |
                       [El || El <- StateData#state.req_list,
                             El#hbr.rid /= Rid]
                      ],
            {reply, Reply, StateName,
             StateData#state{output = [],
                             http_receiver = undefined,
                             wait_timer = undefined,
                             timer = Timer,
                             req_list = ReqList}}
    end;

handle_info({timeout, WaitTimer, _}, StateName,
             #state{wait_timer = WaitTimer} = StateData) ->
    if
        StateData#state.http_receiver /= undefined ->
            cancel_timer(StateData#state.timer),
            Timer = set_inactivity_timer(StateData#state.pause,
                                         StateData#state.max_inactivity),
            gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
            Rid = StateData#state.rid,
            ReqList = [#hbr{rid = Rid,
                            key = StateData#state.key,
                            out = []
                           } |
                       [El || El <- StateData#state.req_list,
                              El#hbr.rid /= Rid ]
                       ],
            {next_state, StateName,
             StateData#state{http_receiver = undefined,
                             req_list = ReqList,
                             wait_timer = undefined,
                             timer = Timer}};
        true ->
            {next_state, StateName, StateData}
    end;

ejabberd_http進程最終會調用ejabberd_http_bind的http_get方法獲取請求的響應結果或者是須要推送的數據,ejabberd_http_bind進程收到請求後會進行相應處理,好比有數據則直接回復,或者設置定時器。當收到ejabberd_c2s進程推送過來的數據時,中止定時器並將數據當即回覆給ejabberd_http進程,若是定時器超時則回覆一個空的消息。

=============================

ejabberd_c2s爲客戶端對應的會話進程,負責維護客戶端的在線狀態,聯繫人列表,請求處理等等。

============

服務器在建立ejabberd_http_bind進程時,會生成一個惟一的sid,用於標識該進程,該sid與ejabberd_http_bind進程pid的對應關係會存儲到mnesia中,同時服務器會將sid也會告訴客戶端,客戶端後續的請求也都須要帶上該sid。http_bind收到請求後根據sid從mnesia查找匹配的ejabberd_http_bind進程。

process_request(Data, IP) ->
    ...
    case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
        %% No existing session:
        {ok, {"", Rid, Attrs, Payload}} ->
            ...
            Sid = sha:sha(term_to_binary({now(), make_ref()})),
            case start(XmppDomain, Sid, "", IP) of
                {ok, Pid} ->
                    handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,Payload, PayloadSize, IP)
                ...
             end;
        {ok, {Sid, Rid, Attrs, Payload1}} ->
            ...
            handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
                            StreamStart, IP);

handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
                     Payload, PayloadSize, IP) ->
    ...
    mnesia:dirty_write(#http_bind{id = Sid,
                                  pid = Pid,
                                  to = {XmppDomain,XmppVersion},
                                  hold = Hold,
                                  wait = Wait,
                                  process_delay = Pdelay,
                                  version = Version}),
    handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).

http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
    case mnesia:dirty_read({http_bind, Sid}) of
        [] ->
            {error, not_exists};
        [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
            ...
            {gen_fsm:sync_send_all_state_event(FsmRef, 
                #http_put{rid = Rid, attrs = Attrs,
                          payload = Payload,
                          payload_size = PayloadSize, hold = Hold,
                          stream = NewStream, ip = IP}, 30000), Sess}
    end.

ejabberd_http_bind與ejabberd_c2s會互相記住對方的進程pid,這樣每一個客戶端都有本身惟一的ejabberd_http_bind與ejabberd_c2s進程進行相應的請求處理。

相關文章
相關標籤/搜索