netty源碼分析之服務端啓動全解析

background

netty 是一個異步事件驅動的網絡通訊層框架,其官方文檔的解釋爲html

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.java

咱們在新美大消息推送系統sailfish(日均推送消息量50億),新美大移動端代理優化系統shark(日均吞吐量30億)中,均選擇了netty做爲底層網絡通訊框架。react

既然兩大如此重要的系統底層都使用到了netty,因此必然要對netty的機制,甚至源碼瞭若指掌,因而,便催生了netty源碼系列文章。後面,我會經過一系列的主題把我從netty源碼裏所學到的毫無保留地介紹給你,源碼基於4.1.6.Final編程

why netty

netty底層基於jdk的NIO,咱們爲何不直接基於jdk的nio或者其餘nio框架?下面是我總結出來的緣由bootstrap

1.使用jdk自帶的nio須要瞭解太多的概念,編程複雜 2.netty底層IO模型隨意切換,而這一切只須要作微小的改動 3.netty自帶的拆包解包,異常檢測等機制讓你從nio的繁重細節中脫離出來,讓你只須要關心業務邏輯 4.netty解決了jdk的不少包括空輪訓在內的bug 5.netty底層對線程,selector作了不少細小的優化,精心設計的reactor線程作到很是高效的併發處理 6.自帶各類協議棧讓你處理任何一種通用協議都幾乎不用親自動手 7.netty社區活躍,遇到問題隨時郵件列表或者issue 8.netty已經歷各大rpc框架,消息中間件,分佈式通訊中間件線上的普遍驗證,健壯性無比強大promise

dive into netty

瞭解了這麼多,今天咱們就從一個例子出來,開始咱們的netty源碼之旅。服務器

本篇主要講述的是netty是如何綁定端口,啓動服務。啓動服務的過程當中,你將會了解到netty各大核心組件,我先不會細講這些組件,而是會告訴你各大組件是怎麼串起來組成netty的核心微信

example

下面是一個很是簡單的服務端啓動代碼網絡

public final class SimpleServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }
}
複製代碼

簡單的幾行代碼就能開啓一個服務端,端口綁定在8888,使用nio模式,下面講下每個步驟的處理細節併發

EventLoopGroup 已經在個人其餘文章中詳細剖析過,說白了,就是一個死循環,不停地檢測IO事件,處理IO事件,執行任務

ServerBootstrap 是服務端的一個啓動輔助類,經過給他設置一系列參數來綁定端口啓動服務

group(bossGroup, workerGroup) 咱們須要兩種類型的人幹活,一個是老闆,一個是工人,老闆負責從外面接活,接到的活分配給工人幹,放到這裏,bossGroup的做用就是不斷地accept到新的鏈接,將新的鏈接丟給workerGroup來處理

.channel(NioServerSocketChannel.class) 表示服務端啓動的是nio相關的channel,channel在netty裏面是一大核心概念,能夠理解爲一條channel就是一個鏈接或者一個服務端bind動做,後面會細說

.handler(new SimpleServerHandler() 表示服務器啓動過程當中,須要通過哪些流程,這裏SimpleServerHandler最終的頂層接口爲ChannelHander,是netty的一大核心概念,表示數據流通過的處理器,能夠理解爲流水線上的每一道關卡

childHandler(new ChannelInitializer<SocketChannel>)...表示一條新的鏈接進來以後,該怎麼處理,也就是上面所說的,老闆如何給工人配活

ChannelFuture f = b.bind(8888).sync(); 這裏就是真正的啓動過程了,綁定8888端口,等待服務器啓動完畢,纔會進入下行代碼

f.channel().closeFuture().sync(); 等待服務端關閉socket

bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); 關閉兩組死循環

上述代碼能夠很輕鬆地再本地跑起來,最終控制檯的輸出爲:

handlerAdded
channelRegistered
channelActive
複製代碼

關於爲何會順序輸出這些,深刻分析以後其實很easy

深刻細節

ServerBootstrap 一系列的參數配置其實沒啥好講的,無非就是使用method chaining的方式將啓動服務器須要的參數保存到filed。咱們的重點落入到下面這段代碼

b.bind(8888).sync();
複製代碼

這裏說一句:咱們剛開始看源碼,對細節沒那麼清楚的狀況下能夠藉助IDE的debug功能,step by step,one step one test或者二分test的方式,來肯定哪行代碼是最終啓動服務的入口,在這裏,咱們已經肯定了bind方法是入口,咱們跟進去,分析

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 
複製代碼

經過端口號建立一個 InetSocketAddress,而後繼續bind

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}
複製代碼

validate() 驗證服務啓動須要的必要參數,而後調用doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}
複製代碼

這裏,我去掉了細枝末節,讓咱們專一於核心方法,其實就兩大核心一個是 initAndRegister(),以及doBind0()

其實,從方法名上面咱們已經能夠略窺一二,init->初始化,register->註冊,那麼到底要註冊到什麼呢?聯繫到nio裏面輪詢器的註冊,多是把某個東西初始化好了以後註冊到selector上面去,最後bind,像是在本地綁定端口號,帶着這些猜想,咱們深刻下去

initAndRegister()

final ChannelFuture initAndRegister() {
    Channel channel = null;
    // ...
    channel = channelFactory.newChannel();
    //...
    init(channel);
    //...
    ChannelFuture regFuture = config().group().register(channel);
    //...
    return regFuture;
}
複製代碼

咱們仍是專一於核心代碼,拋開邊角料,咱們看到 initAndRegister() 作了幾件事情 1.new一個channel 2.init這個channel 3.將這個channel register到某個對象

咱們逐步分析這三件事情

1.new一個channel

咱們首先要搞懂channel的定義,netty官方對channel的描述以下

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

這裏的channel,因爲是在服務啓動的時候建立,咱們能夠和普通Socket編程中的ServerSocket對應上,表示服務端綁定的時候通過的一條流水線

咱們發現這條channel是經過一個 channelFactory new出來的,channelFactory 的接口很簡單

public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
    /** * Creates a new channel. */
    @Override
    T newChannel();
}
複製代碼

就一個方法,咱們查看channelFactory被賦值的地方

AbstractBootstrap.java

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return (B) this;
}
複製代碼

在這裏被賦值,咱們層層回溯,查看該函數被調用的地方,發現最終是在這個函數中,ChannelFactory被new出

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
複製代碼

這裏,咱們的demo程序調用channel(channelClass)方法的時候,將channelClass做爲ReflectiveChannelFactory的構造函數建立出一個ReflectiveChannelFactory

demo端的代碼以下:

.channel(NioServerSocketChannel.class);
複製代碼

而後回到本節最開始

channelFactory.newChannel();
複製代碼

咱們就能夠推斷出,最終是調用到 ReflectiveChannelFactory.newChannel() 方法,跟進

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}
複製代碼

看到clazz.newInstance();,咱們明白了,原來是經過反射的方式來建立一個對象,而這個class就是咱們在ServerBootstrap中傳入的NioServerSocketChannel.class

結果,繞了一圈,最終建立channel至關於調用默認構造函數new出一個 NioServerSocketChannel對象

這裏提一下,讀源碼細節,有兩種讀的方式,一種是回溯,好比用到某個對象的時候能夠逐層追溯,必定會找到該對象的最開始被建立的代碼區塊,還有一種方式就是自頂向下,逐層分析,通常用在分析某個具體的方法,庖丁解牛,最後拼接出完整的流程

接下來咱們就能夠將重心放到 NioServerSocketChannel的默認構造函數

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
複製代碼
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    //...
    return provider.openServerSocketChannel();
}
複製代碼

經過SelectorProvider.openServerSocketChannel()建立一條server端channel,而後進入到如下方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製代碼

這裏第一行代碼就跑到父類裏面去了,第二行,new出來一個 NioServerSocketChannelConfig,其頂層接口爲 ChannelConfig,netty官方的描述以下

A set of configuration properties of a Channel.

基本能夠斷定,ChannelConfig 也是netty裏面的一大核心模塊,初次看源碼,看到這裏,咱們大可沒必要深挖這個對象,而是在用到的時候再回來深究,只要記住,這個對象在建立NioServerSocketChannel對象的時候被建立便可

咱們繼續追蹤到 NioServerSocketChannel 的父類

AbstractNioMessageChannel.java

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
複製代碼

繼續往上追

AbstractNioChannel.java

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    //...
    ch.configureBlocking(false);
    //...
}
複製代碼

這裏,簡單地將前面 provider.openServerSocketChannel(); 建立出來的 ServerSocketChannel 保存到成員變量,而後調用ch.configureBlocking(false);設置該channel爲非阻塞模式,標準的jdk nio編程的玩法

這裏的 readInterestOp 即前面層層傳入的 SelectionKey.OP_ACCEPT,接下來重點分析 super(parent);(這裏的parent實際上是null,由前面寫死傳入)

AbstractChannel.java

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
複製代碼

到了這裏,又new出來三大組件,賦值到成員變量,分別爲

id = newId();
protected ChannelId newId() {
    return DefaultChannelId.newInstance();
}
複製代碼

id是netty中每條channel的惟一標識,這裏不細展開,接着

unsafe = newUnsafe();
protected abstract AbstractUnsafe newUnsafe();
複製代碼

查看Unsafe的定義

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

成功捕捉netty的又一大組件,咱們能夠先不用管TA是幹嗎的,只須要知道這裏的 newUnsafe方法最終屬於類NioServerSocketChannel

最後

pipeline = newChannelPipeline();

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
}

複製代碼

初次看這段代碼,可能並不知道 DefaultChannelPipeline 是幹嗎用的,咱們仍然使用上面的方式,查看頂層接口ChannelPipeline的定義

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel

從該類的文檔中能夠看出,該接口基本上又是netty的一大核心模塊

到了這裏,咱們總算把一個服務端channel建立完畢了,將這些細節串起來的時候,咱們順帶提取出netty的幾大基本組件,先總結以下

  • Channel
  • ChannelConfig
  • ChannelId
  • Unsafe
  • Pipeline
  • ChannelHander

初次看代碼的時候,咱們的目標是跟到服務器啓動的那一行代碼,咱們先把以上這幾個組件記下來,等代碼跟完,咱們就能夠自頂向下,逐層分析,我會放到後面源碼系列中去深刻到每一個組件

總結一下,用戶調用方法 Bootstrap.bind(port) 第一步就是經過反射的方式new一個NioServerSocketChannel對象,而且在new的過程當中建立了一系列的核心組件,僅此而已,並沒有他,真正的啓動咱們還須要繼續跟

2.init這個channel

到了這裏,你最好跳到文章最開始的地方回憶一下,第一步newChannel完畢,這裏就對這個channel作init,init方法具體幹啥,咱們深刻

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

複製代碼

初次看到這個方法,可能會以爲,哇塞,老長了,這可這麼看?還記得咱們前面所說的嗎,庖丁解牛,逐步拆解,最後歸一,下面是個人拆解步驟

1.設置option和attr

final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
複製代碼

經過這裏咱們能夠看到,這裏先調用options0()以及attrs0(),而後將獲得的options和attrs注入到channelConfig或者channel中,關於option和attr是幹嗎用的,其實你如今不用瞭解得那麼深刻,只須要查看最頂層接口ChannelOption以及查看一下channel的具體繼承關係,就能夠了解,我把這兩個也放到後面的源碼分析系列再講

2.設置新接入channel的option和attr

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
複製代碼

這裏,和上面相似,只不過不是設置當前channel的這兩個屬性,而是對應到新進來鏈接對應的channel,因爲咱們這篇文章只關心到server如何啓動,接入鏈接放到下一篇文章中詳細剖析

3.加入新鏈接處理器

p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
複製代碼

到了最後一步,p.addLast()向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor,從名字上就能夠看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件循環器,咱們先不作過多分析

來,咱們總結一下,咱們發現其實init也沒有啓動服務,只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器,用來專門接受新鏈接,咱們還得繼續往下跟

3.將這個channel register到某個對象

這一步,咱們是分析以下方法

ChannelFuture regFuture = config().group().register(channel);
複製代碼

調用到 NioEventLoop 中的register

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
複製代碼
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
複製代碼

好了,到了這一步,還記得這裏的unsafe()返回的應該是什麼對象嗎?不記得的話能夠看下前面關於unsafe的描述,或者最快的方式就是debug到這邊,跟到register方法裏面,看看是哪一種類型的unsafe

咱們跟進去以後發現是

AbstractUnsafe.java

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // ...
    AbstractChannel.this.eventLoop = eventLoop;
    // ...
    register0(promise);
}
複製代碼

這裏咱們依然只須要focus重點,先將EventLoop事件循環器綁定到該NioServerSocketChannel上,而後調用 register0()

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
複製代碼

這一段其實也很清晰,先調用 doRegister();,具體幹啥待會再講,而後調用invokeHandlerAddedIfNeeded(), 因而乎,控制檯第一行打印出來的就是

handlerAdded
複製代碼

關於最終是如何調用到的,咱們後面詳細剖析pipeline的時候再講

而後調用 pipeline.fireChannelRegistered(); 調用以後,控制檯的顯示爲

handlerAdded
channelRegistered
複製代碼

繼續往下跟

if (isActive()) {
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        beginRead();
    }
}
複製代碼

讀到這,你可能會想固然地覺得,控制檯最後一行

pipeline.fireChannelActive();
複製代碼

由這行代碼輸出,咱們不妨先看一下 isActive() 方法

@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}
複製代碼

最終調用到jdk中

ServerSocket.java

/** * Returns the binding state of the ServerSocket. * * @return true if the ServerSocket succesfuly bound to an address * @since 1.4 */
    public boolean isBound() {
        // Before 1.3 ServerSockets were always bound during creation
        return bound || oldImpl;
    }
複製代碼

這裏isBound()返回false,可是從目前咱們跟下來的流程看,咱們並無將一個ServerSocket綁定到一個address,因此 isActive() 返回false,咱們沒有成功進入到pipeline.fireChannelActive();方法,那麼最後一行究竟是誰輸出的呢,咱們有點抓狂,其實,只要熟練運用IDE,要定位函數調用棧,無比簡單

下面是我用intellij定位函數調用的具體方法

Intellij函數調用定位

咱們先在最終輸出文字的這一行代碼處打一個斷點,而後debug,運行到這一行,intellij自動給咱們拉起了調用棧,咱們惟一要作的事,就是移動方向鍵,就能看到函數的完整的調用鏈

若是你看到方法的最近的發起端是一個線程Runnable的run方法,那麼就在提交Runnable對象方法的地方打一個斷點,去掉其餘斷點,從新debug,好比咱們首次debug發現調用棧中的最近的一個Runnable以下

if (!wasActive && isActive()) {
    invokeLater(new Runnable() {
        @Override
        public void run() {
            pipeline.fireChannelActive();
        }
    });
}
複製代碼

咱們停在了這一行pipeline.fireChannelActive();, 咱們想看最初始的調用,就得跳出來,斷點打到 if (!wasActive && isActive()),由於netty裏面不少任務執行都是異步線程即reactor線程調用的(具體能夠看reactor線程三部曲中的最後一曲),若是咱們要查看最早發起的方法調用,咱們必須得查看Runnable被提交的地方,逐次遞歸下去,就能找到那行"消失的代碼"

最終,經過這種方式,終於找到了 pipeline.fireChannelActive(); 的發起調用的代碼,不巧,恰好就是下面的doBind0()方法

doBind0()

private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
複製代碼

咱們發現,在調用doBind0(...)方法的時候,是經過包裝一個Runnable進行異步化的,關於異步化task,能夠看下我前面的文章,netty源碼分析之揭開reactor線程的面紗(三)

好,接下來咱們進入到channel.bind()方法

AbstractChannel.java

@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
}
複製代碼

發現是調用pipeline的bind方法

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}
複製代碼

相信你對tail是什麼不是很瞭解,能夠翻到最開始,tail在建立pipeline的時候出現過,關於pipeline和tail對應的類,我後面源碼系列會詳細解說,這裏,你要想知道接下來代碼的走向,惟一一個比較好的方式就是debug 單步進入,篇幅緣由,我就不詳細展開

最後,咱們來到了以下區域

HeadContext.java

@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    unsafe.bind(localAddress, promise);
}
複製代碼

這裏的unsafe就是前面提到的 AbstractUnsafe, 準確點,應該是 NioMessageUnsafe

咱們進入到它的bind方法

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // ...
    boolean wasActive = isActive();
    // ...
    doBind(localAddress);

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}
複製代碼

顯然按照正常流程,咱們前面已經分析到 isActive(); 方法返回false,進入到 doBind()以後,若是channel被激活了,就發起pipeline.fireChannelActive();調用,最終調用到用戶方法,在控制檯打印出了最後一行,因此到了這裏,你應該清楚爲何最終會在控制檯按順序打印出那三行字了吧

doBind()方法也很簡單

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //noinspection Since15
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
複製代碼

最終調到了jdk裏面的bind方法,這行代碼事後,正常狀況下,就真正進行了端口的綁定。

另外,經過自頂向下的方式分析,在調用pipeline.fireChannelActive();方法的時候,會調用到以下方法

HeadContext.java

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}
複製代碼

進入 readIfIsAutoRead

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}
複製代碼

分析isAutoRead方法

private volatile int autoRead = 1;
public boolean isAutoRead() {
    return autoRead == 1;
}
複製代碼

因而可知,isAutoRead方法默認返回true,因而進入到如下方法

public Channel read() {
    pipeline.read();
    return this;
}
複製代碼

最終調用到

AbstractNioUnsafe.java

protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
複製代碼

這裏的this.selectionKey就是咱們在前面register步驟返回的對象,前面咱們在register的時候,註冊測ops是0

回憶一下注冊

AbstractNioChannel

selectionKey = javaChannel().register(eventLoop().selector, 0, this)
複製代碼

這裏至關於把註冊過的ops取出來,經過了if條件,而後調用

selectionKey.interestOps(interestOps | readInterestOp);
複製代碼

而這裏的 readInterestOp 就是前面newChannel的時候傳入的SelectionKey.OP_ACCEPT,又是標準的jdk nio的玩法,到此,你須要瞭解的細節基本已經差很少了,就這樣結束吧!

summary

最後,咱們來作下總結,netty啓動一個服務所通過的流程 1.設置啓動類參數,最重要的就是設置channel 2.建立server對應的channel,建立各大組件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等 3.初始化server對應的channel,設置一些attr,option,以及設置子channel的attr,option,給server的channel添加新channel接入器,並出發addHandler,register等事件 4.調用到jdk底層作端口綁定,並觸發active事件,active觸發的時候,真正作服務端口綁定

另外,文章中閱讀源碼的思路詳細或許也能夠給你帶來一些幫助。

若是你想系統地學Netty,個人小冊《Netty 入門與實戰:仿寫微信 IM 即時通信系統》能夠幫助你,若是你想系統學習Netty原理,那麼你必定不要錯過個人Netty源碼分析系列視頻:coding.imooc.com/class/230.h…

相關文章
相關標籤/搜索