Netty writeAndFlush() 流程與異步

Netty writeAndFlush()方法分爲兩步, 先 write 再 flushjava

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        DefaultChannelHandlerContext next;
        next = findContextOutbound(MASK_WRITE);
        ReferenceCountUtil.touch(msg, next);
        next.invoker.invokeWrite(next, msg, promise);
        next = findContextOutbound(MASK_FLUSH);
        next.invoker.invokeFlush(next);
        return promise;
    }

以上是DefaultChannelHandlerContext中的writeAndFlush方法, 可見其實是先調用了write, 而後調用flushgit

1. writegithub

write方法從TailHandler開始, 穿過中間自定義的各類handler之後到達HeadHandler, 而後調用了HeadHandler的成員變量Unsafe的writepromise

以下異步

        @Override
        public void write(Object msg, ChannelPromise promise) {
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }
            outboundBuffer.addMessage(msg, promise);
        }

最終會把須要write的msg和promise(也就是一個future, 咱們拿到手的future, 添加Listener的也是這個)放入到outboundBuffer中, msg和promise在outboundBuffer中的存在形式是一個自定義的結構體Entry.socket

也就是說調用write方法實際上並非真的將消息寫出去, 而是將消息和這次操做的promise放入到了一個隊列中ide

2. flushoop

flush也是從Tail開始, 最後到Head, 最終調用的也是Head裏的unsafe的flush0()方法, 而後flush0()裏再調用doWrite()方法, 以下:this

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = -1;

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                break;
            }

            if (msg instanceof ByteBuf) {
                ByteBuf buf = (ByteBuf) msg;
                int readableBytes = buf.readableBytes();
                if (readableBytes == 0) {
                    in.remove();
                    continue;
                }

                boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    int localFlushedAmount = doWriteBytes(buf); // 這裏纔是實際將數據寫出去的地方if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (!buf.isReadable()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);

                if (done) {
                    in.remove();
                } else {
                    incompleteWrite(setOpWrite);
                    break;
                }
            } else if (msg instanceof FileRegion) {
                FileRegion region = (FileRegion) msg;
                boolean setOpWrite = false;
                boolean done = false;
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i --) {
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (region.transfered() >= region.count()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);

                if (done) {
                    in.remove(); // 根據寫出的數據的數量狀況, 來判斷操做是否完成, 若是完成則調用 in.remove()
                } else {
                    incompleteWrite(setOpWrite);
                    break;
                }
            } else {
                throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
            }
        }
    }

紅字部分就是最後將數據寫出去的地方, 這裏寫數據最終調用的是 GatheringByteChannel 的 write() 方法, 這是個原生Java接口, 具體實現依賴於實現這個接口的Java類, 例如會調用 NIO 的 SocketChannel 的write()方法, 至此, 實際寫數據的過程出現了, SocketChannel能夠運行在non-blocking模式, 也就是非阻塞異步模式, write數據會立刻返回寫入的數據數量 (並不必定是全部數據都寫入成功, 對因而否寫入了全部數據, Netty有本身的處理邏輯, 也就是上面代碼中的紅字的那段for循環, 具體參看下SocketChannel的javadoc和netty源碼).spa

當全部數據寫入SocketChannel成功, 開始調用in.remove(), 這個 in 就是第一步 1. write 裏的那個 outboundBuffer, 他的類型是 ChannelOutboundBuffer, 代碼以下:

    public final boolean remove() {
        if (isEmpty()) {
            return false;
        }

        Entry e = buffer[flushed];
        Object msg = e.msg;
        if (msg == null) {
            return false;
        }

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        e.clear();

        flushed = flushed + 1 & buffer.length - 1;

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            safeRelease(msg);
            safeSuccess(promise); // 這裏, 調用了promise的trySuccess()方法, 觸發Listener
            decrementPendingOutboundBytes(size);
        }

        return true;
    }

最後會調用Promise的notifyListeners()操做, 觸發Listener完成整個異步流程

---------

最後, 回到咱們應用netty的時候的代碼

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx.writeAndFlush(new Object()).addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // do sth
                } else {
                    // do sth
                }
            }
        });
    }

這就是整個流程

 

最後提一下, Netty的AbstractNioChannel裏封裝了selectionKey, 在accept socket的時候, socket會被註冊到eventLoop()的Selector, 這個selectionKey就會被賦值,  以下

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

在之後Selector的select()的時候,  則會經過這個key來獲取到channel, 而後調用 AbstractChannel 裏的 DefaultChannelPipeline 來觸發 Handler 的 connect, read, write 等等事件...

相關文章
相關標籤/搜索