Executor是一個接口,裏面提供了一個execute方法,該方法接收一個Runable參數,以下java
public interface Executor {
void execute(Runnable command);
}
複製代碼
線程對象就是提交給線程池的任務,能夠實現Runable接口或Callable接口。或許這邊會產生一個疑問,爲何Runable接口和Callable接口沒有任何關聯,卻都能做爲任務來執行?你們能夠思考下,文章的結尾會對此進行說明git
Future接口和FutureTask類是用來接收線程異步執行後返回的結果,能夠看到下方ExecutorService接口的submit方法返回的就是Future。github
接下來咱們來看看繼承了Executor接口的ExecutorService編程
public interface ExecutorService extends Executor {
//正常關閉(再也不接收新任務,執行完隊列中的任務)
void shutdown();
//強行關閉(關閉當前正在執行的任務,返回全部還沒有啓動的任務清單)
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
...
}
複製代碼
在介紹穿件線程池的方法以前要先介紹一個類ThreadPoolExecutor,應爲Executors工廠大部分方法都是返回ThreadPoolExecutor對象,先來看看它的構造函數吧bash
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}
複製代碼
參數介紹併發
參數 | 類型 | 含義 |
---|---|---|
corePoolSize | int | 核心線程數 |
maximumPoolSize | int | 最大線程數 |
keepAliveTime | long | 存活時間 |
unit | TimeUnit | 時間單位 |
workQueue | BlockingQueue | 存放線程的隊列 |
threadFactory | ThreadFactory | 建立線程的工廠 |
handler | RejectedExecutionHandler | 多餘的的線程處理器(拒絕策略) |
爲何要講ExecutorService接口呢?是由於咱們使用Executors的方法時返回的大部分都是ExecutorService。 Executors提供了幾個建立線程池方法,接下來我就介紹一下這些方法框架
newFixedThreadPool(int nThreads)
建立一個線程的線程池,若空閒則執行,若沒有空閒線程則暫緩在任務隊列中。
newWorkStealingPool()
建立持有足夠線程的線程池來支持給定的並行級別,並經過使用多個隊列,減小競爭,它須要穿一個並行級別的參數,若是不傳,則被設定爲默認的CPU數量。
newSingleThreadExecutor()
該方法返回一個固定數量的線程池
該方法的線程始終不變,當有一個任務提交時,若線程池空閒,則當即執行,若沒有,則會被暫緩在一個任務隊列只能怪等待有空閒的線程去執行。
newCachedThreadPool()
返回一個可根據實際狀況調整線程個數的線程池,不限制最大線程數量,如有空閒的線程則執行任務,若無任務則不建立線程,而且每個空閒線程會在60秒後自動回收。
newScheduledThreadPool(int corePoolSize)
返回一個SchededExecutorService對象,但該線程池能夠設置線程的數量,支持定時及週期性任務執行。
newSingleThreadScheduledExecutor()
建立一個單例線程池,按期或延時執行任務。
複製代碼
下面講解下幾個經常使用的方法,建立單個的就不說明了異步
該方法建立指定線程數量的線程池,沒有限制可存放的線程數量(無界隊列),適用於線程任務執行較快的場景。ide
看看Executors工廠內部是如何實現的函數
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
能夠看到返回的是一個ThreadPoolExecutor對象,核心線程數和是最大線程數都是傳入的參數,存活時間是0,時間單位是毫秒,阻塞隊列是無界隊列LinkedBlockingQueue。
因爲隊列採用的是無界隊列LinkedBlockingQueue,最大線程數maximumPoolSize和keepAliveTime都是無效參數,拒絕策略也將無效,爲何?
這裏又延伸出一個問題,無界隊列說明任務沒有上限,若是執行的任務比較耗時,那麼新的任務會一直存放在線程池中,線程池的任務會愈來愈多,將會致使什麼後果?下面的代碼能夠試試
public class Main {
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
while (true){
pool.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
複製代碼
示例代碼
public class Main {
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 8; i++) {
int finalI = i + 1;
pool.submit(() -> {
try {
System.out.println("任務"+ finalI +":開始等待2秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println("任務"+ finalI +":結束等待2秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
}
複製代碼
輸出結果
任務4:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-4
任務2:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-2
任務3:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-3
任務1:開始等待2秒,時間:17:13:22.048,當前線程名:pool-1-thread-1
任務2:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-2
任務3:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-3
任務1:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-1
任務4:結束等待2秒,時間:17:13:24.048,當前線程名:pool-1-thread-4
任務6:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-4
任務7:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-1
任務5:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-3
任務8:開始等待2秒,時間:17:13:24.049,當前線程名:pool-1-thread-2
任務5:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-3
任務7:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-1
任務8:結束等待2秒,時間:17:13:26.051,當前線程名:pool-1-thread-2
任務6:結束等待2秒,時間:17:13:26.050,當前線程名:pool-1-thread-4
複製代碼
能夠看出任務1-4在同一時間執行,在2秒後執行完畢,同時開始執行任務5-8。說明方法內部只建立了4個線程,其餘任務存放在隊列中等待執行。
newCachedThreadPool方法建立的線程池會根據須要自動建立新線程。
看看Executors工廠內部是如何實現的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
newCachedThreadPool方法也是返回ThreadPoolExecutor對象,核心線程是0,最大線程數是Integer的最MAX_VALUE,存活時間是60,時間單位是秒,SynchronousQueue隊列。
從傳入的參數能夠得知,在newCachedThreadPool方法中的空閒線程存活時間時60秒,一旦超過60秒線程就會被終止。這邊還隱含了一個問題,若是執行的線程較慢,而提交任務的速度快於線程執行的速度,那麼就會不斷的建立新的線程,從而致使cpu和內存的增加。
代碼和newFixedThreadPool同樣循環添加新的線程任務,個人電腦運行就會出現以下錯誤
An unrecoverable stack overflow has occurred.
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)
Process finished with exit code -1073741571 (0xC00000FD)
複製代碼
關於SynchronousQueue隊列,它是一個沒有容量的阻塞隊列,任務傳遞的示意圖以下
示例代碼
public class Main {
public static void main(String[] args) throws Exception{
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 8; i++) {
int finalI = i + 1;
pool.submit(() -> {
try {
System.out.println("任務"+ finalI +":開始等待60秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName());
Thread.sleep(60000);
System.out.println("任務"+ finalI +":結束等待60秒,時間:"+LocalTime.now()+",當前線程名:"+Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//睡眠10秒
Thread.sleep(10000);
}
pool.shutdown();
}
}
複製代碼
執行結果
任務1:開始等待60秒,時間:17:15:21.570,當前線程名:pool-1-thread-1
任務2:開始等待60秒,時間:17:15:31.553,當前線程名:pool-1-thread-2
任務3:開始等待60秒,時間:17:15:41.555,當前線程名:pool-1-thread-3
任務4:開始等待60秒,時間:17:15:51.554,當前線程名:pool-1-thread-4
任務5:開始等待60秒,時間:17:16:01.554,當前線程名:pool-1-thread-5
任務6:開始等待60秒,時間:17:16:11.555,當前線程名:pool-1-thread-6
任務7:開始等待60秒,時間:17:16:21.555,當前線程名:pool-1-thread-7
任務1:結束等待60秒,時間:17:16:21.570,當前線程名:pool-1-thread-1
任務2:結束等待60秒,時間:17:16:31.554,當前線程名:pool-1-thread-2
任務8:開始等待60秒,時間:17:16:31.556,當前線程名:pool-1-thread-2
任務3:結束等待60秒,時間:17:16:41.555,當前線程名:pool-1-thread-3
任務4:結束等待60秒,時間:17:16:51.556,當前線程名:pool-1-thread-4
任務5:結束等待60秒,時間:17:17:01.556,當前線程名:pool-1-thread-5
任務6:結束等待60秒,時間:17:17:11.555,當前線程名:pool-1-thread-6
任務7:結束等待60秒,時間:17:17:21.556,當前線程名:pool-1-thread-7
任務8:結束等待60秒,時間:17:17:31.557,當前線程名:pool-1-thread-2
複製代碼
示例代碼中每一個任務都睡眠60秒,每次循環添加任務睡眠10秒,從執行結果來看,添加的7個任務都是由不一樣的線程來執行,而此時線程1和2都執行完畢,任務8添加進來由以前建立的pool-1-thread-2執行。
這個線程池主要用來延遲執行任務或者按期執行任務。
看看Executors工廠內部是如何實現的
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
複製代碼
這裏返回的是ScheduledThreadPoolExecutor對象,咱們繼續深刻進去看看
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
複製代碼
這裏調用的是父類的構造函數,ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,因此返回的也是ThreadPoolExecutor對象。核心線程數是傳入的參數corePoolSize,線程最大值是Integer的MAX_VALUE,存活時間時0,時間單位是納秒,隊列是DelayedWorkQueue。
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {}
複製代碼
下面是ScheduledExecutorService的一些方法
public interface ScheduledExecutorService extends ExecutorService {
//delay延遲時間,unit延遲單位,只執行1次,在通過delay延遲時間以後開始執行
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//首次執行時間時而後在initialDelay以後,而後在initialDelay+period 後執行,接着在 initialDelay + 2 * period 後執行,依此類推
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
long period,
TimeUnit unit);
//首次執行時間時而後在initialDelay以後,而後延遲delay時間執行
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
複製代碼
那麼就從提交任務入口看看吧
submit方法是由抽象類AbstractExecutorService實現的
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
能夠看出將傳入的Runnable對象和Callable傳入一個newTaskFor方法,而後返回一個RunnableFuture對象
咱們再來看看newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
複製代碼
這裏都是調用FutureTask的構造函數,咱們接着往下看
private Callable<V> callable;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
複製代碼
FutureTask類中有個成員變量callable,而傳入的Runnable對象則繼續調用Executors工廠類的callable方法返回一個Callable對象
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//適配器
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
複製代碼
好了,到這裏也就真相大白了,Runnable對象通過一系列的方法調用,最終被RunnableAdapter適配器適配成Callable對象。方法調用圖以下
以爲不錯的點個star
下一篇會介紹下自定義線程池,後續也會更新newWorkStealingPool方法介紹
[1] Java 併發編程的藝術
[2] Java 併發編程實戰