咱們都是經過new Thread來建立一個線程,因爲線程的建立和銷燬都須要消耗必定的CPU資源,因此在高併發下這種建立線程的方式將嚴重影響代碼執行效率。而線程池的做用就是讓一個線程執行結束後不立刻銷燬,繼續執行新的任務,這樣就節省了不斷建立線程和銷燬線程的開銷。編程
建立Java線程池最爲核心的類爲ThreadPoolExecutor:併發
它提供了四種構造函數來建立線程池,其中最爲核心的構造函數以下所示:dom
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
複製代碼
這7個參數的含義以下:ide
corePoolSize 線程池核心線程數。即線程池中保留的線程個數,即便這些線程是空閒的,也不會被銷燬,除非經過ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法開啓了核心線程的超時策略;函數
maximumPoolSize 線程池中容許的最大線程個數;高併發
keepAliveTime 用於設置那些超出核心線程數量的線程的最大等待時間,超過這個時間尚未新任務的話,超出的線程將被銷燬;this
unit 超時時間單位;spa
workQueue 線程隊列。用於保存經過execute方法提交的,等待被執行的任務;線程
threadFactory 線程建立工程,即指定怎樣建立線程;3d
handler 拒絕策略。即指定當線程提交的數量超出了maximumPoolSize後,該使用什麼策略處理超出的線程。
在經過這個構造方法建立線程池的時候,這幾個參數必須知足如下條件,不然將拋出IllegalArgumentException異常:
corePoolSize不能小於0;
keepAliveTime不能小於0;
maximumPoolSize 不能小於等於0;
maximumPoolSize不能小於corePoolSize;
此外,workQueue、threadFactory和handler不能爲null,不然將拋出空指針異常。
下面舉些例子來深刻理解這幾個參數的含義。
使用上面的構造方法建立一個線程池:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("線程池建立完畢");
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活躍線程個數 " + threadPoolExecutor.getActiveCount());
System.out.println("核心線程個數 " + threadPoolExecutor.getCorePoolSize());
System.out.println("隊列線程個數 " + threadPoolExecutor.getQueue().size());
System.out.println("最大線程數 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
複製代碼
上面的代碼建立了一個核心線程數量爲1,容許最大線程數量爲2,最大活躍時間爲10秒,線程隊列長度爲1的線程池。
假如咱們經過execute方法向線程池提交1個任務,看看結果如何:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("線程池建立完畢");
threadPoolExecutor.execute(() -> sleep(100));
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活躍線程個數 " + threadPoolExecutor.getActiveCount());
System.out.println("核心線程個數 " + threadPoolExecutor.getCorePoolSize());
System.out.println("隊列線程個數 " + threadPoolExecutor.getQueue().size());
System.out.println("最大線程數 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
複製代碼
ThreadPoolExecutor的execute和submit方法均可以向線程池提交任務,區別是,submit方法可以返回執行結果,返回值類型爲Future
sleep方法代碼:
private static void sleep(long value) {
try {
System.out.println(Thread.currentThread().getName() + "線程執行sleep方法");
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製代碼
啓動程序,控制檯輸出以下:
線程池核心線程數量爲1,經過execute提交了一個任務後,因爲核心線程是空閒的,因此任務被執行了。因爲這個任務的邏輯是休眠100秒,因此在這100秒內,線程池的活躍線程數量爲1。此外,由於提交的任務被核心線程執行了,因此並無線程須要被放到線程隊列裏等待,線程隊列長度爲0。
假如咱們經過execute方法向線程池提交2個任務,看看結果如何:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製代碼
線程池核心線程數量爲1,經過execute提交了2個任務後,一開始核心線程是空閒的,Thread-0被執行。因爲這個任務的邏輯是休眠100秒,因此在這100秒內,線程池的活躍線程數量爲1。由於核心線程數量爲1,因此另一個任務在這100秒內不能被執行,因而被放到線程隊列裏等待,線程隊列長度爲1。
假如咱們經過execute方法向線程池提交3個任務,看看結果如何:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製代碼
這三個任務都是休眠100秒,因此核心線程池中第一個任務正在被執行,第二個任務被放入到了線程隊列。而當第三個任務被提交進來時,線程隊列滿了(咱們定義的長度爲1),因爲該線程池容許的最大線程數量爲2,因此線程池還能夠再建立一個線程來執行另一個任務,因而乎以前在線程隊列裏的線程被取出執行(FIFO),第三個任務被放入到了線程隊列。
改變第二個和第三個任務的睡眠時間,觀察輸出:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
複製代碼
第二個任務提交5秒後,任務執行完畢,因此線程隊列裏的任務被執行,因而隊列線程個數爲0,活躍線程數量爲2(第一個和第三個任務)。再過5秒後,第三個任務執行完畢,因而活躍線程數量爲1(第一個100秒還沒執行完畢)。
在第三個任務結束的瞬間,咱們觀察線程快照:
能夠看到,線程池中有兩個線程,Thread-0在執行第一個任務(休眠100秒,還沒結束),Thread-1執行完第三個任務後並無立刻被銷燬。過段時間後(10秒鐘後)再觀察線程快照:
能夠看到,Thread-1這個線程被銷燬了,由於咱們在建立線程池的時候,指定keepAliveTime 爲10秒,10秒後,超出核心線程池線程外的那些線程將被銷燬。
假如一次性提交4個任務,看看會怎樣:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製代碼
由於咱們設置的拒絕策略爲AbortPolicy,因此最後提交的那個任務直接被拒絕了。更多拒絕策略下面會介紹到。
線程池包含如下幾個狀態:
當線程池中全部任務都處理完畢後,線程並不會本身關閉。咱們能夠經過調用shutdown和shutdownNow方法來關閉線程池。二者的區別在於:
shutdown方法將線程池置爲shutdown狀態,拒絕新的任務提交,但線程池並不會立刻關閉,而是等待全部正在折行的和線程隊列裏的任務都執行完畢後,線程池纔會被關閉。因此這個方法是平滑的關閉線程池。
shutdownNow方法將線程池置爲stop狀態,拒絕新的任務提交,中斷正在執行的那些任務,而且清除線程隊列裏的任務並返回。因此這個方法是比較「暴力」的。
舉兩個例子觀察下二者的區別:
shutdown例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.shutdown();
System.out.println("已經執行了線程池shutdown方法");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程當中被打斷" + e.getMessage());
}
}
}
複製代碼
啓動程序,控制檯輸出以下:
能夠看到,雖然在任務都被提交後立刻執行了shutdown方法,可是並不會立刻關閉線程池,而是等待全部被提交的任務都執行完了才關閉。
shutdownNow例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 立刻關閉,並返回還未被執行的任務
System.out.println(runnables);
System.out.println("已經執行了線程池shutdownNow方法");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程當中被打斷" + e.getMessage());
}
}
}
複製代碼
啓動程序,控制檯輸出以下:
能夠看到,在執行shutdownNow方法後,線程池立刻就被關閉了,正在執行中的兩個任務被打斷,而且返回了線程隊列中等待被執行的兩個任務。
經過上面兩個例子咱們還能夠看到shutdown和shutdownNow方法都不是阻塞的。常與shutdown搭配的方法有awaitTermination。
awaitTermination方法接收timeout和TimeUnit兩個參數,用於設定超時時間及單位。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,不然返回false。該方法是阻塞的:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.shutdown();
boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
if (isShutdown) {
System.out.println("線程池在3秒內成功關閉");
} else {
System.out.println("等了3秒還沒關閉,不等了╰(‵□′)╯");
}
System.out.println("------------");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程當中被打斷" + e.getMessage());
}
}
}
複製代碼
啓動程序輸出以下:
當線程池沒法再接收新的任務的時候,可採起以下四種策略:
CallerRunsPolicy CallerRunsPolicy策略:由調用線程處理該任務:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.execute(new shortTask("任務1"));
threadPoolExecutor.execute(new longTask("任務2"));
threadPoolExecutor.execute(new longTask("任務3"));
threadPoolExecutor.execute(new shortTask("任務4"));
threadPoolExecutor.execute(new shortTask("任務5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程當中被打斷" + e.getMessage());
}
}
}
複製代碼
上面的線程池最多隻能一次性提交4個任務,第5個任務提交後會被拒絕策略處理。啓動程序輸出以下:
能夠看到,第5個提交的任務由調用線程(即main線程)處理該任務。
AbortPolicy AbortPolicy策略:丟棄任務,並拋出RejectedExecutionException異常。前面的例子就是使用該策略,因此再也不演示。
DiscardOldestPolicy DiscardOldestPolicy策略:丟棄最先被放入到線程隊列的任務,將新提交的任務放入到線程隊列末端:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolExecutor.execute(new shortTask("任務1"));
threadPoolExecutor.execute(new longTask("任務2"));
threadPoolExecutor.execute(new longTask("任務3"));
threadPoolExecutor.execute(new shortTask("任務4"));
threadPoolExecutor.execute(new shortTask("任務5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程當中被打斷" + e.getMessage());
}
}
}
複製代碼
啓動程序輸出以下:
能夠看到最後提交的任務被執行了,而第3個任務是第一個被放到線程隊列的任務,被丟棄了。
DiscardPolicy DiscardPolicy策略:直接丟棄新的任務,不拋異常:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.DiscardPolicy());
threadPoolExecutor.execute(new shortTask("任務1"));
threadPoolExecutor.execute(new longTask("任務2"));
threadPoolExecutor.execute(new longTask("任務3"));
threadPoolExecutor.execute(new shortTask("任務4"));
threadPoolExecutor.execute(new shortTask("任務5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程當中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程當中被打斷" + e.getMessage());
}
}
}
複製代碼
啓動程序,輸出以下:
第5個任務直接被拒絕丟棄了,而沒有拋出任何異常。
除了使用ThreadPoolExecutor的構造方法建立線程池外,咱們也可使用Executors提供的工廠方法來建立不一樣類型的線程池:
newFixedThreadPool 查看newFixedThreadPool方法源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
能夠看到,經過newFixedThreadPool建立的是一個固定大小的線程池,大小由nThreads參數指定,它具備以下幾個特色:
由於corePoolSize和maximumPoolSize的值都爲nThreads,因此線程池中線程數量永遠等於nThreads,不可能新建除了核心線程數的線程來處理任務,即keepAliveTime實際上在這裏是無效的。
LinkedBlockingQueue是一個無界隊列(最大長度爲Integer.MAX_VALUE),因此這個線程池理論是能夠無限的接收新的任務,這就是爲何上面沒有指定拒絕策略的緣由。
newCachedThreadPool 查看newCachedThreadPool方法源碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
這是一個理論上無限大小的線程池:
核心線程數爲0,SynchronousQueue隊列是沒有長度的隊列,因此當有新的任務提交,若是有空閒的還未超時的(最大空閒時間60秒)線程則執行該任務,不然新增一個線程來處理該任務。
由於線程數量沒有限制,理論上能夠接收無限個新任務,因此這裏也沒有指定拒絕策略。
newSingleThreadExecutor 查看newSingleThreadExecutor源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
核心線程數和最大線程數都爲1,每次只能有一個線程處理任務。
LinkedBlockingQueue隊列能夠接收無限個新任務。
newScheduledThreadPool 查看newScheduledThreadPool源碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
複製代碼
因此newScheduledThreadPool理論是也是能夠接收無限個任務,DelayedWorkQueue也是一個無界隊列。
使用newScheduledThreadPool建立的線程池除了能夠處理普通的Runnable任務外,它還具備調度的功能:
1.延遲指定時間後執行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲5秒執行
executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);
複製代碼
2.按指定的速率執行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲1秒執行,而後每5秒執行一次
executorService.scheduleAtFixedRate(
() -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
複製代碼
3.按指定的時延執行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
() -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);
複製代碼
乍一看,scheduleAtFixedRate和scheduleWithFixedDelay沒啥區別,實際它們仍是有區別的:
scheduleAtFixedRate按照固定速率執行任務,好比每5秒執行一個任務,即便上一個任務沒有結束,5秒後也會開始處理新的任務;
scheduleWithFixedDelay按照固定的時延處理任務,好比每延遲5秒執行一個任務,不管上一個任務處理了1秒,1分鐘仍是1小時,下一個任務老是在上一個任務執行完畢後5秒鐘後開始執行。
對於這些線程池工廠方法的使用,阿里巴巴編程規程指出:
由於這幾個線程池理論是均可以接收無限個任務,因此這就有內存溢出的風險。實際上只要咱們掌握了ThreadPoolExecutor構造函數7個參數的含義,咱們就能夠根據不一樣的業務來建立出符合需求的線程池。通常線程池的建立能夠參考以下規則:
IO密集型任務,線程池線程數量能夠設置爲2 X CPU核心數;
計算密集型任務,線程池線程數量能夠設置爲CPU核心數 + 1。
一些API的用法 ThreadPoolExecutor提供了幾個判斷線程池狀態的方法:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();
System.out.println("線程池爲shutdown狀態:" + threadPoolExecutor.isShutdown());
System.out.println("線程池正在關閉:" + threadPoolExecutor.isTerminating());
System.out.println("線程池已經關閉:" + threadPoolExecutor.isTerminated());
threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);
System.out.println("線程池已經關閉" + threadPoolExecutor.isTerminated());
}
複製代碼
程序輸出以下:
前面咱們提到,線程池核心線程即便是空閒狀態也不會被銷燬,除非使用allowCoreThreadTimeOut設置了容許核心線程超時:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.allowCoreThreadTimeOut(true);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任務執行完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
複製代碼
程序輸出以下所示:
5秒後任務執行完畢,核心線程處於空閒的狀態。由於經過allowCoreThreadTimeOut方法設置了容許核心線程超時,因此3秒後(keepAliveTime設置爲3秒),核心線程被銷燬。核心線程被銷燬後,線程池也就沒有做用了,因而就自動關閉了。
值得注意的是,若是一個線程池調用了allowCoreThreadTimeOut(true)方法,那麼它的keepAliveTime不能爲0。
ThreadPoolExecutor提供了一remove方法,查看其源碼:
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
複製代碼
可看到,它刪除的是線程隊列中的任務,而非正在被執行的任務。舉個例子:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任務執行完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Runnable r = () -> System.out.println("看看我是否會被刪除");
threadPoolExecutor.execute(r);
threadPoolExecutor.remove(r);
threadPoolExecutor.shutdown();
}
複製代碼
執行程序,輸出以下:
可看到任務並無被執行,已經被刪除,由於惟一一個核心線程已經在執行任務了,因此後提交的這個任務被放到了線程隊列裏,而後經過remove方法刪除。
默認狀況下,只有當往線程池裏提交了任務後,線程池纔會啓動核心線程處理任務。咱們能夠經過調用prestartCoreThread方法,讓核心線程即便沒有任務提交,也處於等待執行任務的活躍狀態:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活躍線程數: " + threadPoolExecutor.getActiveCount());
}
複製代碼
程序輸出以下所示:
該方法返回boolean類型值,若是因此核心線程都啓動了,返回false,反之返回true。
還有一個和它相似的prestartAllCoreThreads方法,它的做用是一次性啓動全部核心線程,讓其處於活躍地等待執行任務的狀態。
ThreadPoolExecutor的invokeAny方法用於隨機執行任務集合中的某個任務,並返回執行結果,該方法是同步方法:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
// 任務集合
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
// 隨機執行結果
Integer result = threadPoolExecutor.invokeAny(tasks);
System.out.println("-------------------");
System.out.println(result);
threadPoolExecutor.shutdownNow();
}
複製代碼
啓動程序,輸出以下:
ThreadPoolExecutor的invokeAll則是執行任務集合中的全部任務,返回Future集合:
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
futureList.stream().map(f->{
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}).forEach(System.out::println);
threadPoolExecutor.shutdownNow();
}
複製代碼
輸出以下:
方法 | 描述 |
---|---|
allowCoreThreadTimeOut(boolean value) | 是否容許核心線程空閒後超時,是的話超時後核心線程將銷燬,線程池自動關閉 |
awaitTermination(long timeout, TimeUnit unit) | 阻塞當前線程,等待線程池關閉,timeout用於指定等待時間。 |
execute(Runnable command) | 向線程池提交任務,沒有返回值 |
submit(Runnable task) | 向線程池提交任務,返回Future |
isShutdown() | 判斷線程池是否爲shutdown狀態 |
isTerminating() | 判斷線程池是否正在關閉 |
isTerminated() | 判斷線程池是否已經關閉 |
remove(Runnable task) | 移除線程隊列中的指定任務 |
prestartCoreThread() | 提早讓一個核心線程處於活躍狀態,等待執行任務 |
prestartAllCoreThreads() | 提早讓全部核心線程處於活躍狀態,等待執行任務 |
getActiveCount() | 獲取線程池活躍線程數 |
getCorePoolSize() | 獲取線程池核心線程數 |
threadPoolExecutor.getQueue() | 獲取線程池線程隊列 |
getMaximumPoolSize() | 獲取線程池最大線程數 |
shutdown() | 讓線程池處於shutdown狀態,再也不接收任務,等待全部正在運行中的任務結束後,關閉線程池。 |
shutdownNow() | 讓線程池處於stop狀態,再也不接受任務,嘗試打斷正在運行中的任務,並關閉線程池,返回線程隊列中的任務。 |