當面試官問線程池時,你應該知道些什麼?

Java面試中,線程池也算是一個高頻的問題,其實就JDK源碼來看線程池這一塊的實現代碼應該算是寫的清晰易懂的,經過這篇文章,咱們就來盤點一下線程池的知識點。java

本文基於JDK1.8源碼進行分析面試

首先看下線程池構造函數:併發

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { //忽略賦值與校驗邏輯 } 

構造參數比較多,一個一個說下:函數

  • corePoolSize線程池中的核心線程數
  • maximumPoolSize線程池中的最大線程數
  • keepAliveTime線程池中的線程存活時間(準確來講應該是沒有任務執行時的回收時間,後面會分析)
  • unit時間單位
  • workQueue來不及執行的任務存放的阻塞隊列
  • threadFactory新建woker線程(注意不是咱們提交的任務)是進行一些屬性設置,好比線程名,優先級等等,有默認實現。
  • handler 任務拒絕策略,當運行線程數已達到maximumPoolSize,隊列也已經裝滿時會調用該參數拒絕任務,有默認實現。

當咱們向線程池提交任務時,一般使用execute方法,接下來就先從該方法開始分析。
在分析execute代碼以前,須要先說明下,咱們都知道線程池是維護了一批線程來處理用戶提交的任務,達到線程複用的目的,線程池維護的這批線程被封裝成了Workerthis

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //JDK8的源碼中,線程池自己的狀態跟worker數量使用同一個變量ctl來維護 int c = ctl.get(); //經過位運算得出固然線程池中的worker數量與構造參數corePoolSize進行比較 if (workerCountOf(c) < corePoolSize) { //若是小於corePoolSize,則直接新增一個worker,並把固然用戶提交的任務command做爲參數,若是成功則返回。 if (addWorker(command, true)) return; //若是失敗,則獲取最新的線程池數據 c = ctl.get(); } //若是線程池仍在運行,則把任務放到阻塞隊列中等待執行。 if (isRunning(c) && workQueue.offer(command)) { //這裏的recheck思路是爲了處理併發問題 int recheck = ctl.get(); //當任務成功放入隊列時,若是recheck發現線程池已經再也不運行了則從隊列中把任務刪除 if (! isRunning(recheck) && remove(command)) //刪除成功之後,會調用構造參數傳入的拒絕策略。 reject(command); //若是worker的數量爲0(此時隊列中可能有任務沒有執行),則新建一個worker(因爲此時新建woker的目的是執行隊列中堆積的任務, //所以入參沒有執行任務,詳細邏輯後面會詳細分析addWorker方法)。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //若是前面的新增woker,放入隊列都失敗,則會繼續新增worker,此時線程池的狀態是woker數量達到corePoolSize,阻塞隊列任務已滿 //只能基於maximumPoolSize參數新建woker else if (!addWorker(command, false)) //若是基於maximumPoolSize新建woker失敗,此時是線程池中線程數已達到上限,隊列已滿,則調用構造參數中傳入的拒絕策略 reject(command); } 

源碼裏我增長了不少註釋,須要多讀幾遍才能徹底理解,總結一下用戶向線程池提交任務之後,線程池的執行邏輯:spa

  • 若是當前woker數量小於corePoolSize,則新建一個woker並把當前任務分配給該woker線程,成功則返回。
  • 若是第一步失敗,則嘗試把任務放入阻塞隊列,若是成功則返回。
  • 若是第二步失敗,則判斷若是當前woker數量小於maximumPoolSize,則新建一個woker並把當前任務分配給該woker線程,成功則返回。
  • 若是第三步失敗,則調用拒絕策略處理該任務。

從execute的源碼能夠看出addWorker方法是重中之重,立刻來看下它的實現。
addWorker方法:線程

private boolean addWorker(Runnable firstTask, boolean core) { //這裏有一段基於CAS+死循環實現的關於線程池狀態,線程數量的校驗與更新邏輯就先忽略了,重點看主流程。 //... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //把指定任務做爲參數新建一個worker線程 w = new Worker(firstTask); //這裏是重點,咋一看,必定覺得w.thread就是咱們傳入的firstTask //實際上是經過線程池構造函數參數threadFactory生成的woker對象 //也就是說這個變量t就是表明woker線程。絕對不是用戶提交的線程任務firstTask!!! final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //加鎖以後仍舊是判斷線程池狀態等一些校驗邏輯。 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //把新建的woker線程放入集合保存,這裏使用的是HashSet workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //而後啓動woker線程 //這裏再強調一遍上面說的邏輯,該變量t表明woker線程,也就是會調用woker的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //若是woker啓動失敗,則進行一些善後工做,好比說修改當前woker數量等等 addWorkerFailed(w); } return workerStarted; } 

addWorker方法主要作的工做就是新建一個Woker線程,加入到woker集合中,而後啓動該線程,那麼接下來的重點就是Woker類的run方法了。code

worker執行方法:對象

//Woker類實現了Runnable接口 public void run() { runWorker(this); } //最終woker執行邏輯走到了這裏 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //task就是Woker構造函數入參指定的任務,即用戶提交的任務 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //通常狀況下,task都不會爲空(特殊狀況上面註釋中也說明了),所以會直接進入循環體中 //這裏getTask方法是要重點說明的,它的實現跟咱們構造參數設置存活時間有關 //咱們都知道構造參數設置的時間表明瞭線程池中的線程,即woker線程的存活時間,若是到期則回收woker線程,這個邏輯的實現就在getTask中。 //來不及執行的任務,線程池會放入一個阻塞隊列,getTask方法就是去阻塞隊列中取任務,用戶設置的存活時間,就是 //從這個阻塞隊列中取任務等待的最大時間,若是getTask返回null,意思就是woker等待了指定時間仍然沒有 //取到任務,此時就會跳過循環體,進入woker線程的銷燬邏輯。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //該方法是個空的實現,若是有須要用戶能夠本身繼承該類進行實現 beforeExecute(wt, task); Throwable thrown = null; try { //真正的任務執行邏輯 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //該方法是個空的實現,若是有須要用戶能夠本身繼承該類進行實現 afterExecute(task, thrown); } } finally { //這裏設爲null,也就是循環體再執行的時候會調用getTask方法 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //當指定任務執行完成,阻塞隊列中也取不到可執行任務時,會進入這裏,作一些善後工做,好比在corePoolSize跟maximumPoolSize之間的woker會進行回收 processWorkerExit(w, completedAbruptly); } } 

woker線程的執行流程就是首先執行初始化時分配給的任務,執行完成之後會嘗試從阻塞隊列中獲取可執行的任務,若是指定時間內仍然沒有任務能夠執行,則進入銷燬邏輯。
注:這裏只會回收corePoolSize與maximumPoolSize直接的那部分woker繼承

理解了整個線程池的運行原理之後,再來看下JDK默認提供的線程池類型就會一目瞭然了:

public static ExecutorService newFixedThreadPool(int nThreads) { //corePoolSize跟maximumPoolSize值同樣,同時傳入一個無界阻塞隊列 //根據上面分析的woker回收邏輯,該線程池的線程會維持在指定線程數,不會進行回收 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 
public static ExecutorService newSingleThreadExecutor() { //線程池中只有一個線程進行任務執行,其餘的都放入阻塞隊列 //外面包裝的FinalizableDelegatedExecutorService類實現了finalize方法,在JVM垃圾回收的時候會關閉線程池 return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 
public static ExecutorService newCachedThreadPool() { //這個線程池corePoolSize爲0,maximumPoolSize爲Integer.MAX_VALUE,意思也就是說來一個任務就建立一個woker,回收時間是60s return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 

最後再說說初始化線程池時線程數的選擇:

  • 若是任務是IO密集型,通常線程數須要設置2倍CPU數以上,以此來儘可能利用CPU資源。
  • 若是任務是CPU密集型,通常線程數量只須要設置CPU數加1便可,更多的線程數也只能增長上下文切換,不能增長CPU利用率。

上述只是一個基本思想,若是真的須要精確的控制,仍是須要上線之後觀察線程池中線程數量跟隊列的狀況來定。

做者:凌風郎少 連接:https://www.jianshu.com/p/5df6e38e4362 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索