今天是猿燈塔「365篇原創計劃」第五篇。 接下來的時間燈塔君持續更新Netty系列一共九篇面試
Netty 源碼解析(一): 開始數組
Netty 源碼解析(二): Netty 的 Channel安全
Netty 源碼解析(三): Netty的 Future 和 Promise微信
Netty 源碼解析(四): Netty 的 ChannelPipelineapp
當前:Netty 源碼解析(五): Netty 的線程池分析異步
Netty 源碼解析(六): Channel 的 register 操做ide
Netty 源碼解析(七): NioEventLoop 工做流程函數
Netty 源碼解析(八): 回到 Channel 的 register 操做oop
Netty 源碼解析(九): connect 過程和 bind 過程分析優化
今天呢!燈塔君跟你們講:
Netty 的線程池分析
接下來,咱們來分析 Netty 中的線程池。Netty 中的線程池比較很差理解,由於它的類比較多,並且它們之間的關係錯綜複雜。看下圖,感覺下 NioEventLoop 類和 NioEventLoopGroup 類的繼承結構:
這張圖我按照繼承關係整理而來,你們仔細看一下就會發現,涉及到的類確實挺多的。本節來給你們理理清楚這部份內容。
首先,咱們說的 Netty 的線程池,指的就是 NioEventLoopGroup 的實例;線程池中的單個線程,指的是右邊 NioEventLoop 的實例。
回顧下咱們第一節介紹的 Echo 例子,客戶端和服務端的啓動代碼中,最開始咱們老是先實例化 NioEventLoopGroup:
// EchoClient 代碼最開始:EventLoopGroup group = new NioEventLoopGroup();
// EchoServer 代碼最開始:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
下面,咱們就從 NioEventLoopGroup 的源碼開始進行分析。 咱們打開 NioEventLoopGroup 的源碼,能夠看到,NioEventLoopGroup 有多個構造方法用於參數設置,最簡單地,咱們採用無參構造函數,或僅僅設置線程數量就能夠了,其餘的參數採用默認值。
好比上面的代碼中,咱們只在實例化 bossGroup 的時候指定了參數,表明該線程池須要一個線程。
publicNioEventLoopGroup(){ this(0); } publicNioEventLoopGroup(intnThreads){ this(nThreads,(Executor)null); } ... //參數最全的構造方法 publicNioEventLoopGroup(intnThreads,Executorexecutor,EventExecutorChooserFactorychooserFactory, finalSelectorProviderselectorProvider, finalSelectStrategyFactoryselectStrategyFactory, finalRejectedExecutionHandlerrejectedExecutionHandler){ //調用父類的構造方法 super(nThreads,executor,chooserFactory,selectorProvider,selectStrategyFactory,rejectedExecutionHandler); } 複製代碼
咱們來稍微看一下構造方法中的各個參數:
這裏介紹這些參數是但願你們有個印象而已,你們發現沒有,在構造 NioEventLoopGroup 實例時的好幾個參數,都是用來構造 NioEventLoop 用的。下面,咱們從 NioEventLoopGroup 的無參構造方法開始,跟着源碼走:
publicNioEventLoopGroup(){ this(0); } publicNioEventLoopGroup(){ this(0); } 複製代碼
而後一步步走下去,到這個構造方法:
publicNioEventLoopGroup(intnThreads,ThreadFactorythreadFactory,finalSelectorProviderselectorProvider,finalSelectStrategyFactoryselectStrategyFactory){ super(nThreads,threadFactory,selectorProvider,selectStrategyFactory,RejectedExecutionHandlers.reject()); }複製代碼
你們本身要去跟一下源碼,這樣才知道中間設置了哪些默認值,下面這幾個參數都被設置了默認值:
跟着源碼走,咱們會來到父類 MultithreadEventLoopGroup 的構造方法中:
protectedMultithreadEventLoopGroup(intnThreads,ThreadFactorythreadFactory,Object...args){ super(nThreads==0?DEFAULT_EVENT_LOOP_THREADS:nThreads,threadFactory,args); }複製代碼
這裏咱們發現,若是採用無參構造函數,那麼到這裏的時候,默認地 nThreads 會被設置爲 CPU 核心數 *2。你們能夠看下 DEFAULT_EVENT_LOOP_THREADS 的默認值,以及 static 代碼塊的設值邏輯。咱們繼續往下走:
protectedMultithreadEventExecutorGroup(intnThreads,ThreadFactorythreadFactory,Object...args){ this(nThreads,threadFactory==null?null:newThreadPerTaskExecutor(threadFactory),args);複製代碼
到這一步的時候,new ThreadPerTaskExecutor(threadFactory)
會構造一個 executor。
咱們如今還不知道這個 executor 怎麼用。這裏咱們先看下它的源碼:public
final
class
ThreadPerTaskExecutor
implements
Executor
{
private
final
`ThreadFactorythreadFactory;pu
blic Th
readPerTaskExecutor(ThreadFactorythreadFactory) {` `i`f(`th
readFactory==null){th
row` n`ewNul`l`Poi
nterException("threadFactory"); } this.
threadF
actory=threadFactory; } @Override` p`u`bl`ic void e`x`ecute(`R`unna`b`lecomma
nd) { //爲每一個任務新建一個線程 t
hreadFactory.ne
wThread(command).start(); } } Executor 做爲線程池的最
頂層
接`口, 咱們知道,它只有一個 execute(runnable) 方法,從上面咱們能夠看到,實現類 ThreadPerTaskExecutor 的邏輯就是每來一個任務,新建一個線程。咱們先記住這個,前面也說了,它是給 NioEventLoop 用的,不是給 NioEventLoopGroup 用的。
.一步設置完了 executor,咱們繼續往下看:
protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,Object...args){ this(nThreads,executor,DefaultEventExecutorChooserFactory.INSTANCE,args); } 這一步設置了 chooserFactory,用來實現從線程池中選擇一個線程的選擇策略。 複製代碼
ChooserFactory 的邏輯比較簡單,咱們看下 DefaultEventExecutorChooserFactory 的實現:Override
public
`EventExecutorChoosernewChooser(E
ventExecutor[]executors) {i
f(
isPowerOfTwo(executors.length)){ ret`ur`n newP`o`wer
OfTwoEventExecutorChooser(executors); }else{
return` n`ewGene`r`icE
ventExecutorChooser(executors); } } 這裏設置的
策略
也很簡單:一、若是線程池的線程數量是 2^n,採用下面的方式會高效一些:@Override
publicEve
ntExecutornext() { re
turnex`e`c`u`tors[i
dx.getAndIncrement()&executors.length-1]; } 二、若是不是,用取模的方式:
@Override public
EventExecu
tornext() { returnexe
cutors`\[`M`a`th.abs
(idx.getAndIncrement()%executors.length)]; }`
走了這麼久,咱們終於到了一個幹實事的構造方法中了:
protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor, EventExecutorChooserFactorychooserFactory,Object...args){ if(nThreads<=0){ thrownewIllegalArgumentException(String.format("nThreads:%d(expected:>0)",nThreads)); } // executor 若是是 null,作一次和前面同樣的默認設置。 if(executor==null){ executor=newThreadPerTaskExecutor(newDefaultThreadFactory()); } //這裏的children數組很是重要,它就是線程池中的線程數組,這麼說不太嚴謹,可是就大概這個意思 children=newEventExecutor[nThreads]; //下面這個for循環將實例化children數組中的每個元素 for(inti=0;i<nThreads;i++){ booleansuccess=false; try{ //實例化!!!!!! children[i]=newChild(executor,args); success=true; }catch(Exceptione){ //TODO:Thinkaboutifthisisagoodexceptiontype thrownewIllegalStateException("failedtocreateachildeventloop",e); }finally{ //若是有一個child實例化失敗,那麼success就會爲false,而後進入下面的失敗處理邏輯 if(!success){ //把已經成功實例化的「線程」shutdown,shutdown是異步操做 for(intj=0;j<i;j++){ children[j].shutdownGracefully(); } //等待這些線程成功shutdown for(intj=0;j<i;j++){ EventExecutore=children[j]; try{ while(!e.isTerminated()){ e.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS); } }catch(InterruptedExceptioninterrupted){ //把中斷狀態設置回去,交給關心的線程來處理. Thread.currentThread().interrupt(); break; } } } } } //================================================ //===到這裏,就是表明上面的實例化全部線程已經成功結束=== //================================================ //經過以前設置的chooserFactory來實例化Chooser,把線程池數組傳進去, //這就沒必要再說了吧,實現線程選擇策略 chooser=chooserFactory.newChooser(children); //設置一個Listener用來監聽該線程池的termination事件 //下面的代碼邏輯是:給池中每個線程都設置這個 listener,當監聽到全部線程都 terminate 之後,這個線程池就算真正的 terminate 了。 finalFutureListener<Object>terminationListener=newFutureListener<Object>(){ @Override publicvoidoperationComplete(Future<Object>future)throwsException{ if(terminatedChildren.incrementAndGet()==children.length){ terminationFuture.setSuccess(null); } } }; for(EventExecutore:children){ e.terminationFuture().addListener(terminationListener); } //設置readonlyChildren,它是隻讀集合,之後用到再說 Set<EventExecutor>childrenSet=newLinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet,children); readonlyChildren=Collections.unmodifiableSet(childrenSet); }複製代碼
上面的代碼很是簡單吧,沒有什麼須要特別說的,接下來,咱們來看看 newChild() 這個方法,這個方法很是重要,它將建立線程池中的線程。
我上面已經用過不少次"線程"這個詞了,它可不是 Thread 的意思,而是指池中的個體,後面咱們會看到每一個"線程"在何時會真正建立 Thread 實例。反正每一個 NioEventLoop 實例內部都會有一個本身的 Thread 實例,因此把這兩個概念混在一塊兒也無所謂吧。
newChild(…)
方法在 NioEventLoopGroup 中覆寫了,上面說的"線程"其實就是 NioEventLoop:
@Override protectedEventLoopnewChild(Executorexecutor,Object...args)throwsException{ returnnewNioEventLoop(this,executor,(SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(),(RejectedExecutionHandler)args[2]); } 它調用了 NioEventLoop 的構造方法:複製代碼
NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider, SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){ //調用父類構造器 super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler); if(selectorProvider==null){ thrownewNullPointerException("selectorProvider"); } if(strategy==null){ thrownewNullPointerException("selectStrategy"); } provider=selectorProvider; //開啓 NIO 中最重要的組件:Selector finalSelectorTupleselectorTuple=openSelector(); selector=selectorTuple.selector; unwrappedSelector=selectorTuple.unwrappedSelector; selectStrategy=strategy; }複製代碼
咱們先粗略觀察一下,而後再往下看:
這個時候,咱們來看一下 NioEventLoop 類的屬性都有哪些,咱們先忽略它繼承自父類的屬性,單單看它本身的:
privateSelectorselector; privateSelectorunwrappedSelector; privateSelectedSelectionKeySetselectedKeys; privatefinalSelectorProviderprovider; privatefinalAtomicBooleanwakenUp=newAtomicBoolean(); privatefinalSelectStrategyselectStrategy; privatevolatileintioRatio=50; privateintcancelledKeys; privatebooleanneedsToSelectAgain;複製代碼
結合它的構造方法咱們來總結一下:
而後咱們繼續走它的構造方法,咱們看到上面的構造方法調用了父類的構造器,它的父類是 SingleThreadEventLoop。
protectedSingleThreadEventLoop(EventLoopGroupparent,Executorexecutor, booleanaddTaskWakesUp,intmaxPendingTasks, RejectedExecutionHandlerrejectedExecutionHandler){ super(parent,executor,addTaskWakesUp,maxPendingTasks,rejectedExecutionHandler); //咱們能夠直接忽略這個東西,之後咱們也不會再介紹它 tailTasks=newTaskQueue(maxPendingTasks); } 複製代碼
SingleThreadEventLoop 這個名字很詭異有沒有?而後它的構造方法又調用了父類 SingleThreadEventExecutor 的構造方法:
protectedSingleThreadEventExecutor(EventExecutorGroupparent,Executorexecutor, booleanaddTaskWakesUp,intmaxPendingTasks, RejectedExecutionHandlerrejectedHandler){ super(parent); this.addTaskWakesUp=addTaskWakesUp; this.maxPendingTasks=Math.max(16,maxPendingTasks); this.executor=ObjectUtil.checkNotNull(executor,"executor"); //taskQueue,這個東西很重要,提交給NioEventLoop的任務都會進入到這個taskQueue中等待被執行 //這個queue的默認容量是16 taskQueue=newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler=ObjectUtil.checkNotNull(rejectedHandler,"rejectedHandler"); }複製代碼
到這裏就更加詭異了,NioEventLoop 的父類是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父類是 SingleThreadEventExecutor,它的名字告訴咱們,它是一個 Executor,是一個線程池,並且是 Single Thread 單線程的。也就是說,線程池 NioEventLoopGroup 中的每個線程 NioEventLoop 也能夠當作一個線程池來用,只不過它只有一個線程。這種設計雖然看上去很巧妙,不過有點反人類的樣子。上面這個構造函數比較簡單:
還記得默認策略嗎:拋出RejectedExecutionException 異常。 在 NioEventLoopGroup 的默認構造中,它的實現是這樣的:private
static
final
`RejectedExecutionHandlerREJECT=newReje
ctedExecutionHandler(){ @Overr
ide public
voidr
eject
ed(Runnabletask,SingleThreadEventExecutorexecutor) { throw` `n`ew`Rejec`t`edE
xecutionException(); } };`
而後,咱們再回到 NioEventLoop 的構造方法:
NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider, SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){ //咱們剛剛說完了這個 super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler); if(selectorProvider==null){ thrownewNullPointerException("selectorProvider"); } if(strategy==null){ thrownewNullPointerException("selectStrategy"); } provider=selectorProvider; //建立selector實例 finalSelectorTupleselectorTuple=openSelector(); selector=selectorTuple.selector; unwrappedSelector=selectorTuple.unwrappedSelector; selectStrategy=strategy; }複製代碼
能夠看到,最重要的方法其實就是 openSelector() 方法,它將建立 NIO 中最重要的一個組件 Selector。在這個方法中,Netty 也作了一些優化,這部分咱們就不去分析它了。到這裏,咱們的線程池 NioEventLoopGroup 建立完成了,而且實例化了池中的全部 NioEventLoop 實例。同時,你們應該已經看到,上面並無真正建立 NioEventLoop 中的線程(沒有建立 Thread 實例)。提早透露一下,建立線程的時機在第一個任務提交過來的時候,那麼第一個任務是什麼呢?是咱們立刻要說的 channel 的 register 操做。
365天干貨不斷微信搜索「猿燈塔」第一時間閱讀,回覆【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板