肝了一個月的Netty知識點(上)

有情懷,有乾貨,微信搜索【三太子敖丙】關注這個不同的程序員。java

本文 GitHub github.com/JavaFamily 已收錄,有一線大廠面試完整考點、資料以及個人系列文章。git

高能預警,本文是我一個月前就開始寫的,因此內容會很是長,固然也很是硬核,dubbo源碼系列結束以後我就想着寫一下netty系列的,可是netty的源碼概念又很是多,因此才寫到了如今。程序員

我相信90%的讀者都不會一口氣看完的,由於實在太長了,長到我如今頂配的mbp打字編輯框都是卡的,可是我但願你們往後想看netty或者在面試前須要瞭解的朋友回頭翻一下就夠了,那我寫這個文章的意義也就有了。github

也很少BB,直接開整。面試

NIO 基本概念

阻塞(Block)與非阻塞(Non-Block)

阻塞和非阻塞是進程在訪問數據的時候,數據是否準備就緒的一種處理方式,當數據沒有準備的時候。編程

阻塞:每每須要等待緩衝區中的數據準備好事後才處理其餘的事情,不然一直等待在那裏。bootstrap

非阻塞:當咱們的進程訪問咱們的數據緩衝區的時候,若是數據沒有準備好則直接返回,不會等待。若是數據已經準備好,也直接返回。數組

阻塞 IO :promise

非阻塞 IO :服務器

同步(Synchronous)與異步(Asynchronous)

同步和異步都是基於應用程序和操做系統處理 IO 事件所採用的方式。好比

**同步:**是應用程序要直接參與 IO 讀寫的操做。

**異步:**全部的 IO 讀寫交給操做系統去處理,應用程序只須要等待通知。

同步方式在處理 IO 事件的時候,必須阻塞在某個方法上面等待咱們的 IO 事件完成(阻塞 IO 事件或者經過輪詢 IO事件的方式),對於異步來講,全部的 IO 讀寫都交給了操做系統。這個時候,咱們能夠去作其餘的事情,並不須要去完成真正的 IO 操做,當操做完成 IO 後,會給咱們的應用程序一個通知。

因此異步相比較於同步帶來的直接好處就是在咱們處理IO數據的時候,異步的方式咱們能夠把這部分等待所消耗的資源用於處理其餘事務,提高咱們服務自身的性能。

同步 IO :

異步 IO :

Java BIO與NIO對比

BIO(傳統IO):

BIO是一個同步並阻塞的IO模式,傳統的 java.io 包,它基於流模型實現,提供了咱們最熟知的一些 IO 功能,好比File抽象、輸入輸出流等。交互方式是同步、阻塞的方式,也就是說,在讀取輸入流或者寫入輸出流時,在讀、寫動做完成以前,線程會一直阻塞在那裏,它們之間的調用是可靠的線性順序。

NIO(Non-blocking/New I/O)

NIO 是一種同步非阻塞的 I/O 模型,於 Java 1.4 中引入,對應 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 能夠理解爲 Non-blocking,不單純是 New。它支持面向緩衝的,基於通道的 I/O 操做方法。 NIO 提供了與傳統 BIO 模型中的 SocketServerSocket 相對應的 SocketChannelServerSocketChannel 兩種不一樣的套接字通道實現,兩種通道都支持阻塞和非阻塞兩種模式。對於高負載、高併發的(網絡)應用,應使用 NIO 的非阻塞模式來開發

BIO與NIO的對比

IO模型 BIO NIO
通訊 面向流 面向緩衝
處理 阻塞 IO 非阻塞 IO
觸發 選擇器

NIO 的 Server 通訊的簡單模型:

BIO 的 Server 通訊的簡單模型:

NIO的特色:

  1. 一個線程能夠處理多個通道,減小線程建立數量;
  2. 讀寫非阻塞,節約資源:沒有可讀/可寫數據時,不會發生阻塞致使線程資源的浪費

Reactor 模型

單線程的 Reactor 模型

多線程的 Reactor 模型

多線程主從 Reactor 模型

Netty 基礎概念

Netty 簡介

Netty 是一個 NIO 客戶端服務器框架,可快速輕鬆地開發網絡應用程序,例如協議服務器和客戶端。它極大地簡化和簡化了網絡編程,例如 TCP 和 UDP 套接字服務器。

「快速簡便」並不意味着最終的應用程序將遭受可維護性或性能問題的困擾。Netty 通過精心設計,結合了許多協議(例如FTP,SMTP,HTTP 以及各類基於二進制和文本的舊式協議)的實施經驗。結果,Netty 成功地找到了一種無需妥協便可輕鬆實現開發,性能,穩定性和靈活性的方法。

Netty 執行流程

Netty 核心組件

Channel

​ Channel是 Java NIO 的一個基本構造。能夠看做是傳入或傳出數據的載體。所以,它能夠被打開或關閉,鏈接或者斷開鏈接。

EventLoop 與 EventLoopGroup

​ EventLoop 定義了Netty的核心抽象,用來處理鏈接的生命週期中所發生的事件,在內部,將會爲每一個Channel分配一個EventLoop。

​ EventLoopGroup 是一個 EventLoop 池,包含不少的 EventLoop。

​ Netty 爲每一個 Channel 分配了一個 EventLoop,用於處理用戶鏈接請求、對用戶請求的處理等全部事件。EventLoop 自己只是一個線程驅動,在其生命週期內只會綁定一個線程,讓該線程處理一個 Channel 的全部 IO 事件。

​ 一個 Channel 一旦與一個 EventLoop 相綁定,那麼在 Channel 的整個生命週期內是不能改變的。一個 EventLoop 能夠與多個 Channel 綁定。即 Channel 與 EventLoop 的關係是 n:1,而 EventLoop 與線程的關係是 1:1。

ServerBootstrap 與 Bootstrap

​ Bootstarp 和 ServerBootstrap 被稱爲引導類,指對應用程序進行配置,並使他運行起來的過程。Netty處理引導的方式是使你的應用程序和網絡層相隔離。

​ Bootstrap 是客戶端的引導類,Bootstrap 在調用 bind()(鏈接UDP)和 connect()(鏈接TCP)方法時,會新建立一個 Channel,僅建立一個單獨的、沒有父 Channel 的 Channel 來實現全部的網絡交換。

​ ServerBootstrap 是服務端的引導類,ServerBootstarp 在調用 bind() 方法時會建立一個 ServerChannel 來接受來自客戶端的鏈接,而且該 ServerChannel 管理了多個子 Channel 用於同客戶端之間的通訊。

ChannelHandler 與 ChannelPipeline

​ ChannelHandler 是對 Channel 中數據的處理器,這些處理器能夠是系統自己定義好的編解碼器,也能夠是用戶自定義的。這些處理器會被統一添加到一個 ChannelPipeline 的對象中,而後按照添加的順序對 Channel 中的數據進行依次處理。

ChannelFuture

​ Netty 中全部的 I/O 操做都是異步的,即操做不會當即獲得返回結果,因此 Netty 中定義了一個 ChannelFuture 對象做爲這個異步操做的「代言人」,表示異步操做自己。若是想獲取到該異步操做的返回值,能夠經過該異步操做對象的addListener() 方法爲該異步操做添加監 NIO 網絡編程框架 Netty 聽器,爲其註冊回調:當結果出來後立刻調用執行。

​ Netty 的異步編程模型都是創建在 Future 與回調概念之上的。

Netty 源碼閱讀

源碼閱讀,最好能夠再 Debug 的狀況下進行,這樣更容易幫助理解,所以在分析 Netty 前的我準備一個客戶端和服務端的代碼。

Netty - Server 代碼

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {

            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeSocketServerHandler());
                         }
                    });

            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服務器已啓動。。。");

            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
複製代碼

Server 端 Handler:

public class DemoSocketServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
        ctx.fireChannelActive();
        TimeUnit.MILLISECONDS.sleep(500);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
複製代碼

Netty - Client 代碼

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new DemoSocketClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            if(eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
}
複製代碼

Client 端 Handler :

public class DemoSocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg);
        ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
        TimeUnit.MILLISECONDS.sleep(5000);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("from client:begin talking");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
複製代碼

NioEventLoopGroup 初始化分析

首先根據 Server 服務端代碼,分析 NioEventLoopGroup 的初始化過程。而在分析 NioEventLoopGroup 以前,有必要簡單的說一說 NioEventLoopGroup 與 NioEventLoop ,方便後續源碼的理解。

NioEventLoop 源碼分析前瞭解

NioEventLoop 的繼承體系

從 NioEventLoop 的繼承體系中能夠看到,NioEventLoop 自己就是一個 Executor,而且仍是一個 單線程的 Executor。Executor 必然擁有一個 execute(Runnable command) 的實現方法,而 NioEventLoop 的 execute() 實現方法在其父類 SingleThreadEventExecutor 中,找到具體代碼:

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

複製代碼

這裏不細說,可是貼出這段代碼主要爲了引出 startThread(); 這句代碼,在跟這句代碼會發現,它最終調用了 NioEventLoop 的一個成員 Executor 執行了當前成員的 execute() 方法。對應的成員 io.netty.util.concurrent.SingleThreadEventExecutor#executor

而 executor 成員的初始化也是在當前代碼執行時建立的匿名 Executor ,也就是執行到即新建而且執行當前 匿名 executr() 方法。

總結:

  1. NioEventLoop 自己就是一個 Executor。
  2. NioEventLoop 內部封裝這一個新的線程 Executor 成員。
  3. NioEventLoop 有兩個 execute 方法,除了自己的 execute() 方法對應的還有成員屬性 Executor 對應的 execute() 方法。

備註: 由於這裏出現了四個 Executor,爲了區分,咱們給其新的名稱:

NioEventLoop 自己 Executor:NioEventLoop

NioEventLoop 的成員 Executor:子 Executor

NioEventLoopGroup 自己 Executor :NioEventLoopGroup

NioEventLoopGroup 的構造參數 Executor :總Executor

NioEventLoopGroup 的繼承體系

看到繼承體系能夠直接知道 NioEventLoopGroup 也是一個 Executor,而且是一個線程池的 Executor,因此他也有 execute() 方法。對應的實現再其父類之中:io.netty.util.concurrent.AbstractEventExecutorGroup#execute

而這裏還須要說到的一點是:在 NioEventLoopGroup 的構造中,再其父類 MultithreadEventExecutorGroup 的構造再次引入了一個新的 Executor,

之因此這裏提到這個 Executor,是由於這個 Executor 是對應的 execute() 就是在 NioEventLoop 中的成員 Executor 的 execute() 執行時調用的。也就是下面對應的代碼調用。io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)

到這若是不明白,不要緊,由於只是爲了引入 NioEventLoopGroup 和 NioEventLoop 的對應的兩個 Executor,和兩個 Executor 對應的兩個 execute() 方法。這個後面還會有詳細分析。

總結:

  1. NioEventLoopGroup 是一個線程池線程 Executor。
  2. NioEventLoopGroup 也封裝了一個線程 Executor。
  3. NioEventLoopGroup 也有兩個 execute()方法。

NioEventLoopGroup 初始化代碼分析

上面說了基本的瞭解內容,下面具體分析,從 NioEventLoopGroup 的初始化進入源碼分析。

入口咱們直接找 NioEventLoopGroup 的無參構造。

public NioEventLoopGroup() {
        this(0);
    }
複製代碼
public NioEventLoopGroup(int nThreads) {
        // 第二個參數是這個group所包含的executor
        this(nThreads, (Executor) null);
    }
複製代碼
public NioEventLoopGroup(int nThreads, Executor executor) {
        // 第三個參數是provider,其用於提供selector及selectable的channel,
        // 這個provider是當前JVM中惟一的一個單例的provider
        this(nThreads, executor, SelectorProvider.provider());
    }
複製代碼
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        // 第四個參數是一個選擇策略工廠實例
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
複製代碼
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
複製代碼
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
複製代碼
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        // 第三個參數是選擇器工廠實例
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
複製代碼

跟到此,能夠發現無參構造的基本參數被初始化, nThreads :DEFAULT_EVENT_LOOP_THREADS//默認當前CPU邏輯核心數的兩倍selectorProvide:SelectorProvider.provider()//當前JVM中惟一的一個單例的providerSelectStrategyFactory:DefaultSelectStrategyFactory.INSTANCE//默認選擇策略工廠實例chooserFactory:DefaultEventExecutorChooserFactory.INSTANCE//選擇器工廠實例。到這裏只是基本的初始化參數,重點方法爲MultithreadEventExecutorGroup 的構造方法。下面重點分析:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            // 這個executor是group所包含的executor,其未來會爲其所包含的每一個eventLoop建立一個線程
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 建立eventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 在建立這些eventLoop過程當中,只要有一個建立失敗,則關閉以前全部已經建立好的eventLoop
                if (!success) {
                    // 關閉以前全部已經建立好的eventLoop
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    // 終止全部eventLoop上所執行的任務
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        // 建立一個選擇器
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
複製代碼

根據無參構造直接往下跟,能夠看到核心部分在最後一個父類的構造裏。也就是 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)

再這裏完成整個 NioEventLoopGroup 的實例初始化,這裏分析下,而後再畫個圖回顧下。

初始化構造參數中的 Executor 參數,當其爲空時,將其初始化

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
複製代碼

首先 newDefaultThreadFactory()) 建立默認的線程工廠,有興趣能夠跟進去看看。而後再建立ThreadPerTaskExecutor線程 Executor 對象。(PS:這裏建立的 Executor 就是 NioEventLoopGroup 內的 Executor 對象,並非當前 NioEventLoopGroup 自身,能夠稱其爲 總 Executor)。

而後能夠看到這裏建立了一個 children 數組,根據須要建立的線程數建立對應數量的數組。

children = new EventExecutor[nThreads];
複製代碼

由於每一個 NioEventLoopGroup 都是 NioEventLoop 的集合,因此這裏的 children 數組就是當前 NioEventLoopGroup 的 NioEventLoop。因此 NioEventLoop 的建立的實在 NioEventLoopGroup 初始化的時候。下面看 NioEventLoop 的初始化:

// 逐個建立nioEventLoop實例
for (int i = 0; i < nThreads; i ++) {
    boolean success = false;
    try {
        // 建立eventLoop
        children[i] = newChild(executor, args);
        success = true;
    } catch (Exception e) {
        // TODO: Think about if this is a good exception type
        throw new IllegalStateException("failed to create a child event loop", e);
    } finally {
        // 在建立這些eventLoop過程當中,只要有一個建立失敗,則關閉以前全部已經建立好的eventLoop
        if (!success) {
            // 閉以前全部已經建立好的eventLoop
            for (int j = 0; j < i; j ++) {
                children[j].shutdownGracefully();
            }

            // 終止全部eventLoop上所執行的任務
            for (int j = 0; j < i; j ++) {
                EventExecutor e = children[j];
                try {
                    while (!e.isTerminated()) {
                        e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                    }
                } catch (InterruptedException interrupted) {
                    // Let the caller handle the interruption.
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}
複製代碼

先總體看這段 NioEventLoop 的建立代碼,能夠看到整個過程當中存在一個成功標誌,catch 每一個 NioEventLoop 建立完成過程,若是發生異常則將全部已經建立的 NioEventLoop 關閉。重點的代碼也就在 NioEventLoop 的建立了。因此咱們繼續跟:children[i] = newChild(executor, args);往下走,直接找到 io.netty.channel.nio.NioEventLoopGroup#newChild ,由於當前是 NioEventLoopGroup 的建立,因此知道找到子類的 newChild 實現。

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
複製代碼

又將以前合併的 args 參數強轉回來,繼續跟進 NioEventLoop 構造:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 建立一個selector的二元組
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
複製代碼

這裏咱們先總體看下,將以前的默認參數初始化到 NioEventLoop 屬性中。其中有兩處:openSelector()super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler)。這裏先看父類構造:

往下跟,直接就是 SingleThreadEventLoop -> SingleThreadEventExecutor 的初始化,這些也能夠在 NioEventLoop 的繼承體系能夠看到:

// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    // 建立一個收尾隊列
    tailTasks = newTaskQueue(maxPendingTasks);
}

// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    // 這是當前NioEventLoop所包含的executor
    this.executor = ThreadExecutorMap.apply(executor, this);
    // 建立一個任務隊列
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
複製代碼

這裏首先建立的是 SingleThreadEventExecutor ,這裏重點須要關注的代碼是:

this.executor = ThreadExecutorMap.apply(executor, this);
複製代碼

這裏this 是 NioEventLoop ,因此this.executor 就是前面說的 NioEventLoop 裏的 Executor,這裏咱們先稱爲 子 Executor(子:對應的就是 NioEventLoop ,前面說的 總:對應的是 NioEventLoopGroup )。

而這裏 子 Executor 的初始化是由一個 executor 參數的,這個就是前面 NioEventLoopGroup 構造方法一直帶入的 總 Executor。那咱們繼續往下跟,看看這個子 Executor 是如何完成的初始化的。

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        // 這裏建立的executor是子executor
        return new Executor() {
            // 這個execute()是子executor的execute()
            @Override
            public void execute(final Runnable command) {
                // 這裏調用了NioEventLoopGroup所包含的executor的execute()
                // 即調用了「總的executor」的execute()
                executor.execute(apply(command, eventExecutor));
            }
        };
    }
複製代碼

這段代碼細看就會明白,這裏建立的 子 Executor的建立也就是一個線程的建立,可是重點卻在這個線程 Executor 的 execute()方法實現,只作了一件事情:就是調用 傳入的 總 Executorexecute()方法。因此這裏 子 Executor 作的事情就是調用 總 Executorexecute()。不要以爲這裏繞,由於這還只是初始化,後面這裏執行會更繞。[手動捂臉哭]

其實這裏的 apply(command, eventExecutor),這裏再執行 總 Executorexecute() 時仍是會記錄當前正在執行的線程,而且再執行完成時將當前記錄值刪除。

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
        @Override
        public void run() {
            setCurrentEventExecutor(eventExecutor);
            try {
                command.run();
            } finally {
                setCurrentEventExecutor(null);
            }
        }
    };
}
複製代碼

這裏再 NioEventLoop 的屬性 Executor 建立完成時,又去建立了一個普通任務隊列taskQueue = newTaskQueue(this.maxPendingTasks);而且還建立了一個收尾任務隊列tailTasks = newTaskQueue(maxPendingTasks);。這幾個隊列後面會說到。這裏繼續跟 NioEventLoop 主流程初始化。

到這咱們再回去看看 openSelector(),這裏咱們要先知道 SelectorTuple :

private static final class SelectorTuple {
    final Selector unwrappedSelector;  // NIO原生selector
    final Selector selector;  // 優化過的selector

    SelectorTuple(Selector unwrappedSelector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = unwrappedSelector;
    }

    SelectorTuple(Selector unwrappedSelector, Selector selector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = selector;
    }
}
複製代碼

SelectorTuple 只是一個包含兩個 Selector 的內部類,用於封裝優化先後的 Selector。而 openSelector() 方法就是爲了返回 Selector 而且根據配置判斷是否須要優化當前 Selector 。下面看具體代碼:

而具體的優化過程有興趣的能夠本身去看看,這裏只要知道,如果禁用了優化則 SelectorTuple 的優化後的 Selector 和爲優化的 Selector 均爲 Nio 原生的 Selector。

而這io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)後面還有在 NioEventLoop 數組建立完成後,還有選擇器建立和關閉監聽器綁定等,感興趣能夠本身看看,這裏再也不介紹。

到這一個 NioEventLoop 的建立過程的代碼也所有看完了。我想若是隻看這個確定仍是有點懵,源碼這個東西須要本身跟進去去看,debug 一點點的跟,跟着運行的代碼去想爲什麼這麼實現,不過這裏我也畫個圖,讓你們更直觀的瞭解到 NioEventLoopGroup 的建立流程以及主要操做。

我想你們結合這個圖,再結合上面的分析過程,最好能夠本身找到源碼,跟一遍,應該能夠理解 NioEvnetLoopGroup 的建立。

ServerBootstrap與 ServerBootstrap 屬性配置分析

繼承體系:

入口代碼:

//2.建立服務端啓動引導/輔助類:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.給引導類配置兩大線程組,肯定了線程模型
b.group(bossGroup, workerGroup)
    // (非必備)打印日誌
    .handler(new LoggingHandler(LogLevel.INFO))
    // 4.指定 IO 模型
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
            ChannelPipeline p = ch.pipeline();
            //5.能夠自定義客戶端消息的業務處理邏輯
            p.addLast(new HelloServerHandler());
        }
    });

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
            pipeline.addLast(new SomeSocketClientHandler());
        }
    });

複製代碼

ServerBootstrap與 Bootstrap 都是啓動配置類,惟一不一樣的是,ServerBootstrap是服務端的啓動配置類,Bootstrap 則是客戶端的啓動配置類,主要用於綁定咱們建立的 EventLoopGroup,指定 Channel 的類型以及綁定 Channel 處理器等操做,主要作的都是給 ServerBootstrap與 Bootstrap 的屬性賦值操做,因此稱其爲配置類。能夠進入 group() 方法裏看一眼:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}
複製代碼

其餘的方法也是同樣,感興趣能夠本身進去看看。這裏只是初始化,都是爲了後面的操做作準備。

服務端 bind 方法 ServerBootstrap.bind() 源碼解析

這裏咱們從這裏進入:

b.bind(port).sync();
複製代碼

直接從 bind() 方法跟進去:

// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

// 繼續跟進
public ChannelFuture bind(SocketAddress localAddress) {
    // 驗證group與channelFactory是否爲null
    validate(); 
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    // 這裏是一處重點邏輯
    return doBind(localAddress);
}
複製代碼

這裏顯示校驗了 Bootstrap 的 group 與 channelFactory 是否綁定成功。而後繼續跟進 doBind() 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 建立、初始化channel,並將其註冊到selector,返回一個異步結果
    final ChannelFuture regFuture = initAndRegister();
    // 從異步結果中獲取channel
    final Channel channel = regFuture.channel();
    // 若異步操做執行過程當中出現了異常,則直接返回異步對象(直接結束)
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 處理異步操做完成的狀況(多是正常結束,或發生異常,或任務取消,這些狀況都屬於有結果的狀況)
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        // 綁定指定的端口
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {  // 處理異步操做還沒有有結果的狀況
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 爲異步操做添加監聽
        regFuture.addListener(new ChannelFutureListener() {
            // 若異步操做具備告終果(即完成),則觸發該方法的執行
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) { // 異步操做執行過程當中出現了問題
                    promise.setFailure(cause);
                } else {  // 異步操做正常結果
                    promise.registered();
                    // 綁定指定的端口
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
複製代碼

首先再這裏,咱們先把這個方法總體的邏輯搞清楚,而後再再去研究他的每一步具體的操做,畫個圖,先理解這個方法作了什麼:

能夠在圖中結合代碼,找到整個 dobind() 的大局處理思路,而後呢,到這裏咱們還有不少重點細節須要繼續跟進,也就是圖中標記的 Tag 一、Tag 2。爲了方便後面跟進去代碼以後方便回來,這裏以此標記,而後下面在具體分析 Tag 標記的源碼:

補充 Tag 0 :

ChannelPromise 與 ChannelFuture 瞭解。

Tag 1 :

異步建立、初始化channel,並將其註冊到selector

final ChannelFuture regFuture = initAndRegister();

Tag 2 :

綁定指定的端口號:

doBind0(regFuture, channel, localAddress, promise);

補充 Tag 0:ChannelPromise 與 ChannelFuture

ChannelPromise 是一個特殊的 ChannelFuture,是一個可修改的 ChannelFuture。內部提供了修改當前 Future 狀態的方法。在 ChannelFuture 的基礎上實現了設置最終狀態的修改方法。

而 ChannelFuture 只能夠查詢當前異步操做的結果,不能夠修改當前異步結果的 Future 。這裏須要知道的就是 ChannelPromise 能夠修改當前異步結果的狀態,而且在修改狀態是會觸發監聽器。在 doBind 方法中主要用於在處理異步執行一直未結束的的操做,將異步結果存在異常的時,將異常賦值給 ChannelPromise 並返回。

Tag 1 : initAndRegister() 初始化並註冊 Channel

先找到代碼:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 建立channel
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 將channel註冊到selector
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
複製代碼

嗯?!代碼意一看,咋就這麼點,也就作了三件事,但是這三件事作的每個都不是一句代碼的能夠完成的。這裏咱們一個一個分析,除了這三件事情,其餘的也就是異常後的處理邏輯,因此主流程就是下面的三句代碼,也爲了跟進繼續打上標記吧:

Tag 1.1 建立channel channel = channelFactory.newChannel();

Tag 1.2 初始化channel init(channel);

Tag 1.3 將channel註冊到selector ChannelFuture regFuture = config().group().register(channel);

針對這三處,仍是要一處一處分析。

Tag 1.1 channelFactory.newChannel() 建立 Channel

找到對應的代碼:io.netty.channel.ReflectiveChannelFactory#newChannel

@Override
public T newChannel() {
    try {
        // 調用無參構造器建立channel
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}
複製代碼

這裏爲何直接找到 ReflectiveChannelFactory ,須要提一下,在分析 ServerBootstrap與 Bootstrap 啓動配置類的時候,設置 channel 的方法,跟進去能夠找到針對屬性 channelFactory 的賦值代碼:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
複製代碼

能夠看到這裏 new 的就是 ReflectiveChannelFactory 工廠類,而後再看 ReflectiveChannelFactory 的構造:

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        // 將NioServerSocketChannel的無參構造器初始化到constructor
        this.constructor = clazz.getConstructor();
    } catch (NoSuchMethodException e) {
        throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                                           " does not have a public non-arg constructor", e);
    }
}
複製代碼

看到的是 ReflectiveChannelFactory 在建立時初始化了 constructor 屬性,將傳入的 channel 類 clazz 中獲取構造賦值給了 ReflectiveChannelFactory 反射工廠的 constructor 屬性。

而咱們再 Server 端傳入的 channel 類爲NioServerSocketChannel.class ,因此上面看的 constructor.newInstance(); 對應的也就是 NioServerSocketChannel 的無參構造。這樣咱們就繼續跟進 NioServerSocketChannel :

// NIO中的provider,其用於建立selector與channel。而且是單例的
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
    // DEFAULT_SELECTOR_PROVIDER 靜態變量
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
複製代碼

繼續跟進 newSocket()

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try { 
        // 建立NIO原生的channel => ServerSocketChannel
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
            "Failed to open a server socket.", e);
    }
}
複製代碼

就是返回了一個 Java NIO 原生的 Channel,最後將 NIO 原生的Channel 包裝成 NioServerSocketChannel,繼續跟進 this(newSocket(DEFAULT_SELECTOR_PROVIDER)) 找到有參構造具體代碼:

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 參數1:父channel
    // 參數2:NIO原生channel
    // 參數3:指定當前channel所關注的事件爲 接受鏈接
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 用於對channel進行配置的屬性集合
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製代碼

這裏主要作了兩件事情,1. 調用父類構造,2. 對 channel 進行配置屬性集合。

這裏先說下 new NioServerSocketChannelConfig(),這部操做就是給當前 Channel 的 config 進行賦值,用來保存當前 Channel 的屬性配置的集合。好了,這個說了咱們繼續跟主線:super(null, channel, SelectionKey.OP_ACCEPT)

// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    // 這裏的this.ch爲NIO原生channel
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // NIO,非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                    "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
複製代碼

直接找到 AbstractNioChannel 父類構造,這也第一步也是調用父類構造 super(parent); 先記着,先看除了調用父類構造還作了什麼事情:

  1. 調用父類構造 super(parent);
  2. 將前面建立的原生 Channel 複製給屬性保存 this.ch = ch;
  3. 當前 channel 的關注事件屬性賦值 this.readInterestOp = readInterestOp; // SelectionKey.OP_ACCEPT 接受事件
  4. 將 NIO 原生 Channel 設置爲非阻塞 ch.configureBlocking(false);

在 AbstractNioChannel 構造中就作了這麼四件事情,主要須要說的仍是其調用父類構造又作了什麼事情,找到代碼:

// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 爲channel生成id,由五部分構成
    id = newId();
    // 生成一個底層操做對象unsafe
    unsafe = newUnsafe();
    // 建立與這個channel相綁定的channelPipeline
    pipeline = newChannelPipeline();
}
複製代碼

在 AbstractChannel 構造中主要作了三件事:

  1. 爲當前 Channel 生成 id newId(),感興趣能夠跟進去看看。
  2. 生成一個底層操做對象 unsafe,用於 I/O 線程調用傳輸時使用,用戶代碼沒法調用。newUnsafe()
  3. 建立與這個channel相綁定的channelPipeline,這也是一個重點操做,不過在這裏先不展開細說,後面會單獨細跟 channelPipeline 的代碼。

因此到此 **Tag 1 : initAndRegister() ** 中的 **Tag 1.1 newChannel() ** 建立 Channel 纔算跟完。針對 Tag 1.1 newChannel() 咱們也畫圖簡圖整理下思路:

根據圖,在結合上面代碼的分析,最好本身再能夠跟一遍代碼,我想這一塊的理解仍是沒什麼問題的。到這也只是建立了 Channel。Tag 1.1 的 Channel 建立結束,接着跟進 Tag 1.2 init(channel).

Tag 1.2 init(channel) 初始化 Channel

這裏咱們是從 ServerBootstrap 中的doBind 進入的,因此這裏直接找到 io.netty.bootstrap.ServerBootstrap#init

void init(Channel channel) throws Exception {
    // 獲取serverBootstrap中的options屬性
    final Map<ChannelOption<?>, Object> options = options0();
    // 將options屬性設置到channel
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 獲取serverBootstrap中的attrs屬性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        // 遍歷attrs屬性
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            // 將當前遍歷的attr初始化到channel
            channel.attr(key).set(e.getValue());
        }
    }

    // 獲取channel的pipeline
    ChannelPipeline p = channel.pipeline();

    // 將serverBootstrap中全部以child開頭的屬性寫入到局部變量,
    // 而後將它們初始化到childChannel中
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 將ServerBootstrapAcceptor處理器添加到pipeline
                    // ServerBootstrapAcceptor處理器用於接收ServerBootstrap中的屬性值,
                    // 咱們一般稱其爲鏈接處理器
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
複製代碼

這裏作的事情仍是不少的,基本操做我在上面註釋上也標註出來,還有一些須要繼續跟下去的主要操做,仍是先標記 Tag 而後繼續跟下去。這裏說一下這裏的 options 與 attrs 屬性的賦值,其實就是講咱們 ServerBootstrap 與 Bootstrap 在調用 doBind() 以前經過 option()attr() 設置的參數值,其中 options 屬性設置到了 Channel 的 config 屬性中,attrs 是直接被設置在了 Channel 上的。

在設置完 options 屬性與 attrs 屬性時,接着獲取了當前 channel 的 pipeline,接下來仍是獲取咱們在 doBind() 以前設置的屬性值,以 child 開頭的方法 childOption()childAttr() 設置的屬性值。

這裏使用局部變量記錄了全部 Child 相關的值 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs 主要用於初始化 childChannel 的屬性,new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)) 主要是建立 鏈接處理器。

p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 將ServerBootstrapAcceptor處理器添加到pipeline
                    // ServerBootstrapAcceptor處理器用於接收ServerBootstrap中的屬性值,
                    // 咱們一般稱其爲鏈接處理器
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
複製代碼

首先這裏想作的事情是:將當前 channel 的 pipeline 中綁定一個初始化處理器 ChannelInitializer ,由於是抽象類,因此須要匿名實現 initChannel方法。 而這些主要的操做是處理 childGroup 裏面的 channel 的初始化操做。這裏我只想主要講一下這個鏈接處理器 ServerBootstrapAcceptor 主要作了什麼,其餘的具體會在後面的 handler 和 pipeline 的時候細說。

**補充:**這裏由於 ServerBootstrap 服務端是對用的有兩個 EventLoopGroup,在服務端,parentGroup 是用於接收客戶端的鏈接,在 parentGroup 接收到鏈接以後是將只是將當前轉給了 childGroup去處理後續操做,而 childGroup 是用來專門處理鏈接後的操做的,不關心 channel 的鏈接任務。這個其實就是 Netty-Server 的 Reactor 線程池模型的處理邏輯。

這裏主要往下說一下這個鏈接處理器: ServerBootstrapAcceptor 。

ServerBootstrapAcceptor(
    final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
    Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
    this.childGroup = childGroup;
    this.childHandler = childHandler;
    this.childOptions = childOptions;
    this.childAttrs = childAttrs;

    // See https://github.com/netty/netty/issues/1328
    enableAutoReadTask = new Runnable() {
        @Override
        public void run() {
            channel.config().setAutoRead(true);
        }
    };
}
複製代碼

ServerBootstrapAcceptor 構造只是將 ServerBootstrap 中配置的 Child 屬性設置保存下來。而這裏一直說這是鏈接處理器,是由於當客戶端鏈接發送到服務端時,這個處理器會接收客戶端的鏈接並處理。主要是處理方法是 channelRead 中的實現:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // msg爲客戶端發送來的數據,其爲NioSocketChannel,即子channel,childChannel
    final Channel child = (Channel) msg;

    // 未來自於ServerBootstrap的child開頭屬性初始化到childChannel中(childHandler、childOptions、childAttrs)
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 將childChannel註冊到selector 須要注意的是,這裏的selector與父channel所註冊的selector不是同一個
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
複製代碼

這裏主要就作了兩件事情:

  1. 初始化 childChannel
  2. 將成功從 client 鏈接過來的 channel 註冊到 selector 上。

這裏一直說子channel,就是由於這裏註冊的是兩個 EventLoopGroup,在 Server 端的處理上 netty 線程模型採用「服務端監聽線程」和「IO線程」分離的方式。因此這裏 channelRead 方法就是在 client 端請求鏈接到 server 端時,用於將當前鏈接的 IO 線程綁定到 childChannel 同時註冊到 ChildGroup 中的 Selector 中。線程,模型能夠參考下面的圖:

好了,到這裏 **Tag 1.2 initChannel ** 代碼也分析完了,有些關於 pipeline 、handler、selector 的部分沒有細說由於後面會單獨說,在這裏沒有直接展開。

這裏也畫個圖:到時候將這些圖在整合到一塊兒,如今是的分析過程就像是化整爲零,最後在整合到一塊兒化零爲整。

這裏除了 init(channel) 方法以外,還主要說了下 ServerBootstrapAcceptor 鏈接處理器。其實主要是 netty-server 的線程模型與代碼的結合理解。

本文太長了,致使會超出大部分博客網站的字數限制,因此我分上下發了

我是敖丙,你知道的越多,你不知道的越多,感謝各位人才的:點贊收藏評論,咱們下期見!


文章持續更新,能夠微信搜一搜「 三太子敖丙 」第一時間閱讀,回覆【資料】有我準備的一線大廠面試資料和簡歷模板,本文 GitHub github.com/JavaFamily 已經收錄,有大廠面試完整考點,歡迎Star。

相關文章
相關標籤/搜索