<!-- more -->java
本文原創地址,
個人博客
:
https://jsbintask.cn/2019/03/10/jdk/jdk8-threadpool/(食用效果最佳),轉載請註明出處!
在實際工做中,線程
是一個咱們常常要打交道的角色,它能夠幫咱們靈活利用資源,提高程序運行效率。可是咱們今天不是探討線程
!咱們今天來聊聊另外一個與線程息息相關的角色:線程池
.本篇文章的目的就是全方位的解析線程池的做用,以及jdk中的接口,實現以及原理,另外對於某些重要概念,將從源碼
的角度探討。tip:本文較長,建議先碼後看。
編程
首先咱們看一段建立線程而且運行的經常使用代碼:數組
for (int i = 0; i < 100; i++) { new Thread(() -> { System.out.println("run thread->" + Thread.currentThread().getName()); //to do something, send email, message, io operator, network... }).start(); }
上面的代碼很容易理解,咱們爲了異步,或者效率考慮,將某些耗時操做放入一個新線程去運行,可是這樣的代碼卻存在這樣的問題:安全
時間
,資源
,對於線程的銷燬一樣須要系統資源。手動建立執行線程存在以上問題,而線程池就是用來解決這些問題的。怎麼解決呢?咱們能夠先粗略的定義一下線程池:多線程
線程池是一組已經建立好的,一直在等待任務執行的線程的集合。
由於線程池中線程是已經建立好的,因此對於任務的執行不會消耗掉額外的資源,線程池中線程個數由咱們自定義添加,可相對於資源,資源任務作出調整,對於某些任務,若是線程池還沒有執行,可手動取消,線程任務變得可以管理!
因此,線程池的做用以下:併發
上面咱們已經知道了線程池的做用,而對於這樣一個好用,重要的工具,jdk
固然已經爲咱們提供了實現,這也是本篇文章的重點。
在jdk中,關於線程池的接口,類都定義在juc
(java.util.concurrent)包中,這是jdk專門爲咱們提供用於併發編程的包,固然,本篇文章咱們只介紹與線程池有關的接口和類,首先咱們看下重點要學習的接口和類:
如圖所示,咱們將一一講解這6個類的做用而且分析。異步
首先咱們須要瞭解就是Executor
接口,它有一個方法,定義以下:
Executor自jdk1.5引入,這個接口只有一個方法execute
聲明,它的做用以及定義以下:接收一個任務(Runnable
)而且執行。注意:同步執行仍是異步執行都可
!
由它的定義咱們就知道,它是一個線程池最基本的做用。可是在實際使用中,咱們經常使用的是另一個功能更多的子類ExecutorService
。工具
這個接口繼承自Executor,它的方法定義就豐富多了,能夠關閉,提交Future任務,批量提交任務,獲取執行結果等,咱們一一講解下每一個方法做用聲明:學習
void shutdown()
: 「優雅地」關閉線程池,爲何是「優雅地」呢?由於這個線程池在關閉前會先等待線程池中已經有的任務執行完成,通常會配合方法awaitTermination
一塊兒使用,調用該方法後,線程池中不能再加入新的任務。List<Runnable> shutdownNow();
: 「嘗試」終止正在執行的線程,返回在正在等待的任務列表,調用這個方法後,會調用正在執行線程的interrupt()
方法,因此若是正在執行的線程若是調用了sleep,join,await
等方法,會拋出InterruptedException
異常。boolean awaitTermination(long timeout, TimeUnit unit)
: 該方法是一個阻塞方法,參數分別爲時間和時間單位。這個方法通常配合上面兩個方法以後調用。若是先調用shutdown
方法,全部任務執行完成返回true,超時返回false,若是先調用的是shutdownNow
方法,正在執行的任務所有完成true,超時返回false。boolean isTerminated();
: 調用方法1或者2後,若是全部人物所有執行完畢則返回true,也就是說,就算全部任務執行完畢,可是不是先調用1或者2,也會返回false。<T> Future<T> submit(Callable<T> task);
: 提交一個可以返回結果的Callable
任務,返回任務結果抽象對象是Future
,調用Future.get()
方法能夠阻塞等待獲取執行結果,例如:result = exec.submit(aCallable).get();
,提交一個任務而且一直阻塞知道該任務執行完成獲取到返回結果。this
<T> Future<T> submit(Runnable task, T result);
: 提交一個Runnable
任務,執行成功後調用Future.get()方法返回的是result(這是什麼騷操做?)。Future<?> submit(Runnable task);
:和6不一樣的是調用Future.get()
方法返回的是null
(這又是什麼操做?)。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
: 提交一組任務,而且返回每一個任務執行結果的抽象對象List<Future<T>>
,Future做用同上,值得注意的是:當調用其中任一Future.isDone()
(判斷任務是否完成,正常,異常終止都算)方法時,必須等到全部任務都完成時才返回true,簡單說:所有任務完成纔算完成
。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
: 同方法8,多了一個時間參數,不一樣的是:若是超時,Future.isDone()一樣返回true。<T> T invokeAny(Collection<? extends Callable<T>> tasks)
:這個看名字和上面對比就容易理解了,返回第一個正常完成的任務地執行結果,後面沒有完成的任務將被取消。<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:同10相比,多了一個超時參數。不一樣的是:在超時時間內,一個任務都沒有完成,將拋出TimeoutException
。到如今,咱們已經知道了一個線程池基本的全部方法,知道了每一個方法的做用,接下來咱們就來看看具體實現,首先咱們研究下ExecutorService的具體實現抽象類:AbstractExecutorService
。
AbstractExecutorService
是一個抽象類,繼承自ExecutorService
,它實現了ExecutorService接口的submit, invokeAll, invokeAny
方法,主要用於將ExecutorService的公共實現封裝,方便子類更加方便使用,接下來咱們看看具體實現:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
execute方法交由子類實現了,這裏咱們主要分析newTaskFor
方法,看它是如何構建Future對象的:
首先,RunnableFuture
接口定義以下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
他就是Future和Runnable的組合,它的實現是FutureTask
:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; // ① try { for (Callable<T> t : tasks) { // ② RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); // ③ if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // ④ return futures; } finally { if (!done) // ⑤ for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
execute
方法添加每個任務。Future.cancel()
(實際是調用執行線程的interrupt
方法。上面代碼分析和咱們一開始講解ExecutorService
的invokeAll
一致。
invokeAny
實際調用doInvokeAny
:
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = // ① new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); futures.add(ecs.submit(it.next())); // ② --ntasks; int active = 1; for (;;) { Future<T> f = ecs.poll(); // ③ if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) break; else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else // ④ f = ecs.take(); } if (f != null) { // ⑤ --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { for (int i = 0, size = futures.size(); i < size; i++) // ⑥ futures.get(i).cancel(true); } }
ExecutorCompletionService
ecs,這個對象實際是一個任務執行結果阻塞隊列和線程池的結合,因此它能夠加入任務,執行任務,將任務執行結果加入阻塞隊列。上面代碼分析和咱們一開始講解ExecutorService
的invokeAny
一致。 到如今,咱們已經分析完了AbstractExecutorService
中的公共的方法,接下來就該研究最終的具體實現了:ThreadPoolExecutor
ThreadPoolExecutor
繼承自AbstractExecutorService
,它是線程池的具體實現:
咱們首先分析下構造方法:`public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)`。
corePoolSize
:核心線程數,maximumPoolSize
:線程池最大容許線程數,workQueue
:任務隊列,threadFactory
:線程建立工廠,handler
: 任務拒絕策,keepAliveTime, unit
:等待時長,它們的具體做用以下:提交一個task(Runnable)後(執行execute方法),檢查總線程數是否小於corePoolSize,小於等於則使用threadFactory直接建立一個線程執行任務,大於則再次檢查線程數量是否等於maximumPoolSize,等於則直接執行handler拒絕策略,小於則判斷workQueue是否已經滿了,沒滿則將任務加入等待線程執行,滿了則使用threadFactory建立新線程執行隊頭任務。
經過流程圖咱們知道每一個參數做用,這裏值得注意的是,若是咱們將某些參數特殊化,則能夠獲得特殊的線程池:
allowCoreThreadTimeOut
使這個參數對線程池中全部線程都有效果。RejectedExecutionException
,這是線程池中的默認實現最後,咱們再分析下ThreadPoolExecutor核心方法execute
:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // ① if (workerCountOf(c) < corePoolSize) { // ② if (addWorker(command, true)) return; c = ctl.get(); // ③ } if (isRunning(c) && workQueue.offer(command)) { // ④ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // ⑤ reject(command); else if (workerCountOf(recheck) == 0) // ⑥ addWorker(null, false); } else if (!addWorker(command, false)) // ⑦ reject(command); }
好了,到如今jdk中的線程池核心的實現,策略,分析咱們已經分析完成了。接下來我咱們就來看看關於線程池的另外的一些擴展,也就是圖上的剩下的接口和類:
ScheduledExecutorService
繼承自ExecutorService
,ExecutorService的分析上面咱們已經知道了,咱們來看看它擴展了哪些方法:
這個接口做爲線程池的定義主要增長了能夠定時執行任務
(執行一次)和按期執行任務
(重複執行),咱們來一一簡述下每一個方法的做用。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
: 這個方法用於定時執行任務command,延遲的時間爲delay*unit,它返回一個ScheduledFuture
對象用於獲取執行結果或者剩餘延時,調用Future.get()方法將阻塞當前線程最後返回null。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
:同上,不一樣的是,調用Future.get()方法將返回執行的結果,而不是null。public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,TimeUnit unit);
: 重複執行任務command,第一次執行時間爲initialDelay延遲後,之後的執行時間將在initialDelay + period * n
,unit表明時間單位,值得注意的是,若是某次執行出現異常,後面該任務就不會再執行。或者經過返回對象Future手動取消,後面也將再也不執行。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);
: 效果同上,不一樣點:若是command耗時爲 y,則上面的計算公式爲initialDelay + period * n + y
,也就是說,它的定時時間會加上任務耗時,而上面的方法則是一個固定的頻率,不會算上任務執行時間!這是它擴展的四個方法,其中須要注意的是scheduleAtFixedRate和scheduleWithFixedDelay的細微差異,最後,咱們來看下它的實現類:ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
繼承自ThreadPoolExecutor
類,實現了ScheduledExecutorService
接口,上面均已經分析。
它的構造器以下:
看起來比它的父類構造器簡潔,主要由於它的任務隊列workQueue是默認的(DelayedWorkQueue
),而且最大的線程數爲最大值。接着咱們看下DelayedWorkQueue實現:
它內部使用數組維護了一個二叉樹,提升了任務查找時間,而之因此ScheduledThreadPoolExecutor可以實現延時的關鍵也在於DelayedWorkQueue的take()
方法:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // ① RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
好了,到目前爲止jdk中關於線程池的6個核心類已經所有分析完畢了。接下來還有最後一個小問題,咱們手動建立線程池參數也太了,不論是ThreadPoolExecutor
仍是ScheduledThreadPoolExecutor
,這對於用戶來講彷佛並不太友好,固然,jdk已經想到了這個問題,因此,咱們最後再介紹一個建立這些線程池的工具類:Executors
:
它的主要工具方法以下:
比起手動建立,它幫咱們加了不少默認值,用起來固然就方便多了,好比說newFixedThreadPool
建立一個線程數固定的線程池,其實就是核心線程數等於最大線程數,和咱們一開始分析的結果同樣。
值得注意的是:爲了咱們的程序安全可控性考慮,咱們應該儘可能考慮手動建立線程池,知曉每個參數的做用,下降不穩定性!
本次,咱們首先從代碼出發,分析了線程池給咱們帶來的好處以及直接使用線程的弊端,接着引出了jdk中的已經實現了的線程池。而後重點分析了jdk中關於線程池的六個最重要的接口和類,而且從源碼角度講解了關鍵點實現,最後,處於方便考慮,咱們還知道jdk給咱們留了一個建立線程池的工具類,簡化了手動建立線程池的步驟。
真正作到了知其然,知其因此然
。
關注我,這裏只有乾貨!