先簡單介紹下WebSocket
。 咱們都知道Http是處於應用層的一個通訊協議
,可是隻支持單向主動通訊,作不到服務器主動向客戶端推送消息。並且Http是無狀態
的,即每次通訊都沒有關聯性,致使跟服務器關係不緊密。web
爲了解決和服務器長時間通訊的痛點呢,HTML5
規範引出了WebSocket
協議(知道這名字咋來的吧,人家HTML5
規範引出的,隨爸姓),是一種創建在TCP
協議基礎上的全雙工通訊的協議。他跟Http
同屬於應用層協議,下層仍是須要經過TCP創建鏈接。服務器
可是,WebSocket
在TCP
鏈接創建後,還要經過Http
進行一次握手,也就是經過Http
發送一條GET請求
消息給服務器,告訴服務器我要創建WebSocket鏈接
了,你準備好哦,具體作法就是在頭部信息中添加相關參數。而後服務器響應我知道了,而且將鏈接協議改爲WebSocket
,開始創建長鏈接。websocket
這裏貼上請求頭和響應頭信息,從網上找了一張圖:markdown
3851594110877_.pic.jpgdom
簡單說明下參數:異步
ws
或者wss
開頭,ws
對應Websocket
協議,wss
對應在TLS
之上的WebSocket
。相似於Http
和Https
的關係。Connection:Upgrade
,表示客戶端要鏈接升級,不用Http協議。Upgrade:websocket
, 表示客戶端要升級創建Websocket
鏈接。Sec-Websocket-Key:key
, 這個key是隨機生成的,服務器會經過這個參數驗證該請求是否有效。Sec-WebSocket-Version:13
, websocket使用的版本,通常就是13。Sec-webSocket-Extension:permessage-deflate
,客戶端指定的一些擴展協議,好比這裏permessage-deflate
就是WebSocket
的一種壓縮協議。響應碼101,
表示響應協議升級,後續的數據交互都按照Upgradet指定的WebSocket
協議來。首先是初始化OkHttpClient
和WebSocket
實例:socket
//初始化WebSocket
public void init() {
mWbSocketUrl = "ws://echo.websocket.org";
mClient = new OkHttpClient.Builder()
.pingInterval(10, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(mWbSocketUrl)
.build();
mWebSocket = mClient.newWebSocket(request, new WsListener());
}
複製代碼
複製代碼
這裏主要是配置了OkHttp
的一些參數,以及WebSocket
的鏈接地址。其中newWebSocket
方法就是進行WebSocket
的初始化和鏈接。ide
這裏要注意的點是pingInterval
方法的配置,這個方法主要是用來設置WebSocket
鏈接的保活。 相信作過長鏈接的同窗都知道,一個長鏈接通常要隔幾秒發送一條消息告訴服務器我在線,而服務器也會回覆一個消息表示收到了,這樣就確認了鏈接正常,客戶端和服務器端都在線。工具
若是服務器沒有按時收到
這個消息那麼服務器可能就會主動關閉
這個鏈接,節約資源。 客戶端沒有正常收到
這個返回的消息,也會作一些相似重連的操做
,因此這個保活消息很是重要。oop
咱們稱這個消息叫做心跳包
,通常用PING,PONG
表示,像乒乓球同樣,一來一回。 因此這裏的pingInterval
就是設置心跳包發送的間隔時間,設置了這個方法以後,OkHttp
就會自動幫咱們發送心跳包事件,也就是ping
包。當間隔時間到了,沒有收到pong
包的話,監聽事件中的onFailure
方法就會被調用,此時咱們就能夠進行斷線重連。
可是因爲實際業務需求不同,以及okhttp
中心跳包事件給予咱們權限較少,因此咱們也能夠本身完成心跳包事件,即在WebSocket
鏈接成功以後,開始定時發送ping
包,在下一次發送ping
包以前檢查上一個pong
包是否收到,若是沒收到,就視爲異常,開始斷線重連。感興趣的同窗能夠看看文末的相關源碼。
創建鏈接後,咱們就能夠正常發送和讀取消息了,也就是在上文WsListener
監聽事件中表現:
//監聽事件,用於收消息,監聽鏈接的狀態
class WsListener extends WebSocketListener {
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
super.onClosed(webSocket, code, reason);
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
super.onFailure(webSocket, t, response);
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
super.onMessage(webSocket, text);
Log.e(TAG, "客戶端收到消息:" + text);
onWSDataChanged(DATE_NORMAL, text);
//測試發消息
webSocket.send("我是客戶端,你好啊");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
super.onOpen(webSocket, response);
Log.e(TAG,"鏈接成功!");
}
}
//發送String消息
public void send(final String message) {
if (mWebSocket != null) {
mWebSocket.send(message);
}
}
//發送byte消息
public void send(final ByteString message) {
if (mWebSocket != null) {
mWebSocket.send(message);
}
}
//主動斷開鏈接
public void disconnect(int code, String reason) {
if (mWebSocket != null)
mWebSocket.close(code, reason);
}
複製代碼
複製代碼
這裏要注意,回調的方法都是在子線程回調的,若是須要更新UI
,須要切換到主線程。
基本操做就這麼多,仍是很簡單的吧,初始化Websocket
——鏈接——鏈接成功——收發消息。
其中WebSocket
類是一個操做接口,主要提供瞭如下幾個方法
send(text: String)
發送一個String類型的消息send(bytes: ByteString)
發送一個二進制類型的消息close(code: Int, reason: String?)
關閉WebSocket鏈接簡單貼下WebSocket
使用方法,方便下面解析:
//初始化
mClient = new OkHttpClient.Builder()
.pingInterval(10, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(mWbSocketUrl)
.build();
mWebSocket = mClient.newWebSocket(request, new WsListener());
//收到消息回調
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
super.onMessage(webSocket, text);
Log.e(TAG,"收到消息!");
onWSDataChanged(DATE_NORMAL, text);
}
//發送消息
mWebSocket.send(message);
//主動關閉鏈接
mWebSocket.close(code, reason);
複製代碼
WebSocket
整個流程無非三個功能:鏈接,接收消息,發送消息。下面咱們就從這三個方面
分析下具體是怎麼實現的。
經過上面的代碼咱們得知,WebSocket
鏈接是經過newWebSocket
方法。直接點進去看這個方法:
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
val webSocket = RealWebSocket(
taskRunner = TaskRunner.INSTANCE,
originalRequest = request,
listener = listener,
random = Random(),
pingIntervalMillis = pingIntervalMillis.toLong(),
extensions = null, // Always null for clients.
minimumDeflateSize = minWebSocketMessageToCompress
)
webSocket.connect(this)
return webSocket
}
複製代碼
這裏作了兩件事:
RealWebSocket
,主要是設置了一些參數(好比pingIntervalMillis
心跳包時間間隔,還有監聽事件之類的)connect
方法進行WebSocket
鏈接public void connect(OkHttpClient client) {
client = client.newBuilder().eventListener(EventListener.NONE).protocols(ONLY_HTTP1).build();
final Request request = this.originalRequest.newBuilder().header("Upgrade", "websocket").header("Connection", "Upgrade").header("Sec-WebSocket-Key", this.key).header("Sec-WebSocket-Version", "13").build();
this.call = Internal.instance.newWebSocketCall(client, request);
this.call.enqueue(new Callback() {
public void onResponse(Call call, Response response) {
try {
RealWebSocket.this.checkResponse(response);
} catch (ProtocolException var7) {
RealWebSocket.this.failWebSocket(var7, response);
Util.closeQuietly(response);
return;
}
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams();
RealWebSocket.Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
try {
RealWebSocket.this.listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
RealWebSocket.this.initReaderAndWriter(name, streams);
streamAllocation.connection().socket().setSoTimeout(0);
RealWebSocket.this.loopReader();
} catch (Exception var6) {
RealWebSocket.this.failWebSocket(var6, (Response)null);
}
}
public void onFailure(Call call, IOException e) {
RealWebSocket.this.failWebSocket(e, (Response)null);
}
});
}
複製代碼
Websocket
鏈接須要一次Http
協議的握手,而後才能把協議升級成WebSocket
。因此這段代碼就體現出這個功能了。
首先就new
了一個用來進行Http
鏈接的request
,其中Header
的參數就表示我要進行WebSocket
鏈接了,參數解析以下:
Connection:Upgrade
,表示客戶端要鏈接升級Upgrade:websocket
, 表示客戶端要升級創建Websocket鏈接Sec-Websocket-Key:key
, 這個key是隨機生成的,服務器會經過這個參數驗證該請求是否有效Sec-WebSocket-Version:13
, websocket使用的版本,通常就是13Sec-webSocket-Extension:permessage-deflate
,客戶端指定的一些擴展協議,好比這裏permessage-deflate
就是WebSocket
的一種壓縮協議。Header
設置好以後,就調用了call
的enqueue
方法,這個方法你們應該都很熟悉吧,OkHttp
裏面對於Http
請求的異步請求就是這個方法。 至此,握手結束,服務器返回響應碼101
,表示協議升級。
而後咱們繼續看看獲取服務器響應以後又作了什麼? 在發送Http
請求成功以後,onResponse
響應方法裏面主要表現爲四個處理邏輯:
Http
流轉換成WebSocket
流,獲得Streams
對象,這個流後面會轉化成輸入流和輸出流,也就是進行發送和讀取的操做流listener.onOpen(this@RealWebSocket, response)
,回調了接口WebSocketListener
的onOpen
方法,告訴用戶WebSocket
已經鏈接initReaderAndWriter(name, streams)
loopReader()
前兩個邏輯仍是比較好理解,主要是後兩個方法,咱們分別解析下。 首先看initReaderAndWriter
方法。
public void initReaderAndWriter(String name, RealWebSocket.Streams streams) throws IOException {
synchronized(this) {
this.streams = streams;
this.writer = new WebSocketWriter(streams.client, streams.sink, this.random);
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (this.pingIntervalMillis != 0L) {
this.executor.scheduleAtFixedRate(new RealWebSocket.PingRunnable(), this.pingIntervalMillis,
this.pingIntervalMillis, TimeUnit.MILLISECONDS);
}
if (!this.messageAndCloseQueue.isEmpty()) {
this.runWriter();
}
}
this.reader = new WebSocketReader(streams.client, streams.source, this);
}
private final class PingRunnable implements Runnable {
PingRunnable() {
}
public void run() {
RealWebSocket.this.writePingFrame();
}
}
複製代碼
這個方法主要乾了兩件事:
WebSocketWriter
和WebSocketReader
,用來處理數據的收發。pingIntervalMillis
參數不爲0,就經過計時器,每隔pingIntervalNanos
發送一個ping
消息。其中writePingFrame
方法就是發送了ping
幀數據。接着看看這個loopReader
方法是幹什麼的,看這個名字咱們大膽猜想下,難道這個方法就是用來循環讀取數據的?去代碼裏找找答案:
public void loopReader() throws IOException {
while(this.receivedCloseCode == -1) {
this.reader.processNextFrame();
}
}
複製代碼
複製代碼
代碼很簡單,一個while
循環,循環條件是receivedCloseCode == -1
的時候,作的事情是reader!!.processNextFrame()
方法。繼續:
void processNextFrame() throws IOException {
this.readHeader();
if (this.isControlFrame) {
this.readControlFrame();
} else {
this.readMessageFrame();
}
}
複製代碼
代碼仍是比較直觀,這個processNextFrame
其實就是讀取數據用的,首先讀取頭部信息,獲取數據幀的類型,判斷是否爲控制幀,再分別去讀取控制幀數據或者普通消息幀數據。
問題來了,什麼是數據頭部信息,什麼是控制幀? 這裏就要說下WebSocket
的數據幀了,先附上一個數據幀格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-------+ +-+-------------+ +-----------------------------+
|F|R|R|R| OP | |M| LENGTH | Extended payload length
|I|S|S|S| CODE | |A| | (if LENGTH=126)
|N|V|V|V| | |S| |
| |1|2|3| | |K| |
+-+-+-+-+-------+ +-+-------------+
| Extended payload length(if LENGTH=127)
+ +-------------------------------
| Extended payload length | Masking-key,if Mask set to 1
+----------------------------------+-------------------------------
| Masking-key | Data
+----------------------------------+-------------------------------
| Data
+----------------------------------+-------------------------------
複製代碼
首先每一行表明4個字節,一共也就是32位數,哦,那也就是幾個字節而已嘛,每一個字節有他本身的表明意義唄,這樣想是否是就很簡單了,下面來具體看看每一個字節。
第1個字節:
FIN碼
,其實就是一個標示位,由於數據可能多幀操做嘛,因此多幀狀況下,只有最後一幀的FIN
設置成1,標示結束幀,前面全部幀設置爲0。RSV碼
,通常通訊兩端沒有設置自定義協議,就默認爲0。opcode
,咱們叫它操做碼。這個就是判斷這個數據幀的類型了,通常有如下幾個被定義好的類型:1) 0x0
表示附加數據幀 2) 0x1
表示文本數據幀 3) 0x2
表示二進制數據幀 4) 0x3-7
保留用於將來的非控制幀 5) 0x8
表示鏈接關閉 6) 0x9
表示ping 7) 0xA
表示pong 8) 0xB-F
保留用於將來的非控制幀
是否是發現了些什麼,這不就對應了咱們應用中的幾種格式嗎?2和3
對應的是普通消息幀,包括了文本和二進制數據。567
對應的就是控制幀格式,包括了close,ping,pong
。
第2個字節:
Mask
掩碼,其實就是標識數據是否加密混淆,1表明數據通過掩碼的,0是沒有通過掩碼的,若是是1的話,後續就會有4個字節表明掩碼key
,也就是數據幀中Masking-key
所處的位置。LENGTH
,用來標示數據長度。由於只有7位,因此最大隻能儲存1111111對應的十進制數127長度
的數據,若是須要更大的數據,這個儲存長度確定就不夠了。 因此規定來了,1) 小於126長度
則數據用這七位表示實際長度。2) 若是長度設置爲126
,也就是二進制1111110,就表明取額外2個字節
表示數據長度,共是16位表示數據長度。3) 若是長度設置爲127
,也就是二進制1111111,就表明取額外8個字節
,共是64位表示數據長度。須要注意的是LENGHT的三種狀況在一個數據幀裏面只會出現一種狀況,不共存,因此在圖中是用if表示。一樣的,Masking-key也是當Mask爲1的時候才存在。
因此也就有了數據幀裏面的Extended payload length(LENGTH=126)
所處的2個字節,以及Extended payload length(LENGTH=127)
所處的8個字節。
最後的字節部分天然就是掩碼key
(Mask爲1的時候才存在)和具體的傳輸數據
了。 仍是有點暈吧😷,來張圖總結下:
好了,瞭解了數據幀格式後,咱們再來讀源碼就清晰多了。 先看看怎麼讀的頭部信息
並解析的:
//取數據幀前8位數據
b0 = source.readByte() and 0xff
//獲取數據幀的opcode(數據格式)
opcode = b0 and B0_MASK_OPCODE(15)
//是否爲最終幀
isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
//是否爲控制幀(指令)
isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
複製代碼
複製代碼
and
是按位與計算,and 0xff
意思就是按位與11111111,因此頭部信息其實就是取了數據幀的前8位數據
,一個字節。opcode
,and 15
也就是按位與00001111,其實也就是取了後四位數據,恰好對應上opcode
的位置,第一個字節的後四位。最終幀
,剛纔數據幀格式中說過,第一位FIN
標識了是否爲最後一幀數據,1表明結束幀,因此這裏and 128
也就是按位與10000000,也就是取的第一位數。and 8
也就是按位與00001000,取得是第五位,也就是opcode
的第一位,這是什麼意思呢?咱們看看剛纔的數據幀格式,發現從0x8
開始就是所謂的控制幀了。0x8
對應的二進制是1000,0x7
對應的二進制是0111。發現了吧,若是爲控制幀的時候,opcode
第一位確定是爲1的,因此這裏就判斷的第五位。後面還有讀取第二個字節的代碼,你們能夠本身沿着這個思路本身看看,包括了讀取MASK
,讀取數據長度的三種長度等。
因此這個processNextFrame
方法主要作了三件事:
readHeader
方法中,判斷了是否爲控制幀,是否爲結束幀
,而後獲取了Mask
標識,幀長度等參數readControlFrame
方法中,主要處理了該幀數據爲ping,pong,close
三種狀況,而且在收到close關閉幀
的狀況下,回調了onReadClose
方法,這個待會要細看下。readMessageFrame
方法中,主要是讀取了消息後,回調了onReadMessage方法。至此能夠發現,其實WebSocket
傳輸數據並非一個簡單的事,只是OkHttp
都幫咱們封裝好了,咱們只須要直接傳輸數據便可,感謝這些三方庫爲咱們開發做出的貢獻,不知道何時我也能作出點貢獻呢🤔。
對了,剛纔說回調也很重要,接着看看。onReadClose
和onReadMessage
回調到哪了呢?還記得上文初始化WebSocketWriter
的時候設置了回調接口嗎。因此就是回調給RealWebSocket
了:
//RealWebSocket.kt
override fun onReadClose(code: Int, reason: String) {
require(code != -1)
var toClose: Streams? = null
var readerToClose: WebSocketReader? = null
var writerToClose: WebSocketWriter? = null
synchronized(this) {
check(receivedCloseCode == -1) { "already closed" }
receivedCloseCode = code
receivedCloseReason = reason
//...
}
try {
listener.onClosing(this, code, reason)
if (toClose != null) {
listener.onClosed(this, code, reason)
}
} finally {
toClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}
@Throws(IOException::class)
override fun onReadMessage(text: String) {
listener.onMessage(this, text)
}
@Throws(IOException::class)
override fun onReadMessage(bytes: ByteString) {
listener.onMessage(this, bytes)
}
複製代碼
複製代碼
onReadClose
回調方法裏面有個關鍵的參數,receivedCloseCode
。還記得這個參數嗎?上文中解析消息的循環條件就是receivedCloseCode == -1
,因此當收到關閉幀的時候,receivedCloseCode
就再也不等於-1(規定大於1000),也就再也不去讀取解析消息了。這樣整個流程就結束了。
其中還有一些WebSocketListener
的回調,好比onClosing,onClosed,onMessage
等,就直接回調給用戶使用了。至此,接收消息處理消息說完了。
好了。接着說發送,看看send
方法:
@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
// ***
// Enqueue the message frame.
queueSize += data.size.toLong()
messageAndCloseQueue.add(Message(formatOpcode, data))
runWriter()
return true
}
複製代碼
複製代碼
首先,把要發送的data
封裝成Message
對象,而後入隊列messageAndCloseQueue
。最後執行runWriter
方法。這都不用猜了,runWriter
確定就要開始發送消息了,繼續看:
//RealWebSocket.kt
private fun runWriter() {
this.assertThreadHoldsLock()
val writerTask = writerTask
if (writerTask != null) {
taskQueue.schedule(writerTask)
}
}
private inner class WriterTask : Task("$name writer") {
override fun runOnce(): Long {
try {
if (writeOneFrame()) return 0L
} catch (e: IOException) {
failWebSocket(e, null)
}
return -1L
}
}
//如下是schedule方法轉到WriterTask的runOnce方法過程
//TaskQueue.kt
fun schedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
taskRunner.kickCoordinator(this)
}
}
}
internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
//***
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)
// Impact the coordinator if we inserted at the front.
return insertAt == 0
}
//TaskRunner.kt
internal fun kickCoordinator(taskQueue: TaskQueue) {
this.assertThreadHoldsLock()
if (taskQueue.activeTask == null) {
if (taskQueue.futureTasks.isNotEmpty()) {
readyQueues.addIfAbsent(taskQueue)
} else {
readyQueues.remove(taskQueue)
}
}
if (coordinatorWaiting) {
backend.coordinatorNotify(this@TaskRunner)
} else {
backend.execute(runnable)
}
}
private val runnable: Runnable = object : Runnable {
override fun run() {
while (true) {
val task = synchronized(this@TaskRunner) {
awaitTaskToRun()
} ?: return
logElapsed(task, task.queue!!) {
var completedNormally = false
try {
runTask(task)
completedNormally = true
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
backend.execute(this)
}
}
}
}
}
}
private fun runTask(task: Task) {
try {
delayNanos = task.runOnce()
}
}
複製代碼
複製代碼
代碼有點長,這裏是從runWriter
開始跟的幾個方法,拿到writerTask
實例後,存到TaskQueue
的futureTasks列表
裏,而後到runnable
這裏能夠看到是一個while
死循環,不斷的從futureTasks
中取出Task
並執行runTask
方法,直到Task
爲空,循環中止。
其中涉及到兩個新的類:
TaskQueue類
主要就是管理消息任務列表,保證按順序執行TaskRunner類
主要就是作一些任務的具體操做,好比線程池裏執行任務,記錄消息任務的狀態(準備發送的任務隊列readyQueues
,正在執行的任務隊列busyQueues
等等)而每個Task最後都是執行到了WriterTask
的runOnce
方法,也就是writeOneFrame
方法:
internal fun writeOneFrame(): Boolean {
synchronized(this@RealWebSocket) {
if (failed) {
return false // Failed web socket.
}
writer = this.writer
pong = pongQueue.poll()
if (pong == null) {
messageOrClose = messageAndCloseQueue.poll()
if (messageOrClose is Close) {
} else if (messageOrClose == null) {
return false // The queue is exhausted.
}
}
}
//發送消息邏輯,包括`pong`消息,普通消息,關閉消息
try {
if (pong != null) {
writer!!.writePong(pong)
} else if (messageOrClose is Message) {
val message = messageOrClose as Message
writer!!.writeMessageFrame(message.formatOpcode, message.data)
synchronized(this) {
queueSize -= message.data.size.toLong()
}
} else if (messageOrClose is Close) {
val close = messageOrClose as Close
writer!!.writeClose(close.code, close.reason)
// We closed the writer: now both reader and writer are closed.
if (streamsToClose != null) {
listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
}
}
return true
} finally {
streamsToClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}
複製代碼
複製代碼
這裏就會執行發送消息的邏輯了,主要有三種消息狀況處理:
pong消息
,這個主要是爲服務器端準備的,發送給客戶端迴應心跳包。普通消息
,就會把數據類型Opcode
和具體數據發送過去關閉消息
,其實當用戶執行close
方法關閉WebSocket
的時候,也是發送了一條Close控制幀
消息給服務器告知這個關閉需求,並帶上code狀態碼
和reason關閉緣由
,而後服務器端就會關閉當前鏈接。好了。最後一步了,就是把這些數據組裝成WebSocket
數據幀並寫入流,分紅控制幀
數據和普通消息數據幀
: