socket.io 原理詳解

在項目中用到socket.io在WEB端作消息推送,遂花了點時間看了socket.io實現,作個簡單分析,若有錯漏,歡迎指正。javascript

1 概述

socket.io是一個基於WebSocket的CS的實時通訊庫,它底層基於engine.io。engine.io使用WebSocket和xhr-polling(或jsonp)封裝了一套本身的協議,在不支持WebSocket的低版本瀏覽器中(支持websocket的瀏覽器版本見這裏)使用了長輪詢(long polling)來代替。socket.io在engine.io的基礎上增長了namespace,room,自動重連等特性。html

本文接下來會先簡單介紹websocket協議,而後在此基礎上講解下engine.io和socket.io協議以及源碼分析,後續再經過例子說明socket.io的工做流程。java

2 WebSocket協議

咱們知道,在HTTP 協議開發的時候,並非爲了雙向通訊程序準備的,起初的 web 應用程序只須要 「請求-響應」 就夠了。因爲歷史緣由,在建立擁有雙向通訊機制的 web 應用程序時,就只能利用 HTTP 輪詢的方式,由此產生了 「短輪詢」 和 「長輪詢」(注意區分短鏈接和長鏈接)。python

短輪詢經過客戶端按期輪詢來詢問服務端是否有新的信息產生,缺點也是顯而易見,輪詢間隔大了則信息不夠實時,輪詢間隔太小又會消耗過多的流量,增長服務器的負擔。長輪詢是對短輪詢的優化,須要服務端作相應的修改來支持。客戶端向服務端發送請求時,若是此時服務端沒有新的信息產生,並不馬上返回,而是Hang住一段時間等有新的信息或者超時再返回,客戶端收到服務器的應答後繼續輪詢。能夠看到長輪詢比短輪詢能夠減小大量無用的請求,而且客戶端接收取新消息也會實時很多。nginx

雖然長輪詢比短輪詢優化了很多,可是每次請求仍是都要帶上HTTP請求頭部,並且在長輪詢的鏈接結束以後,服務器端積累的新消息要等到下次客戶端鏈接時才能傳遞。更好的方式是隻用一個TCP鏈接來實現客戶端和服務端的雙向通訊,WebSocket協議正是爲此而生。WebSocket是基於TCP的一個獨立的協議,它與HTTP協議的惟一關係就是它的握手請求能夠做爲一個Upgrade request經由HTTP服務器解析,且與HTTP使用同樣的端口。WebSocket默認對普通請求使用80端口,協議爲ws://,對TLS加密請求使用443端口,協議爲wss://git

握手是經過一個HTTP Upgrade request開始的,一個請求和響應頭部示例以下(去掉了無關的頭部)。WebSocket握手請求頭部與HTTP請求頭部是兼容的(見RFC2616)。github

## Request Headers ##
Connection: Upgrade
Host: socket.io.demo.com
Origin: http://socket.io.demo.com
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: mupA9l2rXciZKoMNQ9LphA==
Sec-WebSocket-Version: 13
Upgrade: websocket

## Response Headers ##
101 Web Socket Protocol Handshake
Connection: upgrade
Sec-WebSocket-Accept: s4VAqh7eedG0a11ziQlwTzJUY3s=
Sec-WebSocket-Origin: http://socket.io.demo.com
Server: nginx/1.6.2
Upgrade: WebSocket
複製代碼
  • Upgrade 是HTTP/1.1中規定的用於轉換當前鏈接的應用層協議的頭部,表示客戶端但願用現有的鏈接轉換到新的應用層協議WebSocket協議。web

  • Origin 用於防止跨站攻擊,瀏覽器通常會使用這個來標識原始域,對於非瀏覽器的客戶端應用能夠根據須要使用。redis

  • 請求頭中的 Sec-WebSocket-Version 是WebSocket版本號,Sec-WebSocket-Key 是用於握手的密鑰。Sec-WebSocket-Extensions 和 Sec-WebSocket-Protocol 是可選項,暫不討論。chrome

  • 響應頭中的 Sec-WebSocket-Accept 是將請求頭中的 Sec-WebSocket-Key 的值加上一個固定魔數258EAFA5-E914-47DA-95CA-C5AB0DC85B11經SHA1+base64編碼後獲得。計算過程的python代碼示例(uwsgi中的實現見 core/websockets.c的 uwsgi_websocket_handshake函數):

    magic_number = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
    key = 'mupA9l2rXciZKoMNQ9LphA=='
    accept = base64.b64encode(hashlib.sha1(key + magic_number).digest())
    assert(accept == 's4VAqh7eedG0a11ziQlwTzJUY3s=')
    複製代碼
  • 客戶端會檢查響應頭中的status code 和 Sec-WebSocket-Accept 值是不是期待的值,若是發現Accept的值不正確或者狀態碼不是101,則不會創建WebSocket鏈接,也不會發送WebSocket數據幀。

WebSocket協議使用幀(Frame)收發數據,幀格式以下。基於安全考量客戶端發送給服務端的幀必須經過4字節的掩碼(Masking-key)加密,服務端收到消息後,用掩碼對數據幀的Payload Data進行異或運算解碼獲得數據(詳見uwsgi的 core/websockets.c 中的uwsgi_websockets_parse函數),若是服務端收到未經掩碼加密的數據幀,則應該立刻關閉該WebSocket。而服務端發給客戶端的數據則不須要掩碼加密,客戶端若是收到了服務端的掩碼加密的數據,則也必須關閉它。

0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-------+-+-------------+-------------------------------+
     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
     | |1|2|3|       |K|             |                               |
     +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
     |     Extended payload length continued, if payload len == 127  |
     + - - - - - - - - - - - - - - - +-------------------------------+
     |                               |Masking-key, if MASK set to 1  |
     +-------------------------------+-------------------------------+
     | Masking-key (continued)       |          Payload Data         |
     +-------------------------------- - - - - - - - - - - - - - - - +
     :                     Payload Data continued ...                :
     +---------------------------------------------------------------+
複製代碼

幀分爲控制幀和數據幀,控制幀不能分片,數據幀能夠分片。主要字段說明以下:

  • FIN: 沒有分片的幀的FIN爲1,分片幀的第一個分片的FIN爲0,最後一個分片FIN爲1。
  • opcode: 幀類型編號,其中控制幀:0x8 (Close), 0x9 (Ping), and 0xA (Pong),數據幀主要有:0x1 (Text), 0x2 (Binary)。
  • MASK:客戶端發給服務端的幀MASK爲1,Masking-key爲加密掩碼。服務端發往客戶端的MASK爲0,Masking-key爲空。
  • Payload len和Payload Data分別是幀的數據長度和數據內容。

3 engine.io和socket.io

前面提到socket.io是基於engine.io的封裝,engine.io(協議版本3)有一套本身的協議,任何engine.io服務器都必須支持polling(包括jsonp和xhr)和websocket兩種傳輸方式。engine.io使用websocket時有一套本身的ping/pong機制,使用的是opcode爲0x1(Text)類型的數據幀,不是websocket協議規定的ping/pong類型的幀,標準的 ping/pong 幀被uwsgi使用

engine.io的數據編碼分爲Packet和Payload,其中 Packet是數據包,有6種類型:

  • 0 open:從服務端發出,標識一個新的傳輸方式已經打開。
  • 1 close:請求關閉這條傳輸鏈接,可是它自己並不關閉這個鏈接。
  • 2 ping:客戶端週期性發送ping,服務端響應pong。注意這個與uwsgi自帶的ping/pong不同,uwsgi裏面發送ping,而瀏覽器返回pong。
  • 3 pong:服務端發送。
  • 4 message:實際發送的消息。
  • 5 upgrade:在轉換transport前,engine.io會發送探測包測試新的transport(如websocket)是否可用,若是OK,則客戶端會發送一個upgrade消息給服務端,服務端關閉老的transport而後切換到新的transport。
  • 6 noop:空操做數據包,客戶端收到noop消息會將以前等待暫停的輪詢暫停,用於在接收到一個新的websocket強制一個新的輪詢週期。

而Payload是指一系列綁定到一塊兒的編碼後的Packet,它只用在poll中,websocket裏面使用websocket幀裏面的Payload字段來傳輸數據。若是客戶端不支持XHR2,則payload格式以下,其中length是數據包Packet的長度,而packet則是編碼後的數據包內容(測試發現客戶端發送給服務端的poll請求中的payload用的這種字符編碼)。

<length1>:<packet1>[<length2>:<packet2>[...]]
複製代碼

若支持XHR2,則payload中內容所有以字節編碼,其中第1位0表示字符串,1表示二進制數據,然後面接着的數字則是表示packet長度,而後以\xff結尾。若是一個長度爲109的字符類型的數據包,則前面長度編碼是 \x00\x01\x00\x09\xff,而後後面接packet內容。(測試發現服務端返回給客戶端的payload爲這種字節編碼)

<0 for string data, 1 for binary data><Any number of numbers between 0 and 9><The number 255><packet1 (first type,
then data)>[...]
複製代碼

engine.io服務器維護了一個socket的字典結構用於管理鏈接到該機的客戶端,而客戶端的標識就是sid。若是有多個worker,則須要保證同一個客戶端的請求落在同一臺worker上(能夠配置nginx根據sid分發)。由於每一個worker只維護了一部分客戶端鏈接,若是要支持廣播,room等特性,則後端須要使用 redis 或者 RabbitMQ 消息隊列,使用redis的話則是經過redis的訂閱發佈機制實現多機多worker之間的消息推送。

socket.io是engine.io的封裝,在其基礎上增長了自動重連,多路複用,namespace,room等特性。socket.io自己也有一套協議,它Packet類型分爲(CONNECT 0, DISCONNECT 1, EVENT 2, ACK 3, ERROR 4, BINARY_EVENT 5, BINARY_ACK 6)。注意與engine.io的Packet類型有所不一樣,可是socket.io的packet實際是藉助的engine.io的Message類型發送的,在後面實例中能夠看到Packet的編碼方式。當鏈接出錯的時候,socket.io會經過自動重連機制從新鏈接。

4 源碼分析

在創建鏈接後,每一個客戶端會被自動加入到一個默認的命名空間/。在每一個命名空間中,socket會被默認加入兩個名爲Nonesid的房間。None的房間用於廣播,而sid是當前客戶端的session id,用於單播。除默認的房間外,咱們能夠根據須要將對應socket加入自定義房間,roomid惟一便可。socket.io基於engine.io,支持websocket和long polling。若是是long polling,會定時發送GET, POST請求,當沒有數據時,GET請求在拉取隊列消息時會hang住(超時時間爲pingTimeout),若是hang住期間服務器一直沒有數據產生,則須要等到客戶端發送下一個POST請求時,此時服務器會往隊列中存儲POST請求中的消息,這樣上一個GET請求才會返回。若是upgrade到了websocket鏈接,則探測成功以後會按期ping/pong來保活鏈接。流程以下圖所示:

socketio通訊流程圖

爲方便描述,下面提到的engine.io服務器對應源文件是engineio/server.py,engine.io套接字對應源文件engineio/socket.py,而socket.io服務器則對應socketio/server.py。下面分析下socket.io鏈接創建、消息接收和發送、鏈接關閉過程。socket.io版本爲1.9.0,engine.io版本爲2.0.4。

鏈接創建

首先,客戶端會發送一個polling請求來創建鏈接。此時的請求參數沒有sid,表示要創建鏈接。 engine.io服務器經過handle_get_request()handle_post_request()方法來分別處理初始化鏈接以及長輪詢中的 GET 和 POST 請求。

socket.io在初始化時便註冊了3個事件到engine.io的handlers中,分別是connect(處理函數_handle_eio_connect),message(_handle_eio_message),disconnect(_handle_eio_disconnect),在engine.io套接字接收到了上述三個類型的消息後,在自身作了對應處理後都會觸發socket.io中的對應的處理函數作進一步處理。

當接收到GET請求且沒有sid參數時,則engine.io服務器會調用 _handle_connect()方法來創建鏈接。這個方法主要工做是爲當前客戶端生成sid,建立Socket對象並保存到engine.io服務器的sockets集合中。作了這些初始化工做後,engine.io服務器會發送一個OPEN類型的數據包給客戶端,接着會觸發socket.io服務器的connect事件。

客戶端第一次鏈接的時候,socket.io也要作一些初始化的工做,這是在socket.io服務器的_handle_eio_connect()處理的。這裏作的事情主要有幾點:

  • 初始化manager,好比用的是redis作後端隊列的話,則須要初始化redis_manager,包括設置redis鏈接配置,若是沒有訂閱頻道則還要訂閱頻道flask_socketio(默認頻道是"socket.io"),若是用到gevent,則還要對redis模塊的socket庫打monkey-patch等。

  • 將該客戶端加入到默認房間None,sid中。

  • 調用代碼中對connect事件註冊的函數。以下面這個,注意下,socket.io中也有個用於事件處理的handlers,它保存的是在後端代碼中對socket.io事件註冊的函數(開發者定義的),而engine.io的handlers中保存的函數是socket.io註冊的那三個針對connect,message和disconnect事件的固定的處理函數。

    socketio.on("connect")
    def test_connect():
        print "client connected"
    複製代碼
  • 發送一個sockeio的connect數據包給客戶端。

最後在響應中engine.io會爲客戶端設置一個名爲io值爲sid的cookie,響應內容payload包括兩個數據包,一個是engine.io的OPEN數據包,內容爲sid,pingTimeout等配置和參數;另外一個是socket.io的connect數據包,內容爲40。其中4表示的是engine.io的message消息,0則表示socket.io的connect消息,以字節流返回。這裏的pingTimeout客戶端和服務端共享這個配置,用於檢測對端是否超時。

接着會發送一個輪詢請求和websocket握手請求,若是websocket握手成功後客戶端會發送2 probe探測幀,服務端迴應3 probe,而後客戶端會發送內容爲5的Upgrade幀,服務端迴應內容爲6的noop幀。探測幀檢查經過後,客戶端中止輪詢請求,將傳輸通道轉到websocket鏈接,轉到websocket後,接下來就開始按期(默認是25秒)的 ping/pong(這是socket.io自定義的ping/pong,除此以外,uwsgi也會按期(默認30秒)對客戶端ping,客戶端迴應pong,這個在chrome的Frames裏面是看不到的,須要藉助wireshark或者用其餘瀏覽器插件來觀察)。

服務端消息接收流程

對接收消息的則統一經過engine.io套接字的receive()函數處理:

  • 對於輪詢,一旦收到了polling的POST請求,則會調用receive往該socket的消息隊列裏面發送消息,從而釋放以前hang住的GET請求。
  • 對於websocket:
    • 收到了ping,則會立刻響應一個pong。
    • 接收到了upgrade消息,則立刻發送一個noop消息。
    • 接收到了message,則調用socket.io註冊到engine.io的_handle_eio_message方法來處理socket.io本身定義的各類消息。
  • 由於服務端接收消息並無用到消息隊列來處理,因此要求同一個客戶端的請求必須落到同一個worker上面,不然接收消息時會報Invalid session錯誤。

服務端消息發送流程

而服務端要給客戶端發送消息,則須要經過socket.io服務器的emit方法,注意emit方法是針對room來發送消息的,若是是context-aware的,則emit默認是對namespace爲/且room名爲sid的房間發送,若是是context-free的,則默認是廣播即對全部鏈接的客戶端發送消息(固然在context-free的場景下面,你也能夠指定room來只給指定room推送消息)。

socket.io要實現多進程以及廣播,房間等功能,勢必須要接入一個redis之類的消息隊列,進而socket.io的emit會調用對應隊列管理器pubsub_manager的emit方法,好比用redis作消息隊列則最終調用 redis_manager中的_publish() 方法經過redis的訂閱發佈功能將消息推送到flask_socketio頻道。另外一方面,每一個進程在初始化時都訂閱了 flask_socketio頻道,並且都有一個協程(或線程)在監聽頻道中是否有消息,一旦有消息,就會調用pubsub_manager._handle_emit()方法對本機對應的socket發送對應的消息,最終是經過socket.io服務器的_emit_internal()方法實現對本機中room爲sid的全部socket發送消息的,若是room爲None,則就是廣播,即對全部鏈接到本機的全部客戶端推送消息。

socket.io服務器發送消息要基於engine.io消息包裝,因此歸結到底仍是調用的engine.io套接字中的send()方法。engine.io爲每一個客戶端都會維護一個消息隊列,發送數據都是先存到隊列裏面待拉取,websocket除了探測幀以外的其餘數據幀也都是經過該消息隊列發送。

關閉鏈接(只分析websocket)

websocket可能異常關閉的狀況不少。好比客戶端發了ping後等待pong超時關閉,服務端接收到ping跟上一個ping之間超過了pingTimeout;用的uwsgi的話,uwsgi發送ping,若是在websockets-pong-tolerance(默認3秒)內接收不到pong迴應,也會關閉鏈接;還有若是nginx的proxy_read_timeout配置的比pingInterval小等。

只要不是客戶端主動關閉鏈接,socket.io就會在鏈接出錯後不斷重試以創建鏈接。重試間隔和重試次數由reconnectionDelayMax(默認5秒)和reconnectionAttempts(默認一直重連)設定。下面討論客戶端正常關閉的狀況,各類異常關閉狀況請具體狀況具體分析。

客戶端主動關閉

假定客戶端調用socket.close()主動關閉websocket鏈接,則會先發送一個消息41(4:engine.io的message,1:socket.io的disconnect)再關閉鏈接。如前面提到,engine.io套接字接收到消息後會交給socket.io服務器註冊的 _handle_eio_message()處理。最終是調用的socket.io的_handle_disconnect(),該函數工做包括調用socketio.on("disconnect")註冊的函數,將該客戶端從加入的房間中移除,清理環境變量等。

uwsgi而接收到客戶端關閉websocket鏈接消息後會關閉服務端到客戶端的鏈接。engine.io服務器的websocket數據接收例程ws.wait()由於鏈接關閉報IOError,觸發服務端循環收發數據過程中止,並從維護的sockets集合中移除這個關閉的sid。而後調用engine.io套接字的close(wait=True, abort=True)方法,因爲是客戶端主動關閉,這裏就不會再給客戶端發送一個CLOSE消息。而 engine.io服務器的close方法同樣會觸發socket.io以前註冊的disconnect事件處理函數,因爲前面已經調用_handle_disconnect()處理了關閉鏈接事件,因此這裏_handle_eio_disconnect()不須要再作其餘操做(這個操做不是多餘的,其做用見後一節)。

瀏覽器關閉

直接關閉瀏覽器發送的是websocket的標準CLOSE消息,opcode爲8。socket.io服務端處理方式基本一致,因爲這種狀況下並無發送socket.io的關閉消息41,socket.io的關閉操做須要等到engine.io觸發的_handle_eio_disconnect()中處理,這就是前一節中爲何engine.io服務器後面還要多調用一次 _handle_eio_disconnect()的緣由所在。

5 實例

協議說明容易讓人有點迷糊,websocket,engine.io,socket.io,各自協議是如何工做的,看看實例可能會比較清晰,爲了方便測試,我寫了個Dockerfile,安裝了docker的童鞋能夠拉取代碼執行 bin/start.sh 便可啓動擁有完整的 nginx+uwsgi+gevent+flask_socketio測試環境的容器開始測試,瀏覽器打開http://127.0.0.1便可測試。flask_socketio支持的異步模式有threading, eventlet, gevent 和 gevent_uwsgi等,個人測試環境async_mode用的是gevent_uwsgi,完整代碼見 這裏

對於不支持websocket的低版本瀏覽器,socket.io會退化爲長輪詢的方式,經過按期的發送GET, POST請求來拉取數據。沒有數據時,會將請求數據的GET請求hang住,直到服務端有數據產生或者客戶端的POST請求將GET請求釋放,釋放以後會緊接着再次發送一個GET請求,除此以外,數據編解碼和處理流程與websocket方式基本一致。實例只針對websocket進行分析,若是要測試長輪詢,能夠將nginx配置中的proxy_set_header中的Connection和Upgrade去掉便可。

爲了觀察socket.io客戶端的調用流程,能夠設置localStorage.debug = '*';,測試的前段代碼片斷以下(完整代碼見倉庫):

<script type="text/javascript" charset="utf-8">
    var socket = io.connect('/', {
        "reconnectionDelayMax": 10000,
        "reconnectionAttempts": 10
    });
    socket.on('connect', function() {
        $('#log').append('<br>' + $('<div/>').text('connected').html());
    })

    $(document).ready(function() {

        socket.on('server_response', function(msg) {
            $('#log').append('<br>' + $('<div/>').text('Received from server: ' + ': ' + msg.data).html());
        });

        $('form#emit').submit(function(event) {
            socket.emit('client_event', {data: $('#emit_data').val()});
            return false;
        });
    });

 </script>
複製代碼

測試代碼比較簡單,引入socket.io的js庫文件,而後在鏈接成功後在頁面顯示「connected」,在輸入框輸入文字,能夠經過鏈接發送至服務器,而後服務器將瀏覽器發送的字符串加上server標識回顯回來。

創建鏈接

在chrome中打開頁面能夠看到發了3個請求,分別是:

1 http://127.0.0.1/socket.io/?EIO=3&transport=polling&t=MAkXxBR
2 http://127.0.0.1/socket.io/? EIO=3&transport=polling&t=MAkXxEz&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4
3 ws://127.0.0.1/socket.io/?EIO=3&transport=websocket&sid=9c54f9c1759c4dbab8f3ce20c1fe43a4
複製代碼

請求默認路徑是/socket.io,注意命名空間並不會在路徑中,而是在參數中傳遞。第1個請求是polling,EIO是engine.io協議的版本號,t是一個隨機字符串,第一個請求時還尚未生成sid。服務端接收到消息後會調用engine.io/server.py_handle_connect()創建鏈接。

返回的結果是

## Response Headers: Content-Type: application/octet-stream ##
�ÿ0{"pingInterval":25000,"pingTimeout":60000,"upgrades":["websocket"],"sid":"9c54f9c1759c4dbab8f3ce20c1fe43a4"}�ÿ40
複製代碼

能夠看到,這裏返回的是字節流的payload,content-type爲"application/octet-stream"。這個payload其實包含兩個packet,第一個packet是engine.io的OPEN消息,類型爲0,它的內容爲pingInterval,pingTimeout,sid等;第二個packet類型是4(message),而它的數據內容是0,表示socket.io的CONNECT。而其中的看起來亂碼的部分實則是前面提到的payload編碼中的長度的編碼\x00\x01\x00\x09\xff\x00\x02\xff

若是在js代碼中將io.connect的namespace參數不用默認的/,而設置爲/demo,那麼鏈接時還會發一個POST請求帶上7:40/demo的字符格式payload(其中7是數據長度,4是engineio的message,0則是表示socket.io的connect類型消息),服務器接收到該POST請求後會將該客戶端再加入到/demo命名空間中。

  • 第2個請求是輪詢請求,若是websocket創建並測試成功(使用內容爲probe的ping/pong幀)後,會暫停輪詢請求。能夠看到輪詢請求一直hang住到websocket創建並測試成功後才返回,響應結果是�ÿ6,前面亂碼部分是payload長度編碼\x00\x01\xff,後面的數字6是engine.io的noop消息。

  • 第3個請求是websocket握手請求,握手成功後,能夠在chrome的Frames裏面看到websocket的數據幀交互流程,能夠看到如前面分析,確實是先發的探測幀,而後是Upgrade幀,接着就是按期的ping/pong幀了。

    2probe
    3probe
    5
    2
    3
    ...
    複製代碼

客戶端發送消息給服務端

若是要發送消息給服務器,在瀏覽器輸入框輸入test,點擊echo按鈕,能夠看到websocket發送的幀的內容以下,其中4是engine.io的message類型標識,2是socket.io的EVENT類型標識,然後面則是事件名稱和數據,數據能夠是字符串,字典,列表等類型。

42["client_event",{"data":"test"}]
複製代碼

服務端接收消息流程

而服務端接收消息並返回一個新的event爲"server_response",數據爲"TEST",代碼以下,其中socketio是flask_socketio模塊的SocketIO對象,它提供了裝飾器方法 on將自定義的client_event和處理函數test_client_event註冊到sockerio服務器的handlers中。

當接收到 client_event 消息時,會經過sockerio/server.py中的 _handle_eio_message()方法處理消息,對於socket.io的EVENT類型的消息最終會經過_trigger_event()方法處理,該方法也就是從handlers中拿到client_event對應的處理函數並調用之。

from flask_socketio import SocketIO, emit
socketio = SocketIO(...)
    
@socketio.on("client_event")
def test_client_event(msg):
    emit("server_response", {"data": msg["data"].upper()})
複製代碼

服務端發送消息到客戶端

服務端發送消息經過 flask_socketio提供的emit方法實現,如前一節分析的,最終仍是經過的engine.io包裝成engine.io的消息格式後發出。

42["server_response",{"data":"TEST"}]
複製代碼

關閉鏈接

客戶端要主動關閉鏈接,在JS中調用 socket.close() 便可,此時發送的數據包爲 41,其中4表明的是engine.io的消息類型message,而數據1則是指的socket.io的消息類型disconnect,關閉流程見上一章的說明。

幾個小點

假如客戶端鏈接時namespace爲/demo,而服務端發送消息emit(namespace="/")指定的命名空間爲默認的/,那這個消息是否會發給客戶端?答案是會。由於前面說到,每一個客戶端默認加入到了/中,因此,服務端的消息確定會發給客戶端的,可是客戶端接收到消息會檢查namespace是否與其connect時的namespace一致,若是不一致,雖然接收到了消息可是並不會觸發客戶端的操做。

若是客戶端想知道本身發送的事件是否被服務端成功接收,能夠在emit裏面加回調函數,以下所示。加了回調函數後客戶端發送的消息格式爲421["client_event",{"data":"test"}],即在原來基礎上多加了一個id標識1,服務端接收到事件後,發現消息中有id,則會多發送一個socket.io的ACK包給客戶端,內容爲該事件處理函數的返回值,客戶端收到ACK包後會調用下面的callback。

socket.emit('client_event', {data: $('#message').val()}, callback);
複製代碼

而服務端若是要確認發送的消息是否被客戶端接收到,能夠在emit函數裏面指定 callback參數,而客戶端的事件監聽裏面回調函數加多一個ack參數並調用ack函數便可,這樣客戶端收到了服務端的消息後,調用ack時就會發送一個ACK消息給服務端,ack函數裏面也能夠傳參數給服務端。

### 服務端
flask_socketio.emit("server_response", {"data": "xxx"}, callback=callback)

### 客戶端
socket.on('server_response', function(msg, ack) {
      ...  
      ack();
 });
複製代碼

6 總結

本文示例中,爲了便於分析,只用了默認的namespace和room,而在項目中能夠根據業務須要使用namespace,room等高級特性。在nginx+uwsgi使用socket.io時,注意nginx的超時配置proxy_read_timeout和uwsgi的websocket超時配置websocket-ping-freq和websockets-pong-tolerance,配置不當會致使socke.io由於websocket的ping/pong超時而不斷重連。若是要禁用websocket,能夠在SocketIO參數裏面加上allow_upgrades=False便可。

調研了一些其餘系統WEB端的推送機制,微信網頁版沒有用websocket,而是統一用的長輪詢的方式。今日頭條WEB版其實都沒有實時推送信息流,而是定時提示用戶去手動點擊刷新。即刻WEB版則是用的短鏈接按期拉取是否有未讀消息,不過它也用到了socket.io。

須要注意,不要在服務端socketio.on("connect)"調用emit函數或者過多的其餘操做,不然容易引發服務端鏈接不關閉的問題。

相關文章
相關標籤/搜索