ExecutorService提供了兩種基礎線程池的選擇,ScheduledThreadPoolExecutor(定時任務的)和ThreadPoolExecutor(普通的),本文主要介紹ThreadPoolExecutor。java
《阿里巴巴java編程手冊》併發處理部分,講了兩個原則:編程
1. 線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。併發
使用線程池的好處是減小在建立和銷燬線程上所消耗的時間及系統資源,解決資源不足的問題。若是不使用線程池,有可能形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。app
咱們知道,有繼承Thread,implements Runnable, implements Callable<T>這些種方法建立線程,但推薦使用線程池來建立線程。ide
2. 線程池不容許使用Executors建立,而是經過ThreadPoolExecutor的方式建立,這樣的處理方式能讓編寫代碼的工程師更加明確線程池的運行規則,規避資源耗盡的風險。
性能
Executors返回的線程池對象的弊端以下:this
1)FixedThreadPool和SingleThreadPool:容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM(無參的LinkedBlockingQueue的長度默認是Integer.MAX_VALUE)spa
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
2)CachedThreadPool和ScheduledThreadPool:容許的建立線程數量爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM線程
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ThreadPoolExecutor類介紹code
ThreadPoolExecutor.class構造方法
//這個是ThreadPoolExecutor完整的構造器,其餘的構造器其實也是在內部調用這個.
ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
下咱們以ThreadPoolExecutor爲例展現下線程池的工做流程圖
AbortPolicy
public class Demo1 { public static void main(String[] args) { BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10); RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, queue, handler); for (int i = 0; i < 20; i ++){ final int temp = i; pool.execute(() -> { System.out.println("客戶" + temp + "來了......."); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); } }
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.kco.test17.demo1.Demo1$$Lambda$1/15497079@ca494b rejected from java.util.concurrent.ThreadPoolExecutor@1a4f24f[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.kco.test17.demo1.Demo1.main(Demo1.java:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
pool-1-thread-1客戶0來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-2客戶3來了.......
pool-1-thread-1客戶4來了.......
pool-1-thread-5客戶5來了.......
pool-1-thread-3客戶6來了.......
pool-1-thread-4客戶7來了.......
pool-1-thread-2客戶9來了.......
pool-1-thread-1客戶8來了.......
pool-1-thread-3客戶10來了.......
pool-1-thread-5客戶11來了.......
pool-1-thread-4客戶12來了.......
從結果看出來,能夠看出線程是重複被使用的,並且當執行的任務超過工做隊列的容量時,線程確實拋出了異常.
DiscardPolicy
將 RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
改成
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
運行結果以下:
pool-1-thread-1客戶0來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-3客戶3來了.......
pool-1-thread-4客戶4來了.......
pool-1-thread-1客戶5來了.......
pool-1-thread-5客戶6來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶7來了.......
pool-1-thread-4客戶8來了.......
pool-1-thread-5客戶9來了.......
pool-1-thread-1客戶10來了.......
pool-1-thread-2客戶11來了.......
pool-1-thread-4客戶12來了.......
如今線程池正確退出了,並且也不拋出異常了,可是超過工做隊列容量的任務所有被忽略了.
DiscardOldestPolicy
RejectedExecutionHandler
改成 RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
pool-1-thread-1客戶0來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-4客戶8來了.......
pool-1-thread-1客戶11來了.......
pool-1-thread-5客戶10來了.......
pool-1-thread-3客戶9來了.......
pool-1-thread-2客戶12來了.......
pool-1-thread-1客戶15來了.......
pool-1-thread-4客戶16來了.......
pool-1-thread-5客戶17來了.......
pool-1-thread-2客戶19來了.......
pool-1-thread-3客戶18來了.......
從以上結果,咱們能夠看出除了客戶0到客戶2恰好是3個核心線程被執行後,客戶3到客戶7直接被忽略掉了.
CallerRunsPolicy
一樣講拒絕策略改成RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
運行程序,結果以下:
pool-1-thread-1客戶0來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-4客戶13來了.......
main客戶15來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-2客戶3來了.......
pool-1-thread-1客戶4來了.......
main客戶18來了.......
pool-1-thread-3客戶5來了.......
pool-1-thread-4客戶7來了.......
pool-1-thread-5客戶6來了.......
pool-1-thread-5客戶8來了.......
pool-1-thread-1客戶9來了.......
pool-1-thread-4客戶10來了.......
pool-1-thread-3客戶12來了.......
pool-1-thread-2客戶11來了.......
pool-1-thread-1客戶16來了.......
pool-1-thread-5客戶19來了.......
pool-1-thread-3客戶17來了.......
結果,咱們能夠發現全部的任務都被執行,並且居然還有兩個是在主線程執行的.如今明白我以前說的則直接運行任務的run
方法的意思了吧,沒錯是直接調用run
方法,而不是開啓線程去執行任務.
如今咱們本身寫一個拒絕策略,要求全部的任務都必須被線程池執行,並且都要在線程池中執行.
public class Demo5 {
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
RejectedExecutionHandler handler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()){
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS, queue, handler);
for (int i = 0; i < 20; i ++){
final int temp = i;
pool.execute(() -> {
String name = Thread.currentThread().getName();
System.out.println(name + "客戶" + temp + "來了.......");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
}
運行結果:
pool-1-thread-1客戶0來了.......
pool-1-thread-3客戶2來了.......
pool-1-thread-5客戶14來了.......
pool-1-thread-4客戶13來了.......
pool-1-thread-2客戶1來了.......
pool-1-thread-1客戶3來了.......
pool-1-thread-3客戶4來了.......
pool-1-thread-5客戶5來了.......
pool-1-thread-2客戶6來了.......
pool-1-thread-4客戶7來了.......
pool-1-thread-1客戶8來了.......
pool-1-thread-3客戶9來了.......
pool-1-thread-5客戶10來了.......
pool-1-thread-4客戶11來了.......
pool-1-thread-2客戶12來了.......
pool-1-thread-1客戶15來了.......
pool-1-thread-3客戶16來了.......
pool-1-thread-5客戶17來了.......
pool-1-thread-4客戶19來了.......
pool-1-thread-2客戶18來了.......
ok.全部任務都被線程池執行了.並且咱們自定義的拒絕策略也很簡單,就是讓工做隊列調用put
讓其一直等待,直到有可用的容量存聽任務.
動手本身寫一個線程池:
import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.locks.ReentrantLock; public class MyThreadPool { /**存放線程的集合*/ private ArrayList<MyThread> threads; /**任務隊列*/ private ArrayBlockingQueue<Runnable> taskQueue; /**線程池初始限定大小*/ private int threadNum; /**已經工做的線程數目*/ private int workThreadNum; private final ReentrantLock mianLock = new ReentrantLock(); public MyThreadPool(int initPoolNum) { this.threadNum = initPoolNum; this.threads = new ArrayList<>(initPoolNum); //任務隊列初始化爲線程池線程數的四倍 this.taskQueue = new ArrayBlockingQueue<>(initPoolNum*4); this.workThreadNum = 0; } public void execute(Runnable runnable) { try { mianLock.lock(); //線程池未滿,每加入一個任務則開啓一個線程 if (this.workThreadNum < this.threadNum) { MyThread myThread = new MyThread(runnable); myThread.start(); threads.add(myThread); this.workThreadNum++; } else { //線程池已滿,放入任務隊列,等待有空閒線程時執行 //隊列已滿,沒法添加時,拒絕任務 if (!taskQueue.offer(runnable)) { rejectTask(); } } } finally { mianLock.unlock(); } } private void rejectTask() { System.out.println("任務隊列已滿,沒法繼續添加,請擴大您的初始化線程池!"); } public static void main(String[] args) { MyThreadPool myThreadPool = new MyThreadPool(5); Runnable task = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"執行中"); } }; for (int i = 0; i < 20; i++) { myThreadPool.execute(task); } } class MyThread extends Thread { private Runnable task; public MyThread(Runnable runnable) { this.task = runnable; } @Override public void run() { //該線程一直啓動着,不斷從任務隊列取出任務執行 while(true) { //若是初始化任務不爲空,則執行初始化任務 if(task != null) { task.run(); task = null; }else { //不然去任務隊列取任務並執行 Runnable queueTask = taskQueue.poll(); if(queueTask != null) queueTask.run(); } } } } }