線程池源碼也是面試常常被提問到的點,我會將全局源碼作一分析,而後告訴你面試考啥,怎麼答。java
簡潔的答兩點就行。面試
下降系統資源消耗。緩存
提升線程可控性。安全
1.建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。網絡
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
2.(JDK8新增)會根據所需的併發數來動態建立和關閉線程。可以合理的使用CPU進行對任務進行併發操做,因此適合使用在很耗時的任務。併發
注意返回的是ForkJoinPool對象。框架
1 public static ExecutorService newWorkStealingPool(int parallelism) { 2 return new ForkJoinPool 3 (parallelism, 4 ForkJoinPool.defaultForkJoinWorkerThreadFactory, 5 null, true); 6 }
什麼是ForkJoinPool:async
1 public ForkJoinPool(int parallelism, 2 ForkJoinWorkerThreadFactory factory, 3 UncaughtExceptionHandler handler, 4 boolean asyncMode) { 5 this(checkParallelism(parallelism), 6 checkFactory(factory), 7 handler, 8 asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 9 "ForkJoinPool-" + nextPoolId() + "-worker-"); 10 checkPermission(); 11 }
使用一個無限隊列來保存須要執行的任務,能夠傳入線程的數量;不傳入,則默認使用當前計算機中可用的cpu數量;使用分治法來解決問題,使用fork()和join()來進行調用。oop
3.建立一個可緩存的線程池,可靈活回收空閒線程,若無可回收,則新建線程。源碼分析
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
4.建立一個單線程的線程池。
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
5.建立一個定長線程池,支持定時及週期性任務執行。
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2 return new ScheduledThreadPoolExecutor(corePoolSize); 3 }
Executor結構:
一個運行新任務的簡單接口
擴展了Executor接口。添加了一些用來管理執行器生命週期和任務生命週期的方法
對ExecutorService接口的抽象類實現。不是咱們分析的重點。
Java線程池的核心實現。
1 // AtomicInteger是原子類 ctlOf()返回值爲RUNNING; 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 // 高3位表示線程狀態 4 private static final int COUNT_BITS = Integer.SIZE - 3; 5 // 低29位表示workerCount容量 6 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 7 8 // runState is stored in the high-order bits 9 // 能接收任務且能處理阻塞隊列中的任務 10 private static final int RUNNING = -1 << COUNT_BITS; 11 // 不能接收新任務,但能夠處理隊列中的任務。 12 private static final int SHUTDOWN = 0 << COUNT_BITS; 13 // 不接收新任務,不處理隊列任務。 14 private static final int STOP = 1 << COUNT_BITS; 15 // 全部任務都終止 16 private static final int TIDYING = 2 << COUNT_BITS; 17 // 什麼都不作 18 private static final int TERMINATED = 3 << COUNT_BITS; 19 20 // 存聽任務的阻塞隊列 21 private final BlockingQueue<Runnable> workQueue;
值的注意的是狀態值越大線程越不活躍。
1 public ThreadPoolExecutor(int corePoolSize,//線程池初始啓動時線程的數量 2 int maximumPoolSize,//最大線程數量 3 long keepAliveTime,//空閒線程多久關閉? 4 TimeUnit unit,// 計時單位 5 BlockingQueue<Runnable> workQueue,//聽任務的阻塞隊列 6 ThreadFactory threadFactory,//線程工廠 7 RejectedExecutionHandler handler// 拒絕策略) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.acc = System.getSecurityManager() == null ? 16 null : 17 AccessController.getContext(); 18 this.corePoolSize = corePoolSize; 19 this.maximumPoolSize = maximumPoolSize; 20 this.workQueue = workQueue; 21 this.keepAliveTime = unit.toNanos(keepAliveTime); 22 this.threadFactory = threadFactory; 23 this.handler = handler; 24 }
在向線程池提交任務時,會經過兩個方法:execute和submit。
本文着重講解execute方法。submit方法放在下次和Future、Callable一塊兒分析。
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 // clt記錄着runState和workerCount 5 int c = ctl.get(); 6 //workerCountOf方法取出低29位的值,表示當前活動的線程數 7 //而後拿線程數和 核心線程數作比較 8 if (workerCountOf(c) < corePoolSize) { 9 // 若是活動線程數<核心線程數 10 // 添加到 11 //addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷 12 if (addWorker(command, true)) 13 // 若是成功則返回 14 return; 15 // 若是失敗則從新獲取 runState和 workerCount 16 c = ctl.get(); 17 } 18 // 若是當前線程池是運行狀態而且任務添加到隊列成功 19 if (isRunning(c) && workQueue.offer(command)) { 20 // 從新獲取 runState和 workerCount 21 int recheck = ctl.get(); 22 // 若是不是運行狀態而且 23 if (! isRunning(recheck) && remove(command)) 24 reject(command); 25 else if (workerCountOf(recheck) == 0) 26 //第一個參數爲null,表示在線程池中建立一個線程,但不去啓動 27 // 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize 28 addWorker(null, false); 29 } 30 //再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize 31 else if (!addWorker(command, false)) 32 //若是失敗則拒絕該任務 33 reject(command); 34 }
總結一下它的工做流程:
當workerCount < corePoolSize
,建立線程執行任務。
當workerCount >= corePoolSize
&&阻塞隊列workQueue
未滿,把新的任務放入阻塞隊列。
當workQueue
已滿,而且workerCount >= corePoolSize
,而且workerCount < maximumPoolSize
,建立線程執行任務。
當workQueue已滿,workerCount >= maximumPoolSize
,採起拒絕策略,默認拒絕策略是直接拋異常。
經過上面的execute方法能夠看到,最主要的邏輯仍是在addWorker方法中實現的,那咱們就看下這個方法:
主要工做是在線程池中建立一個新的線程並執行
參數定義:
firstTask
the task the new thread should run first (or null if none). (指定新增線程執行的第一個任務或者不執行任務)
core
if true use corePoolSize as bound, else maximumPoolSize.(core若是爲true則使用corePoolSize綁定,不然爲maximumPoolSize。 (此處使用布爾指示符而不是值,以確保在檢查其餘狀態後讀取新值)。)
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 5 int c = ctl.get(); 6 // 獲取運行狀態 7 int rs = runStateOf(c); 8 9 // Check if queue empty only if necessary. 10 // 若是狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務) 11 // 而且 若是 !(rs爲SHUTDOWN 且 firsTask爲空 且 阻塞隊列不爲空) 12 if (rs >= SHUTDOWN && 13 ! (rs == SHUTDOWN && 14 firstTask == null && 15 ! workQueue.isEmpty())) 16 // 返回false 17 return false; 18 19 for (;;) { 20 //獲取線程數wc 21 int wc = workerCountOf(c); 22 // 若是wc大與容量 || core若是爲true表示根據corePoolSize來比較,不然爲maximumPoolSize 23 if (wc >= CAPACITY || 24 wc >= (core ? corePoolSize : maximumPoolSize)) 25 return false; 26 // 增長workerCount(原子操做) 27 if (compareAndIncrementWorkerCount(c)) 28 // 若是增長成功,則跳出 29 break retry; 30 // wc增長失敗,則再次獲取runState 31 c = ctl.get(); // Re-read ctl 32 // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回從新執行 33 if (runStateOf(c) != rs) 34 continue retry; 35 // else CAS failed due to workerCount change; retry inner loop 36 } 37 } 38 39 boolean workerStarted = false; 40 boolean workerAdded = false; 41 Worker w = null; 42 try { 43 // 根據firstTask來建立Worker對象 44 w = new Worker(firstTask); 45 // 根據worker建立一個線程 46 final Thread t = w.thread; 47 if (t != null) { 48 // new一個鎖 49 final ReentrantLock mainLock = this.mainLock; 50 // 加鎖 51 mainLock.lock(); 52 try { 53 // Recheck while holding lock. 54 // Back out on ThreadFactory failure or if 55 // shut down before lock acquired. 56 // 獲取runState 57 int rs = runStateOf(ctl.get()); 58 // 若是rs小於SHUTDOWN(處於運行)或者(rs=SHUTDOWN && firstTask == null) 59 // firstTask == null證實只新建線程而不執行任務 60 if (rs < SHUTDOWN || 61 (rs == SHUTDOWN && firstTask == null)) { 62 // 若是t活着就拋異常 63 if (t.isAlive()) // precheck that t is startable 64 throw new IllegalThreadStateException(); 65 // 不然加入worker(HashSet) 66 //workers包含池中的全部工做線程。僅在持有mainLock時訪問。 67 workers.add(w); 68 // 獲取工做線程數量 69 int s = workers.size(); 70 //largestPoolSize記錄着線程池中出現過的最大線程數量 71 if (s > largestPoolSize) 72 // 若是 s比它還要大,則將s賦值給它 73 largestPoolSize = s; 74 // worker的添加工做狀態改成true 75 workerAdded = true; 76 } 77 } finally { 78 mainLock.unlock(); 79 } 80 // 若是worker的添加工做完成 81 if (workerAdded) { 82 // 啓動線程 83 t.start(); 84 // 修改線程啓動狀態 85 workerStarted = true; 86 } 87 } 88 } finally { 89 if (! workerStarted) 90 addWorkerFailed(w); 91 } 92 // 返回線啓動狀態 93 return workerStarted;
由於workers是HashSet類型的,不能保證線程安全。
那w = new Worker(firstTask);
如何理解呢
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable
能夠看到它繼承了AQS併發框架還實現了Runnable。證實它仍是一個線程任務類。那咱們調用t.start()事實上就是調用了該類重寫的run方法。
tryAcquire方法它是不容許重入的,而ReentrantLock是容許重入的。對於線程來講,若是線程正在執行是不容許其它鎖重入進來的。
線程只須要兩個狀態,一個是獨佔鎖,代表正在執行任務;一個是不加鎖,代表是空閒狀態。
1 public void run() { 2 runWorker(this); 3 }
run方法又調用了runWorker方法:
1 final void runWorker(Worker w) { 2 // 拿到當前線程 3 Thread wt = Thread.currentThread(); 4 // 拿到當前任務 5 Runnable task = w.firstTask; 6 // 將Worker.firstTask置空 而且釋放鎖 7 w.firstTask = null; 8 w.unlock(); // allow interrupts 9 boolean completedAbruptly = true; 10 try { 11 // 若是task或者getTask不爲空,則一直循環 12 while (task != null || (task = getTask()) != null) { 13 // 加鎖 14 w.lock(); 15 // If pool is stopping, ensure thread is interrupted; 16 // if not, ensure thread is not interrupted. This 17 // requires a recheck in second case to deal with 18 // shutdownNow race while clearing interrupt 19 // return ctl.get() >= stop 20 // 若是線程池狀態>=STOP 或者 (線程中斷且線程池狀態>=STOP)且當前線程沒有中斷 21 // 其實就是保證兩點: 22 // 1. 線程池沒有中止 23 // 2. 保證線程沒有中斷 24 if ((runStateAtLeast(ctl.get(), STOP) || 25 (Thread.interrupted() && 26 runStateAtLeast(ctl.get(), STOP))) && 27 !wt.isInterrupted()) 28 // 中斷當前線程 29 wt.interrupt(); 30 try { 31 // 空方法 32 beforeExecute(wt, task); 33 Throwable thrown = null; 34 try { 35 // 執行run方法(Runable對象) 36 task.run(); 37 } catch (RuntimeException x) { 38 thrown = x; throw x; 39 } catch (Error x) { 40 thrown = x; throw x; 41 } catch (Throwable x) { 42 thrown = x; throw new Error(x); 43 } finally { 44 afterExecute(task, thrown); 45 } 46 } finally { 47 // 執行完後, 將task置空, 完成任務++, 釋放鎖 48 task = null; 49 w.completedTasks++; 50 w.unlock(); 51 } 52 } 53 completedAbruptly = false; 54 } finally { 55 // 退出工做 56 processWorkerExit(w, completedAbruptly); 57 }
總結一下runWorker方法的執行過程:
while循環中,不斷地經過getTask()方法從workerQueue中獲取任務
若是線程池正在中止,則中斷線程。不然調用3.
調用task.run()執行任務;
若是task爲null則跳出循環,執行processWorkerExit()方法,銷燬線程workers.remove(w);
這個流程圖很是經典:
除此以外,ThreadPoolExector
還提供了tryAcquire
、tryRelease
、shutdown
、shutdownNow
、tryTerminate
、等涉及的一系列線程狀態更改的方法有興趣能夠本身研究。大致思路是同樣的,這裏不作介紹。
tryAcquire方法它是不容許重入的,而ReentrantLock是容許重入的。對於線程來講,若是線程正在執行是不容許其它鎖重入進來的。
線程只須要兩個狀態,一個是獨佔鎖,代表正在執行任務;一個是不加鎖,代表是空閒狀態。
shutdown方法與getTask方法存在競態條件.(這裏不作深刻,建議本身深刻研究,對它比較熟悉的面試官通常會問)
建立線程池的五個方法。
線程池的五個狀態
execute執行過程。
runWorker執行過程。(把兩個流程圖記下,理解後說個大該就行。)
比較深刻的問題就是我在文中插入的問題。
…指望你們能在評論區補充。
聲明:圖片來源於網絡,侵刪。