立刻就要過年了,還在崗位上堅守「swimming」的小夥伴們頂住。博主給你們帶來一篇線程池的基本使用解解悶。java
一、減小線程建立與切換的開銷
編程
二、控制線程的數量
設計模式
重複利用有限的線程
緩存
其實經常使用Java線程池本質上都是由ThreadPoolExecutor
或者ForkJoinPool
生成的,只是其根據構造函數傳入不一樣的實參來實例化相應線程池而已。併發
Executors
是一個線程池工廠類,該工廠類包含以下集合靜態工廠方法來建立線程池:dom
newFixedThreadPool()
:建立一個可重用的、具備固定線程數的線程池newSingleThreadExecutor()
:建立只有一個線程的線程池newCachedThreadPool()
:建立一個具備緩存功能的線程池newWorkStealingPool()
:建立持有足夠線程的線程池來支持給定的並行級別的線程池newScheduledThreadPool()
:建立具備指定線程數的線程池,它能夠在指定延遲後執行任務線程對設計模式有了解過的同窗都會知道,咱們儘可能面向接口編程,這樣對程序的靈活性是很是友好的。Java線程池也採用了面向接口編程的思想,能夠看到ThreadPoolExecutor
和ForkJoinPool
全部都是ExecutorService
接口的實現類。在ExecutorService
接口中定義了一些經常使用的方法,而後再各類線程池中均可以使用ExecutorService
接口中定義的方法,經常使用的方法有以下幾個:ide
向線程池提交線程
Future<?> submit()
:將一個Runnable對象交給指定的線程池,線程池將在有空閒線程時執行Runnable對象表明的任務,該方法既能接收Runnable對象也能接收Callable對象,這就意味着sumbit()方法能夠有返回值。void execute(Runnable command)
:只能接收Runnable對象,意味着該方法沒有返回值。關閉線程池
void shutdown()
:阻止新來的任務提交,對已經提交了的任務不會產生任何影響。(等待全部的線程執行完畢才關閉)List<Runnable> shutdownNow()
: 阻止新來的任務提交,同時會中斷當前正在運行的線程,另外它還將workQueue中的任務給移除,並將這些任務添加到列表中進行返回。(立馬關閉)檢查線程池的狀態
boolean isShutdown()
:調用shutdown()或shutdownNow()方法後返回爲true。boolean isTerminated()
:當調用shutdown()方法後,而且全部提交的任務完成後返回爲true;當調用shutdownNow()方法後,成功中止後返回爲true。線程池中的線程數目是固定的,無論你來了多少的任務。函數
示例代碼性能
public class MyFixThreadPool {
public static void main(String[] args) throws InterruptedException {
// 建立一個線程數固定爲5的線程池
ExecutorService service = Executors.newFixedThreadPool(5);
System.out.println("初始線程池狀態:" + service);
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println("線程提交完畢以後線程池狀態:" + service);
service.shutdown();//會等待全部的線程執行完畢才關閉,shutdownNow:立馬關閉
System.out.println("是否所有線程已經執行完畢:" + service.isTerminated());//全部的任務執行完了,就會返回true
System.out.println("是否已經執行shutdown()" + service.isShutdown());
System.out.println("執行完shutdown()以後線程池的狀態:" + service);
TimeUnit.SECONDS.sleep(5);
System.out.println("5秒鐘事後,是否所有線程已經執行完畢:" + service.isTerminated());
System.out.println("5秒鐘事後,是否已經執行shutdown()" + service.isShutdown());
System.out.println("5秒鐘事後,線程池狀態:" + service);
}
}
複製代碼
運行結果:this
初始線程池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
線程提交完畢以後線程池狀態:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否所有線程已經執行完畢:false
是否已經執行shutdown():true
執行完shutdown()以後線程池的狀態:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒鐘事後,是否所有線程已經執行完畢:true
5秒鐘事後,是否已經執行shutdown():true
5秒鐘事後,線程池狀態:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
程序分析
Running
狀態了,可是pool size
(線程池線程的數量)、active threads
(當前活躍線程) queued tasks
(當前排隊線程)、completed tasks
(已完成的任務數)都是0pool size = 5
:由於咱們建立的是一個固定線程數爲5的線程池(注意:若是這個時候咱們只提交了3個任務,那麼pool size = 3
,說明線程池也是經過懶加載的方式去建立線程)。active threads = 5
:雖然咱們向線程池提交了6個任務,可是線程池的固定大小爲5,因此活躍線程只有5個queued tasks = 1
:雖然咱們向線程池提交了6個任務,可是線程池的固定大小爲5,只能有5個活躍線程同時工做,因此有一個任務在等待shutdown()
的時候,因爲任務尚未所有執行完畢,因此isTerminated()
返回false
,shutdown()
返回true,而線程池的狀態會由Running
變爲Shutting down
pool-1-thread-2
執行了兩次任務,證實線程池中的線程確實是重複利用的。isTerminated()
返回true
,shutdown()
返回true
,證實全部的任務都執行完了,線程池也關閉了,咱們再次檢查線程池的狀態[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
,狀態已經處於Terminated
了,而後已完成的任務顯示爲6從頭至尾整個線程池都只有一個線程在工做。
實例代碼
public class SingleThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
service.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
}
}
}
複製代碼
運行結果
0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
程序分析 能夠看到只有pool-1-thread-1
一個線程在工做。
來多少任務,就建立多少線程(前提是沒有空閒的線程在等待執行任務,不然仍是會複用以前舊(緩存)的線程),直接你電腦能支撐的線程數的極限爲止。
實例代碼
public class CachePool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
System.out.println("初始線程池狀態:" + service);
for (int i = 0; i < 12; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println("線程提交完畢以後線程池狀態:" + service);
TimeUnit.SECONDS.sleep(50);
System.out.println("50秒後線程池狀態:" + service);
TimeUnit.SECONDS.sleep(30);
System.out.println("80秒後線程池狀態:" + service);
}
}
複製代碼
運行結果
初始線程池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
線程提交完畢以後線程池狀態:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒後線程池狀態:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒後線程池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]
程序分析
能夠在指定延遲後或週期性地執行線程任務的線程池。
ScheduledThreadPoolExecutor
newScheduledThreadPool()
方法返回的實際上是一個ScheduledThreadPoolExecutor
對象,ScheduledThreadPoolExecutor
定義以下:public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
複製代碼
ThreadPoolExecutor
並實現了ScheduledExecutorService
接口,而ScheduledExecutorService
也是繼承了ExecutorService
接口,因此咱們也能夠像使用以前的線程池對象同樣使用,只不過是該對象會額外多了一些方法用於控制延遲與週期:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit)
:指定callable任務將在delay延遲後執行public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
:指定的command任務將在delay延遲後執行,並且已設定頻率重複執行。(一開始並不會執行)public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit)
:建立並執行一個在給定初始延遲後首期啓用的按期操做,隨後在每個執行終止和下一次執行開始之間都存在給定的延遲。示例代碼
下面代碼每500毫秒打印一次當前線程名稱以及一個隨機數字。
public class MyScheduledPool {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000));
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
複製代碼
每一個線程維護着本身的隊列,執行完本身的任務以後,會去主動執行其餘線程隊列中的任務。
示例代碼
public class MyWorkStealingPool {
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool(4);
System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors());
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
//因爲產生的是精靈線程(守護線程、後臺線程),主線程不阻塞的話,看不到輸出
System.in.read();
}
static class R implements Runnable {
int time;
R(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
複製代碼
運行結果
cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1
程序分析 ForkJoinPool-1-worker-1
任務的執行時間是1秒,它會最早執行完畢,而後它會去主動執行其餘線程隊列中的任務。
ForkJoinPool
能夠將一個任務拆分紅多個「小任務」並行計算,再把多個「小任務」的結果合併成總的計算結果。ForkJoinPool
提供了以下幾個方法用於建立ForkJoinPool
實例對象:
ForkJoinPool(int parallelism)
:建立一個包含parallelism個並行線程的ForkJoinPool
,parallelism的默認值爲Runtime.getRuntime().availableProcessors()
方法的返回值ForkJoinPool commonPool()
:該方法返回一個通用池,通用池的運行狀態不會受shutdown()
或shutdownNow()
方法的影響。建立了ForkJoinPool
示例以後,就能夠調用ForkJoinPool
的submit(ForkJoinTask task)
或invoke(ForkJoinTask task)
方法來執行指定任務了。其中ForkJoinTask
(實現了Future接口)表明一個能夠並行、合併的任務。ForkJoinTask
是一個抽象類,他還有兩個抽象子類:RecursiveAction
和RecursiveTask
。其中RecursiveTask
表明有返回值的任務,而RecursiveAction
表明沒有返回值的任務。
示例代碼
下面代碼演示了使用ForkJoinPool
對1000000個隨機整數進行求和。
public class MyForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random random = new Random();
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = random.nextInt(1000);
}
System.out.println(Arrays.stream(nums).sum());
}
// static class AddTask extends RecursiveAction {
//
// int start, end;
//
// AddTask(int start, int end) {
// this.start = start;
// this.end = end;
// }
//
// @Override
// protected void compute() {
// if (end - start <= MAX_NUM) {
// long sum = 0L;
// for (int i = 0; i < end; i++) sum += nums[i];
// System.out.println("from:" + start + " to:" + end + " = " + sum);
// } else {
// int middle = start + (end - start) / 2;
//
// AddTask subTask1 = new AddTask(start, middle);
// AddTask subTask2 = new AddTask(middle, end);
// subTask1.fork();
// subTask2.fork();
// }
// }
// }
static class AddTask extends RecursiveTask<Long> {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 當end與start之間的差大於MAX_NUM,將大任務分解成兩個「小任務」
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
return sum;
} else {
int middle = start + (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
// 並行執行兩個「小任務」
subTask1.fork();
subTask2.fork();
// 把兩個「小任務」累加的結果合併起來
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
forkJoinPool.execute(task);
long result = task.join();
System.out.println(result);
forkJoinPool.shutdown();
}
}
複製代碼
上面咱們說到過:其實經常使用Java線程池都是由
ThreadPoolExecutor
或者ForkJoinPool
兩個類生成的,只是其根據構造函數傳入不一樣的實參來生成相應線程池而已。那咱們如今一塊兒來看看Executors中幾個建立線程池對象的靜態方法相關的源碼:
ThreadPoolExecutor構造函數原型
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
複製代碼
參數說明
corePoolSize
:核心運行的poolSize,也就是當超過這個範圍的時候,就須要將新的Runnable放入到等待隊列workQueue中了。maximumPoolSize
:線程池維護線程的最大數量,當大於了這個值就會將任務由一個丟棄處理機制來處理(固然也存在永遠不丟棄任務的線程池,具體得看策略)。keepAliveTime
:線程空閒時的存活時間(當線程數大於corePoolSize時該參數纔有效)[java doc
中的是這樣寫的 :when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.]unit
:keepAliveTime的單位。workQueue
:用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口。執行任務的過程
newFixedThreadPool
poolSize 和 maximumPoolSize 相等,使用無界隊列存儲,不管來多少任務,隊列都能塞的下,因此線程池中的線程數老是 poolSize。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
newSingleThreadExecutor
poolSize 和 maximumPoolSize 都爲1,使用無界隊列存儲,不管來多少任務,隊列都能塞的下,因此線程池中的線程數老是 1。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
newCachedThreadPool
poolSize 爲 0,來一個任務直接扔到隊列中,使用SynchronousQueue存儲(沒有容量的隊列),因此來來一個任務就得新建一個線程,maximumPoolSize 爲 Integer.MAX_VALUE,能夠當作是容許建立無限的線程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
newScheduledThreadPool
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼
newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
複製代碼
以爲文章寫得不錯的朋友能夠點贊、轉發、加關注呀!大家的支持就是我最大的動力,筆芯!