從Netty EventLoop實現上能夠學到什麼

本文主要討論Netty NioEventLoop原理及實踐,關於Netty NioEventLoop,首先要知道NioEventLoop是什麼,爲何它會是Netty核心Reactor處理器,實現原理是什麼,進而再討論Netty對其的實現及使用上咱們能夠學到哪些。java

EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理客戶端請求和內部任務,內部任務如ServerSocketChannel註冊、ServerSocket綁定和延時任務處理等操做。安全

EventLoop是由事件驅動的,好比IO事件和任務等,IO事件即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法觸發。處理完請求時間以後,會處理內部添加到taskQueue中的任務,如register0、bind0等任務,由runAllTasks方法觸發。注意NioEventLoop在Linux中默認底層是基於epoll機制。網絡

上圖是EventLoop的核心流程圖,若是從Netty總體視角看EventLoop的事件流轉,下圖來的更直觀:併發

注意:bossGroup和WorkerGroup中的NioEventLoop流程是一致的,只不過前者處理Accept事件以後將鏈接註冊到後者,由後者處理該鏈接上後續的讀寫事件。less

大體瞭解了NioEventLoop以後,不知道有沒有小夥伴有這樣的疑問,爲何Netty要這樣實現呢,這種實現方案對於咱們後續開發如何借鑑呢?關於這些疑問,本文最後討論哈 :)oop

EventLoop實現原理

EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理IO事件和內部任務。IO事件和內部任務執行時間百分比經過ioRatio來調節,ioRatio表示執行IO時間所佔百分比。任務包括普通任務和已經到時的延遲任務,延遲任務存放到一個優先級隊列PriorityQueue中,執行任務前從PriorityQueue讀取全部到時的task,而後添加到taskQueue中,最後統一執行task。fetch

事件處理機制

EventLoop是由事件驅動的,好比IO事件即selectionKey中ready的事件,如accept、connect、read、write等,處理的核心邏輯主要是在NioEventLoop.run方法中,流程以下:ui

protected void run() {
    for (;;) {
        /* 若是hasTasks,則調用selector.selectNow(),非阻塞方式獲取channel事件,沒有channel事件時可能返回爲0。這裏用非阻塞方式是爲了儘快獲取鏈接事件,而後處理鏈接事件和內部任務。*/
      switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        case SelectStrategy.CONTINUE:
          continue;
        case SelectStrategy.SELECT:
          select(wakenUp.getAndSet(false));
          if (wakenUp.get()) {
            selector.wakeup();
          }
        default:
      }

      cancelledKeys = 0;
      needsToSelectAgain = false;
      /* ioRatio調節鏈接事件和內部任務執行事件百分比
       * ioRatio越大,鏈接事件處理佔用百分比越大 */
      final int ioRatio = this.ioRatio;
      if (ioRatio == 100) {
        try {
          processSelectedKeys();
        } finally {
          runAllTasks();
        }
      } else {
        final long ioStartTime = System.nanoTime();
        try {
          processSelectedKeys();
        } finally {
          final long ioTime = System.nanoTime() - ioStartTime;
          runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
      }
    }
}

從代碼上,在執行select()前有一個hasTasks()的操做,這個hasTasks()方法判斷當前taskQueue是否有元素。若是taskQueue中有元素,執行 selectNow() 方法,最終執行selector.selectNow(),該方法會當即返回,保證了EventLoop在有任務執行時不會由於IO事件遲遲不來形成延後處理,這裏優先處理IO事件,而後再處理任務。this

若是當前taskQueue沒有任務時,就會執行select(wakenUp.getAndSet(false))方法,代碼以下:線程

/* 這個方法解決了Nio中臭名昭著的bug:selector的select方法致使空輪詢 cpu100% */
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
 
        /* delayNanos(currentTimeNanos):計算延遲任務隊列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),默認返回1s。每一個SingleThreadEventExecutor都持有一個延遲執行任務的優先隊列PriorityQueue,啓動線程時,往隊列中加入一個任務。*/
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            /* 若是延遲任務隊列中第一個任務的最晚還能延遲執行的時間小於500000納秒,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執行次數和標識是否執行過selector.selectNow()),則執行selector.selectNow()方法並當即返回。*/
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
             
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
 
            // 超時阻塞select
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            System.out.println(selectCnt);
 
            // 有事件到來 | 被喚醒 | 有內部任務 | 有定時任務時,會返回
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
 
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // 阻塞超時後沒有事件到來,重置selectCnt
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // Selector重建
                rebuildSelector();
                selector = this.selector;
                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
    } catch (CancelledKeyException e) {
        // Harmless exception - log anyway
    }
}

當java NIO bug觸發時,進行Selector重建,rebuildSelector過程以下:

  1. 經過方法openSelector建立一個新的selector。
  2. 將old selector的selectionKey執行cancel。
  3. 將old selector的channel從新註冊到新的selector中。

Netty的鏈接處理就是IO事件的處理,IO事件包括讀事件、ACCEPT事件、寫事件和OP_CONNECT事件:

  • ACCEPT事件:鏈接創建好以後將該鏈接的channel註冊到workGroup中某個NIOEventLoop的selector中;
  • READ事件:從channel中讀取數據,存放到byteBuf中,觸發後續的ChannelHandler來處理數據;
  • WRITE事件:正常狀況下通常是不會註冊寫事件的,若是Socket發送緩衝區中沒有空閒內存時,在寫入會致使阻塞,此時能夠註冊寫事件,當有空閒內存(或者可用字節數大於等於其低水位標記)時,再響應寫事件,並觸發對應回調。
  • CONNECT事件:該事件是client觸發的,由主動創建鏈接這一側觸發的。

任務處理機制

任務處理也就是處理內部任務,這裏也包括延時任務,延時任務到時後會移動到taskQueue而後被執行。任務處理是在IO事件處理以後進行的,IO事件和內部任務執行時間百分比能夠經過ioRatio來調節,ioRatio表示執行IO時間所佔百分比。

/* timeoutNanos:任務執行花費最長耗時/
protected boolean runAllTasks(long timeoutNanos) {
    // 把scheduledTaskQueue中已經超過延遲執行時間的任務移到taskQueue中等待被執行。
    fetchFromScheduledTaskQueue();
 
    // 非阻塞方式pollTask
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
 
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 執行task
        safeExecute(task);
        runTasks ++;
        // 依次從taskQueue任務task執行,每執行64個任務,進行耗時檢查。
        // 若是已執行時間超過預先設定的執行時間,則中止執行非IO任務,避免非IO任務太多,影響IO任務的執行。
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
 
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

注意,任務的處理過程當中有個執行必定量任務後的執行時間耗時檢查動做,這裏是爲了不任務的處理時間過長,影響Netty網絡IO的處理效率,畢竟Netty是要處理大量網絡IO的。

對於NioEventLoop實現的思考

EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理網絡IO請求和內部任務,這裏的selector和taskQueue是線程內部的。

Netty的BossGroup和WorkerGroup可能包含多個EventLoop,BossGroup接收到請求以後輪詢交給WorkerGroup中的其中一個線程(對應一個NioEventLoop)來處理,也就是鏈接之間的處理是線程獨立的,這也就是NioEventLoop流程的無鎖化設計。

從EventLoop「無鎖化」設計和常見的鎖機制對比來看,要實現線程併發安全,有兩種實現策略:

  • 數據隔離:數據隔離就是數據產生後就提交給不一樣的線程來處理,線程內部通常有一個數據容器來保存待處理的數據,這裏的提交動做須要保證是安全的,好比Netty的BossGroup將創建好的鏈接註冊到WorkerGroup時,是由內核來保證線程安全的(好比Linux就是經過epoll_ctl方法,該方法是線程安全的);
  • 數據分配:數據產生後統一放在數據容器中,由數據消費線程本身來獲取數據進行處理,這裏的獲取動做須要保證是安全的,通常經過鎖機制來保護,好比Java線程池中線程從阻塞隊列中獲取任務進行執行,就是由阻塞隊列保證線程安全。

對於數據隔離和數據分配來講,兩者都有優缺點及適用場景。對於數據隔離來講,通常「鎖」交互少成本較低,而且其隔離性較好,線程內部若是有新數據產生還繼續由該線程來處理,可是可能形成數據負載不均衡;對於數據分配來講,「鎖」交互較多,可是因爲數據處理線程都是從同一數據容器消費數據,因此不會出現數據處理負載不均衡問題。

若是想實現相似EventLoop中單個線程對應一個處理隊列的方案,可使用只配置一個線程的Java線程池,達到相似的實現效果。

推薦閱讀

歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。

img

相關文章
相關標籤/搜索