Netty源碼 新鏈接處理

上文咱們闡述了Netty的Reactor模型。在Reactor模型的第二階段,Netty會處理各類io事件。對於客戶端的各類請求就是在這個階段去處理的。本文便來分析一個新的鏈接是如何被處理的。java

代碼的入口就從read方法開始。這裏的unsafe的類型是NioMessageUnsafe,在服務端啓動時就肯定下來了。 react

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read(); if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }

 咱們省去部分代碼,read方法邏輯很是簡單。就是一個讀出加處理的過程promise

public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
                do {
                    //讀取消息
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //循環處理消息
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            //觸發讀取完畢事件
            pipeline.fireChannelReadComplete();
    }

 1.讀取消息

protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept(); try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

在doReadMessages首先accept一個新鏈接,因爲在一階段的時候已經有io事件產生了,因此這裏不會等待而是當即接受一個新鏈接並用SocketChannel表示。數據結構

接着又構造出了一個NioSocketChannel將java的channel封裝成netty本身的channel並添加到list中,咱們點進去看看。socket

public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

  

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

  

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

 

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

 

最終咱們到了AbstractChannel的類中,發現NioSocketChannel的創建會建立unsafe和pipeline。這裏咱們看下具體類型ide

unsafe的具體類型是由子類io.netty.channel.socket.nio.NioSocketChannel#newUnsafe決定的oop

protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

 

pipeline則是默認的DefaultChannelPipeline源碼分析

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this); head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

這裏咱們便引出了pipeline的概念,看上述代碼便知pipeline的數據結構是一個雙向鏈表。咱們也能夠把它想象成一個責任鏈或者更直白點就是流水線。任何鏈接請求都會經過pipeline處理最終返回到客戶端。this

如今顯得鏈接已經封裝成channel並添加到list中了,如今咱們再看下消息處理spa

int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }

2.消息處理

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

 

消息處理實際就是pipeline鏈式執行handle的過程。那麼對於服務端的channel,他在接受新鏈接的時候先執行那個handle呢。服務端處理新鏈接的pipeline中,已經自動添加了一個pipeline處理器 ServerBootstrapAcceptor

因此咱們先看下ServerBootstrapAcceptor的channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //1.泛型轉換新鏈接建立的channel
            final Channel child = (Channel) msg;
            //2.設置channel的handler
            child.pipeline().addLast(childHandler);

            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
            try {
                //channel綁定到一個raector線程上
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

 1.將剛剛建立的channel泛型轉換出來

2.調用用戶代碼的childHandler屬性,注意,這裏只是添加了一個ChannelInitializer,相應的初始化還未運行,

3.註冊該channel,將該channel綁定到一個reactor線程,後續關於這個channel的事件,任務都是由該reactor線程處理。

 

如今咱們點進註冊的代碼

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
public EventLoop next() {
        return (EventLoop) super.next();
    }

 

next方法返回的是一個reactor線程,咱們看下netty是如何挑選線程的。點擊super.next

public EventExecutor next() {
        return chooser.next();
    }

 

這裏出現一個chooser表明的是一個選擇策略,下面直接上代碼了

chooser = chooserFactory.newChooser(children);
public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

 

netty根據線程數量的奇偶性 會選擇出不一樣的選擇策略。二者惟一的區別就是一個是與運算,一個是取餘

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

 

在咱們肯定一個reactor線程以後,咱們便開始了註冊的流程

io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)

public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

 

io.netty.channel.AbstractChannel.AbstractUnsafe#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

 

 註冊的核心代碼即是register0了

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
 pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

 

  • doRegister以前在服務端分析時有過講解,這裏真正的吧channel與reactor線程綁定在一塊兒
  • pipeline.invokeHandlerAddedIfNeeded(); 

爲channel添加Handler,這裏將添加handler任務包裝成Task

private final class PendingHandlerAddedTask extends PendingHandlerCallback {

        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                executor, ctx.name(), e);
                    }
                    remove0(ctx);
                    ctx.setRemoved();
                }
            }
        }
    }

 

最終調用io.netty.channel.ChannelInitializer#handlerAdded

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
    }

 

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

 

這也就是咱們的用戶代碼

 

  • pipeline.fireChannelRegistered(); channel註冊完以後的回調
  • pipeline.fireChannelActive() channel激活的回調

到這裏其實已經接近尾聲了。可是咱們的channel目前仍是沒法使用的。由於他並無註冊他感興趣的事件。他如今是一個沒有夢想的channel。因此咱們看下channel激活的具體邏輯

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }    
public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }
private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }
public Channel read() {
        pipeline.read();
        return this;
    }
public final ChannelPipeline read() {
        tail.read();
        return this;
    }
.......        
protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

最終在io.netty.channel.nio.AbstractNioChannel#doBeginRead中設置selectionKey對讀事件感興趣。

以上即是netty對新鏈接的處理

 

參考


 

 https://www.jianshu.com/p/0242b1d4dd21  【netty源碼分析之新鏈接接入全解析】

相關文章
相關標籤/搜索