Netty源碼分析之Selector流程

在Netty啓動後,Netty的線程池會起一個Selector線程處理IO事件和其餘業務事件,下面來看下Selector流程java

流程圖

Selector線程是一個循環線程它一直處理IO事件和其餘業務事件。這裏須要說明Selector線程是處理IO事件和處理其餘業務共享線程,也就是說Selector線程會按用戶配置比例來處理IO接事件和其餘業務事件(如channel註冊事件),可能此次在執行IO事件下次若是有其餘任務來了就忙其餘任務去了。ide

源碼和實現分析

Selector的主流程就是一個run()方法,源碼以下:oop

@Override
    protected void run() {
        for (;;) {
            //wakenUp設置成false,並獲取原來wakenUp狀態
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
             //判斷當前任務隊列是否有任務,若是有任務直接快速select一次以便能處理到任務
                if (hasTasks()) {
                    selectNow();
                } else {
                //若是任務隊列沒有任務則進行阻塞select並讓出cup時間片
                    select(oldWakenUp);
                //若是wakenUp是true,則中斷select一次
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }
                //處理io事件和根據佔時比例配置處理任務
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    //處理IO事件
                    processSelectedKeys();
                    //處理任務
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();
 						//處理IO事件
                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    //處理任務
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }
    }

主要有如下幾個流程性能

  • 循環開始先把喚醒標誌wakenUp設置成false,並獲取原來wakenUp狀態。

這裏的wakenUp是控制select的中斷的標誌,若是有其餘任務加入會中斷線程的select,代碼以下fetch

@Override
    public void execute(Runnable task) {
    
        ...
        
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

因此這裏每次循環開始都會把wakenUp標誌清理掉。ui

  • 判斷當前任務隊列是否有任務,若是有任務直接快速select一次以便能處理到任務。this

  • 若是任務隊列沒有任務則進行阻塞select並讓出cup時間片。編碼

  • 若是wakenUp是true,則中斷select一次。.net

這裏會有個迷惑,就是若是上次加入任務中斷了select一次這裏狀態還未清理還會再中斷一次,這樣重複中斷設計的意義是什麼?看官方說法是若是沒有這個操做,第一次select被中斷後等待任務執行過程當中的全部加入的任務都不能改變wakenUp的狀態爲ture,由於改變狀態是用cas方式wakenUp.compareAndSet(false, true),因此下次select會出現沒必要要的阻塞。所以這裏作了重複的喚醒。這樣作在沒任務加入狀況下實際上是浪費的因此官方稱這種作法inefficient。線程

  • 處理io事件和根據佔時比例配置處理任務

這裏配置比例主要是讓用戶協調io事件和任務執行的時間,若是ioRatio配置100,會先執行io事件而後執行所有的任務;默認ioRatio配置50,會先執行io事件而後用io事件50%執行時間處理任務,處理任務的時間計算公式以下:

io事件處理時間 * (100 - ioRatio) / ioRatio

下面來看下select的實現:

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            //select計數器
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            //根據定時任務計算select延遲時間
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
            		//計算select阻塞時間
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                //阻塞式select
                int selectedKeys = selector.select(timeoutMillis);
                //計時器加1
                selectCnt ++;
                //若是發現待處理io事件或老喚醒標記true或最新喚醒標記爲true或隊列中有任務或有定時任務,跳出循環,中斷本次輪詢
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                //若是有線程被中斷也,,跳出循環,中斷本次輪詢
                if (Thread.interrupted()) {
                    
                    selectCnt = 1;
                    break;
                }
                //若是的selet時間大於等於timeoutMillis,說明selet正常,計數器重歸於1
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    selectCnt = 1;
                //若是selet在timeoutMillis時間內返回次數大於配置次數說明可能觸發了jdk的nio bug,則重建selector
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    rebuildSelector();
                    selector = this.selector;
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }
            
        } catch (CancelledKeyException e) {
        }
    }

select的操做是個循環,select會一直循環直到出現如下狀況:

  • 發現待處理io事件。
  • 老喚醒標記true,說明處理IO接事件和其餘業務事件期間加入了任務。
  • 最新喚醒標記爲true,說select期間加入了任務。
  • 隊列中有任務或有定時任務。
  • 觸發了jdk的nio bug。

select的操做主要有如下幾個流程:

  • 根據定時任務計算select延遲時間。 這裏計算方式就取定時任務隊列裏的第一個任務(任務根據執行時間從小到大)獲取執行時間加0.5毫秒,若是沒有任務默認1秒加0.5毫秒。

  • 阻塞式select。

  • 若是發現待處理io事件或老喚醒標記true或最新喚醒標記爲true或隊列中有任務或有定時任務,跳出循環,中斷本次輪詢。

  • 若是的select時間大於等於timeoutMillis,說明select正常,計數器重歸於1。

  • 若是select在timeoutMillis時間內返回次數大於配置次數說明可能觸發了jdk的nio bug,則重建selector。 此bug會在沒有IO事件發生時select當即返回,因此會形成無心義的循環最後可能致使cpu飆到100%狀況,因此NIO採用計數器和重建selector方法解決這個bug

    關於該bug的描述見 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055

    重建的過程就是先建立新的selector,將老的selector中監聽key複製到新selector中,而後註冊Channel到新selector中,最後關閉老的selector。

下面看下IO處理的實現

IO處理的實現就是先獲取待處理的key,而後交給processSelectedKey方法去處理,咱們看下processSelectedKey方法的實現

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0){
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

系統會根據SelectionKey處理哪一種IO事件,IO事件共有4種:

  • SelectionKey.OP_READ 讀事件
  • SelectionKey.OP_ACCEPT 接收事件
  • SelectionKey.OP_WRITE 寫事件
  • SelectionKey.OP_CONNECT 鏈接事件

好比客戶端連接後會觸發服務端SelectionKey.OP_ACCEPT的接收事件,而後由服務端的主Channel處理,處理的過程後面會介紹。

處理完IO事件就是處理隊列中的任務若是ioRatio配置成100比較簡單處理方式就是一個一個執行隊列裏的所有任務,ioRatio非100處理比較複雜咱們來下ioRatio非100的處理實現:

protected boolean runAllTasks(long timeoutNanos) {
		 //將須要觸發的定時任務加入到任務隊列
        fetchFromScheduledTaskQueue();
        //獲取一個任務
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }
        //計算任務超時時間
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
                //執行任務
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;
            //每64次任務進行一次,超時判斷,若是超時退出執行
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            //取下個任務
            task = pollTask();
            //若是沒有任務也退出執行
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

執行任務的流程也是個循環直到超時或者沒任務,執行任務流程以下:

  • 將須要觸發的定時任務加入到任務隊列。

  • 計算任務超時時間,超時時長是根據上面說的公式計算的。

  • 執行任務。

  • 每執行64次任務進行一次超時判斷,若是超時退出執行。

    這裏設計64次的緣由官方給的解釋是每次調用nanoTime()去判斷超時是耗費性能的,因此寫成64,同時官方也表示硬編碼成64是不太合理後面會改爲可配置。

  • 取下個任務,若是沒有任務也退出執行。

相關文章
相關標籤/搜索