歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java
ByteToMessageDecoder在Netty中起着很大的做用,用來解決半包字節累積問題。粘貼部分重要代碼(固然自己方法不是很
git
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter { ByteBuf cumulation; private boolean singleDecode; private boolean first; protected ByteToMessageDecoder() { if (getClass().isAnnotationPresent(Sharable.class)) {//由於每個ByteToMessageDecoder都有針對某個socket的累積對象 //故是一個不能夠共享的對象類型 throw new IllegalStateException("@Sharable annotation is not allowed"); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { //緩衝區的大小沒有超過須要寫入的數據的大小 if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data);//將數據寫入到積累對象中 data.release();//釋放bytebuffer(heap或者direct)--經過引用的方式進行釋放緩衝區 } //收集完畢以後解析收集到的字符串 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //若是累積對象中沒有數據了(由於全部發送的數據剛恰好n個msg) if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0; //針對解析後的out結果,逐個調用message for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } } private void expandCumulation(ChannelHandlerContext ctx, int readable) { ByteBuf oldCumulation = cumulation;//新的容量=舊的容量+可讀取的數量 ---在此處的擴展和初次的分配都是經過同一個allocator進行分配的 cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); cumulation.writeBytes(oldCumulation);//複製的過程 oldCumulation.release();//釋放老的對象 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { if (cumulation != null && !first) {//cumulation可讀的數據爲0那麼就不會調用緊跟着的代碼段 cumulation.discardSomeReadBytes();//若是存在半包得話,那麼就釋放沒必要要的空間(有時間的話,咱們會將一個Netty中ByteBuf的構造) } if (decodeWasNull) { decodeWasNull = false; if (!ctx.channel().config().isAutoRead()) { ctx.read(); } } ctx.fireChannelReadComplete(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance(); try {//若是channel不活動了得話,對累積對象進行解碼。 if (cumulation != null) { callDecode(ctx, cumulation, out); decodeLast(ctx, cumulation, out); } else { decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null) { cumulation.release(); cumulation = null; } int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } ctx.fireChannelInactive(); out.recycle(); } } /** * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out);//特別注意: 調用完若是msg被解析出來的話,那麼累積對象的readableBytes必定會發生變化 // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) {//若是此handler被移除 break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } }
由此總結以下:github
累積對象主要是承載socket接收的字節數的。每一次decode,當有msg生成的時候,readableBytes都會減少socket
當一個socket處於inactive時,會對累積對象的數據進行解析。而後釋放累積對象ide
噹噹前的累積對象不能承載數據的時候,須要進行擴展(調用Allocator建立個新的bytebuf,而後copy一下數據)oop
該handler是一個有狀態的handler,須要記憶每個socket的發送的字節.而後再去decode成msg對象
ui