本文主要討論Netty NioEventLoop原理及實踐,關於Netty NioEventLoop,首先要知道NioEventLoop是什麼,爲何它會是Netty核心Reactor處理器,實現原理是什麼,進而再討論Netty對其的實現及使用上咱們能夠學到哪些。java
EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理客戶端請求和內部任務,內部任務如ServerSocketChannel註冊、ServerSocket綁定和延時任務處理等操做。安全
EventLoop是由事件驅動的,好比IO事件和任務等,IO事件即selectionKey中ready的事件,如accept、connect、read、write
等,由processSelectedKeys方法觸發。處理完請求時間以後,會處理內部添加到taskQueue中的任務,如register0、bind0
等任務,由runAllTasks方法觸發。注意NioEventLoop在Linux中默認底層是基於epoll機制。網絡
上圖是EventLoop的核心流程圖,若是從Netty總體視角看EventLoop的事件流轉,下圖來的更直觀:併發
注意:bossGroup和WorkerGroup中的NioEventLoop流程是一致的,只不過前者處理Accept事件以後將鏈接註冊到後者,由後者處理該鏈接上後續的讀寫事件。less
大體瞭解了NioEventLoop以後,不知道有沒有小夥伴有這樣的疑問,爲何Netty要這樣實現呢,這種實現方案對於咱們後續開發如何借鑑呢?關於這些疑問,本文最後討論哈 :)oop
EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理IO事件和內部任務。IO事件和內部任務執行時間百分比經過ioRatio來調節,ioRatio表示執行IO時間所佔百分比。任務包括普通任務和已經到時的延遲任務,延遲任務存放到一個優先級隊列PriorityQueue中,執行任務前從PriorityQueue讀取全部到時的task,而後添加到taskQueue中,最後統一執行task。fetch
EventLoop是由事件驅動的,好比IO事件即selectionKey中ready的事件,如accept、connect、read、write
等,處理的核心邏輯主要是在NioEventLoop.run
方法中,流程以下:ui
protected void run() { for (;;) { /* 若是hasTasks,則調用selector.selectNow(),非阻塞方式獲取channel事件,沒有channel事件時可能返回爲0。這裏用非阻塞方式是爲了儘快獲取鏈接事件,而後處理鏈接事件和內部任務。*/ switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; /* ioRatio調節鏈接事件和內部任務執行事件百分比 * ioRatio越大,鏈接事件處理佔用百分比越大 */ final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } }
從代碼上,在執行select()
前有一個hasTasks()
的操做,這個hasTasks()
方法判斷當前taskQueue是否有元素。若是taskQueue中有元素,執行 selectNow() 方法,最終執行selector.selectNow()
,該方法會當即返回,保證了EventLoop在有任務執行時不會由於IO事件遲遲不來形成延後處理,這裏優先處理IO事件,而後再處理任務。this
若是當前taskQueue沒有任務時,就會執行select(wakenUp.getAndSet(false))
方法,代碼以下:線程
/* 這個方法解決了Nio中臭名昭著的bug:selector的select方法致使空輪詢 cpu100% */ private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); /* delayNanos(currentTimeNanos):計算延遲任務隊列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),默認返回1s。每一個SingleThreadEventExecutor都持有一個延遲執行任務的優先隊列PriorityQueue,啓動線程時,往隊列中加入一個任務。*/ long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { /* 若是延遲任務隊列中第一個任務的最晚還能延遲執行的時間小於500000納秒,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執行次數和標識是否執行過selector.selectNow()),則執行selector.selectNow()方法並當即返回。*/ long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } // 超時阻塞select int selectedKeys = selector.select(timeoutMillis); selectCnt ++; System.out.println(selectCnt); // 有事件到來 | 被喚醒 | 有內部任務 | 有定時任務時,會返回 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // 阻塞超時後沒有事件到來,重置selectCnt selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // Selector重建 rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } } catch (CancelledKeyException e) { // Harmless exception - log anyway } }
當java NIO bug觸發時,進行Selector重建,rebuildSelector過程以下:
Netty的鏈接處理就是IO事件的處理,IO事件包括讀事件、ACCEPT事件、寫事件和OP_CONNECT事件:
任務處理也就是處理內部任務,這裏也包括延時任務,延時任務到時後會移動到taskQueue而後被執行。任務處理是在IO事件處理以後進行的,IO事件和內部任務執行時間百分比能夠經過ioRatio來調節,ioRatio表示執行IO時間所佔百分比。
/* timeoutNanos:任務執行花費最長耗時/ protected boolean runAllTasks(long timeoutNanos) { // 把scheduledTaskQueue中已經超過延遲執行時間的任務移到taskQueue中等待被執行。 fetchFromScheduledTaskQueue(); // 非阻塞方式pollTask Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { // 執行task safeExecute(task); runTasks ++; // 依次從taskQueue任務task執行,每執行64個任務,進行耗時檢查。 // 若是已執行時間超過預先設定的執行時間,則中止執行非IO任務,避免非IO任務太多,影響IO任務的執行。 if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
注意,任務的處理過程當中有個執行必定量任務後的執行時間耗時檢查動做,這裏是爲了不任務的處理時間過長,影響Netty網絡IO的處理效率,畢竟Netty是要處理大量網絡IO的。
EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理網絡IO請求和內部任務,這裏的selector和taskQueue是線程內部的。
Netty的BossGroup和WorkerGroup可能包含多個EventLoop,BossGroup接收到請求以後輪詢交給WorkerGroup中的其中一個線程(對應一個NioEventLoop)來處理,也就是鏈接之間的處理是線程獨立的,這也就是NioEventLoop流程的無鎖化設計。
從EventLoop「無鎖化」設計和常見的鎖機制對比來看,要實現線程併發安全,有兩種實現策略:
對於數據隔離和數據分配來講,兩者都有優缺點及適用場景。對於數據隔離來講,通常「鎖」交互少成本較低,而且其隔離性較好,線程內部若是有新數據產生還繼續由該線程來處理,可是可能形成數據負載不均衡;對於數據分配來講,「鎖」交互較多,可是因爲數據處理線程都是從同一數據容器消費數據,因此不會出現數據處理負載不均衡問題。
若是想實現相似EventLoop中單個線程對應一個處理隊列的方案,可使用只配置一個線程的Java線程池,達到相似的實現效果。
推薦閱讀
歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。