在Java面試中,線程池相關知識,雖不能說是必問提,但出現的頻次也是很是高的。同時又鑑於公衆號「程序新視界」的讀者後臺留言讓寫一篇關於Java線程池的文章,因而就有本篇內容,本篇將基於Java線程池的原理、實現以及相關源碼進行講解等。java
線程池是一種多線程處理形式,處理過程當中將任務提交到線程池,任務的執行交由線程池來管理。面試
爲了充分利用CPU多核資源,應用都會採用多線程並行/併發計算,最大限度的利用多核提高應用程序性能。spring
試想一下,若是每一個請求都執行一遍建立線程、執行任務、銷燬線程,那麼對服務器資源將是一種浪費。在高併發的狀況下,甚至會耗盡服務器資源。apache
線程池的主要做用有兩個:不一樣請求之間重複利用線程,無需頻繁的建立和銷燬線程,下降系統開銷和控制線程數量上限,避免建立過多的線程耗盡進程內存空間,同時減小線程上下文切換次數。數組
在JDK5版本中增長了內置線程池實現ThreadPoolExecutor,同時提供了Executors來建立不一樣類型的線程池。Executors中提供瞭如下常見的線程池建立方法:緩存
雖然在JDK中提供Executors類來支持以上類型的線程池建立,但一般狀況下不建議開發人員直接使用(見《阿里巴巴java開發規範》)。服務器
線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。微信
Executors部分方法的弊端:多線程
同時,阿里巴巴java開發規範中推薦了3種線程池建立方式。併發
方式一,引入commons-lang3包。
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());複製代碼
方式二,引入com.google.guava包。
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown複製代碼
方式三,spring配置線程池方式:自定義線程工廠bean須要實現ThreadFactory,可參考該接口的其它默認實現類,使用方式直接注入bean,調用execute(Runnable task)方法便可。
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
// in code
userThreadPool.execute(thread);複製代碼
除了以上推薦的建立線程池的方法,還能夠經過ThreadPoolExecutor的構造方法,直接建立線程池。本質上來說,以上方法最終也是建立了ThreadPoolExecutor對象,而後堆積進行包裝處理。
ThreadPoolExecutor提供了多個構造方法,咱們最終都調用的構造方法來進行說明。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 省略代碼
}複製代碼
核心參數做用解析以下:
構造方法的中最後的參數RejectedExecutionHandler用於指定線程池的拒絕策略。當請求任務不斷的過來,而系統此時又處理不過來的時候,咱們就須要採起對應的策略是拒絕服務。
默認有四種類型:
固然,除了默認的4種策略以外,還能夠根據業務需求自定義拒絕策略。經過實現RejectedExecutionHandler接口,在建立ThreadPoolExecutor對象時做爲參數傳入便可。
在spring-integration-core中便自定義了CallerBlocksPolicy,相關代碼以下:
public class CallerBlocksPolicy implements RejectedExecutionHandler {
private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
private final long maxWait;
public CallerBlocksPolicy(long maxWait) {
this.maxWait = maxWait;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
BlockingQueue<Runnable> queue = executor.getQueue();
if (logger.isDebugEnabled()) {
logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
}
if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Max wait time expired to queue task");
} else {
if (logger.isDebugEnabled()) {
logger.debug("Task execution queued");
}
}
} catch (InterruptedException var4) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Interrupted", var4);
}
} else {
throw new RejectedExecutionException("Executor has been shut down");
}
}
}複製代碼
建立完成ThreadPoolExecutor以後,當向線程池提交任務時,一般使用execute方法。execute方法的執行流程圖以下:
下面看一下JDK8中ThreadPoolExecutor中execute方法的源代碼實現:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 線程池自己的狀態跟worker數量使用同一個變量ctl來維護
int c = ctl.get();
// 經過位運算得出固然線程池中的worker數量與構造參數corePoolSize進行比較
if (workerCountOf(c) < corePoolSize) {
// 若是小於corePoolSize,則直接新增一個worker,並把固然用戶提交的任務command做爲參數,若是成功則返回。
if (addWorker(command, true))
return;
// 若是失敗,則獲取最新的線程池數據
c = ctl.get();
}
// 若是線程池仍在運行,則把任務放到阻塞隊列中等待執行。
if (isRunning(c) && workQueue.offer(command)) {
// 這裏的recheck思路是爲了處理併發問題
int recheck = ctl.get();
// 當任務成功放入隊列時,若是recheck發現線程池已經再也不運行了則從隊列中把任務刪除
if (! isRunning(recheck) && remove(command))
//刪除成功之後,會調用構造參數傳入的拒絕策略。
reject(command);
// 若是worker的數量爲0(此時隊列中可能有任務沒有執行),則新建一個worker(因爲此時新建woker的目的是執行隊列中堆積的任務,
// 所以入參沒有執行任務,詳細邏輯後面會詳細分析addWorker方法)。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 若是前面的新增woker,放入隊列都失敗,則會繼續新增worker,此時線程池的狀態是woker數量達到corePoolSize,阻塞隊列任務已滿
// 只能基於maximumPoolSize參數新建woker
else if (!addWorker(command, false))
// 若是基於maximumPoolSize新建woker失敗,此時是線程池中線程數已達到上限,隊列已滿,則調用構造參數中傳入的拒絕策略
reject(command);
}複製代碼
下面再看在上述代碼中調用的addWorker方法的源代碼實現及解析:
private boolean addWorker(Runnable firstTask, boolean core) {
// 這裏有一段基於CAS+死循環實現的關於線程池狀態,線程數量的校驗與更新邏輯就先忽略了,重點看主流程。
//...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 把指定任務做爲參數新建一個worker線程
w = new Worker(firstTask);
// 這裏是重點w.thread是經過線程池構造函數參數threadFactory生成的woker對象
// 也就是說這個變量t就是表明woker線程。絕對不是用戶提交的線程任務firstTask。
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 加鎖以後仍舊是判斷線程池狀態等一些校驗邏輯。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
// 把新建的woker線程放入集合保存,這裏使用的是HashSet
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 而後啓動woker線程
// 該變量t表明woker線程,會調用woker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 若是woker啓動失敗,則進行一些善後工做,好比說修改當前woker數量等
addWorkerFailed(w);
}
return workerStarted;
}複製代碼
addWorker方法主要作的工做就是新建一個Woker線程,加入到woker集合中。在上述方法中會調用到Worker類的run方法,並最終執行了runWorker方法。
// Woker類實現了Runnable接口
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// task就是Woker構造函數入參指定的任務,即用戶提交的任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//通常狀況下,task都不會爲空(特殊狀況上面註釋中也說明了),所以會直接進入循環體中
//這裏getTask方法是要重點說明的,它的實現跟咱們構造參數設置存活時間有關
//咱們都知道構造參數設置的時間表明瞭線程池中的線程,即woker線程的存活時間,若是到期則回收woker線程,這個邏輯的實現就在getTask中。
//來不及執行的任務,線程池會放入一個阻塞隊列,getTask方法就是去阻塞隊列中取任務,用戶設置的存活時間,就是
//從這個阻塞隊列中取任務等待的最大時間,若是getTask返回null,意思就是woker等待了指定時間仍然沒有
//取到任務,此時就會跳過循環體,進入woker線程的銷燬邏輯。
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//該方法是個空的實現,若是有須要用戶能夠本身繼承該類進行實現
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正的任務執行邏輯
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//該方法是個空的實現,若是有須要用戶能夠本身繼承該類進行實現
afterExecute(task, thrown);
}
} finally {
//這裏設爲null,也就是循環體再執行的時候會調用getTask方法
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//當指定任務執行完成,阻塞隊列中也取不到可執行任務時,會進入這裏,作一些善後工做,好比在corePoolSize跟maximumPoolSize之間的woker會進行回收
processWorkerExit(w, completedAbruptly);
}
}複製代碼
woker線程的執行流程就是首先執行初始化時分配給的任務,執行完成之後會嘗試從阻塞隊列中獲取可執行的任務,若是指定時間內仍然沒有任務能夠執行,則進入銷燬邏輯。這裏只會回收corePoolSize與maximumPoolSize直接的那部分woker。
執行任務除了可使用execute方法還可使用submit方法。它們的主要區別是:execute適用於不須要關注返回值的場景,submit方法適用於須要關注返回值的場景。
當執行任務時發生異常,那麼該怎麼處理呢?首先看當Thread線程異常如何處理。
在任務中經過try...catch是能夠捕獲異常並進行處理的,以下代碼:
Thread t = new Thread(() -> {
try {
System.out.println(1 / 0);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
});
t.start();複製代碼
若是不少線程任務默認的異常處理機制都是相同的,能夠經過Thread類的UncaughtExceptionHandler來設置線程默認的異常處理機制。
實現UncaughtExceptionHandler接口,並調用Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)方法。若是想設置爲全局默認異常處理機制,則可調用Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)方法。
ThreadGroup默認提供了異常處理機制以下:
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}複製代碼
ThreadPoolExecutor的異常處理機制與Thread是同樣的。同時,ThreadPoolExecutor提供了uncaughtExceptionHandler方法來設置異常處理。以下示例:
public class ThreadPool {
public static void main(String[] args) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d")
.setUncaughtExceptionHandler(new LogUncaughtExceptionHandler())
.build();
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(() -> {
throw new RuntimeException("測試異常");
});
pool.shutdown();
}
static class LogUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("打印LogUncaughtExceptionHandler中得到的異常信息:" + e.getMessage());
}
}
}複製代碼
但須要注意的是使用UncaughtExceptionHandler的方法只適用於execute方法執行的任務,而對submit方法是無效。submit執行的任務,能夠經過返回的Future對象的get方法接收拋出的異常,再進行處理。這也算是execute方法與submit方法的差異之一。
線程池有如下工做隊列:
關閉線程池能夠調用shutdownNow和shutdown兩個方法來實現。
shutdownNow:對正在執行的任務所有發出interrupt(),中止執行,對還未開始執行的任務所有取消,而且返回還沒開始的任務列表。
shutdown:當咱們調用shutdown後,線程池將再也不接受新的任務,但也不會去強制終止已經提交或者正在執行中的任務。
參考文章:
https://www.jianshu.com/p/5df6e38e4362
https://juejin.im/post/5d1882b1f265da1ba84aa676
原文連接:《面試題-關於Java線程池一篇文章就夠了》