Netty源碼分析第七章: 編碼器和寫數據html
第四節: 刷新buffer隊列java
上一小節學習了writeAndFlush的write方法, 這一小節咱們剖析flush方法promise
經過前面的學習咱們知道, flush方法經過事件傳遞, 最終會傳遞到HeadContext的flush方法:socket
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
這裏最終會調用AbstractUnsafe的flush方法:oop
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
這裏首先也是拿到ChannelOutboundBuffer對象源碼分析
而後咱們看這一步:學習
outboundBuffer.addFlush();
這一步一樣也是調整ChannelOutboundBuffer的指針this
跟進addFlush方法:編碼
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; } }
首先聲明一個entry指向unflushedEntry, 也就是第一個未flush的entryspa
一般狀況下unflushedEntry是不爲空的, 因此進入if
再未刷新前flushedEntry一般爲空, 因此會執行到flushedEntry = entry
也就是flushedEntry指向entry
通過上述操做, 緩衝區的指針狀況如圖所示:
7-4-1
而後經過do-while將, 不斷尋找unflushedEntry後面的節點, 直到沒有節點爲止
flushed自增表明須要刷新多少個節點
循環中咱們關注這一步
decrementPendingOutboundBytes(pending, false, true);
這一步也是統計緩衝區中的字節數, 可是是和上一小節的incrementPendingOutboundBytes正好是相反, 由於這裏是刷新, 因此這裏要減掉刷新後的字節數,
咱們跟到方法中:
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } //從總的大小減去 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //直到減到小於某一個閾值32個字節 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { //設置寫狀態 setWritable(invokeLater); } }
一樣TOTAL_PENDING_SIZE_UPDATER表明緩衝區的字節數, 這裏的addAndGet中參數是-size, 也就是減掉size的長度
再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark())
getWriteBufferLowWaterMark()表明寫buffer的第水位值, 也就是32k, 若是寫buffer的長度小於這個數, 就經過setWritable方法設置寫狀態
也就是通道由原來的不可寫改爲可寫
回到addFlush方法:
遍歷do-while循環結束以後, 將unflushedEntry指爲空, 表明全部的entry都是可寫的
通過上述操做, 緩衝區的指針狀況以下圖所示:
7-4-2
回到AbstractUnsafe的flush方法:
指針調整完以後, 咱們跟到flush0()方法中:
protected void flush0() { if (inFlush0) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
if (inFlush0) 表示判斷當前flush是否在進行中, 若是在進行中, 則返回, 避免重複進入
咱們重點關注doWrite方法
跟到AbstractNioByteChannel的doWrite方法中去:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { //每次拿到當前節點 Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //轉化成ByteBuf ByteBuf buf = (ByteBuf) msg; //若是沒有可寫的值 int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //移除 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { //將buf寫入到socket裏面 //localFlushedAmount表明向jdk底層寫了多少字節 int localFlushedAmount = doWriteBytes(buf); //若是一個字節沒寫, 直接break if (localFlushedAmount == 0) { setOpWrite = true; break; } //統計總共寫了多少字節 flushedAmount += localFlushedAmount; //若是buffer所有寫到jdk底層 if (!buf.isReadable()) { //標記全寫道 done = true; break; } } in.progress(flushedAmount); if (done) { //移除當前對象 in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代碼省略 } else { throw new Error(); } } incompleteWrite(setOpWrite); }
首先是一個無限for循環
Object msg = in.current() 這一步是拿到flushedEntry指向的entry中的msg
跟到current()方法中:
public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }
這裏直接拿到flushedEntry指向的entry中關聯的msg, 也就是一個ByteBuf
回到doWrite方法:
若是msg爲null, 說明沒有能夠刷新的entry, 則調用clearOpWrite()方法清除寫標識
若是msg不爲null, 則會判斷是不是ByteBuf類型, 若是是ByteBuf, 就進入if塊中的邏輯
if塊中首先將msg轉化爲ByteBuf, 而後判斷ByteBuf是否可讀, 若是不可讀, 則經過in.remove()將當前的byteBuf所關聯的entry移除, 而後跳過此次循環進入下次循環
remove方法稍後分析, 這裏咱們先繼續往下看
boolean done = false 這裏設置一個標識, 標識刷新操做是否執行完成, 這裏默認值爲false表明走到這裏沒有執行完成
writeSpinCount = config().getWriteSpinCount() 這裏是得到一個寫操做的循環次數, 默認是16
而後根據這個循環次數, 進行循環的寫操做
在循環中, 關注這一步:
int localFlushedAmount = doWriteBytes(buf);
這一步就是將buf的內容寫到channel中, 並返回寫的字節數, 這裏會調用NioSocketChannel的doWriteBytes
咱們跟到doWriteBytes方法中:
protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
這裏首先拿到buf的可讀字節數, 而後經過readBytes將可讀字節寫入到jdk底層的channel中
回到doWrite方法:
將內容寫的jdk底層的channel以後, 若是一個字節都沒寫, 說明如今channel可能不可寫, 將setOpWrite設置爲true, 用於標識寫操做位, 並退出循環
若是已經寫出字節, 則經過 flushedAmount += localFlushedAmount 累加寫出的字節數
而後根據是buf是否沒有可讀字節數判斷是否buf的數據已經寫完, 若是寫完, 將done設置爲true, 說明寫操做完成, 並退出循環
由於有時候不必定一次就能將byteBuf全部的字節寫完, 因此這裏會繼續經過循環進行寫出, 直到循環到16次
若是ByteBuf內容徹底寫完, 會經過in.remove()將當前entry移除掉
咱們跟到remove方法中:
public boolean remove() { //拿到當前第一個flush的entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }
首先拿到當前的flushedEntry
咱們重點關注removeEntry這步, 跟進去:
private void removeEntry(Entry e) { if (-- flushed == 0) { //位置爲空 flushedEntry = null; //若是是最後一個節點 if (e == tailEntry) { //所有設置爲空 tailEntry = null; unflushedEntry = null; } } else { //移動到下一個節點 flushedEntry = e.next; } }
if (-- flushed == 0) 表示當前節點是否爲須要刷新的最後一個節點, 若是是, 則flushedEntry指針設置爲空
若是當前節點是tailEntry節點, 說明當前節點是最後一個節點, 將tailEntry和unflushedEntry兩個指針所有設置爲空
若是當前節點不是須要刷新的最後的一個節點, 則經過 flushedEntry = e.nex t這步將flushedEntry指針移動到下一個節點
以上就是flush操做的相關邏輯