Netty 源碼解析(五): Netty 的線程池分析

今天是猿燈塔「365篇原創計劃」第五篇。 接下來的時間燈塔君持續更新Netty系列一共九篇面試

Netty 源碼解析(一): 開始數組

Netty 源碼解析(二): Netty 的 Channel安全

Netty 源碼解析(三): Netty的 Future 和 Promise微信

Netty 源碼解析(四): Netty 的 ChannelPipelineapp

當前:Netty 源碼解析(五): Netty 的線程池分析異步

Netty 源碼解析(六): Channel 的 register 操做ide

Netty 源碼解析(七): NioEventLoop 工做流程函數

Netty 源碼解析(八): 回到 Channel 的 register 操做oop

Netty 源碼解析(九): connect 過程和 bind 過程分析優化

今天呢!燈塔君跟你們講:

Netty 的線程池分析

Netty 中的線程池 EventLoopGroup

接下來,咱們來分析 Netty 中的線程池。Netty 中的線程池比較很差理解,由於它的類比較多,並且它們之間的關係錯綜複雜。看下圖,感覺下 NioEventLoop 類和 NioEventLoopGroup 類的繼承結構:

這張圖我按照繼承關係整理而來,你們仔細看一下就會發現,涉及到的類確實挺多的。本節來給你們理理清楚這部份內容。

首先,咱們說的 Netty 的線程池,指的就是 NioEventLoopGroup 的實例;線程池中的單個線程,指的是右邊 NioEventLoop 的實例。

回顧下咱們第一節介紹的 Echo 例子,客戶端和服務端的啓動代碼中,最開始咱們老是先實例化 NioEventLoopGroup:

// EchoClient 代碼最開始:

EventLoopGroup group = new NioEventLoopGroup();

// EchoServer 代碼最開始:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

下面,咱們就從 NioEventLoopGroup 的源碼開始進行分析。 咱們打開 NioEventLoopGroup 的源碼,能夠看到,NioEventLoopGroup 有多個構造方法用於參數設置,最簡單地,咱們採用無參構造函數,或僅僅設置線程數量就能夠了,其餘的參數採用默認值。

好比上面的代碼中,咱們只在實例化 bossGroup 的時候指定了參數,表明該線程池須要一個線程。
publicNioEventLoopGroup(){
this(0);
}
publicNioEventLoopGroup(intnThreads){
this(nThreads,(Executor)null);
}
...
//參數最全的構造方法
publicNioEventLoopGroup(intnThreads,Executorexecutor,EventExecutorChooserFactorychooserFactory,
finalSelectorProviderselectorProvider,
finalSelectStrategyFactoryselectStrategyFactory,
finalRejectedExecutionHandlerrejectedExecutionHandler){
//調用父類的構造方法
super(nThreads,executor,chooserFactory,selectorProvider,selectStrategyFactory,rejectedExecutionHandler);
}
複製代碼

咱們來稍微看一下構造方法中的各個參數:

  • nThreads:這個最簡單,就是線程池中的線程數,也就是 NioEventLoop 的實例數量。
  • executor:咱們知道,咱們自己就是要構造一個線程池(Executor),爲何這裏傳一個 executor 實例呢?它其實不是給線程池用的,而是給 NioEventLoop 用的,之後再說。
  • chooserFactory:當咱們提交一個任務到線程池的時候,線程池須要選擇(choose)其中的一個線程來執行這個任務,這個就是用來實現選擇策略的。
  • selectorProvider:這個簡單,咱們須要經過它來實例化 JDK 的 Selector,能夠看到每一個線程池都持有一個 selectorProvider 實例。
  • selectStrategyFactory:這個涉及到的是線程池中線程的工做流程,在介紹 NioEventLoop 的時候會說。
  • rejectedExecutionHandler:這個也是線程池的好朋友了,用於處理線程池中沒有可用的線程來執行任務的狀況。在 Netty 中稍微有一點點不同,這個是給 NioEventLoop 實例用的,之後咱們再詳細介紹。

這裏介紹這些參數是但願你們有個印象而已,你們發現沒有,在構造 NioEventLoopGroup 實例時的好幾個參數,都是用來構造 NioEventLoop 用的。下面,咱們從 NioEventLoopGroup 的無參構造方法開始,跟着源碼走:

publicNioEventLoopGroup(){
this(0);
}

publicNioEventLoopGroup(){
this(0);
}

複製代碼

而後一步步走下去,到這個構造方法:

publicNioEventLoopGroup(intnThreads,ThreadFactorythreadFactory,finalSelectorProviderselectorProvider,finalSelectStrategyFactoryselectStrategyFactory){

super(nThreads,threadFactory,selectorProvider,selectStrategyFactory,RejectedExecutionHandlers.reject());
}複製代碼

你們本身要去跟一下源碼,這樣才知道中間設置了哪些默認值,下面這幾個參數都被設置了默認值:

  • selectorProvider = SelectorProvider.provider() 這個沒什麼好說的,調用了 JDK 提供的方法
  • selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE 這個涉及到的是線程在作 select 操做和執行任務過程當中的策略選擇問題,在介紹 NioEventLoop 的時候會用到。
  • rejectedExecutionHandler = RejectedExecutionHandlers.reject() 你們進去看一下 reject() 方法,也就是說,Netty 選擇的默認拒絕策略是:拋出異常

跟着源碼走,咱們會來到父類 MultithreadEventLoopGroup 的構造方法中:

protectedMultithreadEventLoopGroup(intnThreads,ThreadFactorythreadFactory,Object...args){
super(nThreads==0?DEFAULT_EVENT_LOOP_THREADS:nThreads,threadFactory,args);
}複製代碼

這裏咱們發現,若是採用無參構造函數,那麼到這裏的時候,默認地 nThreads 會被設置爲 CPU 核心數 *2。你們能夠看下 DEFAULT_EVENT_LOOP_THREADS 的默認值,以及 static 代碼塊的設值邏輯。咱們繼續往下走:

protectedMultithreadEventExecutorGroup(intnThreads,ThreadFactorythreadFactory,Object...args){
this(nThreads,threadFactory==null?null:newThreadPerTaskExecutor(threadFactory),args);複製代碼

到這一步的時候,new ThreadPerTaskExecutor(threadFactory) 會構造一個 executor。

咱們如今還不知道這個 executor 怎麼用。這裏咱們先看下它的源碼: public final class ThreadPerTaskExecutor implements Executor { private final`ThreadFactorythreadFactory; public T hreadPerTaskExecutor(T hreadFactorythreadFactory) {` `i`f(`threadFactory==null){ th row` n`ewNul`l`PointerException("threadFactory"); } this .threa dFacto ry=threadFactory; } @Override` p`u`bl`ic void e`x`ecute(`R`unna`b`lecommand) { //爲每一個任務新建一個線 程 threadFactory.n ewThread(command).start(); } } Executor 做爲線程池的 接`口, 咱們知道,它只有一個 execute(runnable) 方法,從上面咱們能夠看到,實現類 ThreadPerTaskExecutor 的邏輯就是每來一個任務,新建一個線程。咱們先記住這個,前面也說了,它是給 NioEventLoop 用的,不是給 NioEventLoopGroup 用的。

.一步設置完了 executor,咱們繼續往下看:

protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,Object...args){
this(nThreads,executor,DefaultEventExecutorChooserFactory.INSTANCE,args);
}
這一步設置了 chooserFactory,用來實現從線程池中選擇一個線程的選擇策略。
複製代碼
ChooserFactory 的邏輯比較簡單,咱們看下 DefaultEventExecutorChooserFactory 的實現: Override public`EventExecutorChooserne wChooser(EventExecutor[]executors) { if (is PowerOfTwo(executors.length)){ ret`ur`n newP`o`werOfTwoEventExecutorChooser(executors); }else { retu rn` n`ewGene`r`icEventExecutorChooser(executors); } } 這裏設置 很簡單:一、若是線程池的線程數量是 2^n,採用下面的方式會高效一些:@Override publicEv entExec utornext() { return ex`e`c`u`tors[idx.getAndIncrement()&executors.length-1]; } 二、若是不是,用取模的方式 @ Override publicEventExec utornex t() { returnexecuto rs`\[`M`a`th.abs(idx.getAndIncrement()%executors.length)]; }`

走了這麼久,咱們終於到了一個幹實事的構造方法中了:

protectedMultithreadEventExecutorGroup(intnThreads,Executorexecutor,
EventExecutorChooserFactorychooserFactory,Object...args){
if(nThreads<=0){
thrownewIllegalArgumentException(String.format("nThreads:%d(expected:>0)",nThreads));
}

// executor 若是是 null,作一次和前面同樣的默認設置。
if(executor==null){
executor=newThreadPerTaskExecutor(newDefaultThreadFactory());
}

//這裏的children數組很是重要,它就是線程池中的線程數組,這麼說不太嚴謹,可是就大概這個意思
children=newEventExecutor[nThreads];

//下面這個for循環將實例化children數組中的每個元素
for(inti=0;i<nThreads;i++){
booleansuccess=false;
try{
//實例化!!!!!!
children[i]=newChild(executor,args);
success=true;
}catch(Exceptione){
//TODO:Thinkaboutifthisisagoodexceptiontype
thrownewIllegalStateException("failedtocreateachildeventloop",e);
}finally{
//若是有一個child實例化失敗,那麼success就會爲false,而後進入下面的失敗處理邏輯
if(!success){
//把已經成功實例化的「線程」shutdown,shutdown是異步操做
for(intj=0;j<i;j++){
children[j].shutdownGracefully();
}

//等待這些線程成功shutdown
for(intj=0;j<i;j++){
EventExecutore=children[j];
try{
while(!e.isTerminated()){
e.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS);
}
}catch(InterruptedExceptioninterrupted){
//把中斷狀態設置回去,交給關心的線程來處理.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//================================================
//===到這裏,就是表明上面的實例化全部線程已經成功結束===
//================================================

//經過以前設置的chooserFactory來實例化Chooser,把線程池數組傳進去,
//這就沒必要再說了吧,實現線程選擇策略
chooser=chooserFactory.newChooser(children);

//設置一個Listener用來監聽該線程池的termination事件
//下面的代碼邏輯是:給池中每個線程都設置這個 listener,當監聽到全部線程都 terminate 之後,這個線程池就算真正的 terminate 了。
finalFutureListener<Object>terminationListener=newFutureListener<Object>(){
@Override
publicvoidoperationComplete(Future<Object>future)throwsException{
if(terminatedChildren.incrementAndGet()==children.length){
terminationFuture.setSuccess(null);
}
}
};
for(EventExecutore:children){
e.terminationFuture().addListener(terminationListener);
}

//設置readonlyChildren,它是隻讀集合,之後用到再說
Set<EventExecutor>childrenSet=newLinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet,children);
readonlyChildren=Collections.unmodifiableSet(childrenSet);
}複製代碼

上面的代碼很是簡單吧,沒有什麼須要特別說的,接下來,咱們來看看 newChild() 這個方法,這個方法很是重要,它將建立線程池中的線程。

我上面已經用過不少次"線程"這個詞了,它可不是 Thread 的意思,而是指池中的個體,後面咱們會看到每一個"線程"在何時會真正建立 Thread 實例。反正每一個 NioEventLoop 實例內部都會有一個本身的 Thread 實例,因此把這兩個概念混在一塊兒也無所謂吧。

newChild(…) 方法在 NioEventLoopGroup 中覆寫了,上面說的"線程"其實就是 NioEventLoop:

@Override
protectedEventLoopnewChild(Executorexecutor,Object...args)throwsException{
returnnewNioEventLoop(this,executor,(SelectorProvider)args[0],
((SelectStrategyFactory)args[1]).newSelectStrategy(),(RejectedExecutionHandler)args[2]);
}

它調用了 NioEventLoop 的構造方法:複製代碼
NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider,
SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){
//調用父類構造器
super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler);
if(selectorProvider==null){
thrownewNullPointerException("selectorProvider");
}
if(strategy==null){
thrownewNullPointerException("selectStrategy");
}
provider=selectorProvider;
//開啓 NIO 中最重要的組件:Selector
finalSelectorTupleselectorTuple=openSelector();
selector=selectorTuple.selector;
unwrappedSelector=selectorTuple.unwrappedSelector;
selectStrategy=strategy;
}複製代碼

咱們先粗略觀察一下,而後再往下看:

  • 在 Netty 中,NioEventLoopGroup 表明線程池,NioEventLoop 就是其中的線程。
  • 線程池 NioEventLoopGroup 是池中的線程 NioEventLoop 的 parent,從上面的代碼中的取名能夠看出。
  • 每一個 NioEventLoop 都有本身的 Selector,上面的代碼也反應了這一點,這和 Tomcat 中的 NIO 模型有點區別。
  • executor、selectStrategy 和 rejectedExecutionHandler 從 NioEventLoopGroup 中一路傳到了 NioEventLoop 中。

這個時候,咱們來看一下 NioEventLoop 類的屬性都有哪些,咱們先忽略它繼承自父類的屬性,單單看它本身的:

privateSelectorselector;
privateSelectorunwrappedSelector;
privateSelectedSelectionKeySetselectedKeys;
privatefinalSelectorProviderprovider;
privatefinalAtomicBooleanwakenUp=newAtomicBoolean();
privatefinalSelectStrategyselectStrategy;
privatevolatileintioRatio=50;
privateintcancelledKeys;
privatebooleanneedsToSelectAgain;複製代碼

結合它的構造方法咱們來總結一下:

  • provider:它由 NioEventLoopGroup 傳進來,前面咱們說了一個線程池有一個 selectorProvider,用於建立 Selector 實例
  • selector:雖然咱們還沒看建立 selector 的代碼,但咱們已經知道,在 Netty 中 Selector 是跟着線程池中的線程走的。也就是說,並不是一個線程池一個 Selector 實例,而是線程池中每個線程都有一個 Selector 實例。 在無參構造過程當中,咱們發現,Netty 設置線程個數是 CPU 核心數的兩倍,假設咱們的機器 CPU 是 4 核,那麼對應的就會有 8 個 Selector 實例。
  • selectStrategy:select 操做的策略,這個不急。
  • ioRatio:這是 IO 任務的執行時間比例,由於每一個線程既有 IO 任務執行,也有非 IO 任務須要執行,因此該參數爲了保證有足夠時間是給 IO 的。這裏也不須要急着去理解什麼 IO 任務、什麼非 IO 任務。

而後咱們繼續走它的構造方法,咱們看到上面的構造方法調用了父類的構造器,它的父類是 SingleThreadEventLoop。

protectedSingleThreadEventLoop(EventLoopGroupparent,Executorexecutor,
booleanaddTaskWakesUp,intmaxPendingTasks,
RejectedExecutionHandlerrejectedExecutionHandler){
super(parent,executor,addTaskWakesUp,maxPendingTasks,rejectedExecutionHandler);

//咱們能夠直接忽略這個東西,之後咱們也不會再介紹它
tailTasks=newTaskQueue(maxPendingTasks);
}

複製代碼

SingleThreadEventLoop 這個名字很詭異有沒有?而後它的構造方法又調用了父類 SingleThreadEventExecutor 的構造方法:

protectedSingleThreadEventExecutor(EventExecutorGroupparent,Executorexecutor,
booleanaddTaskWakesUp,intmaxPendingTasks,
RejectedExecutionHandlerrejectedHandler){
super(parent);
this.addTaskWakesUp=addTaskWakesUp;
this.maxPendingTasks=Math.max(16,maxPendingTasks);
this.executor=ObjectUtil.checkNotNull(executor,"executor");
//taskQueue,這個東西很重要,提交給NioEventLoop的任務都會進入到這個taskQueue中等待被執行
//這個queue的默認容量是16
taskQueue=newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler=ObjectUtil.checkNotNull(rejectedHandler,"rejectedHandler");
}複製代碼

到這裏就更加詭異了,NioEventLoop 的父類是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父類是 SingleThreadEventExecutor,它的名字告訴咱們,它是一個 Executor,是一個線程池,並且是 Single Thread 單線程的。也就是說,線程池 NioEventLoopGroup 中的每個線程 NioEventLoop 也能夠當作一個線程池來用,只不過它只有一個線程。這種設計雖然看上去很巧妙,不過有點反人類的樣子。上面這個構造函數比較簡單:

  • 設置了 parent,也就是以前建立的線程池 NioEventLoopGroup 實例
  • executor:它是咱們以前實例化的 ThreadPerTaskExecutor,咱們說過,這個東西在線程池中沒有用,它是給 NioEventLoop 用的,立刻咱們就要看到它了。提早透露一下,它用來開啓 NioEventLoop 中的線程(Thread 實例)。
  • taskQueue:這算是該構造方法中新的東西,它是任務隊列。咱們前面說過,NioEventLoop 須要負責 IO 事件和非 IO 事件,一般它都在執行 selector 的 select 方法或者正在處理 selectedKeys,若是咱們要 submit 一個任務給它,任務就會被放到 taskQueue 中,等它來輪詢。該隊列是線程安全的 LinkedBlockingQueue,默認容量爲 16。
  • rejectedExecutionHandler:taskQueue 的默認容量是 16,因此,若是 submit 的任務堆積了到了 16,再往裏面提交任務會觸發 rejectedExecutionHandler 的執行策略。
還記得默認策略嗎:拋出RejectedExecutionException 異常。 在 NioEventLoopGroup 的默認構造中,它的實現是這樣的: private static final`RejectedExecutionHandlerREJECT=newR ejectedExecutionHandler(){ @Ove rride publ ic void rejec ted(Runna bletask,SingleThreadEventExecutorexecutor) { throw` `n`ew`Rejec`t`edExecutionException(); } };`

而後,咱們再回到 NioEventLoop 的構造方法:

NioEventLoop(NioEventLoopGroupparent,Executorexecutor,SelectorProviderselectorProvider,
SelectStrategystrategy,RejectedExecutionHandlerrejectedExecutionHandler){
//咱們剛剛說完了這個
super(parent,executor,false,DEFAULT_MAX_PENDING_TASKS,rejectedExecutionHandler);
if(selectorProvider==null){
thrownewNullPointerException("selectorProvider");
}
if(strategy==null){
thrownewNullPointerException("selectStrategy");
}
provider=selectorProvider;
//建立selector實例
finalSelectorTupleselectorTuple=openSelector();
selector=selectorTuple.selector;
unwrappedSelector=selectorTuple.unwrappedSelector;
selectStrategy=strategy;
}複製代碼

能夠看到,最重要的方法其實就是 openSelector() 方法,它將建立 NIO 中最重要的一個組件 Selector。在這個方法中,Netty 也作了一些優化,這部分咱們就不去分析它了。到這裏,咱們的線程池 NioEventLoopGroup 建立完成了,而且實例化了池中的全部 NioEventLoop 實例。同時,你們應該已經看到,上面並無真正建立 NioEventLoop 中的線程(沒有建立 Thread 實例)。提早透露一下,建立線程的時機在第一個任務提交過來的時候,那麼第一個任務是什麼呢?是咱們立刻要說的 channel 的 register 操做。

365天干貨不斷微信搜索「猿燈塔」第一時間閱讀,回覆【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板
相關文章
相關標籤/搜索