開篇一張圖(圖片來自阿里巴巴Java開發手冊(詳盡版)),後面全靠編java
好了要開始編了,從圖片中就能夠看到這篇博文的主題了,ThreadPoolExecutor自定義線程池。git
在介紹穿件線程池的方法以前要先介紹一個類ThreadPoolExecutor,由於Executors工廠大部分方法都是返回ThreadPoolExecutor對象,先來看看它的構造函數吧github
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}
複製代碼
參數介紹編程
參數 | 類型 | 含義 |
---|---|---|
corePoolSize | int | 核心線程數 |
maximumPoolSize | int | 最大線程數 |
keepAliveTime | long | 存活時間 |
unit | TimeUnit | 時間單位 |
workQueue | BlockingQueue | 存放線程的隊列 |
threadFactory | ThreadFactory | 建立線程的工廠 |
handler | RejectedExecutionHandler | 多餘的的線程處理器(拒絕策略) |
這個參數表示線程池中的基本線程數量也就是核心線程數量。bash
這個參數是線程池中容許建立的最大線程數量,當使用有界隊列時,且隊列存放的任務滿了,那麼線程池會建立新的線程(最大不會超過這個參數所設置的值)。須要注意的是,當使用無界隊列時,這個參數是無效的。併發
這個就是線程空閒時能夠存活的時間,一旦超過這個時間,線程就會被銷燬。dom
線程存活的時間單位,有NANOSECONDS(納秒)、MICROSECONDS(微秒)、MILLISECONDS(毫秒)、SECONDS(秒)、MINUTES(分鐘)、HOURS(小時)、DAYS(天)。TimeUnit代碼以下ide
public enum TimeUnit {
NANOSECONDS {...},
MICROSECONDS {...},
MILLISECONDS {...},
SECONDS {...},
MINUTES {...},
HOURS {...},
DAYS {...};
}
複製代碼
建立線程的工廠,通常都是採用Executors.defaultThreadFactory()方法返回的DefaultThreadFactory,固然也能夠用其餘的來設置更有意義的名稱。函數
DefaultThreadFactory類以下測試
/** * The default thread factory */
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
複製代碼
分爲有界隊列和無界隊列,用於存放等待執行的任務的阻塞隊列。有SynchronousQueue、ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、LinkedTransferQueue、DelayedWorkQueue、LinkedBlockingDeque。下面將介紹有界和無界兩種經常使用的隊列。BlockingQueue類圖以下
當使用有界隊列時,若是有新的任務須要添加進來時,若是線程池實際線程數小於corePoolSize(核心線程數),則優先建立線程,若是線程池實際線程數大於corePoolSize(核心線程數),則會將任務加入隊列,若隊列已滿,則在中現場數不大於maximumPoolSize(最大線程數)的前提下,建立新的線程,若線程數大於maximumPoolSize(最大線程數),則執行拒絕策略。
當使用無界隊列時,maximumPoolSize(最大線程數)和拒絕策略便會失效,由於隊列是沒有限制的,因此就不存在隊列滿的狀況。和有界隊列相比,當有新的任務添加進來時,都會進入隊列等待。可是這也會出現一些問題,例如線程的執行速度比任務提交速度慢,會致使無界隊列快速增加,直到系統資源耗盡。
當使用有界隊列時,且隊列任務被填滿後,線程數也達到最大值時,拒絕策略開始發揮做用。ThreadPoolExecutor默認使用AbortPolicy拒絕策略。RejectedExecutionHandler類圖以下
咱們來看看ThreadPoolExecutor是如何調用RejectedExecutionHandler的,能夠直接查看execute方法
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
//拒絕線程
reject(command);
}
}
複製代碼
能夠看到通過一系列的操做,不符合條件的會調用reject方法,那我麼接着來看看reject方法
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
複製代碼
能夠看到調用了RejectedExecutionHandler接口的rejectedExecution方法。好了,如今來看看jdk提供的幾個拒絕策略。
注:後續會寫一篇ThreadPoolExecutor源碼解析,專門介紹ThreadPoolExecutor各個流程
從下面代碼能夠看到直接拋出異常信息,可是線程池仍是能夠正常工做的。
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
複製代碼
示例代碼
線程類
public class Task implements Runnable{
private int id ;
public Task(int id){
this.id = id;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public void run() {
//
System.out.println(LocalTime.now()+" 當前線程id和名稱爲:" + this.id);
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
public String toString(){
return "當前線程的內容爲:{ id : " + this.id + "}";
}
}
複製代碼
測試代碼
public class TestAbortPolicy {
public static void main(String[] args) {
//定義了1個核心線程數,最大線程數1個,隊列長度2個
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
new ThreadPoolExecutor.AbortPolicy());
//直接提交4個線程
executor.submit(new Task(1));
executor.submit(new Task(2));
executor.submit(new Task(3));
//提交第四個拋異常
executor.submit(new Task(4));
}
}
複製代碼
執行結果
當前線程id和名稱爲:1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1540e19d rejected from java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.learnConcurrency.executor.customThreadPool.testRejectedExecutionHandler.TestAbortPolicy.main(TestAbortPolicy.java:25)
當前線程id和名稱爲:2
當前線程id和名稱爲:3
複製代碼
能夠看到添加第四個線程是拋出異常
首先判斷線程池是否關閉,若是未關閉,則直接執行該線程。關閉則不作任何事情。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
複製代碼
代碼和上面的差很少就不貼了,想要查看的能夠到github上查看TestCallerRunsPolicy,執行結果以下
14:58:19.462 當前線程id和名稱爲:4
14:58:19.462 當前線程id和名稱爲:1
14:58:20.464 當前線程id和名稱爲:5
14:58:20.464 當前線程id和名稱爲:2
14:58:21.464 當前線程id和名稱爲:3
14:58:22.464 當前線程id和名稱爲:6
複製代碼
能夠看到裏面沒有任何代碼,也就是這個被拒絕的線程任務被丟棄了,不做任何處理。
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
複製代碼
首先判斷線程池是否關閉,若是未關閉,丟棄最老的一個請求,嘗試再次提交當前任務。 關閉則不作任何事情。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
複製代碼
代碼和上面的差很少就不貼了,想要查看的能夠到github上查看TestDiscardOldestPolicy,執行結果以下
15:02:28.484 當前線程id和名稱爲:1
15:02:29.486 當前線程id和名稱爲:5
15:02:30.487 當前線程id和名稱爲:6
複製代碼
能夠看到線程二、三、4都被替換了
實現RejectedExecutionHandle接口便可,以下MyRejected
public class MyRejected implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("自定義處理:開始記錄日誌");
System.out.println(r.toString());
System.out.println("自定義處理:記錄日誌完成");
}
}
複製代碼
測試代碼
public class TestCustomeRejectedPolicy {
public static void main(String[] args) {
//定義了1個核心線程數,最大線程數1個,隊列長度2個
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
new MyRejected());
executor.execute(new Task(1));
executor.execute(new Task(2));
executor.execute(new Task(3));
executor.execute(new Task(4));
executor.execute(new Task(5));
executor.execute(new Task(6));
executor.shutdown();
}
}
複製代碼
輸出結果
自定義處理:開始記錄日誌
當前線程的內容爲:{ id : 4}
自定義處理:記錄日誌完成
自定義處理:開始記錄日誌
當前線程的內容爲:{ id : 5}
自定義處理:記錄日誌完成
自定義處理:開始記錄日誌
當前線程的內容爲:{ id : 6}
自定義處理:記錄日誌完成
15:12:39.267 當前線程id和名稱爲:1
15:12:40.268 當前線程id和名稱爲:2
15:12:41.268 當前線程id和名稱爲:3
Process finished with exit code 0
複製代碼
這裏若是有仔細觀察的你可能會有所好奇,爲何這裏用execute方法而不是用submit?
這時由於用submit方法後,傳入的線程會被封裝成RunnableFuture,而我寫的MyRejected有調用到toString方法,Task類有重寫toString方法,可是被封裝成RunnableFuture會輸入以下內容
自定義處理:開始記錄日誌
java.util.concurrent.FutureTask@1540e19d
自定義處理:記錄日誌完成
自定義處理:開始記錄日誌
java.util.concurrent.FutureTask@677327b6
自定義處理:記錄日誌完成
自定義處理:開始記錄日誌
java.util.concurrent.FutureTask@14ae5a5
自定義處理:記錄日誌完成
15:18:17.262 當前線程id和名稱爲:1
15:18:18.263 當前線程id和名稱爲:2
15:18:19.264 當前線程id和名稱爲:3
Process finished with exit code 0
複製代碼
ThreadPoolExecutor類中有三個方法是空方法,能夠經過繼承來重寫這三個方法對線程進行監控。經過重寫beforeExecute和afterExecute方法,能夠添加日誌、計時、監控等等功能。terminated方法是在線程關閉時調用的,能夠在這裏面進行通知、日誌等操做。
//任務執行前
protected void beforeExecute(Thread t, Runnable r) { }
//任務執行後
protected void afterExecute(Runnable r, Throwable t) { }
//線程池關閉
protected void terminated() { }
複製代碼
示例代碼
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor pool = new MyThreadPoolExecutor(
2, //coreSize
4, //MaxSize
60, //60
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4));
for (int i = 0; i < 8; i++) {
int finalI = i + 1;
pool.submit(() -> {
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
static class MyThreadPoolExecutor extends ThreadPoolExecutor{
private final AtomicInteger tastNum = new AtomicInteger();
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.nanoTime());
System.out.println(LocalTime.now()+" 執行以前-任務:"+r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
long endTime = System.nanoTime();
long time = endTime - startTime.get();
tastNum.incrementAndGet();
System.out.println(LocalTime.now()+" 執行以後-任務:"+r.toString()+",花費時間(納秒):"+time);
super.afterExecute(r, t);
}
@Override
protected void terminated() {
System.out.println("線程關閉,總共執行線程數:"+tastNum.get());
super.terminated();
}
}
}
複製代碼
執行結果
15:43:23.329 執行以前-任務:java.util.concurrent.FutureTask@469dad33
15:43:23.329 執行以前-任務:java.util.concurrent.FutureTask@1446b68c
15:43:23.329 執行以前-任務:java.util.concurrent.FutureTask@5eefc31e
15:43:23.329 執行以前-任務:java.util.concurrent.FutureTask@33606b2
15:43:23.513 執行以後-任務:java.util.concurrent.FutureTask@33606b2,花費時間(納秒):216399556
15:43:23.513 執行以前-任務:java.util.concurrent.FutureTask@236e71ad
15:43:23.601 執行以後-任務:java.util.concurrent.FutureTask@1446b68c,花費時間(納秒):304505594
15:43:23.601 執行以前-任務:java.util.concurrent.FutureTask@107920dc
15:43:23.733 執行以後-任務:java.util.concurrent.FutureTask@5eefc31e,花費時間(納秒):436283680
15:43:23.733 執行以前-任務:java.util.concurrent.FutureTask@502826b3
15:43:23.808 執行以後-任務:java.util.concurrent.FutureTask@469dad33,花費時間(納秒):512242583
15:43:23.808 執行以前-任務:java.util.concurrent.FutureTask@96741ab
15:43:23.924 執行以後-任務:java.util.concurrent.FutureTask@107920dc,花費時間(納秒):322900976
15:43:24.059 執行以後-任務:java.util.concurrent.FutureTask@236e71ad,花費時間(納秒):546324680
15:43:24.498 執行以後-任務:java.util.concurrent.FutureTask@502826b3,花費時間(納秒):765309335
15:43:24.594 執行以後-任務:java.util.concurrent.FutureTask@96741ab,花費時間(納秒):785868205
線程關閉,總共執行線程數:8
複製代碼
以爲不錯的點個star
[1] Java 併發編程的藝術
[2] Java 併發編程實戰