http的反向推送一般使用"長輪詢"或"長鏈接"的方式。 html
所謂"長輪詢"是指客戶端發送請求給服務器,服務器發現沒有數據須要發送給客戶端因而hold住不及時返回,等有數據須要發送給客戶端時,進行回覆,而後關閉鏈接,客戶端收到回覆後再發送新的http請求,以便服務器能有對應的請求用於消息的反向推送。 java
而"長鏈接"是在長輪詢的基礎上增長"keep-alive"屬性,服務器收到請求後不直接回復,等有數據須要發送給客戶端時再進行response,可是並不關閉鏈接,這樣客戶端收到服務器的response後在同一鏈接上再次發送http請求。 服務器
在ejabberd的實現中,採用了bosh技術來完成對應的工做,具體定義可參考: session
英文: http://xmpp.org/extensions/xep-0124.html socket
中文: http://wiki.jabbercn.org/XEP-0124 tcp
大概實現原理:ejabberd收到一個客戶端http請求後會爲該客戶端最終建立三個進程:ejabberd_http, ejabberd_http_bind, ejabberd_c2s。 spa
ejabberd_http進程不斷的從對應的socket上收客戶端的請求,並轉發交給對應的ejabberd_http_bind進程進行處理,而後同步等待處理結果,並將結果返回給客戶端。 code
init() -> ... receive_headers(State). receive_headers(#state{trail=Trail} = State) -> SockMod = State#state.sockmod, Socket = State#state.socket, Data = SockMod:recv(Socket, 0, 300000), case State#state.sockmod of gen_tcp -> NewState = process_header(State, Data), case NewState#state.end_of_request of true -> ok; _ -> receive_headers(NewState) end; _ -> case Data of {ok, D} -> parse_headers(State#state{trail = <<Trail/binary, D/binary>>}); {error, _} -> ok end end. process_header(State, Data) -> case Data of ... {ok, http_eoh} -> ... Out = process_request(State2), send_text(State2, Out), case State2#state.request_keepalive of true -> ... #state{sockmod = SockMod, socket = Socket, request_handlers = State#state.request_handlers}; _ -> #state{end_of_request = true, request_handlers = State#state.request_handlers}
從代碼中可看出,未設置keep-alive屬性的時候,該進程處理完一次http請求後便本身結束(長輪詢模式)。設置了keep-alive屬性的時候,該進程不斷的循環接收http請求,並轉發接收與響應(長鏈接模式)。 xml
================== htm
ejabberd_http_bind進程負責hold住http請求,對於正常的客戶端請求,ejabberd_http_bind進程會將請求轉發給對應的ejabberd_c2s進程進行實際業務的處理,而對於空的請求(便於服務器反向推送數據),ejabberd_http_bind設置定時器,等待ejabberd_c2s進程對實際請求的響應或者是須要推送給客戶端的消息。
handle_sync_event({send_xml,Packet},_From,StateName, #state{http_receiver = undefined} = StateData) -> Output = [Packet | StateData#state.output], Reply = ok, {reply, Reply, StateName, StateData#state{output = Output}}; handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> Output = [Packet | StateData#state.output], cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), HTTPReply = {ok, Output}, gen_fsm:reply(StateData#state.http_receiver, HTTPReply), cancel_timer(StateData#state.wait_timer), Rid = StateData#state.rid, ReqList = [#hbr{rid = Rid,key = StateData#state.key, out = Output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ]], Reply = ok, {reply, Reply, StateName, StateData#state{output = [], http_receiver = undefined, req_list = ReqList, wait_timer = undefined, timer = Timer}}; handle_sync_event({http_get,Rid,Wait,Hold},From,StateName, StateData) -> %% setup timer send_receiver_reply(StateData#state.http_receiver, {ok, empty}), cancel_timer(StateData#state.wait_timer), TNow = tnow(), if (Hold > 0) and (StateData#state.output == []) and ((TNow -StateData#state.ctime<(Wait*1000*1000)) and (StateData#state.rid == Rid) and (StateData#state.input /= cancel) and (StateData#state.pause == 0) -> WaitTimer = erlang:start_timer(Wait * 1000, self(), []), %% MR: Not sure we should cancel the state timer here. cancel_timer(StateData#state.timer), {next_state,StateName, StateData#state{http_receiver = From, wait_timer = WaitTimer, timer = undefined}}; (StateData#state.input == cancel) -> cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), Reply = {ok, cancel}, {reply, Reply, StateName, StateData#state{input = queue:new(), http_receiver = undefined, wait_timer = undefined, timer = Timer}}; true -> cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), Reply = {ok, StateData#state.output}, %% save request ReqList = [#hbr{rid = Rid, key = StateData#state.key, out = StateData#state.output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid] ], {reply, Reply, StateName, StateData#state{output = [], http_receiver = undefined, wait_timer = undefined, timer = Timer, req_list = ReqList}} end; handle_info({timeout, WaitTimer, _}, StateName, #state{wait_timer = WaitTimer} = StateData) -> if StateData#state.http_receiver /= undefined -> cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), gen_fsm:reply(StateData#state.http_receiver, {ok, empty}), Rid = StateData#state.rid, ReqList = [#hbr{rid = Rid, key = StateData#state.key, out = [] } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], {next_state, StateName, StateData#state{http_receiver = undefined, req_list = ReqList, wait_timer = undefined, timer = Timer}}; true -> {next_state, StateName, StateData} end;
ejabberd_http進程最終會調用ejabberd_http_bind的http_get方法獲取請求的響應結果或者是須要推送的數據,ejabberd_http_bind進程收到請求後會進行相應處理,好比有數據則直接回復,或者設置定時器。當收到ejabberd_c2s進程推送過來的數據時,中止定時器並將數據當即回覆給ejabberd_http進程,若是定時器超時則回覆一個空的消息。
=============================
ejabberd_c2s爲客戶端對應的會話進程,負責維護客戶端的在線狀態,聯繫人列表,請求處理等等。
============
服務器在建立ejabberd_http_bind進程時,會生成一個惟一的sid,用於標識該進程,該sid與ejabberd_http_bind進程pid的對應關係會存儲到mnesia中,同時服務器會將sid也會告訴客戶端,客戶端後續的請求也都須要帶上該sid。http_bind收到請求後根據sid從mnesia查找匹配的ejabberd_http_bind進程。
process_request(Data, IP) -> ... case catch parse_request(Data, PayloadSize, MaxStanzaSize) of %% No existing session: {ok, {"", Rid, Attrs, Payload}} -> ... Sid = sha:sha(term_to_binary({now(), make_ref()})), case start(XmppDomain, Sid, "", IP) of {ok, Pid} -> handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,Payload, PayloadSize, IP) ... end; {ok, {Sid, Rid, Attrs, Payload1}} -> ... handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, StreamStart, IP); handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, PayloadSize, IP) -> ... mnesia:dirty_write(#http_bind{id = Sid, pid = Pid, to = {XmppDomain,XmppVersion}, hold = Hold, wait = Wait, process_delay = Pdelay, version = Version}), handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP). http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> case mnesia:dirty_read({http_bind, Sid}) of [] -> {error, not_exists}; [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] -> ... {gen_fsm:sync_send_all_state_event(FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload, payload_size = PayloadSize, hold = Hold, stream = NewStream, ip = IP}, 30000), Sess} end.
ejabberd_http_bind與ejabberd_c2s會互相記住對方的進程pid,這樣每一個客戶端都有本身惟一的ejabberd_http_bind與ejabberd_c2s進程進行相應的請求處理。