netty源碼分析之新鏈接接入全解析

本文收穫

通讀本文,你會了解到html

  1. netty如何接受新的請求
  2. netty如何給新請求分配reactor線程
  3. netty如何給每一個新鏈接增長ChannelHandler

其實,遠不止這些~java

前序背景

讀這篇文章以前,最好掌握一些前序知識,包括netty中的reactor線程,以及服務端啓動過程 下面我帶你簡單地回顧一下react

1.netty中的reactor線程

netty中最核心的東西莫過於兩種類型的reactor線程,能夠看做netty中兩種類型的發動機,驅動着netty整個框架的運轉編程

一種類型的reactor線程是boos線程組,專門用來接受新的鏈接,而後封裝成channel對象扔給worker線程組;還有一種類型的reactor線程是worker線程組,專門用來處理鏈接的讀寫promise

不論是boos線程仍是worker線程,所作的事情均分爲如下三個步驟微信

  1. 輪詢註冊在selector上的IO事件
  2. 處理IO事件
  3. 執行異步task

對於boos線程來講,第一步輪詢出來的基本都是 accept 事件,表示有新的鏈接,而worker線程輪詢出來的基本都是read/write事件,表示網絡的讀寫事件網絡

2.服務端啓動

服務端啓動過程是在用戶線程中開啓,第一次添加異步任務的時候啓動boos線程被啓動,netty將處理新鏈接的過程封裝成一個channel,對應的pipeline會按順序處理新創建的鏈接(關於pipeline我後面會開篇詳細分析)框架

瞭解完兩個背景,咱們開始進入正題異步

新鏈接的創建

簡單來講,新鏈接的創建能夠分爲三個步驟socket

  1. 檢測到有新的鏈接
  2. 將新的鏈接註冊到worker線程組
  3. 註冊新鏈接的讀事件

下面帶你庖丁解牛,一步步分析整個過程

檢測到有新鏈接進入

咱們已經知道,當服務端綁啓動以後,服務端的channel已經註冊到boos reactor線程中,reactor不斷檢測有新的事件,直到檢測出有accept事件發生

NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}
複製代碼

上面這段代碼是reactor線程三部曲中的第二部曲,表示boos reactor線程已經輪詢到 SelectionKey.OP_ACCEPT 事件,說明有新的鏈接進入,此時將調用channel的 unsafe來進行實際的操做

關於 unsafe,這篇文章我不打算細講,下面是netty做者對於unsafe的解釋

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport.

你只須要了解一個大概的概念,就是全部的channel底層都會有一個與unsafe綁定,每種類型的channel實際的操做都由unsafe來實現

而從上一篇文章,服務端的啓動過程中,咱們已經知道,服務端對應的channel的unsafe是 NioMessageUnsafe,那麼,咱們進入到它的read方法,進入新鏈接處理的第二步

註冊到reactor線程

NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }
    } while (allocHandle.continueReading());
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    pipeline.fireChannelReadComplete();
}
複製代碼

我省去了非關鍵部分的代碼,能夠看到,一上來,就用一條斷言肯定該read方法必須是reactor線程調用,而後拿到channel對應的pipeline和 RecvByteBufAllocator.Handle(先不解釋)

接下來,調用 doReadMessages 方法不斷地讀取消息,用 readBuf 做爲容器,這裏,其實能夠猜到讀取的是一個個鏈接,而後調用 pipeline.fireChannelRead(),將每條新鏈接通過一層服務端channel的洗禮

以後清理容器,觸發 pipeline.fireChannelReadComplete(),整個過程清晰明瞭,不含一絲雜質,下面咱們具體看下這兩個方法

1.doReadMessages(List) 2.pipeline.fireChannelRead(NioSocketChannel)

1.doReadMessages()

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
複製代碼

咱們終於窺探到netty調用jdk底層nio的邊界 javaChannel().accept();,因爲netty中reactor線程第一步就掃描到有accept事件發生,所以,這裏的accept方法是當即返回的,返回jdk底層nio建立的一條channel

netty將jdk的 SocketChannel 封裝成自定義的 NioSocketChannel,加入到list裏面,這樣外層就能夠遍歷該list,作後續處理

上篇文章中,咱們已經知道服務端的建立過程當中會建立netty中一系列的核心組件,包括pipeline,unsafe等等,那麼,接受一條新鏈接的時候是否也會建立這一系列的組件呢?

帶着這個疑問,咱們跟進去

NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}
複製代碼

咱們重點分析 super(parent, socket),config相關的分析咱們放到後面的文章中

NioSocketChannel的父類爲 AbstractNioByteChannel

AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}
複製代碼

這裏,咱們看到jdk nio裏面熟悉的影子—— SelectionKey.OP_READ,通常在原生的jdk nio編程中,也會註冊這樣一個事件,表示對channel的讀感興趣

咱們繼續往上,追蹤到AbstractNioByteChannel的父類 AbstractNioChannel, 這裏,我相信讀了上篇文章的你對於這部分代碼確定是有印象的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
複製代碼

在建立服務端channel的時候,最終也會進入到這個方法,super(parent), 即是在AbstractChannel中建立一系列和該channel綁定的組件,以下

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
複製代碼

而這裏的 readInterestOp 表示該channel關心的事件是 SelectionKey.OP_READ,後續會將該事件註冊到selector,以後設置該通道爲非阻塞模式

到了這裏,我終於能夠將netty裏面最經常使用的channel的結構圖放給你看

簡化版channel繼承關係

這裏的繼承關係有所簡化,當前,咱們只須要了解這麼多。

首先

  1. channel 繼承 Comparable 表示channel是一個能夠比較的對象
  2. channel 繼承AttributeMap表示channel是能夠綁定屬性的對象,在用戶代碼中,咱們常用channel.attr(...)方法就是來源於此
  3. ChannelOutboundInvoker是4.1.x版本新加的抽象,表示一條channel能夠進行的操做
  4. DefaultAttributeMap用於AttributeMap抽象的默認方法,後面channel繼承了直接使用
  5. AbstractChannel用於實現channel的大部分方法,其中咱們最熟悉的就是其構造函數中,建立出一條channel的基本組件
  6. AbstractNioChannel基於AbstractChannel作了nio相關的一些操做,保存jdk底層的 SelectableChannel,而且在構造函數中設置channel爲非阻塞
  7. 最後,就是兩大channel,NioServerSocketChannel,NioSocketChannel對應着服務端接受新鏈接過程和新鏈接讀寫過程

讀到這,關於channel的總體框架你基本已經瞭解了一大半了

好了,讓咱們退棧,繼續以前的源碼分析,在建立出一條 NioSocketChannel以後,放置在List容器裏面以後,就開始進行下一步操做

2.pipeline.fireChannelRead(NioSocketChannel)

AbstractNioMessageChannel.java

pipeline.fireChannelRead(NioSocketChannel);
複製代碼

在沒有正式介紹pipeline以前,請讓我簡單介紹一下pipeline這個組件

在netty的各類類型的channel中,都會包含一個pipeline,字面意思是管道,咱們能夠理解爲一條流水線工藝,流水線工藝有起點,有結束,中間還有各類各樣的流水線關卡,一件物品,在流水線起點開始處理,通過各個流水線關卡的加工,最終到流水線結束

對應到netty裏面,流水線的開始就是HeadContxt,流水線的結束就是TailConextHeadContxt中調用Unsafe作具體的操做,TailConext中用於向用戶拋出pipeline中未處理異常以及對未處理消息的警告,關於pipeline的具體分析咱們後面再詳細探討

經過前面一篇文章,咱們已經知道在服務端處理新鏈接的pipeline中,已經自動添加了一個pipeline處理器 ServerBootstrapAcceptor, 並已經將用戶代碼中設置的一系列的參數傳入了構造函數,接下來,咱們就來看下ServerBootstrapAcceptor

ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;

    ServerBootstrapAcceptor(
            EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        for (Entry<ChannelOption<?>, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
複製代碼

前面的 pipeline.fireChannelRead(NioSocketChannel); 最終經過head->unsafe->ServerBootstrapAcceptor的調用鏈,調用到這裏的 ServerBootstrapAcceptorchannelRead方法

channelRead 一上來就把這裏的msg強制轉換爲 Channel, 爲何這裏能夠強制轉換?讀者能夠思考一下

而後,拿到該channel,也就是咱們以前new出來的 NioSocketChannel對應的pipeline,將用戶代碼中的 childHandler,添加到pipeline,這裏的 childHandler 在用戶代碼中的體現爲

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoServerHandler());
     }
 });
複製代碼

其實對應的是 ChannelInitializer,到了這裏,NioSocketChannel中pipeline對應的處理器爲 head->ChannelInitializer->tail,牢記,後面會再次提到!

接着,設置 NioSocketChannel 對應的 attr和option,而後進入到 childGroup.register(child),這裏的childGroup就是咱們在啓動代碼中new出來的NioEventLoopGroup,具體能夠參考這篇文章

咱們進入到NioEventLoopGroupregister方法,代理到其父類MultithreadEventLoopGroup

MultithreadEventLoopGroup.java

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
複製代碼

這裏又扯出來一個 next()方法,咱們跟進去

MultithreadEventLoopGroup.java

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
複製代碼

回到其父類

MultithreadEventExecutorGroup.java

@Override
public EventExecutor next() {
    return chooser.next();
}
複製代碼

這裏的chooser對應的類爲 EventExecutorChooser,字面意思爲事件執行器選擇器,放到咱們這裏的上下文中的做用就是從worker reactor線程組中選擇一個reactor線程

public interface EventExecutorChooserFactory {

    /** * Returns a new {@link EventExecutorChooser}. */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /** * Chooses the next {@link EventExecutor} to use. */
    @UnstableApi
    interface EventExecutorChooser {

        /** * Returns the new {@link EventExecutor} to use. */
        EventExecutor next();
    }
}
複製代碼

關於chooser的具體建立我不打算展開,相信前面幾篇文章中的源碼閱讀技巧能夠幫助你找出choose的始末,這裏,我直接告訴你(可是勸你仍是自行分析一下,簡單得很),chooser的實現有兩種

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}
複製代碼

默認狀況下,chooser經過 DefaultEventExecutorChooserFactory被建立,在建立reactor線程選擇器的時候,會判斷reactor線程的個數,若是是2的冪,就建立PowerOfTowEventExecutorChooser,不然,建立GenericEventExecutorChooser

兩種類型的選擇器在選擇reactor線程的時候,都是經過Round-Robin的方式選擇reactor線程,惟一不一樣的是,PowerOfTowEventExecutorChooser是經過與運算,而GenericEventExecutorChooser是經過取餘運算,與運算的效率要高於求餘運算,可見,netty爲了效率優化簡直喪心病狂!

選擇完一個reactor線程,即 NioEventLoop 以後,咱們回到註冊的地方

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
複製代碼

代理到 NioEventLoop 的父類的register方法

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
複製代碼

其實,這裏已經和服務端啓動的過程同樣了,詳細步驟能夠參考服務端啓動詳解這篇文章,咱們直接跳到關鍵環節

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;

    pipeline.invokeHandlerAddedIfNeeded();

    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}
複製代碼

和服務端啓動過程同樣,先是調用 doRegister();作真正的註冊過程,以下

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
複製代碼

將該條channel綁定到一個selector上去,一個selector被一個reactor線程使用,後續該channel的事件輪詢,以及事件處理,異步task執行都是由此reactor線程來負責

綁定完reactor線程以後,調用 pipeline.invokeHandlerAddedIfNeeded()

前面咱們說到,到目前爲止NioSocketChannel 的pipeline中有三個處理器,head->ChannelInitializer->tail,最終會調用到 ChannelInitializerhandlerAdded 方法

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}
複製代碼

handlerAdded方法調用 initChannel 方法以後,調用remove(ctx);將自身刪除

AbstractNioChannel.java

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { 
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}
複製代碼

而這裏的 initChannel 方法又是神馬玩意?讓咱們回到用戶方法,好比下面這段用戶代碼

用戶代碼

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });
複製代碼

哦,原來最終跑到咱們本身的代碼裏去了啊!我就不解釋這段代碼是幹嗎的了,你懂的~

完了以後,NioSocketChannel綁定的pipeline的處理器就包括 head->LoggingHandler->EchoServerHandler->tail

註冊讀事件

接下來,咱們還剩下這些代碼沒有分析完

AbstractNioChannel.java

private void register0(ChannelPromise promise) {
    // ..
    pipeline.fireChannelRegistered();
    if (isActive()) {
        if (firstRegistration) {
            pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
            beginRead();
        }
    }
}
複製代碼

pipeline.fireChannelRegistered();,其實沒有幹啥有意義的事情,最終無非是再調用一下業務pipeline中每一個處理器的 ChannelHandlerAdded方法處理下回調

isActive()在鏈接已經創建的狀況下返回true,因此進入方法塊,進入到 pipeline.fireChannelActive();,這裏的分析和netty源碼分析之服務端啓動全解析分析中的同樣,在這裏我詳細步驟先省略,直接進入到關鍵環節

AbstractNioChannel.java

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
複製代碼

你應該還記得前面 register0() 方法的時候,向selector註冊的事件代碼是0,而 readInterestOp對應的事件代碼是 SelectionKey.OP_READ,參考前文中建立 NioSocketChannel 的過程,稍加推理,聰明的你就會知道,這裏其實就是將 SelectionKey.OP_READ事件註冊到selector中去,表示這條通道已經能夠開始處理read事件了

總結

至此,netty中關於新鏈接的處理已經向你展現完了,咱們作下總結

  1. boos reactor線程輪詢到有新的鏈接進入
  2. 經過封裝jdk底層的channel建立 NioSocketChannel以及一系列的netty核心組件
  3. 將該條鏈接經過chooser,選擇一條worker reactor線程綁定上去
  4. 註冊讀事件,開始新鏈接的讀寫

下篇文章將深挖netty中的核心組件 pipeline,敬請期待

若是你想系統地學Netty,個人小冊《Netty 入門與實戰:仿寫微信 IM 即時通信系統》能夠幫助你,若是你想系統學習Netty原理,那麼你必定不要錯過個人Netty源碼分析系列視頻:coding.imooc.com/class/230.h…

相關文章
相關標籤/搜索