歡迎關注公衆號:【 愛編程】
若是有須要後臺回覆 2019贈送 1T的學習資料哦!!
Netty框架的主要線程就是I/O線程,線程模型的設計決定了系統的吞吐量、併發性和安全性等架構質量屬性。因此瞭解一下NioEventLoop。java
基本上全部的網絡處理程序都有如下基本的處理過程:
Read request
Decode request
Process service
Encode reply
Send replygit
這是最簡單的單Reactor線程模型,它負責多路分離套接字,Accept新鏈接,並分派請求處處理器鏈中。該模型適用於處理器鏈中業務處理組件能快速完成的場景。但這種模型並不能充分利用多核資源,實際使用少。github
相比上一種模型,該模型在處理器鏈部分採用了多線程(線程池),也就是後端程序常見的模型。但Reactor仍爲單個線程。編程
主從Reactor多線程:多個acceptor的NIO線程池用於接受客戶端的鏈接。將Reactor分紅兩部分,mainReactor負責監聽Server socket,accpet新鏈接,並將簡歷的socket分派給subReactor。subReactor負責多路分離已鏈接的socket,讀寫網絡數據,將業務處理功能扔給worker線程池完成。一般subReactor個數上與CPU個數等同。後端
以上就是對Reactor線程模型的學習。更加詳細能夠參考Doug Lea大神的PPT
http://gee.cs.oswego.edu/dl/c...數組
netty的線程模型是能夠經過設置啓動類的參數來配置的,設置不一樣的啓動參數,netty支持Reactor單線程模型、多線程模型和主從Reactor多線程模型。 安全
Boss線程池職責以下:
(1)接收客戶端的鏈接,初始化Channel參數
(2)將鏈路狀態變動時間通知給ChannelPipeline網絡
worker線程池做用是:
(1)異步讀取通訊對端的數據報,發送讀事件到ChannelPipeline
(2)異步發送消息到通訊對端,調用ChannelPipeline的消息發送接口
(3)執行系統調用Task;
(4)執行定時任務Task;數據結構
經過配置boss和worker線程池的線程個數以及是否共享線程池等方式,netty的線程模型能夠在單線程、多線程、主從線程之間切換。多線程
爲了提高性能,netty在不少地方都進行了無鎖設計。好比在IO線程內部進行串行操做,避免多線程競爭形成的性能問題。表面上彷佛串行化設計彷佛CPU利用率不高,可是經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖串行線程設計性能更優。
基於Netty4.1.36
問題:
1.默認狀況下,netty服務端起多少線程?什麼時候啓動?
2.Netty是如何解決jdk空輪詢bug的?
3.Netty如何保證異步串行無鎖化?
大體來講,從new NioEventLoopGroup()入手,而後到MultithreadEventLoopGroup的構造中明確的寫明瞭默認爲CPU的2倍的線程,接着new ThreadPerTaskExecutor()[線程建立器],而後就是一個死循環newChild()構造NioEventLoop,最後就是newChooser()[線程選擇器]爲後面的啓動和執行作準備。
NioEventLoop啓動從客戶端bind()入手,而後跟蹤到doBind0(),接着到SingleThreadEventExecutor中execute(),該方法主要是添加任務addTask(task)和運行線程startThread(),而後在startThread()-->doStartThread()-->SingleThreadEventExecutor.this.run();開始執行NioEventLoop運行邏輯。
NioEventLoop啓動後主要的工做
1.select() -- 檢測IO事件,輪詢註冊到selector上面的io事件
2.processSelectedKeys() -- 處理io事件
3.runAllTasks() -- 處理外部線程扔到TaskQueue裏面的任務
1.select() -- 檢測IO事件
檢測IO事件主要有三個部分:
deadline以及任務穿插邏輯處理:
計算本次執行select截止時間(根據NioEventLoop當時是否有定時任務處理)以及判斷在select的時候是否有任務要處理。
阻塞式select:
未到截止時間或者任務隊列爲空進行一次阻塞式select操做
避免JDK空輪詢的Bug:
判斷此次select操做是否阻塞timeoutMillis時間,未阻塞timeoutMillis時間表示觸發JDK空輪詢;判斷觸發JDK空輪詢的次數是否超過閾值,達到閾值調用rebuildSelector()方法替換原來的selector操做方式避免下次JDK空輪詢繼續發生
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { /** 1.deadline以及任務穿插邏輯處理-- 開始**/ long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } /** 1.deadline以及任務穿插邏輯處理-- 結束**/ /**2.阻塞select--開始**/ int selectedKeys = selector.select(timeoutMillis); selectCnt ++; /**2.阻塞select--結束**/ if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } /**3.避免jdk空輪詢的bug -- 開始 **/ long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The code exists in an extra method to ensure the method is not too big to inline as this // branch is not very likely to get hit very frequently. selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; } currentTimeNanos = time; } /**3.避免jdk空輪詢的bug -- 結束**/ if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
2. processSelectedKeys()-- 處理IO事件
selected keySet優化
select操做每次把已就緒狀態的io事件添加到底層HashSet(時間複雜度爲O(n))數據結構,經過反射方式將HashSet替換成數組的實現.
NioEventLoop.openSelector()
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
processSelectedKeysOptimized()
遍歷SelectionKey數組獲取SelectionKey的attachment即NioChannel;
SelectionKey合法獲取SelectionKey的io事件進行事件處理
NioEventLoop.processSelectedKeysOptimized()
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
3. runAllTasks()
Task的分類和添加
MpscQueue建立NioEventLoop構造,外部線程使用addTask()方法添加task;
ScheduledTaskQueue調用schedule()封裝ScheduledFutureTask添加到普通任務隊列
普通任務Task
SingleThreadEventExecutor.execute()-->addTask()
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } }
定時任務Task
將線程外的任務是經過加入隊列實現,從而保證了線程安全。
AbstractScheduledEventExecutor.schedule() -->ScheduledFuture
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
任務的聚合
將定時任務隊列任務聚合到普通任務隊列
SingleThreadEventExecutor.fetchFromScheduledTaskQueue()
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } scheduledTask = pollScheduledTask(nanoTime); } return true; }
ScheduledFutureTask中能夠看到任務Task是先按照截止時間排序,而後按照id進行排序的。
public int compareTo(Delayed o) { if (this == o) { return 0; } ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1; } else if (d > 0) { return 1; } else if (id < that.id) { return -1; } else if (id == that.id) { throw new Error(); } else { return 1; } }
任務的執行
獲取普通任務隊列待執行任務,使用safeExecute()方法執行任務,每次當累計任務數量達到64判斷當前時間是否超過截止時間中斷執行後續任務
NioEventLoop.runAllTasks()
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
主要學習了NioEventLoop的基本知識,若是有更多知識歡迎各位分享,我仍是個小菜鳥。
若是對 Java、大數據感興趣請長按二維碼關注一波,我會努力帶給大家價值。以爲對你哪怕有一丁點幫助的請幫忙點個贊或者轉發哦。
關注公衆號【愛編碼】,回覆2019有相關資料哦。