NIO 在Jetty中的應用

引子

做爲縱橫情場多年的老手,憲程在把到妹子後一般有如下策略 (假設憲程是影流之主的第1024代傳人而且只剩下了分身的能力)java

  • 將妹子存到隊列中,不時發微信去撩一下,若是有意向的話憲程會使用分身能力再建立一個憲程去把妹git

  • 憲程本身執行把妹的操做,若是期間又有新的妹子看上他咋辦呢,那就將該妹子交給本身的分身憲程去輪詢處理,而且憲程在把完妹子以後會嘗試去把分身憲程的輪詢任務給接過來,畢竟本體老是要掌握主動權的,若是沒有接過來咋辦?只能選擇成爲分身了,畢竟此時分身憲程已經接過了本體的工做,某種意義上他已經成爲了本體github

Jetty NIO 模型

建議在閱讀以前先了解如下Tomcat的NIO模型,沒有對比就沒有傷害,你會發現Jetty NIO模型的有趣之處web

概述

若是時間充足的話,我建議你直接閱讀附錄,瞭解如何Debug Jetty NIO功能算法

既然要了解Jetty的NIO模型,從線程的角度來講能夠分爲如下幾類spring

  • 空閒線程 此角色會根據提交到線程池中的任務,將本身轉變爲I/O線程或者輪詢線程tomcat

  • Acceptor線程 該角色主要負責接收來自客戶端的鏈接並對其進行封裝以後,選擇一個Selector來提交此任務微信

  • 輪詢線程 此角色主要負責輪詢事件,並處理其餘角色提交給此角色的任務,另外此角色能夠根據所設定的策略將輪詢任務交給其餘線程,在執行完I/O任務以後歸還到線程池中成爲空閒線程網絡

主要參與的類有多線程

  • Connector 該角色主要負責JettyNIO模型中各個組件的啓動和,協調工做

  • SelectorManager 此角色主要對ManagedSelector進行管理,想要和Selector進行交互可使用此類

  • ManagedSelector 封裝了JDK原生的selector, 並對外提供對selector執行操做的內部類、接口以及方法

重點 全部線程共用一個線程池

Connector

關鍵類 org.eclipse.jetty.server.ServerConnector

Connector即鏈接器,是Jetty對於網絡I/O模型的一個抽象,主要負責組裝,啓動Jetty NIO模型中所須要用到的組件。所以,咱們主要注意力集中到其實現上也就是ServerConnector上。

初始化Connector鏈接器,咱們須要向其提供如下關鍵參數(隱去了和本文無關的參數,有興趣的可自行了解)

  • 用來執行接收新鏈接、處理I/O、輪詢事件任務的線程池
  • ByteBuffer 對象池, 該對象池能夠回收以及提供ByteBuffer給I/O線程使用
  • 負責執行accept操做線程的數量
  • 負責執行輪詢任務的selector線程數量

可是,大部分的初始化工做並非在ServerConnector中執行的,而是在其父類中執行的操做,所以咱們將目光轉移到 org.eclipse.jetty.server.AbstractConnector

該類的初始化代碼以下,其主要作了如下工做

  • 檢查是否指定線程池,若是沒有則和Server共用一個線程池
  • 檢查是否指定ByteBufferPool,若是沒有則使用ArrayByteBuffer
  • 檢查是否設置Acceptor數量,若是沒有則按照max(1,min(4,CPU核心數÷8))進行計算,也就是說默認的Acceptor數量最少有一個,最多有4個

想象一下,若是ServerSocketChannel被設置爲阻塞狀態以便多個線程同時執行accept操做,那麼大多數狀況下多數線程將會陷入阻塞狀態,而且線程從阻塞態恢復是有線程上下文切換的成本的所以Acceptor線程並非越多越好

public AbstractConnector( Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories) {
        _server = server;
        //檢查是否設置線程池,若是沒有則使用Server的
        _executor = executor != null ? executor : _server.getThreadPool();
        if (scheduler == null)
            scheduler = _server.getBean(Scheduler.class);
        _scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
        
        // 檢查是否指定ByteBufferPool,若是沒有則本身建立一個
        if (pool == null)
            pool = _server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
        // 將這些對象交給Jetty統一管理(不在本文討論範圍內,不展開)
        addBean(_server, false);
        addBean(_executor);
        if (executor == null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // ConnectionFactory主要使用來處理對應的HTTP協議
        for (ConnectionFactory factory : factories)
        {
            addConnectionFactory(factory);
        }
        // 若是未指定Acceptor的數量則根據CPU核數執行計算
        int cores = ProcessorUtils.availableProcessors();
        if (acceptors < 0)
           //根據此式能夠推出Acceptor數量最大是4最小是1
            acceptors = Math.max(1, Math.min(4, cores / 8));
        // Acceptor數量大於CPU核心數
        // 將會引發大量的線程陷入阻塞狀態
        // 沒有東西能夠accept不就阻塞了嗎
        // 而要激活阻塞的線程則須要切換線程上下文會引發性能的浪費
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
        _acceptors = new Thread[acceptors];
    }

複製代碼

以下圖所示個人電腦爲4核心的i5CPU,那麼默認的Acceptor線程應該只有一個

4核心CPU
在啓動你的Jetty以後咱們能夠用JConsole來驗證一下
正如你所看到的,以qtp開頭的線程用於NIO的線程池,其中一個Acceptor線程阻塞在accept()方法上

Acceptor

Acceptor是一個定義在AbstractConnector中的內部類, 其主要工做不斷調用在子類中實現accept方法,也就是接收鏈接的實現延遲到了子類中。

其代以下,能夠學到很多小技巧, 若是你不想看代碼,其總結以下

  • 獲取執行當前代碼線程,給他起個名字,見上一節JConsole的截圖
  • 將Acceptor線程優先級調至最高(固然,不必定起做用,還得看人操做系統理不理你)
  • 在執行accept操做以前須要等待來自其餘線程的放行信號
  • 不斷循環執行accept操做
public void run() {
           // 給線程起給名字
            final Thread thread = Thread.currentThread();
            String name = thread.getName();
            _name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString());
            thread.setName(_name);
            // 設置優先級
            int priority = thread.getPriority();
            if (_acceptorPriorityDelta != 0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta)));
            // 保存對此線程的引用
            _acceptors[_id] = thread;
            
            try
            {
                while (isRunning())
                {
                    // 加鎖,等待來自其餘線程的信號說能夠開始幹活了
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await();
                            continue;
                        }
                    }
                    catch (InterruptedException e)
                    {
                        continue;
                    }

                    try
                    {
                       //調用子類的accept方法
                        accept(_id);
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
               // 發生異常了,則將線程的名稱以及優先級調回原來的值
                thread.setName(name);
                if (_acceptorPriorityDelta != 0)
                    thread.setPriority(priority);
                
                //釋放引用
                synchronized (AbstractConnector.this)
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping = _stopping;
                if (stopping != null)
                    stopping.countDown();
            }
        }
複製代碼

在子類ServerConnector中,accept主要執行如下操做

  • 阻塞的形式接收來自客戶端的鏈接
  • 設置客戶端SocketChannel非阻塞模式,並禁用nagle算法
  • 交給SelectorManager來處理, 該類會將客戶端SocketChannel封裝成一個Accept事件,交給輪詢線程處理 ServerConnector中的代碼
@Override
    public void accept(int acceptorID) throws IOException {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
            accepted(channel);
        }
    }

    private void accepted(SocketChannel channel) throws IOException {
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        configure(socket); // socket.setTcpNoDelay(true);
        _manager.accept(channel);
    }
複製代碼

SelectorManager中最終被調用的代碼

public void accept(SelectableChannel channel, Object attachment) {
        final ManagedSelector selector = chooseSelector();
        selector.submit(selector.new Accept(channel, attachment));
    }
複製代碼

輪詢線程

輪詢線程主要負責輪詢I/O事件以及處理其餘線程提交到本線程任務。而且咱們能夠爲輪詢線程指定執行策略, 在後面咱們能夠看到執行策略將如何影響輪詢線程行爲。

首先,咱們須要先明確哪些類會參與到輪詢線程的工做中,也就是說咱們要先理清楚輪詢線程的調用鏈。

如上圖堆棧跟蹤圖紅框所標註的部分所示,參與到輪詢線程主要堆棧結構以下圖所示。

  • ManagedSelector 此類主要封裝了JDK的selector類,並對外暴露操做此Selector的方法和類
  • EatWhatYouKill 此類即輪詢線程執行策略,該類會不斷調用SelectorProducer.produce 方法產生封裝好的I/O任務,並根據其策略來決定執行這個I/O任務的方式
  • SelectorProducer 此類爲ManagedSelector的內部類,實現線程執行策略裏面的ExecutionStrategy.Producer接口,該類專門用於生成供輪詢線程處理的I/O任務

ManagedSelector

Jetty將JDK原生的Selector類封裝成爲ManagedSelector,該類主要功能是對外暴露對其封裝的selector執行操做的接口和內部類. 其關鍵方法和內部類以下

SelectorUpdate接口 若是要對ManagedSelector所管理的selector進行更新(如執行註冊感興趣的I/O事件)能夠實現此接口,該接口定義以下

public interface SelectorUpdate {
        void update(Selector selector);
    }
複製代碼

submit方法 該方法主要用於外界將SelectorUpdate提交到輪詢線程中以便執行對Selector的更新操做,簡單來講此方法會執行如下操做

  • 將update事件加入隊列
  • 檢查Selector是否正在執行select操做,若是是則將其喚醒,使其從阻塞狀態返回以便咱們對其進行更新
public void submit(SelectorUpdate update) {
        if (LOG.isDebugEnabled())
            LOG.debug("Queued change {} on {}", update, this);

        Selector selector = null;
        synchronized (ManagedSelector.this)
        {
            //加事件加入處理隊列
            _updates.offer(update);
            //檢查是否正在輪詢,若是正在輪詢,則會執行喚醒操做
            //所以在此處須要將selecting置爲false
            if (_selecting)
            {
                selector = _selector;
                // To avoid the extra select wakeup.
                _selecting = false;
            }
        }

        if (selector != null)
        {
           //執行喚醒操做,以便對selector執行更新操做
            if (LOG.isDebugEnabled())
                LOG.debug("Wakeup on submit {}", this);
            selector.wakeup();
        }
    }
複製代碼

SelectorProducer

SelectorProducerManagedSelector的內部類,該類實現了輪詢線程執行策略的ExecutionStrategy.Producer接口

interface Producer {
        // 返回一個Runnable任務供輪詢線程執行
        Runnable produce();
    }
複製代碼

所以SelectorProducer須要不斷調用selector去輪詢看有無新的I/O事件以供處理,除此以外它還須要處理外部類向ManagedSelector經過調用submit方法提交的SelectorUpdate任務

其向線程執行策略類所提供produce方法代以下所示,總的來講主要完成如下幾項工做

  • 執行一個循環,直到輪詢到感興趣的任務(一次只返回一個,被輪詢到事件會被保存起來供下一次使用)
  • 處理外部類向其提交的任務(調用processUpdates)
  • 更新客戶端SocketChannel感興趣的事件
@Override
        public Runnable produce() {
            while (true)
            {
                //處理以前查詢到事件
                Runnable task = processSelected();
                if (task != null)
                    return task;
                //處理外部類所提交的update任務
                //該方法最終會致使提交的SelectorUpdate.update被調用
                processUpdates();
                //此方法的調用可能會
                //致使客戶端SocketChannel感興趣的事件發生變動
                updateKeys();
                //執行select操做,並將查詢到事件保存起來
                if (!select())
                    return null;
            }
        }
複製代碼

processUpdates 此方法主要是處理外部類提交的SelectorUpdate任務,經過複製引用很是巧妙的避免了併發問題

private void processUpdates() {
            synchronized (ManagedSelector.this)
            {
                //倒騰數據,將要處理隊列的引用保存
                //到另外一個變量上,原有的引用能夠繼續對外提供服務
                //整個數據倒騰過程很是短,性能影響較小
                Deque<SelectorUpdate> updates = _updates;
                _updates = _updateable;
                _updateable = updates;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updateable {}", _updateable.size());
            //遍歷事件隊列,處理update方法
            for (SelectorUpdate update : _updateable)
            {
                if (_selector == null)
                    break;
                try
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("update {}", update);
                    //調用事件的update方法,並傳入selector
                    update.update(_selector);
                }
                catch (Throwable ex)
                {
                    LOG.warn(ex);
                }
            }
            _updateable.clear();

            Selector selector;
            int updates;
            //再次檢查是否有新的事件被提交,若是有則執行喚醒操做
            synchronized (ManagedSelector.this)
            {
               //外部類提交的任務會保存到updates中
                updates = _updates.size();
                _selecting = updates == 0;
                selector = _selecting ? null : _selector;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updates {}", updates);

            if (selector != null)
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("wakeup on updates {}", this);
                selector.wakeup();
            }
        }

複製代碼

select() 該方法主要執行輪詢操做,並將輪詢到事件保存起來以供下一次循環的時候返回,在這個方法中展示jetty如何處理空輪詢事件(空輪詢是指selector在執行select操做時,沒有查詢到任何事件卻返回了,這個BUG一般會形成CPU100%的使用率,從而使系統崩潰)

private boolean select() {
            try
            {
                Selector selector = _selector;
                if (selector != null && selector.isOpen())
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
                    int selected = selector.select();
                    //沒查詢到事件, 空輪詢事件處理
                    if (selected == 0)
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("Selector {} woken with none selected", selector);
                        //若是線程被中斷,而且標誌位被設置了不在運行則執行推出邏輯
                        if (Thread.interrupted() && !isRunning())
                            throw new ClosedSelectorException();
                        //開啓了此參數則當即執行一次select操做
                        if (FORCE_SELECT_NOW)
                            selected = selector.selectNow();
                    }
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());

                    int updates;
                    synchronized (ManagedSelector.this)
                    {
                        // 完成了select操做則設置標誌位
                        _selecting = false;
                        updates = _updates.size();
                    }

                    _keys = selector.selectedKeys();
                    _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);

                    return true;
                }
            }
            catch (Throwable x)
            {
                _selector = null;
                if (isRunning())
                    LOG.warn(x);
                else
                {
                    LOG.warn(x.toString());
                    LOG.debug(x);
                }
                closeNoExceptions(_selector);
            }
            return false;
        }
複製代碼

與Netty的空輪詢處理策略不一樣,Jetty的處理策略是再select一次並當即返回,但這樣彷佛並不能解決空輪詢的BUG問題詳情

EatWhatYouKill

EatWhatYouKill是線程執行策略的一種,也是Jetty默認的指策略,其思想來源於若是獵人殺死一隻獵物,那麼獵人就應該吃掉它(若是你吃過新鮮的蝦你就會對這種哲學深有體會),換種說法就是輪詢線程若是查詢到一次I/O事件就應該直接處理它(想起引子了嗎)

P.S. 關鍵代碼org.eclipse.jetty.util.thread.strategy.EatWhatYouKill

之因此這樣作的緣由是由於切換線程是一件比較費時操做(相對來講),所以在這種策略下輪詢線程A若是獲取到一個事件會有如下策略

  • 若是此任務被標誌爲非阻塞任務,那麼線程A會當即執行此任務

若是任務阻塞類型未知或者被標記爲阻塞狀態

  • 若是線程池中的線程都處於繁忙狀態,則將其提交到線程池種等待執行

  • 若是線程池種有空閒線程B,則嘗試將線程A負責輪詢功能交給線程B,若是當即獲取到線程B成功,則線程A會直接執行獲取到的任務, 任務執行完成後,線程A會嘗試奪回交給線程B的輪詢任務,若是奪回失敗則變爲空閒線程等待分配任務。(想起引子了嗎?)

  • 除此以外,線程A還會嘗試直接執行任務而且不會交出輪詢工做 (代碼太長,只摘出關鍵代碼)

case BLOCKING:
        synchronized (this)
        {
            if (_pending)
            {
                //輪詢工做陷入了停滯,所以是IDLE狀態
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }
            //tryExecute 若是當即分配到了線程則返回true
            //this的run方法也就是實現輪詢線程核心的方法
            //所以此行代碼至關於將輪詢的工做轉移給了其餘線程
            else if (_tryExecutor.tryExecute(this))
            {
                _pending = true;
                //因爲輪詢工做的轉移
                //所以當前輪詢工做至關於陷入空閒狀態
                //因此須要將此對象的狀態至爲IDLE
                //(輪詢線程和當前線程使用同一個對象)
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }else
            {
               //前二者均不知足則將任務提交到線程池
                mode = Mode.PRODUCE_EXECUTE_CONSUME;
            }
        }
        break;
複製代碼

任務的執行策略

case EXECUTE_PRODUCE_CONSUME:
                _epcMode.increment();
                //直接在當前線程調用
                runTask(task);

                // 嘗試奪回輪詢任務
                synchronized (this)
                {
                   // 若是State還處於空閒狀態
                   // 說明
                   // 線程B還未開始執行輪詢任務,能夠直接奪回
                   // 若是線程B已經開始輪詢
                   // 則選擇離開
                    if (_state == State.IDLE)
                    {
                        // 返回true則繼續輪詢
                        return true;
                    }
                }
                //返回false則結束輪詢任務,變爲空閒線程
                return false;
複製代碼

總結

相較於循規蹈矩的Tomcat,Jetty的設計更爲激進,更富有冒險主義者的精神,從我的角度來講更喜歡Jetty的設計,但從業務的角度來講仍是選擇Tomcat較爲穩妥畢竟穩定是業務的基本需求,而且Tomcat的性能也不會太差。

以線程的類別來進行劃分的話, Jetty的NIO模型以下圖所示

  • Acceptor 線程負責接收來自客戶端的新鏈接,並將其封裝成一個事件提交給輪詢線程處理
  • 輪詢線程 輪詢線程處理負責輪詢I/O事件以外,還須要處理外部線程所提交的selector更新任務,而且根據設定的執行策略,輪詢線程可能會在本線程直接執行I/O任務,並將輪詢任務移交給其餘空閒的線程,或者選擇一個空閒的線程來執行I/O操做
  • I/O線程 主要負責處理I/O操做

從線程類別的角度來看Jetty的NIO模型相對簡單,但其引入的輪詢線程執行策略使得線程之間身份能夠發生轉變, 得益於此Jetty能夠直接輪詢線程直接執行I/O任務減小了線程上下文切換所帶來的性能消耗,提高了性能。

思想遷移

切換線程是有成本的 Jetty經過直接在輪詢線程執行I/O任務來提高性能,來減小線程上下文的切換,除此以外,咱們還能夠實現協程的機制來減小線程上下文切換所帶來的成本(參考Go語言)

Acceptor線程應適量 若是將ServerSocket設置爲阻塞模式,那麼accept操做將致使線程陷入阻塞,從accept方法返回時將引發線程上下的切換,所以並非越多越好

如何Debug Jetty

咱們使用SpringBoot來Debug Jetty,所以咱們須要在pom.xml中引入Jetty,因爲SpringBoot默認使用Tomcat所以咱們須要將其替換掉,依賴以下所示.

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
複製代碼

使用的SpringBoot版本是2.2.0其所依賴的Jetty版本號是9.4.20

  • 若是你要了解Connector是如何工做的請關注如下類 org.eclipse.jetty.server.ServerConnector

  • 若是你想要了解Jetty NIO 如何輪詢以及處理事件,那麼請關注如下類 org.eclipse.jetty.io.ManagedSelector 並在其內部類 SelectorProducerproduce方法打上斷點,以下圖所示,你將瞭解到整個輪詢過程當中都發生了什麼

右鍵小紅點,選擇Thread,以免進入不了斷點的狀況,畢竟咱們調試的是多線程程序

  • 若是你想要了解線程執行的策略,那麼請關注如下類(此類執行機制較爲複雜,若是想Debug到全部的狀況,最好結合必定的策略,如在Controller代碼處阻塞住線程等) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
相關文章
相關標籤/搜索