Netty源碼分析第7章(編碼器和寫數據)---->第3節: 寫buffer隊列

 

Netty源碼分析七章: 編碼器和寫數據html

 

第三節: 寫buffer隊列promise

 

 

以前的小節咱們介紹過, writeAndFlush方法其實最終會調用write和flush方法緩存

write方法最終會傳遞到head節點, 調用HeadContext的write方法:oop

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }

這裏經過unsafe對象的write方法, 將消息寫入到緩存中, 具體的執行邏輯, 咱們在這個小節進行剖析源碼分析

咱們跟到AbstractUnsafe的write方法中:this

public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //負責緩衝寫進來的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { //非堆外內存轉化爲堆外內存
        msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //插入寫隊列
 outboundBuffer.addMessage(msg, size, promise); }

首先看 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer 編碼

ChannelOutboundBuffer的功能就是緩存寫入的ByteBufspa

咱們繼續看try塊中的 msg = filterOutboundMessage(msg) 3d

這步的意義就是將非對外內存轉化爲堆外內存指針

filterOutboundMessage方法方法最終會調用AbstractNioByteChannel中的filterOutboundMessage方法:

protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //是堆外內存, 直接返回
        if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }

首先判斷msg是否byteBuf對象, 若是是, 判斷是否堆外內存, 若是是堆外內存, 則直接返回, 不然, 經過newDirectBuffer(buf)這種方式轉化爲堆外內存

回到write方法中:

outboundBuffer.addMessage(msg, size, promise)將已經轉化爲堆外內存的msg插入到寫隊列

咱們跟到addMessage方法當中, 這是ChannelOutboundBuffer中的方法:

public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(size, false); }

首先經過 Entry.newInstance(msg, size, total(msg), promise) 的方式將msg封裝成entry

而後經過調整tailEntry, flushedEntry, unflushedEntry三個指針, 完成entry的添加

這三個指針均是ChannelOutboundBuffer的成員變量

flushedEntry指向第一個被flush的entry

unflushedEntry指向第一個未被flush的entry

也就是說, 從flushedEntry到unflushedEntry之間的entry, 都是被已經被flush的entry

tailEntry指向最後一個entry, 也就是從unflushedEntry到tailEntry之間的entry都是沒flush的entry

咱們回到代碼中:

建立了entry以後首先判斷尾指針是否爲空, 在第一次添加的時候, 均是空, 因此會將flushedEntry設置爲null, 而且將尾指針設置爲當前建立的entry

最後判斷unflushedEntry是否爲空, 若是第一次添加這裏也是空, 因此這裏將unflushedEntry設置爲新建立的entry

第一次添加以下圖所示

7-3-1

若是不是第一次調用write方法, 則會進入 if (tailEntry == null) 中else塊:

 Entry tail = tailEntry  這裏tail就是當前尾節點

 tail.next = entry  表明尾節點的下一個節點指向新建立的entry

 tailEntry = entry  將尾節點也指向entry

這樣就完成了添加操做, 其實就是將新建立的節點追加到原來尾節點以後

第二次添加 if (unflushedEntry == null) 會返回false, 因此不會進入if塊

第二次添加以後指針的指向狀況以下圖所示:

7-3-4

之後每次調用write, 若是沒有調用flush的話都會在尾節點以後進行追加

回到代碼中, 看這一步incrementPendingOutboundBytes(size, false)

這步時統計當前有多少字節須要被寫出, 咱們跟到這個方法中:

private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } //TOTAL_PENDING_SIZE_UPDATER當前緩衝區裏面有多少待寫的字節
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); //getWriteBufferHighWaterMark() 最高不能超過64k
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } }

看這一步:

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)

TOTAL_PENDING_SIZE_UPDATER表示當前緩衝區還有多少待寫的字節, addAndGet就是將當前的ByteBuf的長度進行累加, 累加到newWriteBufferSize中

在繼續看判斷 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) 

 channel.config().getWriteBufferHighWaterMark() 表示寫buffer的高水位值, 默認是64k, 也就是說寫buffer的最大長度不能超過64k

若是超過了64k, 則會調用setUnwritable(invokeLater)方法設置寫狀態

咱們跟到setUnwritable(invokeLater)方法中:

private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0 && newValue != 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }

這裏經過自旋和cas操做, 傳播一個ChannelWritabilityChanged事件, 最終會調用handler的channelWritabilityChanged方法進行處理

以上就是寫buffer的相關邏輯

 

上一節: MessageToByteEncoder

下一節: 刷新buffer隊列

相關文章
相關標籤/搜索