關於 Netty Channel 的 Autoread

Netty 4 的 Channel 多了一個 autoread 參數, 它的用處是在讓 channel 在觸發某些事件之後(例如 channelActive, channelReadComplete)之後還會自動調用一次 read(), 代碼:java

DefaultChannelPipeline.javagit

    @Override
    public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();

        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }

-----------------------------------

    @Override
    public ChannelPipeline fireChannelReadComplete() {
        head.fireChannelReadComplete();
        if (channel.config().isAutoRead()) {
            read();
        }
        return this;
    }

 

另一個很重要的做用是在觸發某些事件(例如 socket 關閉)時, 在 NioEventloop 的 OP_READ 位爲 1 時, 是否真正的去 channel 讀取數據, 去 channel 讀取數據的意義還包括能夠更新 channel 的狀態, 例如改變 active 狀態等, 代碼:github

Eventloop.javaless

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read(); if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

AbstractNioByteChannel.java (unsafe.read())socket

public void read() { final ChannelConfig config = config();
// 判斷是不是 autoread, 若是不是直接移除 opRead 標誌位返回
if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();
// 這裏會到 channel 中 read 一次, 若是返回 -1, 則表示 channel 關閉
// 假如說 autoread 爲 false, 則在 socket close 時永遠不會進入這裏, 從而改變 channel 的 active 狀態
int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); // 關閉 channel, 並改變 channel active 狀態 if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
相關文章
相關標籤/搜索