Netty源碼分析第7章(編碼器和寫數據)---->第4節: 刷新buffer隊列

 

Netty源碼分析第七章: 編碼器和寫數據html

 

第四節: 刷新buffer隊列java

 

上一小節學習了writeAndFlush的write方法, 這一小節咱們剖析flush方法promise

經過前面的學習咱們知道, flush方法經過事件傳遞, 最終會傳遞到HeadContext的flush方法:socket

public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }

這裏最終會調用AbstractUnsafe的flush方法:oop

public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }

這裏首先也是拿到ChannelOutboundBuffer對象源碼分析

而後咱們看這一步:學習

outboundBuffer.addFlush();

這一步一樣也是調整ChannelOutboundBuffer的指針this

跟進addFlush方法:編碼

public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; } }

首先聲明一個entry指向unflushedEntry, 也就是第一個未flush的entryspa

一般狀況下unflushedEntry是不爲空的, 因此進入if

再未刷新前flushedEntry一般爲空, 因此會執行到flushedEntry = entry

也就是flushedEntry指向entry

通過上述操做, 緩衝區的指針狀況如圖所示:

7-4-1

而後經過do-while將, 不斷尋找unflushedEntry後面的節點, 直到沒有節點爲止

flushed自增表明須要刷新多少個節點

循環中咱們關注這一步

decrementPendingOutboundBytes(pending, false, true);

這一步也是統計緩衝區中的字節數, 可是是和上一小節的incrementPendingOutboundBytes正好是相反, 由於這裏是刷新, 因此這裏要減掉刷新後的字節數,

咱們跟到方法中:

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } //從總的大小減去 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //直到減到小於某一個閾值32個字節 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { //設置寫狀態  setWritable(invokeLater); } }

一樣TOTAL_PENDING_SIZE_UPDATER表明緩衝區的字節數, 這裏的addAndGet中參數是-size, 也就是減掉size的長度

再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) 

getWriteBufferLowWaterMark()表明寫buffer的第水位值, 也就是32k, 若是寫buffer的長度小於這個數, 就經過setWritable方法設置寫狀態

也就是通道由原來的不可寫改爲可寫

回到addFlush方法:

遍歷do-while循環結束以後, 將unflushedEntry指爲空, 表明全部的entry都是可寫的

通過上述操做, 緩衝區的指針狀況以下圖所示:

7-4-2

回到AbstractUnsafe的flush方法:

指針調整完以後, 咱們跟到flush0()方法中:

protected void flush0() { if (inFlush0) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }

 if (inFlush0) 表示判斷當前flush是否在進行中, 若是在進行中, 則返回, 避免重複進入

咱們重點關注doWrite方法

跟到AbstractNioByteChannel的doWrite方法中去:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { //每次拿到當前節點 Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //轉化成ByteBuf ByteBuf buf = (ByteBuf) msg; //若是沒有可寫的值 int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //移除  in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { //將buf寫入到socket裏面 //localFlushedAmount表明向jdk底層寫了多少字節 int localFlushedAmount = doWriteBytes(buf); //若是一個字節沒寫, 直接break if (localFlushedAmount == 0) { setOpWrite = true; break; } //統計總共寫了多少字節 flushedAmount += localFlushedAmount; //若是buffer所有寫到jdk底層 if (!buf.isReadable()) { //標記全寫道 done = true; break; } } in.progress(flushedAmount); if (done) { //移除當前對象  in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代碼省略 } else { throw new Error(); } } incompleteWrite(setOpWrite); }

首先是一個無限for循環

 Object msg = in.current() 這一步是拿到flushedEntry指向的entry中的msg

跟到current()方法中:

public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }

這裏直接拿到flushedEntry指向的entry中關聯的msg, 也就是一個ByteBuf

回到doWrite方法:

若是msg爲null, 說明沒有能夠刷新的entry, 則調用clearOpWrite()方法清除寫標識

若是msg不爲null, 則會判斷是不是ByteBuf類型, 若是是ByteBuf, 就進入if塊中的邏輯

if塊中首先將msg轉化爲ByteBuf, 而後判斷ByteBuf是否可讀, 若是不可讀, 則經過in.remove()將當前的byteBuf所關聯的entry移除, 而後跳過此次循環進入下次循環

remove方法稍後分析, 這裏咱們先繼續往下看

 boolean done = false 這裏設置一個標識, 標識刷新操做是否執行完成, 這裏默認值爲false表明走到這裏沒有執行完成

 writeSpinCount = config().getWriteSpinCount() 這裏是得到一個寫操做的循環次數, 默認是16

而後根據這個循環次數, 進行循環的寫操做

在循環中, 關注這一步:

int localFlushedAmount = doWriteBytes(buf);

這一步就是將buf的內容寫到channel中, 並返回寫的字節數, 這裏會調用NioSocketChannel的doWriteBytes

咱們跟到doWriteBytes方法中:

protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }

這裏首先拿到buf的可讀字節數, 而後經過readBytes將可讀字節寫入到jdk底層的channel中

回到doWrite方法:

將內容寫的jdk底層的channel以後, 若是一個字節都沒寫, 說明如今channel可能不可寫, 將setOpWrite設置爲true, 用於標識寫操做位, 並退出循環

若是已經寫出字節, 則經過 flushedAmount += localFlushedAmount 累加寫出的字節數

而後根據是buf是否沒有可讀字節數判斷是否buf的數據已經寫完, 若是寫完, 將done設置爲true, 說明寫操做完成, 並退出循環

由於有時候不必定一次就能將byteBuf全部的字節寫完, 因此這裏會繼續經過循環進行寫出, 直到循環到16次

若是ByteBuf內容徹底寫完, 會經過in.remove()將當前entry移除掉

咱們跟到remove方法中:

public boolean remove() { //拿到當前第一個flush的entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }

首先拿到當前的flushedEntry

咱們重點關注removeEntry這步, 跟進去:

private void removeEntry(Entry e) { if (-- flushed == 0) { //位置爲空 flushedEntry = null; //若是是最後一個節點 if (e == tailEntry) { //所有設置爲空 tailEntry = null; unflushedEntry = null; } } else { //移動到下一個節點 flushedEntry = e.next; } }

 if (-- flushed == 0) 表示當前節點是否爲須要刷新的最後一個節點, 若是是, 則flushedEntry指針設置爲空

若是當前節點是tailEntry節點, 說明當前節點是最後一個節點, 將tailEntry和unflushedEntry兩個指針所有設置爲空

若是當前節點不是須要刷新的最後的一個節點, 則經過 flushedEntry = e.nex t這步將flushedEntry指針移動到下一個節點

以上就是flush操做的相關邏輯

 

上一節: 寫buffer隊列

下一節: Future和Promies

相關文章
相關標籤/搜索