最近老闆又來新需求了,要作一個物聯網相關的app
,其中有個需求是客戶端須要收發服務器不按期發出的消息。
心裏OS:
🤔 這咋整呢?經過接口輪詢?定時訪問接口,有數據就更新?
🤔 不行不行,這樣浪費資源了,還耗電,會致使不少請求都是無效的網絡操做。
🤔 那就長鏈接唄?WebSocket協議
好像不錯,經過握手創建長鏈接後,能夠隨時收發服務器的消息。那就它了!
🤔 怎麼集成呢?正好前段時間複習OkHttp
源碼的時候發現了它是支持Websocket
協議的,那就用它試試吧!(戲好多,演不下去了🤮)java
開淦!git
先簡單介紹下WebSocket
。
咱們都知道Http是處於應用層的一個通訊協議
,可是隻支持單向主動通訊,作不到服務器主動向客戶端推送消息。並且Http是無狀態
的,即每次通訊都沒有關聯性,致使跟服務器關係不緊密。github
爲了解決和服務器長時間通訊的痛點呢,HTML5
規範引出了WebSocket
協議(知道這名字咋來的吧,人家HTML5
規範引出的,隨爸姓),是一種創建在TCP
協議基礎上的全雙工通訊的協議。他跟Http
同屬於應用層協議,下層仍是須要經過TCP創建鏈接。web
可是,WebSocket
在TCP
鏈接創建後,還要經過Http
進行一次握手,也就是經過Http
發送一條GET請求
消息給服務器,告訴服務器我要創建WebSocket鏈接
了,你準備好哦,具體作法就是在頭部信息中添加相關參數。而後服務器響應我知道了,而且將鏈接協議改爲WebSocket
,開始創建長鏈接。面試
這裏貼上請求頭和響應頭信息,從網上找了一張圖:服務器
簡單說明下參數:websocket
ws
或者wss
開頭,ws
對應Websocket
協議,wss
對應在TLS
之上的WebSocket
。相似於Http
和Https
的關係。Connection:Upgrade
,表示客戶端要鏈接升級,不用Http協議。Upgrade:websocket
, 表示客戶端要升級創建Websocket
鏈接。Sec-Websocket-Key:key
, 這個key是隨機生成的,服務器會經過這個參數驗證該請求是否有效。Sec-WebSocket-Version:13
, websocket使用的協議,通常就是13。Sec-webSocket-Extension:permessage-deflate
,客戶端指定的一些擴展協議,好比這裏permessage-deflate
就是WebSocket
的一種壓縮協議。響應碼101,
表示響應協議升級,後續的數據交互都按照Upgradet指定的WebSocket
協議來。implementation("com.squareup.okhttp3:okhttp:4.7.2")
首先是初始化OkHttpClient
和WebSocket
實例:網絡
/** * 初始化WebSocket */ public void init() { mWbSocketUrl = "ws://echo.websocket.org"; mClient = new OkHttpClient.Builder() .pingInterval(10, TimeUnit.SECONDS) .build(); Request request = new Request.Builder() .url(mWbSocketUrl) .build(); mWebSocket = mClient.newWebSocket(request, new WsListener()); }
這裏主要是配置了OkHttp
的一些參數,以及WebSocket
的鏈接地址。其中newWebSocket
方法就是進行WebSocket
的初始化和鏈接。app
這裏要注意的點是pingInterval
方法的配置,這個方法主要是用來設置WebSocket
鏈接的保活。
相信作過長鏈接的同窗都知道,一個長鏈接通常要隔幾秒發送一條消息告訴服務器我在線,而服務器也會回覆一個消息表示收到了,這樣就確認了鏈接正常,客戶端和服務器端都在線。less
若是服務器沒有按時收到
這個消息那麼服務器可能就會主動關閉
這個鏈接,節約資源。
客戶端沒有正常收到
這個返回的消息,也會作一些相似重連的操做
,因此這個保活消息很是重要。
咱們稱這個消息叫做心跳包
,通常用PING,PONG
表示,像乒乓球同樣,一來一回。
因此這裏的pingInterval
就是設置心跳包發送的間隔時間,設置了這個方法以後,OkHttp
就會自動幫咱們發送心跳包事件,也就是ping
包。當間隔時間到了,沒有收到pong
包的話,監聽事件中的onFailure
方法就會被調用,此時咱們就能夠進行重連。
可是因爲實際業務需求不同,以及okhttp
中心跳包事件給予咱們權限較少,因此咱們也能夠本身完成心跳包事件,即在WebSocket
鏈接成功以後,開始定時發送ping
包,在下一次發送ping
包以前檢查上一個pong
包是否收到,若是沒收到,就視爲異常,開始重連。感興趣的同窗能夠看看文末的相關源碼。
創建鏈接後,咱們就能夠正常發送和讀取消息了,也就是在上文WsListener
監聽事件中表現:
//監聽事件,用於收消息,監聽鏈接的狀態 class WsListener extends WebSocketListener { @Override public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) { super.onClosed(webSocket, code, reason); } @Override public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) { super.onClosing(webSocket, code, reason); } @Override public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { super.onFailure(webSocket, t, response); } @Override public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { super.onMessage(webSocket, text); Log.e(TAG, "客戶端收到消息:" + text); onWSDataChanged(DATE_NORMAL, text); //測試發消息 webSocket.send("我是客戶端,你好啊"); } @Override public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) { super.onMessage(webSocket, bytes); } @Override public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { super.onOpen(webSocket, response); Log.e(TAG,"鏈接成功!"); } } //發送String消息 public void send(final String message) { if (mWebSocket != null) { mWebSocket.send(message); } } /** * 發送byte消息 * @param message */ public void send(final ByteString message) { if (mWebSocket != null) { mWebSocket.send(message); } } //主動斷開鏈接 public void disconnect(int code, String reason) { if (mWebSocket != null) mWebSocket.close(code, reason); }
這裏要注意,回調的方法都是在子線程回調的,若是須要更新UI
,須要切換到主線程。
基本操做就這麼多,仍是很簡單的吧,初始化Websocket
——鏈接——鏈接成功——收發消息。
其中WebSocket
類是一個操做接口,主要提供瞭如下幾個方法
send(text: String)
發送一個String類型的消息send(bytes: ByteString)
發送一個二進制類型的消息close(code: Int, reason: String?)
關閉WebSocket鏈接若是有同窗想測試下WebSocket
的功能可是又沒有實際的服務器,怎麼辦呢?
其實OkHttp
官方有一個MockWebSocket
服務,能夠用來模擬服務端,下面咱們一塊兒試一下:
首先集成MockWebSocket
服務庫:
implementation 'com.squareup.okhttp3:mockwebserver:4.7.2'
而後就能夠新建MockWebServer
,並加入MockResponse
做爲接收消息的響應。
MockWebServer mMockWebServer = new MockWebServer(); MockResponse response = new MockResponse() .withWebSocketUpgrade(new WebSocketListener() { @Override public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { super.onOpen(webSocket, response); //有客戶端鏈接時回調 Log.e(TAG, "服務器收到客戶端鏈接成功回調:"); mWebSocket = webSocket; mWebSocket.send("我是服務器,你好呀"); } @Override public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { super.onMessage(webSocket, text); Log.e(TAG, "服務器收到消息:" + text); } @Override public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) { super.onClosed(webSocket, code, reason); Log.e(TAG, "onClosed:"); } }); mMockWebServer.enqueue(response);
這裏服務器端在收到客戶端鏈接成功消息後,給客戶端發送了一條消息。
要注意的是這段代碼要在子線程執行,由於主線程不能進行網絡操做。
而後就能夠去初始化Websocket
客戶端了:
//獲取鏈接url,初始化websocket客戶端 String websocketUrl = "ws://" + mMockWebServer.getHostName() + ":" + mMockWebServer.getPort() + "/"; WSManager.getInstance().init(websocketUrl);
ok,運行項目
//運行結果 E/jimu: mWbSocketUrl=ws://localhost:38355/ E/jimu: 服務器收到客戶端鏈接成功回調: E/jimu: 鏈接成功! E/jimu: 客戶端收到消息:我是服務器,你好呀 E/jimu: 服務器收到消息:我是客戶端,你好啊
相關的WebSocket
管理類和模擬服務器類我也上傳到github
了,有須要的同窗能夠文末自取。
WebSocket
整個流程無非三個功能:鏈接,接收消息,發送消息。下面咱們就從這三個方面
分析下具體是怎麼實現的。
經過上面的代碼咱們得知,WebSocket
鏈接是經過newWebSocket
方法。直接點進去看這個方法:
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket { val webSocket = RealWebSocket( taskRunner = TaskRunner.INSTANCE, originalRequest = request, listener = listener, random = Random(), pingIntervalMillis = pingIntervalMillis.toLong(), extensions = null, // Always null for clients. minimumDeflateSize = minWebSocketMessageToCompress ) webSocket.connect(this) return webSocket }
這裏作了兩件事:
RealWebSocket
,主要是設置了一些參數(好比pingIntervalMillis
心跳包時間間隔,還有監聽事件之類的)connect
方法進行WebSocket
鏈接繼續查看connect方法:
fun connect(client: OkHttpClient) { //*** val webSocketClient = client.newBuilder() .eventListener(EventListener.NONE) .protocols(ONLY_HTTP1) .build() val request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .header("Sec-WebSocket-Extensions", "permessage-deflate") .build() call = RealCall(webSocketClient, request, forWebSocket = true) call!!.enqueue(object : Callback { override fun onResponse(call: Call, response: Response) { //獲得數據流 val streams: Streams try { checkUpgradeSuccess(response, exchange) streams = exchange!!.newWebSocketStreams() } //*** // Process all web socket messages. try { val name = "$okHttpName WebSocket ${request.url.redact()}" initReaderAndWriter(name, streams) listener.onOpen(this@RealWebSocket, response) loopReader() } catch (e: Exception) { failWebSocket(e, null) } } }) }
上一篇使用篇文章中說過,Websocket
鏈接須要一次Http
協議的握手,而後才能把協議升級成WebSocket
。因此這段代碼就體現出這個功能了。
首先就new
了一個用來進行Http
鏈接的request
,其中Header
的參數就表示我要進行WebSocket
鏈接了,參數解析以下:
Connection:Upgrade
,表示客戶端要鏈接升級Upgrade:websocket
, 表示客戶端要升級創建Websocket鏈接Sec-Websocket-Key:key
, 這個key是隨機生成的,服務器會經過這個參數驗證該請求是否有效Sec-WebSocket-Version:13
, websocket使用的版本,通常就是13Sec-webSocket-Extension:permessage-deflate
,客戶端指定的一些擴展協議,好比這裏permessage-deflate
就是WebSocket
的一種壓縮協議。Header
設置好以後,就調用了call
的enqueue
方法,這個方法你們應該都很熟悉吧,OkHttp
裏面對於Http
請求的異步請求就是這個方法。
至此,握手結束,服務器返回響應碼101
,表示協議升級。
而後咱們繼續看看獲取服務器響應以後又作了什麼?
在發送Http
請求成功以後,onResponse
響應方法裏面主要表現爲四個處理邏輯:
Http
流轉換成WebSocket
流,獲得Streams
對象,這個流後面會轉化成輸入流和輸出流,也就是進行發送和讀取的操做流listener.onOpen(this@RealWebSocket, response)
,回調了接口WebSocketListener
的onOpen
方法,告訴用戶WebSocket
已經鏈接initReaderAndWriter(name, streams)
loopReader()
前兩個邏輯仍是比較好理解,主要是後兩個方法,咱們分別解析下。
首先看initReaderAndWriter
方法。
//RealWebSocket.kt @Throws(IOException::class) fun initReaderAndWriter(name: String, streams: Streams) { val extensions = this.extensions!! synchronized(this) { //*** //寫數據,發送數據的工具類 this.writer = WebSocketWriter() //設置心跳包事件 if (pingIntervalMillis != 0L) { val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis) taskQueue.schedule("$name ping", pingIntervalNanos) { writePingFrame() return@schedule pingIntervalNanos } } //*** } //*** //讀取數據的工具類 reader = WebSocketReader( *** frameCallback = this, *** ) } internal fun writePingFrame() { //*** try { writer.writePing(ByteString.EMPTY) } catch (e: IOException) { failWebSocket(e, null) } }
這個方法主要乾了兩件事:
WebSocketWriter
和WebSocketReader
,用來處理數據的收發。pingIntervalMillis
參數不爲0,就經過計時器,每隔pingIntervalNanos
發送一個ping
消息。其中writePingFrame
方法就是發送了ping
幀數據。接着看看這個loopReader
方法是幹什麼的,看這個名字咱們大膽猜想下,難道這個方法就是用來循環讀取數據的?去代碼裏找找答案:
fun loopReader() { while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. reader!!.processNextFrame() } }
代碼很簡單,一個while
循環,循環條件是receivedCloseCode == -1
的時候,作的事情是reader!!.processNextFrame()
方法。繼續:
//WebSocketWriter.kt fun processNextFrame() { //讀取頭部信息 readHeader() if (isControlFrame) { //若是是控制幀,讀取控制幀內容 readControlFrame() } else { //讀取普通消息內容 readMessageFrame() } } //讀取頭部信息 @Throws(IOException::class, ProtocolException::class) private fun readHeader() { if (closed) throw IOException("closed") try { //讀取數據,獲取數據幀的前8位 b0 = source.readByte() and 0xff } finally { source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS) } //*** //獲取數據幀的opcode(數據格式) opcode = b0 and B0_MASK_OPCODE //是否爲最終幀 isFinalFrame = b0 and B0_FLAG_FIN != 0 //是否爲控制幀(指令) isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0 //判斷最終幀,獲取幀長度等等 } //讀取控制幀(指令) @Throws(IOException::class) private fun readControlFrame() { if (frameLength > 0L) { source.readFully(controlFrameBuffer, frameLength) } when (opcode) { OPCODE_CONTROL_PING -> { //ping 幀 frameCallback.onReadPing(controlFrameBuffer.readByteString()) } OPCODE_CONTROL_PONG -> { //pong 幀 frameCallback.onReadPong(controlFrameBuffer.readByteString()) } OPCODE_CONTROL_CLOSE -> { //關閉 幀 var code = CLOSE_NO_STATUS_CODE var reason = "" val bufferSize = controlFrameBuffer.size if (bufferSize == 1L) { throw ProtocolException("Malformed close payload length of 1.") } else if (bufferSize != 0L) { code = controlFrameBuffer.readShort().toInt() reason = controlFrameBuffer.readUtf8() val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code) if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage) } //回調onReadClose方法 frameCallback.onReadClose(code, reason) closed = true } } } //讀取普通消息 @Throws(IOException::class) private fun readMessageFrame() { readMessage() if (readingCompressedMessage) { val messageInflater = this.messageInflater ?: MessageInflater(noContextTakeover).also { this.messageInflater = it } messageInflater.inflate(messageFrameBuffer) } if (opcode == OPCODE_TEXT) { frameCallback.onReadMessage(messageFrameBuffer.readUtf8()) } else { frameCallback.onReadMessage(messageFrameBuffer.readByteString()) } }
代碼仍是比較直觀,這個processNextFrame
其實就是讀取數據用的,首先讀取頭部信息,獲取數據幀的類型,判斷是否爲控制幀,再分別去讀取控制幀數據或者普通消息幀數據。
問題來了,什麼是數據頭部信息,什麼是控制幀?
這裏就要說下WebSocket
的數據幀了,先附上一個數據幀格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 +-+-+-+-+-------+ +-+-------------+ +-----------------------------+ |F|R|R|R| OP | |M| LENGTH | Extended payload length |I|S|S|S| CODE | |A| | (if LENGTH=126) |N|V|V|V| | |S| | | |1|2|3| | |K| | +-+-+-+-+-------+ +-+-------------+ | Extended payload length(if LENGTH=127) + +------------------------------- | Extended payload length | Masking-key,if Mask set to 1 +----------------------------------+------------------------------- | Masking-key | Data +----------------------------------+------------------------------- | Data +----------------------------------+-------------------------------
我認可,我懵逼了。
冷靜冷靜,一步一步分析下吧。
首先每一行表明4個字節,一共也就是32位數,哦,那也就是幾個字節而已嘛,每一個字節有他本身的表明意義唄,這樣想是否是就很簡單了,下面來具體看看每一個字節。
第1個字節:
FIN碼
,其實就是一個標示位,由於數據可能多幀操做嘛,因此多幀狀況下,只有最後一幀的FIN
設置成1,標示結束幀,前面全部幀設置爲0。RSV碼
,通常通訊兩端沒有設置自定義協議,就默認爲0。opcode
,咱們叫它操做碼。這個就是判斷這個數據幀的類型了,通常有如下幾個被定義好的類型:1) 0x0
表示附加數據幀
2) 0x1
表示文本數據幀
3) 0x2
表示二進制數據幀
4) 0x3-7
保留用於將來的非控制幀
5) 0x8
表示鏈接關閉
6) 0x9
表示ping
7) 0xA
表示pong
8) 0xB-F
保留用於將來的非控制幀
是否是發現了些什麼,這不就對應了咱們應用中的幾種格式嗎?2和3
對應的是普通消息幀,包括了文本和二進制數據。567
對應的就是控制幀格式,包括了close,ping,pong
。
第2個字節:
Mask
掩碼,其實就是標識數據是否加密混淆,1表明數據通過掩碼的,0是沒有通過掩碼的,若是是1的話,後續就會有4個字節表明掩碼key
,也就是數據幀中Masking-key
所處的位置。LENGTH
,用來標示數據長度。由於只有7位,因此最大隻能儲存1111111對應的十進制數127長度
的數據,若是須要更大的數據,這個儲存長度確定就不夠了。小於126長度
則數據用這七位表示實際長度。2) 若是長度設置爲126
,也就是二進制1111110,就表明取額外2個字節
表示數據長度,共是16位表示數據長度。3) 若是長度設置爲127
,也就是二進制1111111,就表明取額外8個字節
,共是64位表示數據長度。須要注意的是LENGHT的三種狀況在一個數據幀裏面只會出現一種狀況,不共存,因此在圖中是用if表示。一樣的,Masking-key也是當Mask爲1的時候才存在。
因此也就有了數據幀裏面的Extended payload length(LENGTH=126)
所處的2個字節,以及Extended payload length(LENGTH=127)
所處的8個字節。
最後的字節部分天然就是掩碼key
(Mask爲1的時候才存在)和具體的傳輸數據
了。
仍是有點暈吧😷,來張圖總結下:
好了,瞭解了數據幀格式後,咱們再來讀源碼就清晰多了。
先看看怎麼讀的頭部信息
並解析的:
//取數據幀前8位數據 b0 = source.readByte() and 0xff //獲取數據幀的opcode(數據格式) opcode = b0 and B0_MASK_OPCODE(15) //是否爲最終幀 isFinalFrame = b0 and B0_FLAG_FIN(128) != 0 //是否爲控制幀(指令) isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
and
是按位與計算,and 0xff
意思就是按位與11111111,因此頭部信息其實就是取了數據幀的前8位數據
,一個字節。opcode
,and 15
也就是按位與00001111,其實也就是取了後四位數據,恰好對應上opcode
的位置,第一個字節的後四位。最終幀
,剛纔數據幀格式中說過,第一位FIN
標識了是否爲最後一幀數據,1表明結束幀,因此這裏and 128
也就是按位與10000000,也就是取的第一位數。and 8
也就是按位與00001000,取得是第五位,也就是opcode
的第一位,這是什麼意思呢?咱們看看剛纔的數據幀格式,發現從0x8
開始就是所謂的控制幀了。0x8
對應的二進制是1000,0x7
對應的二進制是0111。發現了吧,若是爲控制幀的時候,opcode
第一位確定是爲1的,因此這裏就判斷的第五位。後面還有讀取第二個字節的代碼,你們能夠本身沿着這個思路本身看看,包括了讀取MASK
,讀取數據長度的三種長度等。
因此這個processNextFrame
方法主要作了三件事:
readHeader
方法中,判斷了是否爲控制幀,是否爲結束幀
,而後獲取了Mask
標識,幀長度等參數readControlFrame
方法中,主要處理了該幀數據爲ping,pong,close
三種狀況,而且在收到close關閉幀
的狀況下,回調了onReadClose
方法,這個待會要細看下。readMessageFrame
方法中,主要是讀取了消息後,回調了onReadMessage方法。至此能夠發現,其實WebSocket
傳輸數據並非一個簡單的事,只是OkHttp
都幫咱們封裝好了,咱們只須要直接傳輸數據便可,感謝這些三方庫爲咱們開發做出的貢獻,不知道何時我也能作出點貢獻呢🤔。
對了,剛纔說回調也很重要,接着看看。onReadClose
和onReadMessage
回調到哪了呢?還記得上文初始化WebSocketWriter
的時候設置了回調接口嗎。因此就是回調給RealWebSocket
了:
//RealWebSocket.kt override fun onReadClose(code: Int, reason: String) { require(code != -1) var toClose: Streams? = null var readerToClose: WebSocketReader? = null var writerToClose: WebSocketWriter? = null synchronized(this) { check(receivedCloseCode == -1) { "already closed" } receivedCloseCode = code receivedCloseReason = reason //... } try { listener.onClosing(this, code, reason) if (toClose != null) { listener.onClosed(this, code, reason) } } finally { toClose?.closeQuietly() readerToClose?.closeQuietly() writerToClose?.closeQuietly() } } @Throws(IOException::class) override fun onReadMessage(text: String) { listener.onMessage(this, text) } @Throws(IOException::class) override fun onReadMessage(bytes: ByteString) { listener.onMessage(this, bytes) }
onReadClose
回調方法裏面有個關鍵的參數,receivedCloseCode
。還記得這個參數嗎?上文中解析消息的循環條件就是receivedCloseCode == -1
,因此當收到關閉幀的時候,receivedCloseCode
就再也不等於-1(規定大於1000),也就再也不去讀取解析消息了。這樣整個流程就結束了。
其中還有一些WebSocketListener
的回調,好比onClosing,onClosed,onMessage
等,就直接回調給用戶使用了。至此,接收消息處理消息說完了。
好了。接着說發送,看看send
方法:
@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean { // *** // Enqueue the message frame. queueSize += data.size.toLong() messageAndCloseQueue.add(Message(formatOpcode, data)) runWriter() return true }
首先,把要發送的data
封裝成Message
對象,而後入隊列messageAndCloseQueue
。最後執行runWriter
方法。這都不用猜了,runWriter
確定就要開始發送消息了,繼續看:
//RealWebSocket.kt private fun runWriter() { this.assertThreadHoldsLock() val writerTask = writerTask if (writerTask != null) { taskQueue.schedule(writerTask) } } private inner class WriterTask : Task("$name writer") { override fun runOnce(): Long { try { if (writeOneFrame()) return 0L } catch (e: IOException) { failWebSocket(e, null) } return -1L } } //如下是schedule方法轉到WriterTask的runOnce方法過程 //TaskQueue.kt fun schedule(task: Task, delayNanos: Long = 0L) { synchronized(taskRunner) { if (scheduleAndDecide(task, delayNanos, recurrence = false)) { taskRunner.kickCoordinator(this) } } } internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean { //*** if (insertAt == -1) insertAt = futureTasks.size futureTasks.add(insertAt, task) // Impact the coordinator if we inserted at the front. return insertAt == 0 } //TaskRunner.kt internal fun kickCoordinator(taskQueue: TaskQueue) { this.assertThreadHoldsLock() if (taskQueue.activeTask == null) { if (taskQueue.futureTasks.isNotEmpty()) { readyQueues.addIfAbsent(taskQueue) } else { readyQueues.remove(taskQueue) } } if (coordinatorWaiting) { backend.coordinatorNotify(this@TaskRunner) } else { backend.execute(runnable) } } private val runnable: Runnable = object : Runnable { override fun run() { while (true) { val task = synchronized(this@TaskRunner) { awaitTaskToRun() } ?: return logElapsed(task, task.queue!!) { var completedNormally = false try { runTask(task) completedNormally = true } finally { // If the task is crashing start another thread to service the queues. if (!completedNormally) { backend.execute(this) } } } } } } private fun runTask(task: Task) { try { delayNanos = task.runOnce() } }
代碼有點長,這裏是從runWriter
開始跟的幾個方法,拿到writerTask
實例後,存到TaskQueue
的futureTasks列表
裏,而後到runnable
這裏能夠看到是一個while
死循環,不斷的從futureTasks
中取出Task
並執行runTask
方法,直到Task
爲空,循環中止。
其中涉及到兩個新的類:
TaskQueue類
主要就是管理消息任務列表,保證按順序執行TaskRunner類
主要就是作一些任務的具體操做,好比線程池裏執行任務,記錄消息任務的狀態(準備發送的任務隊列readyQueues
,正在執行的任務隊列busyQueues
等等)而每個Task最後都是執行到了WriterTask
的runOnce
方法,也就是writeOneFrame
方法:
internal fun writeOneFrame(): Boolean { synchronized(this@RealWebSocket) { if (failed) { return false // Failed web socket. } writer = this.writer pong = pongQueue.poll() if (pong == null) { messageOrClose = messageAndCloseQueue.poll() if (messageOrClose is Close) { } else if (messageOrClose == null) { return false // The queue is exhausted. } } } //發送消息邏輯,包括`pong`消息,普通消息,關閉消息 try { if (pong != null) { writer!!.writePong(pong) } else if (messageOrClose is Message) { val message = messageOrClose as Message writer!!.writeMessageFrame(message.formatOpcode, message.data) synchronized(this) { queueSize -= message.data.size.toLong() } } else if (messageOrClose is Close) { val close = messageOrClose as Close writer!!.writeClose(close.code, close.reason) // We closed the writer: now both reader and writer are closed. if (streamsToClose != null) { listener.onClosed(this, receivedCloseCode, receivedCloseReason!!) } } return true } finally { streamsToClose?.closeQuietly() readerToClose?.closeQuietly() writerToClose?.closeQuietly() } }
這裏就會執行發送消息的邏輯了,主要有三種消息狀況處理:
pong消息
,這個主要是爲服務器端準備的,發送給客戶端迴應心跳包。普通消息
,就會把數據類型Opcode
和具體數據發送過去關閉消息
,其實當用戶執行close
方法關閉WebSocket
的時候,也是發送了一條Close控制幀
消息給服務器告知這個關閉需求,並帶上code狀態碼
和reason關閉緣由
,而後服務器端就會關閉當前鏈接。好了。最後一步了,就是把這些數據組裝成WebSocket
數據幀並寫入流,分紅控制幀
數據和普通消息數據幀
:
//寫入(發送)控制幀 private fun writeControlFrame(opcode: Int, payload: ByteString) { if (writerClosed) throw IOException("closed") val length = payload.size require(length <= PAYLOAD_BYTE_MAX) { "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX" } val b0 = B0_FLAG_FIN or opcode sinkBuffer.writeByte(b0) var b1 = length if (isClient) { b1 = b1 or B1_FLAG_MASK sinkBuffer.writeByte(b1) random.nextBytes(maskKey!!) sinkBuffer.write(maskKey) if (length > 0) { val payloadStart = sinkBuffer.size sinkBuffer.write(payload) sinkBuffer.readAndWriteUnsafe(maskCursor!!) maskCursor.seek(payloadStart) toggleMask(maskCursor, maskKey) maskCursor.close() } } else { sinkBuffer.writeByte(b1) sinkBuffer.write(payload) } sink.flush() } //寫入(發送)普通消息數據幀 @Throws(IOException::class) fun writeMessageFrame(formatOpcode: Int, data: ByteString) { if (writerClosed) throw IOException("closed") messageBuffer.write(data) var b0 = formatOpcode or B0_FLAG_FIN val dataSize = messageBuffer.size sinkBuffer.writeByte(b0) var b1 = 0 if (isClient) { b1 = b1 or B1_FLAG_MASK } when { dataSize <= PAYLOAD_BYTE_MAX -> { b1 = b1 or dataSize.toInt() sinkBuffer.writeByte(b1) } dataSize <= PAYLOAD_SHORT_MAX -> { b1 = b1 or PAYLOAD_SHORT sinkBuffer.writeByte(b1) sinkBuffer.writeShort(dataSize.toInt()) } else -> { b1 = b1 or PAYLOAD_LONG sinkBuffer.writeByte(b1) sinkBuffer.writeLong(dataSize) } } if (isClient) { random.nextBytes(maskKey!!) sinkBuffer.write(maskKey) if (dataSize > 0L) { messageBuffer.readAndWriteUnsafe(maskCursor!!) maskCursor.seek(0L) toggleMask(maskCursor, maskKey) maskCursor.close() } } sinkBuffer.write(messageBuffer, dataSize) sink.emit() }
你們應該都能看懂了吧,其實就是組裝數據幀,包括Opcode,mask,數據長度
等等。兩個方法的不一樣就在於普通數據須要判斷數據長度的三種狀況,再組裝數據幀。最後都會經過sinkBuffer
寫入到輸出數據流。
終於,基本的流程說的差很少了。其中還有不少細節,同窗們能夠本身花時間看看琢磨琢磨,好比Okio
部分。仍是那句話,但願你們有空本身也讀一讀相關源碼,這樣理解才能深入,並且你確定會發現不少我沒說到的細節,歡迎你們討論。我也會繼續努力,最後你們給我加個油點個贊吧,感謝感謝。
再來個圖總結下吧!🎉
個人公衆號:碼上積木,天天三問面試題,詳細剖析,助你成爲offer收割機。
謝謝你的閱讀,若是你以爲寫的還行,就點個贊支持下吧!感謝!
你的一個👍,就是我分享的動力❤️。