掃描下方二維碼或者微信搜索公衆號
菜鳥飛呀飛
,便可關注微信公衆號,閱讀更多Spring源碼分析
和Java併發編程
文章。java
在上一篇文章中分析到了 Netty 服務端是如何進行新鏈接的接入的,那麼當新鏈接接入後,就能夠開始數據的讀寫操做了。在進行數據讀寫操做時,對於 TCP 鏈接而言,netty 就須要解決 TCP 中粘包、半包的問題,這將是本文今天重點分析的內容。在開始閱讀本文以前,能夠先思考一下如下兩個問題。編程
在 TCP/IP 協議模型中,TCP 和 UDP 協議屬於傳輸層協議,這兩個協議在數據傳輸過程當中存在很大的差別。設計模式
對於 UDP 協議而言,它傳輸的數據是基於數據報來進行收發的,在 UDP 協議的頭中,會有一個 16bit 的字段來表示 UDP 數據報文的長度,在應用層能很好的將不一樣的數據報文區分開。能夠理解爲,UDP 協議傳輸的數據是有邊界的,所以它不會存在粘包、半包的問題。微信
而對於 TCP 協議而言,它傳輸數據是基於字節流傳輸的。應用層在傳輸數據時,實際上會先將數據寫入到 TCP 套接字的緩衝區,當緩衝區被寫滿後,數據纔會被寫出去,這就可能形成粘包、半包的問題。並且當接收方接收到數據後,實際上接收到的是一個字節流,所謂的流,能夠理解爲河流同樣。既然是流,多個數據包相互之間是沒有邊界的,並且在 TCP 的協議頭中,沒有一個單獨的字段來表示數據包的長度,這樣在接收方的應用層,從字節流中讀取到數據後,是沒辦法將兩個數據包區分開的。網絡
當發送方連續向接收方發送兩個完整的數據包時,若是使用 TCP 協議進行傳輸,就可能存在如下幾種狀況。下圖中 packet1 和 packet2 分別表示發送方發送的兩個完整的數據包。數據結構
第一種狀況,沒有發生粘包、半包的現象,即接收方正常接收到兩個獨立的完整數據包 packet一、packet2,這種狀況是屬於正常狀況。如圖 1 所示。併發
第二種狀況,發生了粘包現象,即發送方將數據包 packet1 寫入到本身的 TCP 套接字的緩衝區後,TCP 並無當即將數據發送出去,由於此時緩衝區可能尚未慢。接着發送方又發送了一個數據包 packet2,仍然是先寫入到 TCP 套接字的緩衝區,此時緩衝區滿了,而後 TCP 纔將緩衝區的數據一塊兒發送出去,這時候接收方接收到的數據看起來只有一個數據包。在 TCP 的協議頭中,沒有一個單獨的字段來表示數據包的長度,這樣接收方根本就沒法區分出 packet1 和 packet2,這就是所謂的粘包問題。另外,當接收方的 TCP 層接收到數據後,因爲應用層沒有及時從 TCP 套接字中讀取數據,也會形成粘包現象。如圖 2 所示。框架
第三種狀況,發生了半包現象,即發送方依舊是前後發送了兩個數據包 packet1 和 packet2,可是 TCP 在傳輸時,分了幾回傳輸,每次傳輸的內容中包含的不是 packet1 和 packet2 的完整包,只是 packet1 或者 packet2 的一部分,就至關於把兩個數據包的內容拆分了,所以也稱之爲拆包現象。如圖 3 所示。ide
從上面的示意圖中,咱們大體能夠知道產生粘包、半包的主要緣由以下。oop
但歸根結底,產生粘包、半包的根本緣由是由於 TCP 是基於字節流來傳輸數據的,數據包相互之間沒有邊界,致使接收方沒法準確的分辨出每個單獨的數據包。
做爲一個應用層的開發者,咱們沒法去改變 TCP 基於字節流來傳輸數據的特性,除非咱們自定義一個相似於 TCP 的協議,可是難度太大,設計出來的性能還不必定比現有的 TCP 協議性能好,何況目前 TCP 協議的使用十分普遍。而 netty 做爲一款高性能的網絡框架,必然就要有對 TCP 協議的支持,既然支持 TCP 協議,那就要解決 TCP 中粘包、半包的問題,不然若是開發人員本身去解決,那就費時費力了。
netty 中經過提供一系列的編解碼器來解決 TCP 的粘包、半包問題,顧名思義,編解碼器就是經過將從 TCP 套接字中讀取的字節流經過必定的規則,將其進行編碼或者解碼,編碼成二進制字節流或者解析出一個個完整的數據包。在 netty 中提供了不少通用的編解碼器,對於解碼器而言,它們均繼承自抽象類ByteToMessageDecoder;對於編碼器而言,它們均繼承與抽象類MessageToByteEncoder。
今天主要先簡單分析下抽象類解碼器ByteToMessageDecoder類的源碼,對於具體的解碼器實現將在後面兩篇文章中詳細分析其原理,對於編碼器而言,編碼過程與解碼過程剛好相反,所以就再也不單獨贅述,有興趣的朋友能夠自行閱讀,歡迎分享。
ByteToMessageDecoder實際上就是一個 ChannelHandler,它的具體實現類須要被添加到 pipeline 中才會起做用。當將解碼器添加到 pipeline 中後,當出現 OP_READ 事件時,就會經過 pipeline 傳播執行全部 handler 的 channelRead() 方法。在抽象類解碼器中,就定義了 channelRead()方法。其源碼以下。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 用來存放解碼出來的數據對象,能夠將它當作一個集合
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
//第一次直接賦值
cumulation = data;
} else {
//累加數據
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 調用解碼的方法
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
// 省略部分代碼...
// size是解碼出來的數據對象的數量,
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 向下傳播,若是size爲0,就表示沒有解碼出一個對象,所以不會向下傳播,而是等到下一次繼續讀到數據後解碼
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
複製代碼
netty 中解碼器的核心邏輯就是先經過一個累加器將讀到的字節流數據累計,而後調用解碼器的實現類對累加到的數據進行解碼,若是能解碼出一個數據對象,就表示讀到了一個完整的數據包,那就將解碼出來的數據對象沿着 pipeline 向下傳播,交由業務代碼去執行後面的業務邏輯。若是沒能解碼出一個數據對象,那就表示尚未讀到一個完整的數據包,就不向下進行傳播,而是等待下一次繼續有數據讀,繼續累加字節流數據,直到累加到的數據能解碼出一個數據對象,而後再向下傳播。
這裏有 3 個比較重要的點,第一:字節流數據的累加器;第二:具體的解碼操做,這一步是在具體的解碼器實現類中完成的;第三:解碼出來的數據對象,在由具體的解碼器將字節流數據解碼出數據對象後,會將對象存放到一個 list 中,而後將數據對象沿着 pipeline 向下傳播。接下來咱們結合上面的源碼,來分析下這幾個步驟。
首先會判斷 msg 是不是 ByteBuf 類型的對象,對於已經解碼出來的字節流數據,此時不會是 ByteBuf 類型,所以也不須要進行解碼,從而進入 else 邏輯中,直接將數據對象向下進行傳播。對於沒有被解碼的字節流數據,此時 msg 就是 ByteBuf 類型,所以會進入到 if 邏輯塊中進行解碼。
在 if 邏輯中,先定義了一個out對象,這個對象能夠簡單的把它當作一個集合對象,它用來存放成功解碼出來的數據對象,也就是上面第三點中提到的 list。接下來會碰到 cumulation 這個對象,它就是一個字節流數據累加器,默認值爲MERGE_CUMULATOR,經過判斷它是否爲空,從而知道是不是第一次讀取數據,若是爲空,表示前面沒有累加數據,所以直接讓 msg 等於 cumulation,意思就是將當前的字節流數據 msg 所有累加到累加器 cumulation 中;若是累加器不爲空,表示前面累加器中存在一部分數據(前面出現了 TCP 半包現象),所以須要將當前讀到的字節流數據 msg 累加到累加器中。
如何累加字節流數據到累加器中的呢?那就是調用累加器的 cumulate() 方法,這裏採用的是策略設計模式,默認的累加器爲MERGE_CUMULATOR,其源碼以下。
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final ByteBuf buffer;
// 若是由於空間滿了寫不了本次的新數據 就擴容
// cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() 能夠裝換爲以下:
// cumulation.writeIndex() + in.readableBytes()>cumulation.maxCapacity
// 即 寫指針的位置+可讀的數據的長度,若是超過了ByteBuf的最大長度,那麼就須要擴容
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// 擴容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 將新數據寫入
buffer.writeBytes(in);
return buffer;
} finally {
// 釋放內存,防止OOM
in.release();
}
}
複製代碼
在代碼中能夠看到,在將數據累加到累加器中以前,會先判斷是否須要擴容,若是須要擴容,就調用 expandCumulation() 方法先進行擴容。最後調用 writeBytes() 方法將數據寫入到累加器中,而後將累加器返回。關於擴容的方法,因爲這裏的累加器是MERGE_CUMULATOR,所以其底層就是進行內存複製。在 netty 中還提供了另外一種類型的累加器:COMPOSITE_CUMULATOR,它擴容的時候不須要進行內存複製,而是經過組合 ByteBuf,即CompositeByteBuf類來實現擴容的。
那麼問題來了,顯然基於內存複製的操做會更慢一點,那 netty 爲何會默認使用基於內存複製的累加器呢?netty 源碼裏面給的解釋以下:
/**
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
複製代碼
大體意思就是:累加器只是累加數據,具體的解碼操做是由抽象解碼器的實現類來作的,對於解碼器的實現類,此時咱們並不知道解碼器的實現類具體是如何進行解碼的,可能它基於 CompositeByteBuf 類型數據結構,解碼起來會更慢,並不必定比直接使用 ByteBuf 快,效率不必定高,所以 netty 默認就直接使用了MERGE_CUMULATOR,也就是基於內存複製的累加器。
當將數據累加到累計器後,就會調用callDecode(ctx, cumulation, out) 來進行解碼了。其精簡後的源碼以下。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 只要有數據可讀,就循環讀取數據
while (in.isReadable()) {
int outSize = out.size();
// 若是out中有對象,這說明已經解碼出一個數據對象了,能夠向下傳播了
if (outSize > 0) {
// 向下傳播,並清空out
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 記錄下解碼以前的可讀字節數
int oldInputLength = in.readableBytes();
// 調用解碼的方法
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
// 若是解碼先後,out中對象的數量沒變,這代表沒有解碼出新的對象
if (outSize == out.size()) {
// 當沒解碼出新的對象時,累計器中可讀的字節數在解碼先後也沒變,說明本次while循環讀到的數據,
// 不夠解碼出一個對象,所以中斷循環,等待下一次讀到數據
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
// out中的對象數量變了,說明解碼除了新的對象,可是解碼先後,累計器中的可讀數據並無變化,這表示出現了異常
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
複製代碼
該方法的第二個參數 in 就是前面咱們提到的累積器,第三個參數 out,就是前面提到的存儲成功解碼出來的數據對象。該方法的邏輯能夠參考上面代碼中的註釋,解碼的核心代碼在這一行:
// 調用解碼的方法
decodeRemovalReentryProtection(ctx, in, out);
複製代碼
這個方法的源代碼以下。在它的代碼中,會就會真正去調用解碼器子類的 decode()方法,進行數據解碼。
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//調用子類的解碼方法
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
複製代碼
decode(ctx, in, out) 方法是一個抽象方法,它由解碼器的具體實現類去實現具體的邏輯。顯然,在父類中,調用一個抽象方法,抽象方法具體的邏輯由子類本身實現,這裏用到的是模板方法設計模式。netty 中經常使用的解碼器有以下幾種,以下所示。關於子類解碼的核心邏輯,後面兩篇文章分析。
FixedLengthFrameDecoder(基於固定長度的解碼器)
LineBasedFrameDecoder (基於行分隔符的解碼器)
DelimiterBasedFrameDecoder (基於自定義分割符的解碼器)
LengthFieldBasedFrameDecoder (基於長度字段的解碼器)
複製代碼
最後回到channelRead() 方法的 finally 語句塊中(代碼以下),會先獲取 out 中解碼出來的數據對象的數量,而後調用 fireChannelRead() 方法將解析出來的數據對象向下進行傳播處理。若是 size 爲 0,就表示沒有解碼出一個對象,所以不會向下傳播,而是等到下一次繼續讀到數據後解碼。
finally {
// 省略部分代碼...
// size是解碼出來的數據對象的數量,
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 向下傳播,若是size爲0,就表示沒有解碼出一個對象,所以不會向下傳播,而是等到下一次繼續讀到數據後解碼
fireChannelRead(ctx, out, size);
out.recycle();
}
複製代碼
本文先介紹了什麼是 TCP 的粘包、半包現象,就是將多個獨立的數據包合成或者拆分紅多個數據包發送,以及產生粘包、半包現象的根本緣由是 TCP 協議是基於字節流傳輸數據的。而後結合源碼介紹了 netty 經過編解碼器是如何來解決粘包、半包問題。最後關於具體的解碼操做的源碼分析,將會在後面兩篇文章中分析。