EventLoop和EventLoopGroup

Netty框架的主要線程就是I/O線程,線程模型設計的好壞,決定了系統的吞吐量、併發性和安全性等架構質量屬性。Netty的線程模型被精心地設計,既提高了框架的併發性能,又能在很大程度避免鎖,局部實現了無鎖化設計。java

線程模型

通常首先會想到的是經典的Reactor線程模型,儘管不一樣的NIO框架對於Reactor模式的實現存在差別,但本質上仍是遵循了Reactor的基礎線程模型。react

Reactor單線程模型

Reactor單線程模型,是指全部的I/O操做都在同一個NIO線程上面完成。數據庫

NIO線程的職責以下:編程

  1. 做爲NIO服務端,接收客戶端的TCP鏈接;
  2. 做爲NIO客戶端,向服務端發起TCP鏈接;
  3. 讀取通訊對端的請求或者應答消息;
  4. 向通訊對端發送消息請求或者應答消息。

因爲Reactor模式使用的是異步非阻塞I/O,全部的I/O操做都不會致使阻塞,理論上一個線程能夠獨立處理全部I/O相關的操做。從架構層面看,一個NIO線程確實能夠完成其承擔的職責。例如,經過Acceptor類接收客戶端的TCP鏈接請求消息,當鏈路創建成功以後,經過Dispatch將對應的ByteBuffer派發到指定的Handler上,進行消息解碼。用戶線程消息編碼後經過NIO線程將消息發送給客戶端。後端

在一些小容量應用場景下,可使用單線程模型。可是這對於高負載、大併發的應用場景卻不合適,主要緣由以下:安全

  1. 一個NIO線程同時處理成百上千的鏈路,性能上沒法支撐,即使NIO線程的CPU負荷達到100%,也沒法知足海量消息的編碼、解碼、讀取和發送。
  2. 當NIO線程負載太重以後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每會進行重發,這更加劇了NIO線程的負載,最終會致使大量消息積壓和處理超時,成爲系統的性能瓶頸。
  3. 可靠性問題:一旦NIO線程意外跑飛,或者進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部消息,形成節點故障。

Reactor多線程模型

Rector多線程模型與單線程模型最大的區別就是有一組NIO線程來處理I/O操做。網絡

Reactor多線程模型的特色以下:多線程

  1. 有專門一個NIO線程——Acceptor線程用於監聽服務端,接收客戶端的TCP鏈接請求。
  2. 網絡I/O操做——讀、寫等由一個NIO線程池負責,線程池能夠採用標準的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送。
  3. 一個NIO線程能夠同時處理N條鏈路,可是一個鏈路只對應一個NIO線程,防止發生併發操做問題。

在絕大多數場景下,Reactor多線程模型能夠知足性能需求。可是,在個別特殊場景中,一個NIO線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如併發百萬客戶端鏈接,或者服務端須要對客戶端握手進行安全認證,可是認證自己很是損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足的問題,爲了解決性能問題,產生了第三種Reactor線程模型——主從Reactor多線程模型。架構

主從Reactor多線程模型

主從Reactor線程模型的特色是:服務端用於接收客戶端鏈接的再也不是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP鏈接請求並處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工做。Acceptor線程池僅僅用於客戶端的登陸、握手和安全認證,一旦鏈路創建成功,就將鏈路註冊到後端subReactor線程池的I/O線程上,由I/O線程負責後續的I/O操做。併發

利用主從NIO線程模型,能夠解決一個服務端監聽線程沒法有效處理全部客戶端鏈接的性能不足問題。所以,在Netty的官方demo中,推薦使用該線程模型。

Netty的線程模型

Netty的線程模型並非一成不變的,它實際取決於用戶的啓動參數配置。經過設置不一樣的啓動參數,Netty能夠同時支持Reactor單線程模型、多線程模型和主從Reactor多線層模型。

下面讓咱們經過一張原理圖(圖18-4)來快速瞭解Netty的線程模型。

能夠經過Netty服務端啓動代碼來了解它的線程模型:

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer() {
                    @Override
                    public void initChannel(Channel ch)throws IOException{
                        ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder());
                        ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50));
                        ch.pipeline().addLast(new LoginAuthRespHandler());
                        ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler());
                    }
                });

        // 綁定端口,同步等待成功
        b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();

服務端啓動的時候,建立了兩個NioEventLoopGroup,它們實際是兩個獨立的Reactor線程池。一個用於接收客戶端的TCP鏈接另外一個用於處理I/O相關的讀寫操做,或者執行系統Task、定時任務Task等

Netty用於接收客戶端請求的線程池職責以下。

(1)接收客戶端TCP鏈接,初始化Channel參數;

(2)將鏈路狀態變動事件通知給ChannelPipeline。

Netty處理I/O操做的Reactor線程池職責以下。

(1)異步讀取通訊對端的數據報,發送讀事件到ChannelPipeline;

(2)異步發送消息到通訊對端,調用ChannelPipeline的消息發送接口;

(3)執行系統調用Task;

(4)執行定時任務Task,例如鏈路空閒狀態監測定時任務。

經過調整線程池的線程個數、是否共享線程池等方式,Netty的Reactor線程模型能夠在單線程、多線程和主從多線程間切換,這種靈活的配置方式能夠最大程度地知足不一樣用戶的個性化定製。

爲了儘量地提高性能,Netty在不少地方進行了無鎖化的設計,例如在I/O線程內部進行串行操做,避免多線程競爭致使的性能降低問題。表面上看,串行化設計彷佛CPU利用率不高,併發程度不夠。可是,經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列—多個工做線程的模型性能更優。

它的設計原理如圖:

Netty的NioEventLoop讀取到消息以後,直接調用ChannelPipeline的fireChannelRead (Object msg)。只要用戶不主動切換線程,一直都是由NioEventLoop調用用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程操做致使的鎖的競爭,從性能角度看是最優的。

最佳實踐

Netty的多線程編程最佳實踐以下。

(1)建立兩個NioEventLoopGroup,用於邏輯隔離NIO Acceptor和NIO I/O線程。

(2)儘可能不要在ChannelHandler中啓動用戶線程(解碼後用於將POJO消息派發到後端業務線程的除外)。

(3)解碼要放在NIO線程調用的解碼Handler中進行,不要切換到用戶線程中完成消息的解碼。

(4)若是業務邏輯操做很是簡單,沒有複雜的業務邏輯計算,沒有可能會致使線程被阻塞的磁盤操做、數據庫操做、網路操做等,能夠直接在NIO線程上完成業務邏輯編排,不須要切換到用戶線程。

(5)若是業務邏輯處理複雜,不要在NIO線程上完成,建議將解碼後的POJO消息封裝成Task,派發到業務線程池中由業務線程執行,以保證NIO線程儘快被釋放,處理其餘的I/O操做。

NioEventLoop

Netty的NioEventLoop並非一個純粹的I/O線程,它除了負責I/O的讀寫以外,還兼顧處理如下兩類任務:

  1. 系統Task:經過調用NioEventLoop的execute(Runnable task)方法實現,Netty有不少系統Task,建立它們的主要緣由是:I/O線程和用戶線程同時操做網絡資源時,爲了防止併發操做致使的鎖競爭,將用戶線程的操做封裝成Task放入消息隊列中,由I/O線程負責執行,這樣就實現了局部無鎖化。
  2. 定時任務:經過調用NioEventLoop的schedule(Runnable command, long delay, TimeUnit unit)方法實現。

正是由於NioEventLoop具有多種職責,因此它的實現比較特殊,它並非個簡單的Runnable。

它實現了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,正是由於這種設計,致使NioEventLoop和其父類功能實現很是複雜。

NioEventLoop源碼分析

Selector的初始化

做爲NIO框架的Reactor線程,NioEventLoop須要處理網絡I/O讀寫事件,所以它必須聚合一個多路複用器對象。

Selector的初始化很是簡單,直接調用Selector.open()方法就能建立並打開一個新的Selector。Netty對Selector的selectedKeys進行了優化,用戶能夠經過io.netty.noKeySetOptimization開關決定是否啓用該優化項。默認不打開selectedKeys優化功能。

    Selector selector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
        super(parent, executor, false);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        provider = selectorProvider;
        selector = openSelector();
    }

    private Selector openSelector() {
        final Selector selector;
        try {
            //經過provider.openSelector()建立並打開多路複用器
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
        //若是沒有開啓selectedKeys優化開關,就當即返回。
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
        //若是開啓了優化開關,須要經過反射的方式從Selector實例中獲取selectedKeys和publicSelectedKeys,
        //將上述兩個成員變量設置爲可寫,經過反射的方式使用Netty構造的selectedKeys包裝類selectedKeySet將原JDK的selectedKeys替換掉。
        try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());

            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable t) {
            selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
        }

        return selector;
    }

run方法的實現

    @Override
    protected void run() {
        //全部的邏輯操做都在for循環體內進行,只有當NioEventLoop接收到退出指令的時候,才退出循環,不然一直執行下去,這也是通用的NIO線程實現方式。
        for (;;) {
            //首先須要將wakenUp還原爲false,並將以前的wakeup狀態保存到oldWakenUp變量中。
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                //經過hasTasks()方法判斷當前的消息隊列中是否有消息還沒有處理
                if (hasTasks()) {
                    //若是有則調用selectNow()方法當即進行一次select操做,看是否有準備就緒的Channel須要處理。
                    //Selector的selectNow()方法會當即觸發Selector的選擇操做,若是有準備就緒的Channel,則返回就緒Channel的集合,不然返回0。
                    //選擇完成以後,再次判斷用戶是否調用了Selector的wakeup方法,若是調用,則執行selector.wakeup()操做。
                    selectNow();
                } else {
                    //執行select()方法,由Selector多路複用器輪詢,看是否有準備就緒的Channel。
                    //取當前系統的納秒時間,調用delayNanos()方法計算得到NioEventLoop中定時任務的觸發時間。
                    //計算下一個將要觸發的定時任務的剩餘超時時間,將它轉換成毫秒,爲超時時間增長0.5毫秒的調整值。
                    //對剩餘的超時時間進行判斷,若是須要當即執行或者已經超時,則調用selector.selectNow()進行輪詢操做,將selectCnt設置爲1,並退出當前循環。
                    //將定時任務剩餘的超時時間做爲參數進行select操做,每完成一次select操做,對select計數器selectCnt加1。
                    //Select操做完成以後,須要對結果進行判斷,若是存在下列任意一種狀況,則退出當前循環:
                    //if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {break}
                    //1.有Channel處於就緒狀態,selectedKeys不爲0,說明有讀寫事件須要處理;
                    //2.oldWakenUp爲true;
                    //3.系統或者用戶調用了wakeup操做,喚醒當前的多路複用器;
                    //4.消息隊列中有新的任務須要處理。
                    //若是本次Selector的輪詢結果爲空,也沒有wakeup操做或是新的消息須要處理,則說明是個空輪詢.
                    //有可能觸發了JDK的epoll bug,它會致使Selector的空輪詢,使I/O線程一直處於100%狀態。
                    //截止到當前最新的JDK7版本,該bug仍然沒有被徹底修復。因此Netty須要對該bug進行規避和修正。
                    //該Bug的修復策略以下:
                    //(1)對Selector的select操做週期進行統計;
                    //(2)每完成一次空的select操做進行一次計數;
                    //(3)在某個週期(例如100ms)內若是連續發生N次空輪詢,說明觸發了JDK NIO的epoll()死循環bug。
                    //監測到Selector處於死循環後,須要經過重建Selector的方式讓系統恢復正常,重建步驟以下:
                    //(1)首先經過inEventLoop()方法判斷是不是其餘線程發起的rebuildSelector,
                    //若是由其餘線程發起,爲了不多線程併發操做Selector和其餘資源,須要將rebuildSelector封裝成Task,
                    //放到NioEventLoop的消息隊列中,由NioEventLoop線程負責調用,這樣就避免了多線程併發操做致使的線程安全問題。
                    //(2)調用openSelector方法建立並打開新的Selector
                    //(3)經過循環,將原Selector上註冊的SocketChannel從舊的Selector上去註冊,從新註冊到新的Selector上,並將老的Selector關閉。
                    //過銷燬舊的、有問題的多路複用器,使用新建的Selector,就能夠解決空輪詢Selector致使的I/O線程CPU佔用100%的問題。
                    select();
                    //判斷用戶是否調用了Selector的wakeup方法
                    if (wakenUp.get()) {
                        //若是調用,則執行selector.wakeup()操做。
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;

                final long ioStartTime = System.nanoTime();
                needsToSelectAgain = false;
                //若是輪詢到了處於就緒狀態的SocketChannel,則須要處理網絡I/O事件
                if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    //因爲默認未開啓selectedKeys優化功能,因此會進入processSelectedKeysPlain分支執行。
                    //1.對SelectionKey進行保護性判斷,若是爲空則返回。
                    //2.獲取SelectionKey的迭代器進行循環操做,經過迭代器獲取SelectionKey和SocketChannel的附件對象,
                    //3.將已選擇的選擇鍵從迭代器中刪除,防止下次被重複選擇和處理
                    //4.對SocketChannel的附件類型進行判讀,
                    //若是是AbstractNioChannel類型,說明它是NioServerSocketChannel或者NioSocketChannel,須要進行I/O讀寫相關的操做
                    //步驟以下:
                    //--首先從NioServerSocketChannel或者NioSocketChannel中獲取其內部類Unsafe,判斷當前選擇鍵是否可用,
                    //--若是不可用,則調用Unsafe的close方法,釋放鏈接資源。
                    //--若是選擇鍵可用,則繼續對網絡操做位進行判斷,以下:
                    //----若是是讀或者鏈接操做,則調用Unsafe的read方法。此處Unsafe的實現是個多態
                    //對於NioServerSocketChannel,它的讀操做就是接收客戶端的TCP鏈接。
                    //對於NioSocketChannel,它的讀操做就是從SocketChannel中讀取ByteBuffer。
                    //----若是網絡操做位爲寫,則說明有半包消息還沒有發送完成,須要繼續調用flush方法進行發送
                    //----若是網絡操做位爲鏈接狀態,則須要對鏈接結果進行判讀,在進行finishConnect判斷以前,須要將網絡操做位進行修改,註銷掉SelectionKey.OP_CONNECT。
                    //若是它是NioTask,則對其進行類型轉換,調用processSelectedKey進行處理。因爲Netty自身沒實現NioTask接口,因此一般狀況下系統不會執行該分支,除非用戶自行註冊該Task到多路複用器。
                    processSelectedKeysPlain(selector.selectedKeys());
                }
                //因爲NioEventLoop須要同時處理I/O事件和非I/O任務,爲了保證二者都能獲得足夠的CPU時間被執行,Netty提供了I/O比例供用戶定製。
                //若是I/O操做多於定時任務和Task,則能夠將I/O比例調大,反之則調小,默認值爲50%。
                //Task的執行時間根據本次I/O操做的執行時間計算得來。
                final long ioTime = System.nanoTime() - ioStartTime;
                final int ioRatio = this.ioRatio;//50%
                //處理完I/O事件以後,NioEventLoop須要執行非I/O操做的系統Task和定時任務
                //首先從定時任務消息隊列中彈出消息進行處理,若是消息隊列爲空,則退出循環。
                //根據當前的時間戳進行判斷,若是該定時任務已經或者正處於超時狀態,則將其加入到執行Task Queue中,同時從延時隊列中刪除。
                //定時任務若是沒有超時,說明本輪循環不須要處理,直接退出便可。
                //執行Task Queue中原有的任務和從延時隊列中複製的已經超時或者正處於超時狀態的定時任務
                //因爲獲取系統納秒時間是個耗時的操做,每次循環都獲取當前系統納秒時間進行超時判斷會下降性能。
                //爲了提高性能,每執行60次循環判斷一次,若是當前系統時間已經到了分配給非I/O操做的超時時間,則退出循環。
                //這是爲了防止因爲非I/O任務過多致使I/O操做被長時間阻塞。
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                //判斷系統是否進入優雅停機狀態,若是處於關閉狀態。
                if (isShuttingDown()) {
                    //調用closeAll方法,釋放資源。遍歷獲取全部的Channel,調用它的Unsafe.close()方法關閉全部鏈路,釋放線程池、ChannelPipeline和ChannelHandler等資源。
                    closeAll();
                    //並讓NioEventLoop線程退出循環,結束運行。
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }
相關文章
相關標籤/搜索