Java的線程既是工做單元,也是執行機制。從JDK5開始,把工做機單元和執行機制分離開來。工做單元包括Runnable和Callable,而執行機制由Executor框架提供。html
在上層,Java多線程程序一般把應用分解爲若干個任務,而後使用用戶級的調度器(Executor框架)將這些任務映射爲固定數量的線程。java
在底層,操做系統內核將這些線程映射到硬件處理器上。小程序
Executor框架主要由3部分組成:服務器
Executor框架的成員及其關係能夠用一下的關係圖表示:多線程
Executor框架的使用示意圖:框架
使用步驟:異步
Executors.callable(Runnable task)
或Executors.callable(Runnable task, Object result)
)。import java.util.concurrent.*; public class ExecutorDemo { // 建立ThreadPoolExecutor實現類 private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), ); public static void main(String[] args) { // 採用submit()方法提交Callable對象並返回Future對象 Future<String> future = executor.submit(new callableDemo()); try { // get()方法獲取返回值 System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { // 處理異常 e.printStackTrace(); } finally { // 關閉線程池 executor.shutdown(); } } } /** * 建立Callable接口的實現類 */ class callableDemo implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(1000); String s = "return string"; return s; } }
直接建立ThreadPoolExecutor的實例對象,見https://www.cnblogs.com/chiaki/p/13536624.htmlide
ThreadPoolExecutor一般使用工廠類Executors建立,能夠建立3種類型的ThreadPoolExecutor,即FixedThreadPool、SingleThreadExecutor以及CachedThreadPool。工具
FixedThreadPool:適用於爲了知足資源管理的需求,而須要限制當先線程數量的應用場景,適用於負載比較重的服務器。ui
public static ExecutorService es = Executors.newFixedThreadPool(int threadNums); public static ExecutorService es = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
把線程池最大線程數量maxmumPoolSize和核心線程池的數量corePoolSize設置爲threadNums
,將參數keepAliveTime設置爲0L
。使用無界隊列LinkedBlockingQueue
做爲阻塞隊列,所以當任務不能馬上執行時,都會添加到阻塞隊列中,並且maximumPoolSize,keepAliveTime都是無效的。
SingleThreadExecutor:適用於須要保證順序地執行各個任務;而且在任意時間點,不會有多個線程是活動地應用場景。**
public static ExecutorService es = Executors.newSingleThreadExecutor(); public static ExecutorService es = Executors.newSingleThreadExecutor(ThreadFactory threadFactory);
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
由於阻塞隊列使用的是LinkedBlockingQueue
,所以和FixedThreadPool同樣,參數maximumPoolSize以及keepAliveTime都是無效的。corePoolSize爲1
,所以最多隻能建立一個線程。
CachedThreadPool:大小無界的線程池,適用於執行不少的短時間異步任務的小程序,或者是負載較輕的服務器。
public static ExecutorService es = Executors.newCachedThreadPool(); public static ExecutorService es = Executors.newCachedThreadPool(ThreadFactory threadFactory);
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CachedThreadPool使用SynchronizedQueue
做爲阻塞隊列,SynchronizedQueue是不存儲元素的阻塞隊列,實現「一對一的交付」,也就是說,每次向隊列中put一個任務必須等有線程來take這個任務,不然就會一直阻塞該任務,若是一個線程要take一個任務就要一直阻塞知道有任務被put進阻塞隊列。
由於CachedThreadPool的maximumPoolSize爲Integer.MUX_VALUE
,所以CachedThreadPool是無界的線程池,也就是說能夠一直不斷的建立線程,這樣可能會使CPU和內存資源耗盡。corePoolSize爲0
,所以在CachedThreadPool中直接經過阻塞隊列來進行任務的提交。
ScheduledThreadPoolExecutor類繼承了ThreadPoolExecutor並實現了ScheduledExecutorService接口。主要用於在給定的延遲後執行任務或者按期執行任務。
ScheduledThreadPoolExecutor一般使用Executors工廠類來建立,可建立2種類型的ScheduledThreadPoolExecutor,即ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。
ScheduledThreadPoolExecutor:適用於若干個(固定)線程延時或者按期執行任務,同時爲了知足資源管理的需求而須要限制後臺線程數量的場景。
public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums); public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory);
SingleThreadScheduledExecutor:適用於須要單個線程延時或者按期的執行任務,同時須要保證各個任務順序執行的應用場景。
public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums); public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);
ScheduledThreadPoolExecutor的實現:
ScheduledThreadPoolExecutor的實現主要是經過把任務封裝爲ScheduledFutureTask來實現。經過調用scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞隊列添加一個實現了RunnableScheduledFutureTask接口的ScheduledFutureTask類對象。ScheduledFutureTask主要包括3個成員變量:
// 序列號,用於保存任務添加到阻塞隊列的順序 private final long sequenceNumber; // 用於保存該任務將要被執行的具體時間 private long time; // 週期,用於保存任務直線的間隔週期 private final long period;
ScheduledTreadPoolExecutor的阻塞隊列是用無界隊列DelayQueue
實現的,能夠實現元素延時delayTime後才能獲取元素,在ScheduledThreadPoolExecutor中,DelayQueue內部封裝了一個PriorityQueue,來對任務進行排序,首先對time排序,time小的在前,若是time同樣,則sequence小的在前,也就是說若是time同樣,那麼先被提交的任務先執行。
由於DelayQueue是一個無界的隊列,所以線程池的maximumPoolSize是無效的。ScheduledThreadPoolExecutor的工做流程大體以下:
Future接口和實現Future接口的FutureTask實現類,表明異步計算的結果。
FutureTask除了實現Future接口外還實現了Runnable接口。所以,FutureTask能夠交給Executor執行,也能夠條用線程直接執行(FutureTask.run()
)。根據FutureTask.run()方法被執行的時機,FutureTask可處於如下3種狀態:
FutureTask.cancel()
),或者執行FutureTask.run()時拋出異常而異常結束;狀態遷移示意圖:
FutureTask的get和cancle執行示意圖:
FutureTask是一個基於AQS同步隊列實現的一個自定義同步組件,經過對同步狀態state的競爭實現acquire或者release操做。
FutureTask的內部類Sync實現了AQS接口,經過對tryAcquire等抽象方法的重寫和模板方法的調用來實現內部類Sync的tryAcquireShared等方法,而後聚合Sync的方法來實現FutureTask的get和cancel等方法。
FutureTask的設計示意圖:
FutureTask的get方法最終會調用AQS.acquireSharedInterruptibly(int arg)
方法:
AQS.acquireSharedInterruptibly(int arg)
方法會首先調用tryAcquireShared()
方法判斷acquire操做是否能夠成功,能夠成功的條件是state爲執行完成狀態RAN或者已取消狀態CANCELLED,且runner不爲null;get()
方法當即返回,若是失敗則到線程等待隊列執行release操做;FutureTask.run()
或FutureTask.cancle(...)
),當前線程再次執行tryAcquireShared()
將返回正值1,當前線程離開現場等待隊列並喚醒它的後繼線程(級聯喚醒);用於實現線程要執行的工做單元。
提供了常見配置線程池的方法,由於ThreadPoolExecutor的參數衆多且意義重大,爲了不配置出錯,纔有了Executors工廠類。
FixedThreadPool和SingleThreadExecutor:容許請求的隊列長度爲Integer.MAX_VALUE(無界的阻塞隊列),可能堆積大量的請求,從而致使OOM。
CachedThreadPool和ScheduledThreadPool:容許建立的線程數量爲Integer.MAX_VALUE(無界的阻塞隊列),可能會建立大量線程,從而致使OOM。