Netty5 Write和Flush事件處理過程_源碼講解

歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java

write處理流程git


業務邏輯handler調用context的write方法,將欲發送的數據發送到帶發送緩衝區中.github

看看write流程的觸發代碼(就是在一個業務handler中調用一下write方法便可):數組

public class DiscardServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(final ChannelHandlerContext ctx,final  Object msg) throws Exception {
        ByteBuf bufferBuf =(ByteBuf)msg;
    System.out.println(new String(bufferBuf.array()));
    ctx.channel().write(bufferBuf);
}

追蹤一下,ctx.channel().write(bufferBuf)的實現(假設out pipeline中沒有其餘的encode handler了,),咱們會看到,最終會由AbstractUnsafe(AbstractUnsafe是channel的一個內部類對象)的write方法(很好找,順着找就好了,記住,默認pipeline一定會有tail和head兩個handler)進行處理,上代碼:promise

 public void write(Object msg, ChannelPromise promise) {
            if (!isActive()) {
                // Mark the write request as failure if the channel is inactive.
                if (isOpen()) {
                    promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
                } else {
                    promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
                }
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
            } else {//往緩存中添加一個消息對象
                outboundBuffer.addMessage(msg, promise);
            }
        }

這裏關注一下outboundBuffer.addMessage() 到此處,你們就會恍然大悟,知道怎麼回事兒了,就是這樣,僅僅將要寫入的message object寫入到一個buffer中。下面咱們來看一下outboundBuffer.addmessage的實現。緩存

注意: outboundBuffer是一個ChannelOutboundBuffer類型的兌現,每個channel都會一個ChannelOutboundBuffer對象與之關聯,用來盛放欲發送的消息.上代碼證實一切:網絡

 protected abstract class AbstractUnsafe implements Unsafe {
    
        private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this);
        private boolean inFlush0;
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {


    private MessageSizeEstimator.Handle estimatorHandle;

    private final Channel parent;
    private final ChannelId id = DefaultChannelId.newInstance();
    private final Unsafe unsafe;//channel中有Unsafe引用
    private final DefaultChannelPipeline pipeline;
    private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
   /**省略部分代碼***/
    private final EventLoop eventLoop;

Unsafe對象裏有一個outboundBuffer ,而channel裏有個unsafe引用,因此能夠說,channel與outboundBuffer有has-a關係

socket

看一下ChannelOutboundBuffer outboundBuffer的addMessage實現:
ide

 void addMessage(Object msg, ChannelPromise promise) {
        //預測message的size
        int size = channel.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
        //建立一個Entry對象,一個entry就是一個欲發送的message以及描述信息
        Entry e = buffer[tail++];
        e.msg = msg;
        e.pendingSize = size;
        e.promise = promise;
        e.total = total(msg); //由此能夠看出total和pendingSize是相等的
        tail &= buffer.length - 1;
        if (tail == flushed) {//擴展容量
            addCapacity();
        }
        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(size);//更新一下,欲發送的字節大小。
    }

都很容易看懂,就是對欲發送的message封裝成entry後,將其註冊到一個鏈表中,若是鏈表大小不夠用的話就調用addCapacity進行擴容。下面咱們看一下,addCapaciry()方法的實現,上代碼:oop

 private void addCapacity() {
//更新鏈表的標誌位
        int p = flushed;
        int n = buffer.length;
        int r = n - p; // number of elements to the right of p 剩餘沒有刷出去的
        int s = size();

        int newCapacity = n << 1;//擴大一倍 注意   哈哈 
        if (newCapacity < 0) {
            throw new IllegalStateException();
        }

        Entry[] e = new Entry[newCapacity];//建立對象數組。擴容後的哦!
        System.arraycopy(buffer, p, e, 0, r);
        System.arraycopy(buffer, 0, e, r, p);//拷貝的和爲拷貝的數據(不包括二進制bytebuf數據哦)都要進行復制
        for (int i = n; i < e.length; i++) {//將e數組中n到
            e[i] = new Entry();
        }

        buffer = e;
        flushed = 0;
        unflushed = s;
        tail = n;
    }

哈哈 很容易理解的,就是對數組進行擴展。而後複製

到目前爲止,咱們已經講解完了,write的的處理流程。咱們把message放入到buffer中,目的是爲了將其發送到目的socket的內核緩衝區中,何時發送(固然是對應的socket發送write可寫事件的時候)呢? 當writer事件發送的時候,就是咱們將緩衝起來的message flush到socket的內核緩衝區的時候了!!如今開始下一主題:

Flush處理流程

剛纔已經說道flush的發生,意味着socket的write事件發生了,因而咱們天然而然的就想到了NioEventLoop的處理write事件的代碼塊,上代碼:

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            /**簡潔起見,省略部分代碼**/
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//對於半包消息進行輸出操做
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            /**簡潔起見,省略部分代碼*/
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

從上面能夠看到實際上worker線程也是調用的當前遍歷的socketChannel的unsafe的forceFlush方法。直接上代碼看具體實現(最終會調用到AbstractUnsafe的force0方法):

protected void flush0() {
            if (inFlush0) { //若是對於一個channel正在進行刷新
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;//若是緩存爲空,則直接返回
            }

            inFlush0 = true;//設置正在刷新標誌位

            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
                    } else {
                        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                outboundBuffer.failFlushed(t);
            } finally {
                inFlush0 = false;
            }
        }

不用多說,若是write事件發生,可是緩衝區爲空得話,那麼就會直接返回,若是不是的話,再去調用doWrite方法。直接上代碼(doWrite是個抽象方法,由NioSocketChannel實現,本身想爲何選這個類!!能夠看源代碼!從鏈接的建立開始看哦!我有寫鏈接建立的博文哦! 本身看吧! ):

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {//不斷寫入---將全部的數據flush到內核網絡緩衝區中
            // Do non-gathering write for a single buffer case.
            final int msgCount = in.size(); //對於只有一個message的狀況,直接寫便可
            if (msgCount <= 1) {
                super.doWrite(in);
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();//將全部的消息轉換成java.nio.ByteBuf數組
            if (nioBuffers == null) {//msg不是ByteBuf類型,則也不須要採用gathering write的方式,能夠直接調用父類AbstractNioByteChannel的doWrite方法
                super.doWrite(in);
                return;
            }

            int nioBufferCnt = in.nioBufferCount(); //buffer的個數
            long expectedWrittenBytes = in.nioBufferSize();//總的buffer的byte的數量

            final SocketChannel ch = javaChannel();
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                //雖然是每次都是針對的messag進行遍歷,可是寫入的時候確實針對的全體內部的buffer進行遍歷
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//localWrittenBytes寫入的字節數目可能超過了本次遍歷的msg的buffer的承載的個數,因此若是寫回半包得話,須要比較current.msg的大小與此值的大小
                if (localWrittenBytes == 0) {
                    setOpWrite = true;//中止寫入,多是由於socket如今不可寫了
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {//若是但願寫入的數量爲0,代表所有寫入完畢
                    done = true;
                    break;
                }
            }

            if (done) {//若是所有發送完畢
                // Release all buffers
                for (int i = msgCount; i > 0; i --) {
                    in.remove();//刪除buffer數組中的元素--挨個刪除 代碼 888
                }

                // Finish the write loop if no new messages were flushed by in.remove().
                if (in.isEmpty()) {//若是所有完成的話,,那麼就將其寫半包標誌刪除
                    clearOpWrite();
                    break;
                }
            } else {
                // Did not write all buffers completely.
                // Release the fully written buffers and update the indexes of the partially written buffer.
                //若是沒有所有刷寫完成,那麼就釋放已經寫入的buffer,更新部分寫入的buffer的索引
                for (int i = msgCount; i > 0; i --) { //代碼 999
                    final ByteBuf buf = (ByteBuf) in.current();//獲得當前的直接內存,current()方法調用的不是很深,由於當前的msg已經通過niobuffers轉換成直接類型的了,因此基本上會直接返回。
                    final int readerIndex = buf.readerIndex();
                    final int readableBytes = buf.writerIndex() - readerIndex;//獲得可寫的數據字節數

                    if (readableBytes < writtenBytes) {//若是寫入的部分大於當前緩存指針的大小的話,那麼就將其釋放
                        in.progress(readableBytes);
                        in.remove();//移動指針,移動到下一個buffer中
                        writtenBytes -= readableBytes;
                    } else if (readableBytes > writtenBytes) {
                        buf.readerIndex(readerIndex + (int) writtenBytes);//從新設置當前的buffer的大小
                        in.progress(writtenBytes);
                        break;
                    } else { // readableBytes == writtenBytes  寫入的部分
                        in.progress(readableBytes);
                        in.remove();//直接移除(實際上是刪除引用個數)
                        break;
                    }//代碼 1000
                }

                incompleteWrite(setOpWrite);//註冊write興趣事件
                break;
            }
        }
    }

這個方法是flush的核心代碼,可是代碼很長,在此咱們講一下基本流程,而後再講幾個重要方法是幹嗎的!!:

流程: 

  1. 判斷一下outbuffer欲發送的message的大小,若是爲1的話,調用父類的doWriter方法.

  2. 而後調用outbuffer的nioBuffers方法,niobuffers方法主要就是對outbuffer中的bytebuffer解包(由於一個buffer可能會封裝多個buffer)、發送字節數的統計(expectedWrittenBytes)、底層最小單位緩衝對象的個數(nioBufferCnt)統計。

  3. 調用java的原生API進行數據寫入,ch.write(nioBuffers, 0, nioBufferCnt).注意 寫入次數是能夠經過WriteSpinCount配置項限制的哦!! 該配置項咱們能夠在初始化或者運行的過程當中經過調用ctx.channel().config()進行配置或者更改。

  4. 若是全發送完成,那麼就將緩衝區中的緩衝對象依次清空 見 代碼 888

  5. 若是沒有所有發送,就將所有刷出的bytebuf釋放(至於怎樣釋放,會寫一個專門的Netty內存管理的文章),僅僅發送一部分的byte的bytebuf的readerindex進行更改。 見代碼  999和1000之間的代碼

 

咱們粘貼一下nioBuffers的實現,很簡單:

public ByteBuffer[] nioBuffers() {
        long nioBufferSize = 0;//用來記錄全部須要發送的數據的字節大小
        int nioBufferCount = 0;//用來記錄最底層的buffer的個數多少
        final int mask = buffer.length - 1;
        final ByteBufAllocator alloc = channel.alloc();//用於將heap緩衝轉換成direct類型緩衝
        ByteBuffer[] nioBuffers = this.nioBuffers; //底層的niobuffer,會對buffer的進行一個解包操做
        Object m;
        int i = flushed;
        while (i != unflushed && (m = buffer[i].msg) != null) {//逐個遍歷即將發送的bytebuf數據
            if (!(m instanceof ByteBuf)) {
                this.nioBufferCount = 0;
                this.nioBufferSize = 0;
                return null;
            }
            //
            Entry entry = buffer[i];
            ByteBuf buf = (ByteBuf) m;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;//能夠讀取的字節數組

            if (readableBytes > 0) {
                nioBufferSize += readableBytes;
                int count = entry.count;//獲得低層的buffer的個數
                if (count == -1) {
                    entry.count = count = buf.nioBufferCount();
                }
                //總的buffer的個數
                int neededSpace = nioBufferCount + count;
                if (neededSpace > nioBuffers.length) {//若是buffer的個數超過了nioBuffers的length進行擴張,按照2倍的係數擴張
                    this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                }

                if (buf.isDirect() || threadLocalDirectBufferSize <= 0) {
                    if (count == 1) {//沒有封裝內部的緩存
                        ByteBuffer nioBuf = entry.buf;
                        if (nioBuf == null) {
                            // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
                            // derived buffer
                            entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                        }
                        nioBuffers[nioBufferCount ++] = nioBuf;
                    } else {//內部有多個buffer
                        ByteBuffer[] nioBufs = entry.buffers;
                        if (nioBufs == null) {
                            // cached ByteBuffers as they may be expensive to create in terms of Object allocation
                            entry.buffers = nioBufs = buf.nioBuffers();//獲得內部緩存
                        }
                        //進行解壓,並返回內部的全部的緩存的個數(解壓後的哦)
                        nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount);
                    }
                } else {
                    nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,//將heap緩存轉換層direct類型
                            readableBytes, alloc, nioBuffers, nioBufferCount);
                }
            }
            i = i + 1 & mask;
        }
        this.nioBufferCount = nioBufferCount;
        this.nioBufferSize = nioBufferSize;

        return nioBuffers;

很容易看明白,就對數據的統計。例如bytebuf的個數、發送的字節數的統計

線面咱們來看一下 current()方法的實現:

  public Object current() {
        return current(true);
    }

    /**
     * 將當前即將要刷出的數據寫入到directByteBuf中
     * @param preferDirect
     * @return
     */
    public Object current(boolean preferDirect) {
        if (isEmpty()) { //若是緩存爲空,則直接返回
            return null;
        } else {
            // TODO: Think of a smart way to handle ByteBufHolder messages
            Object msg = buffer[flushed].msg;//獲得即將要刷新的數據緩衝區--buffer[flushed]表示即將要刷新的數據緩衝去
            if (threadLocalDirectBufferSize <= 0 || !preferDirect) {//若是線程中沒有直接內存緩衝區可用,不喜歡用堆外緩存
                return msg;
            }
            if (msg instanceof ByteBuf) { //由此能夠看出message必須是bytebuf類修的
                ByteBuf buf = (ByteBuf) msg;
                if (buf.isDirect()) {//是否是直接內存中分配的
                    //對於nioBuffers以後,已經將全部bytebuf所有轉換成direct類型的了!! 
                    return buf;
                } else {
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) { //若是沒有了的話,就直接返回
                        return buf;
                    }

                    // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O.
                    // We can do a better job by using our pooled allocator. If the current allocator does not
                    // pool a direct buffer, we use a ThreadLocal based pool.
                    /**
                     * 非直接內存會被拷貝的jdk本身的內部的直接緩衝區中。咱們能夠經過具備池子功能的分配器來將工做作的更好。
                     * 若是當前的分配器沒有起到一個緩衝直接內存的做用得話,那麼咱們就會使用基於線程的threadLocal的池子
                     * */
                    ByteBufAllocator alloc = channel.alloc();//獲得內存分配器
                    ByteBuf directBuf;
                    if (alloc.isDirectBufferPooled()) {//是否爲直接內存---分配的內存是否爲直接緩衝做用的
                        directBuf = alloc.directBuffer(readableBytes);
                    } else {//不然的話,就用與線程綁定的ThreadLocalPooledByteBuf進行二進制數據分配
                        directBuf = ThreadLocalPooledByteBuf.newInstance(); //從當前線程棧中獲取一個bytebuf--ByteBuffer.allocateDirect(initialCapacity)
                    }
                    //進行必要的數據拷貝--將堆內的數據拷貝到直接內存中
                    directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
                    current(directBuf);//將原先非direct類型的內存釋放,而且替換成 direct的bytebuf
                    return directBuf;
                }
            }
            return msg;
        }
    }

    /**
     * Replace the current msg with the given one.
     * The replaced msg will automatically be released  用一個指定的buffer去替換原先的buffer msg對象。唄替換的對象會自動釋放
     */
    public void current(Object msg) {
        Entry entry =  buffer[flushed];
        safeRelease(entry.msg);//
        entry.msg = msg;
    }

從上面的代碼中咱們會看到,netty會將全部的byteBuf解包後,所有轉換成直接類型的內存,而後再發送。

到目前爲止,咱們已經將必要的flush和write時,發生的事件進行必要的闡述。固然也有幾個方法沒有講到,不過本人以爲沒有必要再去講解,由於那樣得話,太冗餘了。

給一下write事件的流程圖(稍後給出):

給一下flush處理的流程圖(稍後給出):

可是還有幾個方面你們煮注意一下:

  1. 發送完成的bytebuf內存是怎樣釋放的?

  2. 爲何要推薦使用direct類型的內存進行推送?

    關於上面的兩個問題不是這篇文章要講述的內容,本人會寫一個關於Netty5內存管理的博文,來詳細的講述上面兩個問題,讓你們吐槽一下!!

本文是本人學習Netty後的我的總結的分享,須要說明的是本人僅僅表明我的的見解,不是什麼官方權威的版本,若是有不當之處,還請賜教!!歡迎吐槽!!

相關文章
相關標籤/搜索