Netty源碼分析——flush流程java
前言數組
承接上篇寫流程,這篇看下flush流程。以前文章中咱們已經提到過,writeAndFlush操做其實是經過pipeline分別進行了write和flush操做。具體咱們就不看了,咱們直接看下flush。promise
flush緩存
flush操做一樣是經過pipeline最終傳遞給HeadContext:unsafe.flush();:socket
123456789101112public final void flush() { //確保不是外部調用 assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } //添加flush節點 outboundBuffer.addFlush(); //把節點裏的數據寫到socket裏 flush0();}oop
最主要的實際上是兩個步驟,上文已經標註了,一個就是添加flush節點,一個就是真正的寫操做。源碼分析
添加flush節點優化
追進去看下:this
1234567891011121314151617public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; }}.net
咱們按照上篇文章的狀態來講,當前調用了兩次write的狀態是這樣的:
調用完addFlush以後是這樣的:
看到了嗎,其實是把flushedEntry和unFlushedEntry交換了一下。
再假設一下,若是咱們在調用flush以前調用了三次write,再調用flush,鏈表是這樣的:
添加節點以後會繼續執行flush0:
123if (!isFlushPending()) { super.flush0();}
看下這個isFlushPending:
12SelectionKey selectionKey = selectionKey();return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
這裏實際上是在校驗,當前這個Channel是否有OP_WRITE,若是當前selectionKey是寫事件,說明有線程執行flush過程,結合上面的一句:!isFlushPending(),說明若是有線程在進行flush過程,就直接返回。
這裏其實我還沒吃透,個人疑問是這裏是否有必要進行!isFlushPending()判斷。由於咱們以前已經說過了,任何flush操做開頭都進行了一個校驗:assertEventLoop(),說白了,只有Reactor線程能夠調用flush,那麼當前線程在執行的時候,怎麼可能有別的線程進行了flush操做呢?
這個問題我會去debug一下,而後求證一下做者,有結果了會在文章後面加上。
繼續看,若是當前channel沒有被其餘線程操做,這裏會調用super.flush0,回到io.netty.channel.AbstractChannel.AbstractUnsafe#flush0裏:
12345678910111213141516171819202122232425262728293031323334353637383940if (inFlush0) { // 防止重複調用 return;}// 若是沒有數據要flush就返回final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null || outboundBuffer.isEmpty()) { return;}inFlush0 = true;// 若是channel失效,把全部待刷的數據設置爲失敗if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return;}try { // 真正的寫操做 doWrite(outboundBuffer);} catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } }} finally { inFlush0 = false;}
繼續看doWrite操做,這裏會直接走到io.netty.channel.socket.nio.NioSocketChannel#doWrite裏:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849SocketChannel ch = javaChannel();// 獲取循環次數,至關於一個自旋,保證寫成功int writeSpinCount = config().getWriteSpinCount();do { // 若是buffer裏空的,則清理OP_WRITE,防止Reacotr線程再次處理這個Channel if (in.isEmpty()) { clearOpWrite(); return; } // 聚合 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: writeSpinCount -= doWrite0(in); break; case 1: { // 若是隻有一個buffer的狀況下,直接把這個buffer寫進去 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // 多個buffer的狀況下,寫nioBufferCnt個buffer進去 long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } }} while (writeSpinCount > 0);// 是否寫完成incompleteWrite(writeSpinCount < 0);
上面的過程當中,我只是粗略的寫了一下過程,其實裏面的細節很是多,咱們一點一點來看。
先看着幾句:
123int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);int nioBufferCnt = in.nioBufferCount();
首先獲取聚合寫的最大字節數,聚合寫,在原生NIO的概念中,是把幾個Buffer的數據寫到一個Channel裏,就是說一次最多寫多少字節數據。而後進入到nioBuffers方法,這個方法作什麼註釋上有說:若是緩衝區裏全都是ByteBuf,則返回直接NIO緩衝區的Buffer數組(其實就是把ByteBuf裏的數據寫到原生Buffer裏),nioBufferCount和nioBufferSize分別表明返回數組中原生NIO Buffer的數量和NIO緩衝區的可讀字節總數。看代碼,有點長拆開看:
123456789long nioBufferSize = 0;int nioBufferCount = 0;final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);Entry entry = flushedEntry;while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { //...}return nioBuffers;
先是從當前線程裏獲取ByteBuffer,這裏能夠看出,其實ByteBuffer是被緩存了的(若是沒有建立一個長度爲1024的ByteBuffer數組),不須要每次建立。
而後是循環全部的flushedEntry,這裏,咱們回顧一下上面addFlush以後的圖,其實循環中會不停地把flushedEntry前移,直到flushedEntry和tailEntry中的節點所有都被處理。isFlushEntry的代碼:e != null && e != unflushedEntry;,其實就是,不是unflushedEntry的都是flushedEntry。
這裏咱們能夠看到,另一個循環的條件就是entry.msg instanceof ByteBuf,說明這個方法只處理ByteBuf。
繼續看循環裏:
1234567891011121314151617181920212223ByteBuf buf = (ByteBuf) entry.msg;final int readerIndex = buf.readerIndex();final int readableBytes = buf.writerIndex() - readerIndex;// 若是有數據if (readableBytes > 0) { if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) { break; } // NIO Buffer可讀字節數+ByteBuf的可讀字節數 nioBufferSize += readableBytes; int count = entry.count; if (count == -1) { // 初始化entry的count entry.count = count = buf.nioBufferCount(); } int neededSpace = min(maxCount, nioBufferCount + count); if (neededSpace > nioBuffers.length) { //若是實際須要的空間,比以前獲得的ByteBuffer數組數量大,就擴容,而後緩存起來 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); }}
這裏說下這個流程,聲明一下,ByteBuffer指的是原生Buffer,ByteBuf是Netty本身封裝的Buffer。
先拿出來一個Entry的ByteBuf,看看可讀的字節多少。而後初始化entry的count,用的是組成這個ByteBuf的ByteBuffer(原生)的數量(這裏這個nioBufferCount大部分狀況下返回1,及一個ByteBuf對應一個ByteBuffer)。
而後看看須要的空間是多少,須要的空間默認狀況下是1024和已有須要的ByteBuffer數量+count(及ByteBuf.nioBufferCount)兩者之間的最小值,及,最多搞1024個ByteBuffer。
而後對比須要空間和提供空間,及對比以前分配的ByteBuffer[]的length和須要的ByteBuffer的數量,若是須要的空間大,就擴容(這裏能夠類比一下ArrayList的擴容)。也就是,無論怎麼樣,都會分配足夠的ByteBuffer使用。
可能這麼看起來有點繞,我舉個例子:加羣617434785獲取文中知識點。
假設咱們有八個節點,每一個節點的ByteBuf在Flush的時候,數據都會寫入到1個ByteBuffer裏,而後咱們開始循環這個八個節點,循環以前我記錄一下一共須要多少個ByteBuffer數組(好比叫count,循環前就是0),而後咱們有一個分配給咱們的ByteBuffer數組(好比叫fenpei)。
代碼應該是這樣的:
123456789101112// 循環八個節點int count = 0;for (Entry e : entries) { ByteBuf b = e.msg; // 這裏大部分ByteBuf會返回1 int c = b.nioBufferCount(); count += c; if (count > fenpei.length) { //擴容 expandNioBufferArray(fenpei) }}
這樣就比較直接了,再看不懂的。。。emmm,哈哈哈哈哈~
繼續看:
12345678910111213141516171819202122232425if (count == 1) { // 1個ByteBuff對應1個ByteBuffer ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // 初始化ByteBuffer, entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } // 放到數組裏 nioBuffers[nioBufferCount++] = nioBuf;} else { // 一個ByteBuf對應多個ByteBuffer,初始化多個ByteBuffer,循環放到數組裏 ByteBuffer[] nioBufs = entry.bufs; if (nioBufs == null) { entry.bufs = nioBufs = buf.nioBuffers(); } for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) { ByteBuffer nioBuf = nioBufs[i]; if (nioBuf == null) { break; } else if (!nioBuf.hasRemaining()) { continue; } nioBuffers[nioBufferCount++] = nioBuf; }}
這個方法咱們總結一下,就是分配數組,這個數組一開始會初始化一個長度爲1024的ByteBuffer數組,若是不夠用就擴容,裏面的ByteBuffer能容納的數據,對應每一個節點ByteBuf裏有的數據。這個地方其實並無看到ByteBuf向對應的ByteBuffer裏寫數據的地方,關於這個問題,你們能夠跟一下buf.internalNioBuffer(readerIndex, readableBytes)這裏,這裏是會把數據搞到ByteBuffer裏的。
至此分配就結束了,而後繼續往下看doWrite,接下來進入了一個switch,條件就是有多少個原生ByteBuffer要寫,咱們看看default的狀況:
12345678910111213141516// 整個Entry鏈的可讀數據long attemptedBytes = in.nioBufferSize();// 向管道中寫數據final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);// 若是寫失敗了,這個要細說if (localWrittenBytes <= 0) { incompleteWrite(true); return;}// 調整最大聚合寫的字節數adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);// remove節點 in.removeBytes(localWrittenBytes);// 自旋次數-1--writeSpinCount;
向channel中寫入數據,若是localWrittenBytes < 0,這裏是說明這個Channel不可用,其實就是寫失敗了(或者說沒有所有寫到Channel裏?這個地方存疑,我沒有驗證過)。
這裏寫失敗了怎麼辦,咱們看下incompleteWrite,注意入參是true,下面會說到:
1234567891011121314151617// 這裏這個setOpWrite就是入參trueif (setOpWrite) { setOpWrite();} else { clearOpWrite(); eventLoop().execute(flushTask);}setOpWrite方法:final SelectionKey key = selectionKey();if (!key.isValid()) { return;}final int interestOps = key.interestOps();if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE);}
若是失敗了,就去setOpWrite,其實就是給Channel註冊了一個OP_WRITE,而後就return了。
這裏爲何要打上OP_WRITE呢,打上有什麼用呢?還記得上文中我提出的問題麼,關於爲何flush0以前要進行判斷isFlushPending,這裏讓我細細說來(爲何判斷isFlushPending的疑惑已經解開了,這篇文章拖了幾天,這幾天中來來回回的看源碼、請教大神、debug。。終於有所突破)。
先放下isFlushPending,繼續說這個OP_WRITE。這裏還記得何時打上OP_WRITE麼,是Channel寫失敗的時候!咱們先無論這個OP_WRITE的具體含義,就認爲是一個標記,標記這個管道不可用,這時候,請問:管道不可用的狀況下,若是我還想進行Flush操做,即向管道中寫數據,這時候能成功麼?答案是不行!怎麼優化?太簡單了,提早返回就能夠了,每次Flush的時候先看看管道是否可用!
到這,isFlushPending的做用就體現出來了!OP_WRITE咱們就把它當成一個普通的標記,若是Channel上有這個標記,就表示不可寫。
那麼爲何用OP_WRITE標識不可寫呢,命名OP_WRITE的含義就是可寫啊!這裏就要說回到Reacotr要作的三件事中的處理select到的事件了,看一下processSelectedKey:
1234if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush();}
其中有這麼一句,這裏的註釋我也抄過來了,就是說,若是輪訓到寫事件,就去執行forceFlush,而後清理掉OP_WRITE。
回想咱們寫失敗的地方。若是寫失敗,就給管道註冊OP_WRITE,而後Reacotr線程會不斷地去select,一旦Channel可用,那麼這個Channel因爲以前註冊了OP_WRITE,就會被Reactor線程select出來,而後進行forceFlush,這個forceFlush其實就是調用了flush0,從新走了一遍flush操做,注意兩個地方:
forceFlush不會添加flush節點(不會調用addFlush)
forceFlush不會進行isFlushPending校驗
爲何不進行isFlushPending:咱們說過了,OP_WRITE的意思是管道不可用,那麼被select出來的管道必定是可用的,直接進行寫操做。
這樣咱們就理順了寫失敗的流程:
若是寫失敗,給管道註冊一個OP_WRITE
其餘的flush操做都會直接返回(被isFlushPending)攔截
管道可用後,被Reacotr線程select到,進行forceFlush操做
收一下,看回到doWrite方法裏,若是寫成功,進行in.removeBytes(localWrittenBytes);操做,remove掉這個節點。
注意,正常狀況下,結束doWrite操做是在:
1234if (in.isEmpty()) { clearOpWrite(); return;}
這裏返回的。若是樂觀鎖的默認16次都循環完,操做還沒結束,又會進行incompleteWrite(writeSpinCount < 0)操做,若是執行了16次循環之後,ChannelOutboundBuffer中還有Entry,writeSpinCount < 0成立,設置一個OP_WRITE,而後等着被Reacotr線程select。若是大於0,這種狀況比較特殊,寫入的ByteBuf或者FileRegion只有一個,可是這個ByteBuf是不可讀的,或者region.transferred() >= region.count()。這時候會走到incompleteWrite(false)裏,這裏執行clearOpWrite();和eventLoop().execute(flushTask);,清理掉OP_WRITE讓通道繼續可寫,而後再次扔了一個flushTask到NioEventLoop裏,這裏其是讓出資源,讓Reacotr能夠處理其餘的task。至此,整個寫流程就結束了,寫的比較細,你們多多琢磨,多多思考,纔會有更多收穫!