NIO 在Tomcat中的應用

對NIO的理解

我的單方面認爲,NIO與BIO的最大區別在於主動和被動,使用BIO的方式須要等待被調用方返回數據,很明顯此時調用者是被動的。java

舉個例子apache

阻塞IO 假設你是一個膽小又害羞的男孩子,你約了隔壁測試的妹子,但你並不敢主動約會,因此你把本身的手機號碼給她,並暗示她想要約會的時候打電話給你。很明顯此時你陷入了被動,約不約會的結果須要妹子主動告知你,若是她忘了,那麼你要陷入長時間的等待中以及無盡的猜想和自我懷疑中(太慘了)。[若是你是一個膽小害羞又好色的男孩子,那就慘了]緩存

非阻塞IO 咱們知道,渣男一般有不少的備胎,我管這個叫作備胎池(SpareTirePool), 那麼當他想要約會的時候,只要羣發問妹子要不要約會,若是要約會的話就和妹子約會,約會結束以後,處理其餘約會事件,若是沒有繼續下一次詢問。在這個例子中約會能夠視爲IO事件,問妹子的過程能夠視爲備胎池的輪詢。tomcat

若是你要學習NIO,能夠學習網絡

Tomcat 如何使用NIO

既然是網絡通訊的I/O那必然有如下兩個步驟併發

  • SeverSocket的啓動
  • I/O事件的處理

關鍵代碼在 package org.apache.tomcat.util.net.NioEndpoint 中app

P.S. 文章太長,若是不想看能夠直接閱讀結論socket

ServerSocket的啓動

在最開始看代碼,是震驚的,真的,若是你看Reactor模型的話ide

如下bind方法代碼是啓動ServerSocket的流程,主要流程以下高併發

  • 綁定地址
  • 設置接收新鏈接的方式爲阻塞方式(關鍵點)
  • 設置Acceptor和Poller的數量以及初始化SelectorPool
@Override
    public void bind() throws Exception {

        if (!getUseInheritedChannel()) {
            serverSock = ServerSocketChannel.open();
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
            serverSock.socket().bind(addr,getAcceptCount());
        } else {
            // Retrieve the channel provided by the OS
            Channel ic = System.inheritedChannel();
            if (ic instanceof ServerSocketChannel) {
                serverSock = (ServerSocketChannel) ic;
            }
            if (serverSock == null) {
                throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
            }
        }
        // 以阻塞的方式來接收鏈接!!
        serverSock.configureBlocking(true); //mimic APR behavior

        // 設置Acceptor和Poller的數量
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            // 顧名思義,Acceptor是用來處理新鏈接的
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            // Poller 用來處理I/O事件
            pollerThreadCount = 1;
        }
        setStopLatch(new CountDownLatch(pollerThreadCount));

        // Initialize SSL if needed
        initialiseSsl();
        // 今後處能夠看出tomcat池化了selector
        selectorPool.open();
    }
複製代碼

Tomcat NIO 如何處理I/O事件

先說結論,Tomcat NIO模型中有如下關鍵角色

  • Acceptor 用於接收新鏈接,每一個Acceptor一個線程,以阻塞的方式接收新鏈接
  • Poller 當Acceptor接收到新鏈接,進行處理以後選擇一個Poller處理該鏈接上的I/O事件。
  • LimitLatch 一個用來限制鏈接數的鎖

Acceptor

Acceptor的主要工做就是不斷接收來自客戶端的鏈接,在簡單處理以後將該鏈接交給Poller處理

接收來自客戶端鏈接, 若是你不想看代碼,如下是其主要流程

  • 接收來自客戶端的鏈接,並將其交給Poller處理
@Override
        public void run() {

            int errorDelay = 0;

            // running的檢測貫穿了Accpetor的處理流程,在每次關鍵操做的時候都會執行檢測
            while (running) {

                // 若是進入暫停狀態則每隔一段時間檢測一下
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                // 再次檢測
                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //檢查是否達到最大鏈接數若是是則陷入等待,若是不是則增長當前鏈接數
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        //接收新鏈接
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // 發生異常,則減小鏈接數
                        countDownConnection();
                        if (running) {
                         handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        //setSocketOptions會致使將該鏈接交給Poller處理
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
複製代碼

再來看看setSocketOptions作了什麼,不想看代碼的話,總結以下

  • 將客戶端socket設置爲非阻塞模式
  • 將客戶端的socket封裝爲NioChannelSecureNioChannel(使用了對象池技術)
  • Poller池中獲取一個Poller,將NioChannel註冊到Poller上
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //設置爲非阻塞模式,以便經過selector進行查詢
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            //從對象池中獲取一個NioChannel,tomcat會複用一切能夠複用的對象以減小建立新對象所帶來的消耗
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
               // 沒有獲取到,那就新建一個唄
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                // SSL這一塊還沒研究
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                //從新設置SocketBufferHandler,將其設置爲可寫和可讀
                channel.reset();
            }
            //從Poller池中獲取一個Poller(按照次序獲取,能夠理解爲一個圓環),並將Channel註冊到上面
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

複製代碼

Poller

從鏈接註冊到Poller提及

不加鎖的獲取一個Poller

具體說明見代碼

關鍵點:對一個數A取餘會將餘數的結果限制在A的範圍內

/** * Return an available poller in true round robin fashion. * 很明顯,取餘的方式揭示了獲取Poller的方法。你能夠理解爲 * Poller會組成一個圓環,這樣咱們就能夠經過不斷遞增獲取 * 下一個Poller,可是數據會溢出因此咱們要取絕對值 * @return The next poller in sequence */
    public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }
複製代碼

channel的註冊

該方法會對新的建的鏈接進行封裝,並以PollerEvent的形式註冊到相應的Poller中

須要注意的是,真正的註冊讀事件並非在此方法註冊的(當前方法調用者爲Acceptor線程),而是在Poller線程中註冊讀事件的

/** * Registers a newly created socket with the poller. * 將新建的socket註冊到Poller上 * @param socket The newly created socket */
        public void register(final NioChannel socket) {
            //如下代碼爲設置各類參數,能夠從方法名進行推測,再也不進行敘述
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getConnectionTimeout());
            ka.setWriteTimeout(getConnectionTimeout());
            //從緩存中獲取一個PollerEvent
            PollerEvent r = eventCache.pop();
            // 註冊讀事件
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            // 若是沒有從緩存中獲取,那麼就新建一個
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }
複製代碼

Poller處理I/O 事件

Poller 處理I/O事件的的代碼較長,並且細節也較多,總結其主要做用以下

  • 檢測是否有Acceptor提交PollerEvent,若是有則調用PolllerEvent的run方法註冊讀事件
  • 在執行關鍵操做的時候檢測該Poller是否被關閉若是是,則執行相應的資源釋放和關閉操做
  • 調用selector.select() 輪詢事件,若是有讀事件則交給processKey處理
@Override
        public void run() {
            // Loop until destroy() is called
            // 一直循環直到destroy方法被調用
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        // events 方法會處理Acceptor註冊到Poller中的PollerEvent
                        // 主要是註冊讀事件
                        hasEvents = events();
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            //if we are here, means we have other stuff to do
                            //do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        wakeupCounter.set(0);
                    }
                    // 檢測到關閉,則處理剩餘的事件並關閉selector
                    if (close) {
                        // 處理Acceptors註冊到Poller中的PollerEvent
                        events();
                        //selector time out 或者poller被關閉就會調用timeout方法
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error("",x);
                    continue;
                }
                //either we timed out or we woke up, process events first
                if ( keyCount == 0 ) hasEvents = (hasEvents | events());
                // 執行 select 操做,查詢I/O事件
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        // 處理檢測到的I/O事件
                        processKey(sk, attachment);
                    }
                }//while

                //timeout 會檢查是否關閉,若是已經關閉而且有事件未處理會調用cancelledKey方法
                //cancelledKey:該方法主要是對和該鏈接相關的資源執行關閉操做
                timeout(keyCount,hasEvents);
            }//while

            getStopLatch().countDown();
        }
複製代碼

processKey 處理I/O事件

processKey主要工做以下

  • 再次檢測Poller是否關閉,若是是則釋放資源
  • 檢測查詢到事件是否合法,若是合法則取消已註冊到selector上的事件且被被本次輪詢所查詢到的事件
  • 再調用processSocket處理讀事件,以後處理寫事件
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            try {
                if ( close ) {
                    // 若是Poller關閉則關閉和釋放和此鏈接相關的資源
                    cancelledKey(sk);
                } else if ( sk.isValid() && attachment != null ) {
                    if (sk.isReadable() || sk.isWritable() ) {
                        if ( attachment.getSendfileData() != null ) {
                            processSendfile(sk,attachment, false);
                        } else {
                            // 取消註冊事件
                            // sk.interestOps()& (~readyOps)
                            unreg(sk, attachment, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write 先讀後寫
                            if (sk.isReadable()) {
                               // 關鍵代碼,調用processSocket方法處理讀事件
                                if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                                if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk);
                            }
                        }
                    }
                } else {
                    //invalid key
                    cancelledKey(sk);
                }
            } catch ( CancelledKeyException ckx ) {
                cancelledKey(sk);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error("",t);
            }
        }
複製代碼

processSocket 真-處理I/O事件

processSocket定義在org.apache.tomcat.util.net.AbstractEndPoint中, 也就是意味着不管你採用的是BIO仍是NIO或者NIO2最終讀寫數據都是調用此方法

從代碼中能夠看出,依然是對象池,依然是再次封裝(套娃),並將其提交到線程池中執行,接下來的內容就再也不本次討論範圍內呢。

public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
複製代碼

總結

Tomcat的NIO模型

NIO模型

手抖了,線不怎麼♂

LimitLatch 爲全部的Acceptor共用,用來限制當前的最大鏈接數

Acceptor 以阻塞的形式來接收新鏈接,並將其封裝成PollerEvent對象提交到Poller中

Poller 接收來自Acceptor的PollerEvent並註冊讀事件,以及輪詢和其綁定的客戶端Socket有無讀事件,若是有則執行進一步操做,將其提交到其餘地方執行處理(解析Http協議)

思想遷移

學習源碼就是爲了學習其設計思想. -- 沃茲及.碩德

對象池化 池化對象、池化鏈接能夠大大下降新建對象以及GC所帶來的消耗,當須要使用從池中取出來從新設置相關值便可

環形隊列 雖然這玩意不新鮮,但配合上原子類,就能夠在高併發的狀況,高效的獲取隊列中的下一個元素(環形隊列中索引溢出的處理在以前我是沒有考慮到的)

阻塞獲取連接,非阻塞處理IO事件 與Reactor模型造成強烈的對比,學習NIO的時候思惟被限制住了,認爲非阻塞的獲取鏈接會得到更高的性能,但如今狀況不必定了(還沒測試,哪位老哥試了告訴我一下)

關鍵操做時,對標誌位進行檢測 若是你要經過一個標誌變量來控制你的線程,且線程循環一次須要相對較長的時間(你代碼太長,操做太多)那麼最好在執行關鍵操做以前對你的標誌變量進行檢查,來決定是否要改變線程的行爲(康康poller和Acceptor的代碼)

初次學習Tomcat的代碼,有理解錯誤的地方還請大佬指出

相關文章
相關標籤/搜索