我的博客項目地址java
但願各位幫忙點個star,給我加個小星星✨git
在java中,使用線程時經過new Thread實現很簡單,可是若是併發數量不少時,頻繁地建立線程就會大大下降系統的效率。github
因此能夠經過線程池,使得線程能夠複用,每執行完一個任務,並非被銷燬,而是能夠繼續執行其餘任務。編程
花了兩天時間去看了高洪巖寫的《JAVA併發編程》,是想要知其然,知其因此然,在使用的狀況下,瞭解學習了一下原理記錄下java.util.concurrent併發包下的ThreadPoolExecutor特性和實現緩存
粗暴點,咱們直接看如何使用吧多線程
簡單舉個🌰:
Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //建立容量爲1的緩衝池
Executors.newFixedThreadPool(int); //建立固定容量大小的緩衝池
具體實現邏輯:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
經過該Executors的靜態方法進行線程池的建立,並且從具體實現來看,仍是調用了new ThreadPoolExecutor(),只是內部參數已經幫咱們配置好了。併發
既然真正實現都是用ThreadPoolExecutor,那就本身設定好方法的參數吧。ide
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.HOURS, new LinkedBlockingDeque<>());
for(int i=0;i<10;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
executor.getQueue().size()+",已執行完別的任務數目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
static class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在執行task "+taskNum);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"執行完畢");
}
}
複製代碼
打印效果以下:函數
正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
task 2執行完畢
task 0執行完畢
task 3執行完畢
task 1執行完畢
正在執行task 8
task 4執行完畢
正在執行task 7
正在執行task 6
正在執行task 5
正在執行task 9
task 8執行完畢
task 6執行完畢
task 7執行完畢
task 5執行完畢
task 9執行完畢
複製代碼
任務Task提交以後,因爲是多線程狀態下,因此打印效果並非同步的,能夠看出任務都已經順利執行。學習
我這個實現參數是5個corePoolSize核心線程數和5個maximumPoolSize最大線程數,當線程池中的線程數超過5個的時候,將新來的任務放進緩存隊列中,小夥伴能夠試下把任務數(for循環的個數)提升一點,讓緩存等待的任務數超過5個,看看默認的任務拒絕策略(AbortPolicy)會拋出什麼錯誤hhh
下面來看看ThreadPoolExecutor的廬山真面目吧~
它有如下四個構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
從構造方法能夠看出,前三個方法最終都是調用第四個構造器進行初始化工做的。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
複製代碼
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
複製代碼
從源碼構造函數能夠看到,不傳參數的時候,默認阻塞隊列中的大小是Integer.MAX_VALUE;
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
複製代碼
Array和Linked在傳入大小小於0時將會報錯,比較經常使用的是LinkedBlockingDeque和SynchronousQueue,線程池的排隊策略與BlockingQueue有關。
主要用來建立線程,能夠在newThread()方法中自定義線程名字和設置線程異常狀況的處理邏輯。
舉個🌰:
static class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread();
thread.setName("JingQ" + new Date());
thread.setUncaughtExceptionHandler((t, e) -> {
doSomething();
e.printStackTrace();
});
return thread;
}
}
複製代碼
有如下四種:
能夠看出,實際上ThreadPoolExecutor是繼承了AbstractExecutorService類和引用了ExecutorService、Executor接口。
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
複製代碼
AbstarctExecutorService是一個抽象類,它實現的是ExecutorService接口
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼
接口ExecutorService引用了Executor接口,Executor接口比較簡單,只有一個execute方法定義
public interface Executor {
void execute(Runnable command);
}
複製代碼
小結:
Executor是一個頂級接口,定義了一個execute方法,返回值爲空,參數爲Runnable。
ExecutorService繼承了Executor而且定義了其它一些方法,結果以下圖:
抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法。
最後ThreadPoolExecutor繼承了AbstractExecutorService,咱們最經常使用到它兩個方法,submit和execute,下面介紹一下這二者:
execute():
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * (如下是我的渣翻譯,有誤請輕噴~) * 有如下三步流程: * * 1. 若是少於核心池大小的線程正在運行, * 那麼嘗試以給定的命令做爲它的第一個任務啓動一個新線程。 * 調用添加worker原子性檢查運行狀態和workder的數量, * 這樣能夠防止錯誤警報在不該該返回的狀況下添加線程,返回false。 * * 2. 若是一個任務能夠成功地排隊,那麼咱們仍然須要再次檢查是否應該添加一個線程 * (由於現有的線程在上次檢查後死亡),或者是在該方法進入後關閉了池。 * 所以,咱們從新檢查狀態,若是必要的話,若是中止的話,須要回滾隊列。 * 若是沒有新的線程,就去啓動它 * * 3. 若是咱們不能排隊任務,那麼咱們嘗試添加一個新線程。 * 若是失敗了,咱們知道任務隊列已經被關閉或飽和,因此拒絕這個任務。 */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
複製代碼
submit:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
execute()方法在ThreadPoolExecutor中進行了重寫,submit()方法是在AbstractExecutorService實現的,ThreadPoolExecutor並無重寫,而且execute方法是沒有返回結果的,submit的返回類型是Future,可以得到任務的結果,可是實際執行的仍是execute方法。
固然,還有例如shutdown、getQueue、getActiveCount、getPoolSize等方法沒有介紹到,推薦胖友們打開IDE進行查看吧~
ps:關於線程池的原理並未深刻記錄,有關它的任務拒絕策略、線程初始化、ThreadPoolExecutor構造以後,當任務超過設定值,它的執行策略等原理都值得去深刻學習,下回記錄~