第7節初步學習了一下Java本來的線程池是如何工做的,以及Future的爲何可以達到其效果,這些知識對於理解本章有很大的幫助,不瞭解的能夠先看上一節。java
Netty爲何會高效?回答就是良好的線程模型,和內存管理。在Java的NIO例子中就我將客戶端的操做單獨放在一個線程中處理了,這麼作的緣由在於若是將客戶端鏈接串起來,後來的鏈接就要等前一個處理完,固然這並不意味着多線程比單線程有優點,而是在於每一個客戶端都須要進行讀取準備好的緩存數據,再執行一些業務邏輯。若是業務邏輯耗時好久,那麼順序執行的方式沒有多線程優點大。另外一個方面目前多核CPU很常見了,多線程是個不錯的選擇。這些在第一節就說明過,也提到過NIO並非提高了IO操做的速度,而是減小了CPU的浪費時間,這些概念不能搞混。數組
本節不涉及內存管理,只介紹相關的線程模型。promise
上圖就是咱們須要關注的體系內容了,主要從EventExecutorGroup開始往下看,再上層的父接口是JDK提供的併發包內的內容,基礎是線程池中能夠執行週期任務的線程池服務。因此從這咱們能夠知道Netty能夠實現週期任務,好比心跳檢測。接口定義下面將逐一介紹。緩存
isShuttingDown():是否正在關閉,或者是已經關閉。安全
shutdownGracefully():優雅停機,等待全部執行中的任務執行完成,並再也不接收新的任務。多線程
terminationFuture():返回一個該線程池管理的全部線程都terminated的時候觸發的future。併發
shutdown():廢棄了的關閉方法,被shutdownGracefully取代。app
next():返回一個被該Group管理的EventExecutor。ide
iterator():全部管理的EventExecutor的迭代器。oop
submit():提交一個線程任務。
schedule():週期執行一個任務。
上述方法基本上是對週期線程池的一個封裝,可是擴展了EventExecuotr概念,即分了若干個小組,處理事件。另一個比較實用的就是優雅停機了。
EventLoopGroup中的方法不多,其主要是和channel結合了,就多了一個將channel註冊到線程池中的方法。
EventExecutor繼承自EventExecutorGroup,這個以前也提到過該類,至關於Group中的一個子集。
next():就是找group中下一個子集
parent():就是所屬group
inEventLoop():當前線程是不是在該子集中
newXXX():這個是下一節內容,此處不介紹。
該接口就一個方法,就是parent();EventLoop和EventLoopGroup與EventExecutor和EventExecutorGroup是一組類似的概念。瞭解這些就能夠了。
EventLoop和EventLoopGroup的實現十分簡單,簡單看下就能夠了,這裏介紹幾個重要的實現類。
該類繼承自上節說過的AbstractExecutorService,其最重要的是execute方法未實現。該類是對AbstractExecutorService的一個進一步加工,添加了group的概念,和不一樣的Future建立方法。這裏不要被以前的Java線程池模型所幹擾,其不必定是線程池。回到上一節線程池的介紹,最終的樣子都是Execute方法決定的。
該類是對AbstractEventExecutor的一個進一步實現,其實現了週期任務的執行。原理是內部持有一個優先隊列ScheduledFutureTask。全部週期任務都添加到這個隊列中,也實現了取出週期任務的方法,可是該抽象類並無具體執行週期任務的實現。
該類是對AbstractScheduledEventExecutor的一個實現,其基本上是咱們最終的一個EventLoop的雛形了,不少不一樣協議的EventLoop都是基於它實現的。
雖然名字叫作單線程執行器,可是其不必定是單個線程。Executor默認使用的是ThreadPerTaskExecutor,其executor會爲每個任務建立一個線程並執行,固然你也能夠傳入本身的executor。Queue使用的是LinkedBlockingQueue,無容量限制的任務隊列。其提供了添加任務到任務隊列,從任務隊列中獲取任務的方法。
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; } protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { safeExecute(task); task = pollTaskFrom(taskQueue); if (task == null) { return true; } } }
執行過程如上:1.先獲取全部的週期任務,放入taskQueue;2.不斷的執行taskQueue中的任務;3.afterRunningAllTasks就是一個自由發揮的方法。safeExecute就是直接執行run方法。
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
上面是該Executor初始化過程,run方法又是交給子類進行初始化了。
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } if (unit == null) { throw new NullPointerException("unit"); } if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_TERMINATED); terminationFuture.tryFailure(cause); if (!(cause instanceof Exception)) { // Also rethrow as it may be an OOME for example PlatformDependent.throwException(cause); } return terminationFuture; } } if (wakeup) { wakeup(inEventLoop); } return terminationFuture(); }
上面是一個優雅停機的過程,改變該Executor的狀態成ST_SHUTTING_DOWN,這裏要注意addTask的時候只有shutdown狀態纔會拒絕,因此此時這裏的邏輯還不會拒絕新任務添加,而後返回了一個terminationFuture,這裏不作介紹。
此類繼承自上面講解的SingleThreadEventExecutor,這裏多了一個tailTask隊列,用於每次事件循環後置任務處理,暫且無論。重要的在於很早提到了register方法,將channel註冊到線程中。
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
實際上就是生成了一個DefaultChannelPromise,將channel和線程綁定,最後都放入了unsafe對象中。
上面講了一些雜亂無章的內容,這裏藉助NioEventLoop好好梳理一下整個設計流程。NioEventLoop繼承自SingleThreadEventLoop,先對以前的相關研究進行總結:
1.EventExecutorGroup接口是繼承自Java的週期任務接口,是一個事件處理器組的概念,其相關方法有:
是否正在關閉;優雅停機;獲取一個事件處理器;提交一個任務;提交一個週期任務
2.EventExecutor接口是事件處理器,其繼承自EventExecutorGroup,目的並非說每個事件處理器都是一個事件處理器組,而是爲了複用接口定義的方法。每一個處理器都應該具有優雅停機,提交任務,判斷是否關閉的方法,其它方法有:
獲取處理器組;獲取下一個處理器(覆蓋了父接口的next方法);判斷是否在事件循環內;建立Promise、ProgressivePromise、SucceededFuture、FailedFuture
3.EventLoopGroup繼承自EventExecutorGroup,更新了父接口的含義,EventExecutor的定位是處理單個事件,group就是處理事件組了。EventLoop的定位是處理一個鏈接的生命週期過程當中的週期事件,group是多個EventLoop的集合了。這裏又有一個尷尬的地方,group按照定義本不須要定義其它方法,可是因爲Server端的設計(以前說過服務端的channel也是一個線程),使用的是group,因此group必須承擔單個EventLoop的職責。最終添加了額外的方法:
獲取下一個EventLoop;註冊channel;
4.EventLoop,事件循環,其也是一個處理器,最終繼承自EventExecutor和EventLoopGroup,方法只有一個:
獲取父事件循環組EventLoopGroup。
上述接口光看名稱很容易陷入誤解,實際上定義是想將單個loop和group分離,可是實現上因爲Server端包含一個服務端監聽鏈接線程,一個客戶端鏈接線程,其Group承擔了單個的職責,因此定義了一些本該由單個執行器處理的方法,又爲了複用方法,致使loop繼承了group,這樣看起來怪怪的,接口理解起來就混亂了。結合上面的描述,再看一遍繼承圖就更清楚了:
理解了上面的設定,咱們再來看看客戶端的事件處理是如何設計的,即總結上訴抽象類作了哪些事情。
1. AbstractEventExecutor:入參就一個parent,該類完成了一個基本處理:
a.將next設置成本身(上面說過繼承的group,這個操做就和group區分開了)。
b.優雅停機調用的是帶有超時的停機方案,超時爲15秒
c.覆蓋了Java提供的newTask包裝成FutureTask的方法,使用了本身的PromiseTask
d.提供安全執行方法:safeExecute,直接調用的run方法
該類是最基礎的一個抽象類,基本做用就是與group在定義混亂上作了一個區分。提供了執行器與Future關聯方法和一個基本的執行任務的方法。
2.AbstractScheduledEventExecutor:入參也是一個parent,該類對AbstractEventExecutor未處理的週期任務提供了具體的完成方法:
a.提供計算當前距服務啓動的時間
b.提供存儲ScheduledFutureTask的優先隊列
c.提供了取消全部週期任務的方法
d.提供了獲取一個符合的週期任務的方法,要知足時間,並獲取後移除
e.提供了獲取距最近一個週期任務的時間多久
f.提供了移除一個週期任務的方法
g.提供添加週期任務的方法
該類提供了週期任務執行的一些基本方法,涉及添加週期任務,移除,獲取等方法。
3.SingleThreadEventExecutor:入參包括parent,addTaskWakesUp標誌,maxPendingTasks最大任務隊列數(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)參數更大的那個值),executor執行器(默認是ThreadPerTaskExecutor,每一個任務建立一個線程執行),taskQueue任務隊列(默認是LinkedBlockingQueue),rejectedExecutionHandler拒絕任務的處理類(默認直接拋出RejectedExecutionException)。該類主要完成了一個單線程的EventExecutor的基本操做:
a.建立一個taskQueue
b.中斷線程
c.從任務隊列中獲取一個任務,takeTask連同週期任務也會獲取
d.添加任務到任務隊列
e.移除任務隊列中指定任務
f.運行全部任務,會先將週期任務存入taskQueue,再使用safeExecute方法執行任務
g.實現了execute方法,會添加任務到任務隊列,若是當前線程不是事件循環線程,開啓一個線程。經過的就是持有的executor來開啓的線程任務。execute方法調用了run方法,該類沒有實現run方法。任務的添加都不是經過execute直接執行了,而是走的添加任務到taskQueue,由未實現的run線程來處理這些事件。
h.優雅停機
這樣就有了一個基礎的單線程模型了,開啓線程,保存,取出任務的方法都有了,只有在開啓線程中執行任務的run()方法還未實現。
4.SingleThreadEventLoop:入參和SingleThreadEventExecutor一致,不一樣的是多了一個tailTasks。該類主要是針對netty自身的事件循環的定義來實現方法了:
a.註冊channel,其實是生成了一個DefaultChannelPromise對象,持有了channel,和運行該channel的EventExecutor,而後將該對象交給最底層的unsafe處理。
b.添加一個事件週期結束後執行的尾任務tailTasks
c.執行尾任務
d.刪除指定尾任務
該類就很簡單,沒有過多的內容,只是增長了一個每一個事件週期後執行的任務而已。
回顧完了,上面4個父類構建了一個基本的帶定時任務,普通任務,事件循環後置任務的EventLoop,每一個channel綁定了一個線程執行器,經過DefaultChannelPromise持有二者,最終交給Unsafe操做。子類只須要實現run方法,處理任務隊列中的任務。下面就是重頭戲NioEventLoop這個客戶端的線程是如何設計的了:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
調用的就是父類方法,不過在建立EventLoop的時候建立了selector,這個是NIO中也提到過的。該EventLoop是在Group中newChild建立的。
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
上面是咱們須要關注的run方法,該方法被單獨的線程執行。經過一個策略來判斷執行哪一個,默認的策略是任務隊列中有任務,select執行一下,返回當前的事件數,沒任務返回SelectStrategy.SELECT。簡單的說就是有任務就補充新到來的事件執行全部的任務,沒任務就執行新到的事件。先處理IO讀寫任務,再處理其餘任務,ioRation設置的耗時比例是IO任務佔一個執行週期的百分比,默認50,意思是IO執行了50秒,其餘任務也會獲得50秒的執行時間。後續操做就是獲取全部select的key,執行全部的任務了。這裏就有一個判斷若是是停機狀態,就會closeAll(),以前說優雅停機的時候就是設置了一個這個標誌,最後是在執行任務以後判斷。processSelectedKey環節都交給unsafe類完成了,這裏就掛上了handler相關觸發,handler的執行也就說明都在該線程內了。
上面的描述雖然把整個過程都關聯上了,可是最主要的問題仍是混亂的:如何作到一個channel建立一個線程的?上面只是說明了channel和EventExecutor是綁定在DefaultChannelPromise並交給了Unsafe類,並無看到是如何建立線程的。並且另外一個問題在於,processSelectedKey是選擇了全部的key,這不是全部的channel共享了一個線程嗎?
要解決該問題要回到Bootstrap的channel創建過程:initAndRegister()方法中,經過channelFactory建立了一個channel對象,然後ChannelFuture regFuture = config().group().register(channel);主要就是觀察register方法,該類是設置的線程池NioEventLoopGroup提供的方法,其繼承的是MultithreadEventLoopGroup,是調用了next方法獲取的EventLoop,最後接上上面channel和eventloop綁定的內容。next中獲取的EventLoop早在類初始化的時候就生成了,在構造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不過是挑選了一個線程池而已,默認數量是CPU核數的2倍。這個也就是前面說的,線程數量不是越多越好。
這樣咱們明白了,客戶端註冊的時候是分配了一個線程給它。客戶端並不須要多線程,可是仍是繼續看後面的內容:AbstractUnsafe的register方法給出了相關解答。channel持有了該EventLoop,此時線程仍是未運行狀態,只是有了這麼一個對象而已。
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
這裏execute方法,這個是以前咱們講過的一個方法,其會判斷當前線程是不是該channel的線程,目前都沒有初始化線程確定不是,將任務放入任務隊列,開啓一個線程(線程處於未啓動狀態纔會開啓,不然不執行),這個設計使得execute的時候只會開啓一次線程,而全部的任務都會被放入任務隊列,由這個線程執行。再回到run方法,這個是channel線程執行的方法,目前每一個NioEventLoop都執行了Select等方法啊,這不是處理了全部的channel的工做嗎?並無達到一個channel生命週期控制在一個線程中啊。
這裏其實是JAVA NIO的例子帶來的誤解,認爲必須一個線程來使用select,而後遍歷事件分配線程給channel執行讀寫操做。實際上在Netty中不同,Netty全部線程都在執行select並獲取相關事件,可是實際上其並無執行全部的事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }
看這裏,if (eventLoop != this || eventLoop == null) ,若是channel的eventLoop不是當前的eventLoop就不執行,這個就一小段代碼,可是就直接決定了EventLoop只對本身所綁定的channel感興趣,最終達到了只處理本身相關的任務的目的。
該類沒有太多須要說明的內容,前面已經講解了不少。
1.AbstractEventExecutorGroup,實現了基本的方法,全部方法都是調用next()即挑選出一個線程執行器完成的。
2.MultithreadEventExecutorGroup,實現了一個基本的線程池,持有子線程,主要工做是初始化了子線程數組,提供了next方法。
3.MultithreadEventLoopGroup,實現了Netty的channel線程池,提供了register方法,雖然也是調用next的register方法。
4.NioEventLoopGroup,實現了建立子線程數組時newChild方法,全部的EventLoop都是這個方法建立的。
group就是兩個重點,一個next()挑選事件執行器,一個newChild()建立線程執行器對象。
本節耗費了大量的篇幅講解了Netty的線程模型的設計思路,主要看點以下:
1.解釋了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup這四者之間的關係,和這麼複雜混亂的繼承關係的緣由。
2.解釋了group是如何初始化線程,並綁定channel的,next().register()
3.解釋了eventloop爲何和channel綁定了,execute()開啓線程,以及每一個eventloop都在獲取IO事件,可是經過channel的eventloop是否等於當前過濾掉其它的事件,只處理本身綁定的channel事件。
因爲每一個線程都在獲取IO事件,因此這段邏輯變的很是複雜,這也就是我以前說的寫好很IO很困難。
最後附上一張對前面全部內容總結的一個圖,清醒一下頭腦,從複雜的代碼中脫身:
這個圖就是一個基本的執行過程圖了,可能有遺漏的地方,可是大致狀況如圖所示。