Netty的底層是基於TCP實現的,TCP協議在傳輸數據的過程當中一個完整的業務可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,所以咱們須要考慮Netty的粘包拆包問題java
Netty提供了拆包的基類ByteToMessageDecoder,若是咱們爲引用程序添加了解碼器每次從TCP緩衝區讀到數據都會調用到ByteToMessageDecoder的channelRead方法,它是Netty解碼的入口web
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
從上訴代碼能夠看出Netty拆包的過程主要分爲一下四個流程bash
1.累加數據 2.將累加到的數據傳遞給業務進行業務拆包 3.清理字節容器 4.傳遞業務數據包給業務解碼器處理
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
累加數據是經過如上代碼實現的,主要功能就是將讀取到的數據塞到ButeBuf中去。上面涉及到一個累加器cumulator(實現的功能就是往ByteBuf追加數據),在該類中定義了以下兩個累加器(默認狀況下會使用MERGE_CUMULATOR)ide
public static final Cumulator MERGE_CUMULATOR
public static final Cumulator COMPOSITE_CUMULATOR
下面咱們看一下MERGE_CUMULATOR是如何將新讀取到的數據累加到字節容器裏的svg
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
Netty中ByteBuf的抽象使得累加很是簡單,經過一個簡單的API調用buffer.writeBytes(in)便將新數據累加到字節容器中,爲了防止字節容器大小不夠在累加以前還進行了擴容處理,擴容也是一個內存拷貝操做新增的大小便是新讀取數據的大小函數
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
到這一步字節容器裏的數據已經是目前未拆包部分的全部的數據了編碼
CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);
callDecode將嘗試將字節容器的數據拆分紅業務數據包塞到業務數據容器out中spa
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
while (in.isReadable()) {
// 記錄一下字節容器中有多少字節待拆
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
if (out.size() == 0) {
// 拆包器未讀取任何數據
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 拆包器已讀取部分數據,還須要繼續
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
}
在解碼以前先記錄一下字節容器中有多少字節待拆,而後調用抽象函數decode進行拆包code
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
Netty中對各類用戶協議的支持就體如今這個抽象函數中,全部的拆包器最終都實現了該抽象方法xml
業務拆包完成以後若是發現並無拆到一個完整的數據包,這個時候又分兩種狀況
1.拆包器什麼數據也沒讀取,可能數據還不夠業務拆包器處理,直接break等待新的數據
2.拆包器已讀取部分數據,說明解碼器仍然在工做,繼續解碼
業務拆包完成以後若是發現已經解析到數據包可是並無讀取任何數據,這個時候就會拋出一個Runtime異常,告訴你什麼數據都沒讀取卻解析出一個業務數據包這是有問題的
業務拆包完成以後只是從字節容器中取走了數據,可是這部分空間對於字節容器來講依然保留着,而字節容器每次累加字節數據的時候都是將字節數據追加到尾部,若是不對字節容器作清理那麼時間一長就會OOM
正常狀況下其實每次讀取完數據,Netty都會在下面這個方法中將字節容器清理
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
可是當發送端發送數據過快channelReadComplete沒有辦法及時清理可能會引起OOM,因此爲防止發送端發送數據過快Netty會在每次讀取到一次數據,業務拆包以後對字節字節容器作清理,清理部分的代碼以下
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
若是字節容器當前已無數據可讀取,直接釋放該容器而且將cumulation置爲null減小下次拆包時計數器累加的工做,若是連續16次(discardAfterReads的默認值)字節容器中仍然有未被業務拆包器讀取的數據那就作一次壓縮,將有效數據段移到容器首部
discardSomeReadBytes以前,字節累加器中的數據分佈
+--------------+----------+----------+
| readed | unreaded | writable | +--------------+----------+----------+
discardSomeReadBytes以後,字節容器中的數據分佈
+----------+-------------------------+
| unreaded | writable | +----------+-------------------------+
這樣字節容器又能夠承載更多的數據了
以上三個步驟完成以後,就能夠將拆成的包丟到業務解碼器處理了,代碼以下
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
最後調用fireChannelRead將拆到的業務數據包都傳遞到後續的handler,若是未解析到有效的數據包此處的msgs長度爲0,即若是在拆包過程當中未解析到有效的數據包,讀事件不會往下傳遞
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
這樣就能夠把一個個完整的業務數據包傳遞到後續的業務解碼器進行解碼,隨後處理業務邏輯
關於消息編碼器原理與消息解碼器相似,不一樣的是消息編碼器的抽象是MessageToByteDecoder。因此此處不展開分析