雨露均沾的OkHttp—WebSocket長鏈接的使用&源碼解析

前言

最近老闆又來新需求了,要作一個物聯網相關的app,其中有個需求是客戶端須要收發服務器不按期發出的消息。
心裏OS:
🤔 這咋整呢?經過接口輪詢?定時訪問接口,有數據就更新?
🤔 不行不行,這樣浪費資源了,還耗電,會致使不少請求都是無效的網絡操做。
🤔 那就長鏈接唄?WebSocket協議好像不錯,經過握手創建長鏈接後,能夠隨時收發服務器的消息。那就它了!
🤔 怎麼集成呢?正好前段時間複習OkHttp源碼的時候發現了它是支持Websocket協議的,那就用它試試吧!(戲好多,演不下去了🤮)java

開淦!git

WebSocket介紹

先簡單介紹下WebSocket
咱們都知道Http是處於應用層的一個通訊協議,可是隻支持單向主動通訊,作不到服務器主動向客戶端推送消息。並且Http是無狀態的,即每次通訊都沒有關聯性,致使跟服務器關係不緊密。github

爲了解決和服務器長時間通訊的痛點呢,HTML5規範引出了WebSocket協議(知道這名字咋來的吧,人家HTML5規範引出的,隨爸姓),是一種創建在TCP協議基礎上的全雙工通訊的協議。他跟Http同屬於應用層協議,下層仍是須要經過TCP創建鏈接。web

可是,WebSocketTCP鏈接創建後,還要經過Http進行一次握手,也就是經過Http發送一條GET請求消息給服務器,告訴服務器我要創建WebSocket鏈接了,你準備好哦,具體作法就是在頭部信息中添加相關參數。而後服務器響應我知道了,而且將鏈接協議改爲WebSocket,開始創建長鏈接。面試

這裏貼上請求頭和響應頭信息,從網上找了一張圖:服務器

3851594110877_.pic.jpg

簡單說明下參數:websocket

  • URL通常是以ws或者wss開頭,ws對應Websocket協議,wss對應在TLS之上的WebSocket。相似於HttpHttps的關係。
  • 請求方法爲GET方法。
  • Connection:Upgrade,表示客戶端要鏈接升級,不用Http協議。
  • Upgrade:websocket, 表示客戶端要升級創建Websocket鏈接。
  • Sec-Websocket-Key:key, 這個key是隨機生成的,服務器會經過這個參數驗證該請求是否有效。
  • Sec-WebSocket-Version:13, websocket使用的協議,通常就是13。
  • Sec-webSocket-Extension:permessage-deflate,客戶端指定的一些擴展協議,好比這裏permessage-deflate就是WebSocket的一種壓縮協議。
  • 響應碼101,表示響應協議升級,後續的數據交互都按照Upgradet指定的WebSocket協議來。

OkHttp實現

添加OkHttp依賴

implementation("com.squareup.okhttp3:okhttp:4.7.2")

實現代碼

首先是初始化OkHttpClientWebSocket實例:網絡

/**
     * 初始化WebSocket
     */
    public void init() {
        mWbSocketUrl = "ws://echo.websocket.org";
        mClient = new OkHttpClient.Builder()
                .pingInterval(10, TimeUnit.SECONDS)
                .build();
        Request request = new Request.Builder()
                .url(mWbSocketUrl)
                .build();
        mWebSocket = mClient.newWebSocket(request, new WsListener());
    }

這裏主要是配置了OkHttp的一些參數,以及WebSocket的鏈接地址。其中newWebSocket方法就是進行WebSocket的初始化和鏈接。app

這裏要注意的點是pingInterval方法的配置,這個方法主要是用來設置WebSocket鏈接的保活。
相信作過長鏈接的同窗都知道,一個長鏈接通常要隔幾秒發送一條消息告訴服務器我在線,而服務器也會回覆一個消息表示收到了,這樣就確認了鏈接正常,客戶端和服務器端都在線。less

若是服務器沒有按時收到這個消息那麼服務器可能就會主動關閉這個鏈接,節約資源。
客戶端沒有正常收到這個返回的消息,也會作一些相似重連的操做,因此這個保活消息很是重要。

咱們稱這個消息叫做心跳包,通常用PING,PONG表示,像乒乓球同樣,一來一回。
因此這裏的pingInterval就是設置心跳包發送的間隔時間,設置了這個方法以後,OkHttp就會自動幫咱們發送心跳包事件,也就是ping包。當間隔時間到了,沒有收到pong包的話,監聽事件中的onFailure方法就會被調用,此時咱們就能夠進行重連。

可是因爲實際業務需求不同,以及okhttp中心跳包事件給予咱們權限較少,因此咱們也能夠本身完成心跳包事件,即在WebSocket鏈接成功以後,開始定時發送ping包,在下一次發送ping包以前檢查上一個pong包是否收到,若是沒收到,就視爲異常,開始重連。感興趣的同窗能夠看看文末的相關源碼。

創建鏈接後,咱們就能夠正常發送和讀取消息了,也就是在上文WsListener監聽事件中表現:

//監聽事件,用於收消息,監聽鏈接的狀態
    class WsListener extends WebSocketListener {
        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosed(webSocket, code, reason);
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            super.onClosing(webSocket, code, reason);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
            super.onFailure(webSocket, t, response);
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            super.onMessage(webSocket, text);
            Log.e(TAG, "客戶端收到消息:" + text);
            onWSDataChanged(DATE_NORMAL, text);
           //測試發消息
            webSocket.send("我是客戶端,你好啊");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
            super.onMessage(webSocket, bytes);
        }

        @Override
        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            super.onOpen(webSocket, response);
            Log.e(TAG,"鏈接成功!");
        }
    }
    
    
    //發送String消息
    public void send(final String message) {
        if (mWebSocket != null) {
            mWebSocket.send(message);
        }
    }
    
    /**
     * 發送byte消息
     * @param message
     */
    public void send(final ByteString message) {
        if (mWebSocket != null) {
            mWebSocket.send(message);
        }
    }    

    //主動斷開鏈接
    public void disconnect(int code, String reason) {
        if (mWebSocket != null)
            mWebSocket.close(code, reason);
    }

這裏要注意,回調的方法都是在子線程回調的,若是須要更新UI,須要切換到主線程。

基本操做就這麼多,仍是很簡單的吧,初始化Websocket——鏈接——鏈接成功——收發消息。

其中WebSocket類是一個操做接口,主要提供瞭如下幾個方法

  • send(text: String) 發送一個String類型的消息
  • send(bytes: ByteString) 發送一個二進制類型的消息
  • close(code: Int, reason: String?) 關閉WebSocket鏈接

若是有同窗想測試下WebSocket的功能可是又沒有實際的服務器,怎麼辦呢?
其實OkHttp官方有一個MockWebSocket服務,能夠用來模擬服務端,下面咱們一塊兒試一下:

模擬服務器

首先集成MockWebSocket服務庫:

implementation 'com.squareup.okhttp3:mockwebserver:4.7.2'

而後就能夠新建MockWebServer,並加入MockResponse做爲接收消息的響應。

MockWebServer mMockWebServer = new MockWebServer();
        MockResponse response = new MockResponse()
                .withWebSocketUpgrade(new WebSocketListener() {
                    @Override
                    public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
                        super.onOpen(webSocket, response);
                        //有客戶端鏈接時回調
                        Log.e(TAG, "服務器收到客戶端鏈接成功回調:");
                        mWebSocket = webSocket;
                        mWebSocket.send("我是服務器,你好呀");
                    }

                    @Override
                    public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
                        super.onMessage(webSocket, text);

                        Log.e(TAG, "服務器收到消息:" + text);
                    }

                    @Override
                    public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
                        super.onClosed(webSocket, code, reason);
                        Log.e(TAG, "onClosed:");
                    }
                });

        mMockWebServer.enqueue(response);

這裏服務器端在收到客戶端鏈接成功消息後,給客戶端發送了一條消息。
要注意的是這段代碼要在子線程執行,由於主線程不能進行網絡操做。

而後就能夠去初始化Websocket客戶端了:

//獲取鏈接url,初始化websocket客戶端
        String websocketUrl = "ws://" + mMockWebServer.getHostName() + ":" + mMockWebServer.getPort() + "/";
        WSManager.getInstance().init(websocketUrl);

ok,運行項目

//運行結果
    E/jimu: mWbSocketUrl=ws://localhost:38355/
    E/jimu: 服務器收到客戶端鏈接成功回調:
    E/jimu: 鏈接成功!
    E/jimu: 客戶端收到消息:我是服務器,你好呀
    E/jimu: 服務器收到消息:我是客戶端,你好啊

相關的WebSocket管理類和模擬服務器類我也上傳到github了,有須要的同窗能夠文末自取。

源碼解析

WebSocket整個流程無非三個功能:鏈接,接收消息,發送消息。下面咱們就從這三個方面分析下具體是怎麼實現的。

鏈接

經過上面的代碼咱們得知,WebSocket鏈接是經過newWebSocket方法。直接點進去看這個方法:

override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
    val webSocket = RealWebSocket(
        taskRunner = TaskRunner.INSTANCE,
        originalRequest = request,
        listener = listener,
        random = Random(),
        pingIntervalMillis = pingIntervalMillis.toLong(),
        extensions = null, // Always null for clients.
        minimumDeflateSize = minWebSocketMessageToCompress
    )
    webSocket.connect(this)
    return webSocket
  }

這裏作了兩件事:

  • 初始化RealWebSocket,主要是設置了一些參數(好比pingIntervalMillis心跳包時間間隔,還有監聽事件之類的)
  • connect 方法進行WebSocket鏈接

繼續查看connect方法:

connect(WebSocket鏈接握手)

fun connect(client: OkHttpClient) {
    //***
    val webSocketClient = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build()
    val request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Extensions", "permessage-deflate")
        .build()
    call = RealCall(webSocketClient, request, forWebSocket = true)
    call!!.enqueue(object : Callback {
      override fun onResponse(call: Call, response: Response) {
        
        //獲得數據流
        val streams: Streams
        try {
          checkUpgradeSuccess(response, exchange)
          streams = exchange!!.newWebSocketStreams()
        } 
        
        //***
        // Process all web socket messages.
        try {
          val name = "$okHttpName WebSocket ${request.url.redact()}"
          initReaderAndWriter(name, streams)
          listener.onOpen(this@RealWebSocket, response)
          loopReader()
        } catch (e: Exception) {
          failWebSocket(e, null)
        }
      }
    })
  }

上一篇使用篇文章中說過,Websocket鏈接須要一次Http協議的握手,而後才能把協議升級成WebSocket。因此這段代碼就體現出這個功能了。

首先就new了一個用來進行Http鏈接的request,其中Header的參數就表示我要進行WebSocket鏈接了,參數解析以下:

  • Connection:Upgrade,表示客戶端要鏈接升級
  • Upgrade:websocket, 表示客戶端要升級創建Websocket鏈接
  • Sec-Websocket-Key:key, 這個key是隨機生成的,服務器會經過這個參數驗證該請求是否有效
  • Sec-WebSocket-Version:13, websocket使用的版本,通常就是13
  • Sec-webSocket-Extension:permessage-deflate,客戶端指定的一些擴展協議,好比這裏permessage-deflate就是WebSocket的一種壓縮協議。

Header設置好以後,就調用了callenqueue方法,這個方法你們應該都很熟悉吧,OkHttp裏面對於Http請求的異步請求就是這個方法。
至此,握手結束,服務器返回響應碼101,表示協議升級。

而後咱們繼續看看獲取服務器響應以後又作了什麼?
在發送Http請求成功以後,onResponse響應方法裏面主要表現爲四個處理邏輯:

  • Http流轉換成WebSocket流,獲得Streams對象,這個流後面會轉化成輸入流和輸出流,也就是進行發送和讀取的操做流
  • listener.onOpen(this@RealWebSocket, response),回調了接口WebSocketListeneronOpen方法,告訴用戶WebSocket已經鏈接
  • initReaderAndWriter(name, streams)
  • loopReader()

前兩個邏輯仍是比較好理解,主要是後兩個方法,咱們分別解析下。
首先看initReaderAndWriter方法。

initReaderAndWriter(初始化輸入流輸出流)

//RealWebSocket.kt

  @Throws(IOException::class)
  fun initReaderAndWriter(name: String, streams: Streams) {
    val extensions = this.extensions!!
    synchronized(this) {
      //***
      
      //寫數據,發送數據的工具類
      this.writer = WebSocketWriter()
      
      //設置心跳包事件
      if (pingIntervalMillis != 0L) {
        val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
        taskQueue.schedule("$name ping", pingIntervalNanos) {
          writePingFrame()
          return@schedule pingIntervalNanos
        }
      }
      //***
    }

		//***
		
		//讀取數據的工具類
    reader = WebSocketReader(     
      ***
      frameCallback = this,
      ***
    )
  }
  
  internal fun writePingFrame() {
   //***
    try {
      writer.writePing(ByteString.EMPTY)
    } catch (e: IOException) {
      failWebSocket(e, null)
    }
  }

這個方法主要乾了兩件事:

  • 實例化輸出流輸入流工具類,也就是WebSocketWriterWebSocketReader,用來處理數據的收發。
  • 設置心跳包事件。若是pingIntervalMillis參數不爲0,就經過計時器,每隔pingIntervalNanos發送一個ping消息。其中writePingFrame方法就是發送了ping幀數據。

接收消息處理消息

loopReader

接着看看這個loopReader方法是幹什麼的,看這個名字咱們大膽猜想下,難道這個方法就是用來循環讀取數據的?去代碼裏找找答案:

fun loopReader() {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader!!.processNextFrame()
    }
  }

代碼很簡單,一個while循環,循環條件是receivedCloseCode == -1的時候,作的事情是reader!!.processNextFrame()方法。繼續:

//WebSocketWriter.kt
  fun processNextFrame() {
    //讀取頭部信息
    readHeader()
    if (isControlFrame) {
      //若是是控制幀,讀取控制幀內容
      readControlFrame()
    } else {
      //讀取普通消息內容
      readMessageFrame()
    }
  }
  
  //讀取頭部信息
  @Throws(IOException::class, ProtocolException::class)
  private fun readHeader() {
    if (closed) throw IOException("closed")
    
    try {
     //讀取數據,獲取數據幀的前8位
      b0 = source.readByte() and 0xff
    } finally {
      source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
    }    
    //***
    //獲取數據幀的opcode(數據格式)
    opcode = b0 and B0_MASK_OPCODE
    //是否爲最終幀
    isFinalFrame = b0 and B0_FLAG_FIN != 0
    //是否爲控制幀(指令)
    isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0

    //判斷最終幀,獲取幀長度等等
  }  
  
  
  //讀取控制幀(指令)
    @Throws(IOException::class)
  private fun readControlFrame() {
    if (frameLength > 0L) {
      source.readFully(controlFrameBuffer, frameLength)
    }

    when (opcode) {
      OPCODE_CONTROL_PING -> {
      //ping 幀
        frameCallback.onReadPing(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_PONG -> {
        //pong 幀
        frameCallback.onReadPong(controlFrameBuffer.readByteString())
      }
      OPCODE_CONTROL_CLOSE -> {
        //關閉 幀
        var code = CLOSE_NO_STATUS_CODE
        var reason = ""
        val bufferSize = controlFrameBuffer.size
        if (bufferSize == 1L) {
          throw ProtocolException("Malformed close payload length of 1.")
        } else if (bufferSize != 0L) {
          code = controlFrameBuffer.readShort().toInt()
          reason = controlFrameBuffer.readUtf8()
          val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
          if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
        }
        //回調onReadClose方法
        frameCallback.onReadClose(code, reason)
        closed = true
      }
    }
  }
  
  //讀取普通消息
  @Throws(IOException::class)
  private fun readMessageFrame() {
    
    readMessage()

    if (readingCompressedMessage) {
      val messageInflater = this.messageInflater
          ?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
      messageInflater.inflate(messageFrameBuffer)
    }

    if (opcode == OPCODE_TEXT) {
      frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
    } else {
      frameCallback.onReadMessage(messageFrameBuffer.readByteString())
    }
  }

代碼仍是比較直觀,這個processNextFrame其實就是讀取數據用的,首先讀取頭部信息,獲取數據幀的類型,判斷是否爲控制幀,再分別去讀取控制幀數據或者普通消息幀數據。

數據幀格式

問題來了,什麼是數據頭部信息,什麼是控制幀
這裏就要說下WebSocket的數據幀了,先附上一個數據幀格式:

0 1 2 3 4 5 6 7    0 1 2 3 4 5 6 7  0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
  +-+-+-+-+-------+  +-+-------------+ +-----------------------------+
  |F|R|R|R| OP    |  |M| LENGTH      |   Extended payload length
  |I|S|S|S| CODE  |  |A|             |  (if LENGTH=126)
  |N|V|V|V|       |  |S|             |
  | |1|2|3|       |  |K|             |
  +-+-+-+-+-------+  +-+-------------+
  |                      Extended payload length(if LENGTH=127)
  +                                  +-------------------------------
  |      Extended payload length     | Masking-key,if Mask set to 1
  +----------------------------------+-------------------------------
  |   Masking-key                    |       Data
  +----------------------------------+-------------------------------
  |                                Data
  +----------------------------------+-------------------------------

我認可,我懵逼了。
冷靜冷靜,一步一步分析下吧。

首先每一行表明4個字節,一共也就是32位數,哦,那也就是幾個字節而已嘛,每一個字節有他本身的表明意義唄,這樣想是否是就很簡單了,下面來具體看看每一個字節。

第1個字節:

  • 第一位是FIN碼,其實就是一個標示位,由於數據可能多幀操做嘛,因此多幀狀況下,只有最後一幀的FIN設置成1,標示結束幀,前面全部幀設置爲0。
  • 第二位到第四位是RSV碼,通常通訊兩端沒有設置自定義協議,就默認爲0。
  • 後四位是opcode,咱們叫它操做碼。這個就是判斷這個數據幀的類型了,通常有如下幾個被定義好的類型:

1) 0x0 表示附加數據幀
2) 0x1 表示文本數據幀
3) 0x2 表示二進制數據幀
4) 0x3-7 保留用於將來的非控制幀
5) 0x8 表示鏈接關閉
6) 0x9 表示ping
7) 0xA 表示pong
8) 0xB-F 保留用於將來的非控制幀

是否是發現了些什麼,這不就對應了咱們應用中的幾種格式嗎?2和3對應的是普通消息幀,包括了文本和二進制數據。567對應的就是控制幀格式,包括了close,ping,pong

第2個字節:

  • 第一位是Mask掩碼,其實就是標識數據是否加密混淆,1表明數據通過掩碼的,0是沒有通過掩碼的,若是是1的話,後續就會有4個字節表明掩碼key,也就是數據幀中Masking-key所處的位置。
  • 後7位是LENGTH,用來標示數據長度。由於只有7位,因此最大隻能儲存1111111對應的十進制數127長度的數據,若是須要更大的數據,這個儲存長度確定就不夠了。
    因此規定來了,1) 小於126長度則數據用這七位表示實際長度。2) 若是長度設置爲126,也就是二進制1111110,就表明取額外2個字節表示數據長度,共是16位表示數據長度。3) 若是長度設置爲127,也就是二進制1111111,就表明取額外8個字節,共是64位表示數據長度。

須要注意的是LENGHT的三種狀況在一個數據幀裏面只會出現一種狀況,不共存,因此在圖中是用if表示。一樣的,Masking-key也是當Mask爲1的時候才存在。

因此也就有了數據幀裏面的Extended payload length(LENGTH=126)所處的2個字節,以及Extended payload length(LENGTH=127)所處的8個字節。

最後的字節部分天然就是掩碼key(Mask爲1的時候才存在)和具體的傳輸數據了。
仍是有點暈吧😷,來張圖總結下:
數據幀格式.jpeg

好了,瞭解了數據幀格式後,咱們再來讀源碼就清晰多了。
先看看怎麼讀的頭部信息並解析的:

//取數據幀前8位數據
  b0 = source.readByte() and 0xff
  //獲取數據幀的opcode(數據格式)
  opcode = b0 and B0_MASK_OPCODE(15)
  //是否爲最終幀
  isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
  //是否爲控制幀(指令)
  isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
  • 第一句獲取頭信息,and是按位與計算,and 0xff 意思就是按位與11111111,因此頭部信息其實就是取了數據幀的前8位數據,一個字節。
  • 第二句獲取opcodeand 15也就是按位與00001111,其實也就是取了後四位數據,恰好對應上opcode的位置,第一個字節的後四位。
  • 第三句獲取是否爲最終幀,剛纔數據幀格式中說過,第一位FIN標識了是否爲最後一幀數據,1表明結束幀,因此這裏and 128 也就是按位與10000000,也就是取的第一位數。
  • 第四句獲取是否爲控制幀,and 8也就是按位與00001000,取得是第五位,也就是opcode的第一位,這是什麼意思呢?咱們看看剛纔的數據幀格式,發現從0x8開始就是所謂的控制幀了。0x8對應的二進制是1000,0x7對應的二進制是0111。發現了吧,若是爲控制幀的時候,opcode第一位確定是爲1的,因此這裏就判斷的第五位。

後面還有讀取第二個字節的代碼,你們能夠本身沿着這個思路本身看看,包括了讀取MASK,讀取數據長度的三種長度等。

因此這個processNextFrame方法主要作了三件事:

  • readHeader方法中,判斷了是否爲控制幀,是否爲結束幀,而後獲取了Mask標識,幀長度等參數
  • readControlFrame方法中,主要處理了該幀數據爲ping,pong,close三種狀況,而且在收到close關閉幀的狀況下,回調了onReadClose方法,這個待會要細看下。
  • readMessageFrame方法中,主要是讀取了消息後,回調了onReadMessage方法。

至此能夠發現,其實WebSocket傳輸數據並非一個簡單的事,只是OkHttp都幫咱們封裝好了,咱們只須要直接傳輸數據便可,感謝這些三方庫爲咱們開發做出的貢獻,不知道何時我也能作出點貢獻呢🤔。

對了,剛纔說回調也很重要,接着看看。onReadCloseonReadMessage回調到哪了呢?還記得上文初始化WebSocketWriter的時候設置了回調接口嗎。因此就是回調給RealWebSocket了:

//RealWebSocket.kt
  override fun onReadClose(code: Int, reason: String) {
    require(code != -1)

    var toClose: Streams? = null
    var readerToClose: WebSocketReader? = null
    var writerToClose: WebSocketWriter? = null
    synchronized(this) {
      check(receivedCloseCode == -1) { "already closed" }
      receivedCloseCode = code
      receivedCloseReason = reason 
      //...
    }

    try {
      listener.onClosing(this, code, reason)

      if (toClose != null) {
        listener.onClosed(this, code, reason)
      }
    } finally {
      toClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }
  
  @Throws(IOException::class)
  override fun onReadMessage(text: String) {
    listener.onMessage(this, text)
  }

  @Throws(IOException::class)
  override fun onReadMessage(bytes: ByteString) {
    listener.onMessage(this, bytes)
  }

onReadClose回調方法裏面有個關鍵的參數,receivedCloseCode。還記得這個參數嗎?上文中解析消息的循環條件就是receivedCloseCode == -1,因此當收到關閉幀的時候,receivedCloseCode就再也不等於-1(規定大於1000),也就再也不去讀取解析消息了。這樣整個流程就結束了。

其中還有一些WebSocketListener的回調,好比onClosing,onClosed,onMessage等,就直接回調給用戶使用了。至此,接收消息處理消息說完了。

發消息

好了。接着說發送,看看send方法:

@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
    // ***
    // Enqueue the message frame.
    queueSize += data.size.toLong()
    messageAndCloseQueue.add(Message(formatOpcode, data))
    runWriter()
    return true
  }

首先,把要發送的data封裝成Message對象,而後入隊列messageAndCloseQueue。最後執行runWriter方法。這都不用猜了,runWriter確定就要開始發送消息了,繼續看:

//RealWebSocket.kt
  private fun runWriter() {
    this.assertThreadHoldsLock()

    val writerTask = writerTask
    if (writerTask != null) {
      taskQueue.schedule(writerTask)
    }
  }
  
  private inner class WriterTask : Task("$name writer") {
    override fun runOnce(): Long {
      try {
        if (writeOneFrame()) return 0L
      } catch (e: IOException) {
        failWebSocket(e, null)
      }
      return -1L
    }
  }  
  
  //如下是schedule方法轉到WriterTask的runOnce方法過程

  //TaskQueue.kt
  fun schedule(task: Task, delayNanos: Long = 0L) {
    synchronized(taskRunner) {
      if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
        taskRunner.kickCoordinator(this)
      }
    }
  }
  
  internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
    //***
    if (insertAt == -1) insertAt = futureTasks.size
    futureTasks.add(insertAt, task)

    // Impact the coordinator if we inserted at the front.
    return insertAt == 0
  }  

  //TaskRunner.kt
  internal fun kickCoordinator(taskQueue: TaskQueue) {
    this.assertThreadHoldsLock()
    
    if (taskQueue.activeTask == null) {
      if (taskQueue.futureTasks.isNotEmpty()) {
        readyQueues.addIfAbsent(taskQueue)
      } else {
        readyQueues.remove(taskQueue)
      }
    }    
    
    if (coordinatorWaiting) {
      backend.coordinatorNotify(this@TaskRunner)
    } else {
      backend.execute(runnable)
    }
  }  
  
  private val runnable: Runnable = object : Runnable {
    override fun run() {
      while (true) {
        val task = synchronized(this@TaskRunner) {
          awaitTaskToRun()
        } ?: return

        logElapsed(task, task.queue!!) {
          var completedNormally = false
          try {
            runTask(task)
            completedNormally = true
          } finally {
            // If the task is crashing start another thread to service the queues.
            if (!completedNormally) {
              backend.execute(this)
            }
          }
        }
      }
    }
  }
  
  private fun runTask(task: Task) {
    try {
      delayNanos = task.runOnce()
    } 
  }

代碼有點長,這裏是從runWriter開始跟的幾個方法,拿到writerTask實例後,存到TaskQueuefutureTasks列表裏,而後到runnable這裏能夠看到是一個while死循環,不斷的從futureTasks中取出Task並執行runTask方法,直到Task爲空,循環中止。

其中涉及到兩個新的類:

  • TaskQueue類主要就是管理消息任務列表,保證按順序執行
  • TaskRunner類主要就是作一些任務的具體操做,好比線程池裏執行任務,記錄消息任務的狀態(準備發送的任務隊列readyQueues,正在執行的任務隊列busyQueues等等)

而每個Task最後都是執行到了WriterTaskrunOnce方法,也就是writeOneFrame方法:

internal fun writeOneFrame(): Boolean {
    synchronized(this@RealWebSocket) {
      if (failed) {
        return false // Failed web socket.
      }
      writer = this.writer
      pong = pongQueue.poll()
      if (pong == null) {
        messageOrClose = messageAndCloseQueue.poll()
        if (messageOrClose is Close) {
        } else if (messageOrClose == null) {
            return false // The queue is exhausted.
        }
      }
    }

   //發送消息邏輯,包括`pong`消息,普通消息,關閉消息
    try {
      if (pong != null) {
        writer!!.writePong(pong)
      } else if (messageOrClose is Message) {
        val message = messageOrClose as Message
        writer!!.writeMessageFrame(message.formatOpcode, message.data)
        synchronized(this) {
          queueSize -= message.data.size.toLong()
        }
      } else if (messageOrClose is Close) {
        val close = messageOrClose as Close
        writer!!.writeClose(close.code, close.reason)
        // We closed the writer: now both reader and writer are closed.
        if (streamsToClose != null) {
          listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
        }
      } 
      return true
    } finally {
      streamsToClose?.closeQuietly()
      readerToClose?.closeQuietly()
      writerToClose?.closeQuietly()
    }
  }

這裏就會執行發送消息的邏輯了,主要有三種消息狀況處理:

  • pong消息,這個主要是爲服務器端準備的,發送給客戶端迴應心跳包。
  • 普通消息,就會把數據類型Opcode和具體數據發送過去
  • 關閉消息,其實當用戶執行close方法關閉WebSocket的時候,也是發送了一條Close控制幀消息給服務器告知這個關閉需求,並帶上code狀態碼reason關閉緣由,而後服務器端就會關閉當前鏈接。

好了。最後一步了,就是把這些數據組裝成WebSocket數據幀並寫入流,分紅控制幀數據和普通消息數據幀

//寫入(發送)控制幀
  private fun writeControlFrame(opcode: Int, payload: ByteString) {
    if (writerClosed) throw IOException("closed")
    
    val length = payload.size
    require(length <= PAYLOAD_BYTE_MAX) {
      "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
    }
    val b0 = B0_FLAG_FIN or opcode
    sinkBuffer.writeByte(b0)

    var b1 = length
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
      sinkBuffer.writeByte(b1)
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (length > 0) {
        val payloadStart = sinkBuffer.size
        sinkBuffer.write(payload)
        sinkBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(payloadStart)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    } else {
      sinkBuffer.writeByte(b1)
      sinkBuffer.write(payload)
    }

    sink.flush()
  }


  //寫入(發送)普通消息數據幀
  @Throws(IOException::class)
  fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
    if (writerClosed) throw IOException("closed")

    messageBuffer.write(data)

    var b0 = formatOpcode or B0_FLAG_FIN
    val dataSize = messageBuffer.size
    sinkBuffer.writeByte(b0)

    var b1 = 0
    if (isClient) {
      b1 = b1 or B1_FLAG_MASK
    }
    when {
      dataSize <= PAYLOAD_BYTE_MAX -> {
        b1 = b1 or dataSize.toInt()
        sinkBuffer.writeByte(b1)
      }
      dataSize <= PAYLOAD_SHORT_MAX -> {
        b1 = b1 or PAYLOAD_SHORT
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeShort(dataSize.toInt())
      }
      else -> {
        b1 = b1 or PAYLOAD_LONG
        sinkBuffer.writeByte(b1)
        sinkBuffer.writeLong(dataSize)
      }
    }

    if (isClient) {
      random.nextBytes(maskKey!!)
      sinkBuffer.write(maskKey)

      if (dataSize > 0L) {
        messageBuffer.readAndWriteUnsafe(maskCursor!!)
        maskCursor.seek(0L)
        toggleMask(maskCursor, maskKey)
        maskCursor.close()
      }
    }

    sinkBuffer.write(messageBuffer, dataSize)
    sink.emit()
  }

你們應該都能看懂了吧,其實就是組裝數據幀,包括Opcode,mask,數據長度等等。兩個方法的不一樣就在於普通數據須要判斷數據長度的三種狀況,再組裝數據幀。最後都會經過sinkBuffer寫入到輸出數據流。

終於,基本的流程說的差很少了。其中還有不少細節,同窗們能夠本身花時間看看琢磨琢磨,好比Okio部分。仍是那句話,但願你們有空本身也讀一讀相關源碼,這樣理解才能深入,並且你確定會發現不少我沒說到的細節,歡迎你們討論。我也會繼續努力,最後你們給我加個油點個贊吧,感謝感謝。

總結

再來個圖總結下吧!🎉
OkHttp-WebSocket源碼.jpg

參考

OkHttp源碼
《WebSocket協議翻譯》

附件

OkHttp源碼
WebSocket功能實現源碼


個人公衆號:碼上積木,天天三問面試題,詳細剖析,助你成爲offer收割機。

謝謝你的閱讀,若是你以爲寫的還行,就點個贊支持下吧!感謝!
你的一個👍,就是我分享的動力❤️。

相關文章
相關標籤/搜索