#Dubbo處理TCP拆包粘包問題java
在TCP網絡傳輸工程中,因爲TCP包的緩存大小限制,每次請求數據有可能不在一個TCP包裏面,或者也可能多個請求的數據在一個TCP包裏面。那麼若是合理的decode接受的TCP數據很重要,須要考慮TCP拆包和粘包的問題。咱們知道在Netty提供了各類Decoder來解決此類問題,好比LineBasedFrameDecoder
,LengthFieldBasedFrameDecoder
等等,可是這些都是處理一些通用簡單的協議棧,並不能處理高度自定義的協議棧。因爲dubbo協議是自定義協議棧,而且包含消息頭和消息體兩部分,而消息頭中包含消息類型、協議版本、協議魔數以及payload長度等信息。因此使用Netty自帶的處理方案可能沒法知足Dubbo解析自身協議的需求,因此須要Dubbo本身來處理,那本身處理,就須要本身處理TCP的拆包和粘包的問題。這裏就對Dubbo處理此類問題進行探討,從而加深本身對它的理解。web
##說明 此處所描述的協議是dubbo協議,其餘的協議好比http,webservice等協議不是這裏討論範圍。而且這裏使用的通訊框架以Netty來說解,Mina以及grizzly也不在種類討論範圍。緩存
##NettyCodecAdapter NettyCodecAdapter
是對dubbo協議解析的入口,裏面包含decoder和encoder兩部分,而TCP的拆包和粘包主要是decoder部分,因此encoder這裏不進行討論。在NettyCodecAdapter
中的decoder是由InternalDecoder
來實現,它的父類是Netty的SimpleChannelUpstreamHandler
能夠接受全部inbound消息,那麼就能夠對接受的消息進行decode。這裏須要說明一下對於某一個Channel都有一個私有的InternalDecoder
對象,並非和其餘的Channel共享,這裏就避免了併發問題,因此在InternalDecoder
裏面能夠用單線程的方式去看待,這樣就比較容易理解。網絡
###InternalDecoder 每一個channel的inbound消息都會發送到InternalDecoder
的messageReceived
方法,而dubbo會先將接受的消息緩存到InternalDecoder
的buffer
屬性中,這個變量很重要,後面會討論。下面是messageReceived
方法中將接受的消息負載到buffer
實現。併發
private class InternalDecoder extends SimpleChannelUpstreamHandler { private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Object o = event.getMessage(); if (! (o instanceof ChannelBuffer)) { ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; int readable = input.readableBytes(); if (readable <= 0) { return; } com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if (buffer.readable()) { if (buffer instanceof DynamicChannelBuffer) { buffer.writeBytes(input.toByteBuffer()); message = buffer; } else { int size = buffer.readableBytes() + input.readableBytes(); message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize); message.writeBytes(buffer, buffer.readableBytes()); message.writeBytes(input.toByteBuffer()); } } else { message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message); } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) { Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable()); } finally { if (message.readable()) { message.discardReadBytes(); buffer = message; } else { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { ctx.sendUpstream(e); } }
首先是判斷當前decoder對象的buffer
中是否有能夠讀取的消息,若是有則進行合併,而且把對象引用賦予message
局部變量,因此message
則獲取了當前channel的inbound消息。獲得inbound消息以後,那麼接下來就是對協議的解析了。app
do { saveReaderIndex = message.readerIndex(); try { msg = codec.decode(channel, message); } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) { Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable());
這裏首先要作的是把當前message的讀索引保存到局部變量saveReaderIndex
中,用於後面的消息回滾。後面緊接着是對消息的decode,這裏的codec
是DubboCountCodec
對象實體,這裏須要注意一點,DubboCountCodec
的decode
每次只會解析出一個完整的dubbo協議棧,帶着這個看看decode
的實現。框架
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true); if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0); } return result; }
這裏暫存了當前buffer
的讀索引,一樣也是爲了後面的回滾。能夠看到當decode返回的是NEED_MORE_INPUT則表示當前的buffer
中數據不足,不能完整解析出一個dubbo協議棧,同時將buffer的讀索引回滾到以前暫存的索引而且退出循環,將結果返回。那接下來看看何時會返回NEED_MORE_INPUT,最終會定位到在ExchangeCodec
的decode
方法會解析出協議棧。tcp
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i ++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if( readable < tt ) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } }
這個方法開始是對telnet協議進行解析(因爲dubbo支持telnet鏈接,因此這裏提供了支持,能夠忽略這一部分)。看到會有兩個地方返回NEED_MORE_INPUT
,一個是當前buffer
的可讀長度尚未消息頭長,說明當前buffer
連協議棧的頭都不完整,因此須要繼續讀取inbound數據,另外一個是當前buffer
包含了完整的消息頭,即可以獲得payload的長度,發現它的可讀的長度,並無包含整個協議棧的數據,因此也須要繼續讀取inbound數據。若是上面兩個狀況都不復核,那麼說明當前的buffer
至少包含一個dubbo協議棧的數據,那麼從當前buffer
中讀取一個dubbo協議棧的數據,解析出一個dubbo數據,固然這裏可能讀取完一個dubbo數據以後還會有剩餘的數據。ide
上面對dubbo解析出一個完整的dubbo協議棧過程進行了討論,可是尚未對TCP的拆包和粘包問題作過多的討論。下面結合上面內容作一個綜合討論。url
我這裏對TCP拆包和粘包分別列舉一個場景來討論。
###當反生TCP拆包問題時候 這裏假設以前尚未發生過任何數據交互,系統剛剛初始化好,那麼這個時候在InternalDecoder
裏面的buffer
屬性會是EMPTY_BUFFER
。當發生第一次inbound數據的時候,第一次在InternalDecoder
裏面接收的確定是dubbo消息頭的部分(這個由TCP協議保證),因爲發生了拆包狀況,那麼此時接收的inbound消息可能存在一下幾種狀況
一、當前inbound消息只包含dubbo協議頭的一部分
二、當前inbound消息只包含dubbo的協議頭
三、當前inbound消息只包含dubbo消息頭和部分payload消息
經過上面的討論,咱們知道發生上面三種狀況,都會觸發ExchangeCodec
返回NEED_MORE_INPUT
,因爲在DubboCountCodec
對餘返回NEED_MORE_INPUT
會回滾讀索引,因此此時的buffer
裏面的數據能夠看成並無發生過讀取操做,而且DubboCountCodec
的decode也會返回NEED_MORE_INPUT
,在InternalDecoder
對於當判斷返回NEED_MORE_INPUT
,也會進行讀索引回滾,而且退出循環,最後會執行finally
內容,這裏會判斷inbound消息是否還有可讀的,因爲在DubboCountCodec
裏面進行了讀索引回滾,因此次數的buffer
裏面是完整的inbound消息,等待第二次的inbound消息的到來,當第二次inbound消息過來的時候,再次通過上面的判斷。
###當發生TCP粘包的時候 當發生粘包的時候是tcp將一個以上的dubbo協議棧放在一個tcp包中,那麼有可能發生下面幾種狀況
一、當前inbound消息只包含一個dubbo協議棧
二、當前inbound消息包含一個dubbo協議棧,同時包含部分另外一個或者多個dubbo協議棧內容
若是發生只包含一個協議棧,那麼當前buffer
經過ExchangeCodec
解析協議以後,當前的buffer
的readeIndex位置應該是 buffer
尾部,那麼在返回到InternalDecoder
中message
的方法readable
返回的是false,那麼就會對buffer
從新賦予EMPTY_BUFFER
實體,而針對包含一個以上的dubbo協議棧,固然也會解析出其中一個dubbo協議棧,可是通過ExchangeCodec
解析以後,message
的readIndex不在message
尾部,因此message
的readable
方法返回的是true
。那麼則會繼續遍歷message
,讀取下面的信息。最終要麼message
恰好整數倍包含完整的dubbo協議棧,要不ExchangeCodec
返回NEED_MORE_INPUT
,最後將未讀完的數據緩存到buffer
中,等待下次inbound事件,將buffer
中的消息合併到下次的inbound消息中,種類又回到了拆包的問題上。
##總結
dubbo在處理tcp的粘包和拆包時是藉助InternalDecoder
的buffer
緩存對象來緩存不完整的dubbo協議棧數據,等待下次inbound事件,合併進去。因此說在dubbo中解決TCP拆包和粘包的時候是經過buffer
變量來解決的。