Java是天生就支持併發的語言,支持併發意味着多線程,線程的頻繁建立在高併發及大數據量是很是消耗資源的,由於java提供了線程池。在jdk1.5之前的版本中,線程池的使用是及其簡陋的,可是在JDK1.5後,有了很大的改善。JDK1.5以後加入了java.util.concurrent包,java.util.concurrent包的加入給予開發人員開發併發程序以及解決併發問題很大的幫助。這篇文章主要介紹下併發包下的Executor接口,Executor接口雖然做爲一個很是舊的接口(JDK1.5 2004年發佈),可是不少程序員對於其中的一些原理仍是不熟悉,所以寫這篇文章來介紹下Executor接口,同時鞏固下本身的知識。若是文章中有出現錯誤,歡迎你們指出。java
對於數據庫鏈接,咱們常常聽到數據庫鏈接池這個概念。由於創建數據庫鏈接時很是耗時的一個操做,其中涉及到網絡IO的一些操做。所以就想出把鏈接經過一個鏈接池來管理。須要鏈接的話,就從鏈接池裏取一個。當使用完了,就「關閉」鏈接,這不是正在乎義上的關閉,只是把鏈接放回到咱們的池裏,供其餘人在使用。因此對於線程,也有了線程池這個概念,其中的原理和數據庫鏈接池是差很少的,所以java jdk中也提供了線程池的功能。程序員
線程池的做用:線程池就是限制系統中使用線程的數量以及更好的使用線程數據庫
根據系統的運行狀況,能夠自動或手動設置線程數量,達到運行的最佳效果:配置少了,將影響系統的執行效率,配置多了,又會浪費系統的資源。用線程池配置數量,其餘線程排隊等候。當一個任務執行完畢後,就從隊列中取一個新任務運行,若是沒有新任務,那麼這個線程將等待。若是來了一個新任務,可是沒有空閒線程的話,那麼把任務加入到等待隊列中。緩存
爲何要用線程池:網絡
Java裏面線程池的頂級接口是Executor,可是嚴格意義上講Executor並非一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。Executors類,提供了一系列工廠方法用於創先線程池,返回的線程池都實現了ExecutorService接口。多線程
比較重要的幾個類:併發
ExecutorService高併發 |
真正的線程池接口。工具 |
ScheduledExecutorService大數據 |
能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。 |
ThreadPoolExecutor |
ExecutorService的默認實現。 |
ScheduledThreadPoolExecutor |
繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,週期性任務調度的類實現。 |
要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的,所以在Executors類裏面提供了一些靜態工廠,生成一些經常使用的線程池。
1. newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。
2. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,在提交新任務,任務將會進入等待隊列中等待。若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
3. newCachedThreadPool
public static ExecutorService newCachedThreadPool() public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,
那麼就會回收部分空閒(60秒處於等待任務到來)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池的最大值是Integer的最大值(2^31-1)。
4. newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
實例:
注:使用了java8的lambda表達式以及stream
1.newSingleThreadExecutor
public class SingleThreadExecutorTest { public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
IntStream.range(0, 5).forEach(i -> executor.execute(() -> { String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); })); try { //close pool executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (!executor.isTerminated()) { executor.shutdownNow(); } } } }
輸出結果:
finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-1
線程名都同樣,說明是同一個線程
2.newFixedThreadPool
public class FixedThreadExecutorTest { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); IntStream.range(0, 6).forEach(i -> executor.execute(() -> { try { TimeUnit.SECONDS.sleep(1); String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); } catch (InterruptedException e) { e.printStackTrace(); } })); //close pool 同上... } }
輸出結果:
finished: pool-1-thread-2 finished: pool-1-thread-3 finished: pool-1-thread-1 finished: pool-1-thread-1 finished: pool-1-thread-2 finished: pool-1-thread-3
只建立了三個線程
3.newCachedThreadPool
public class CachedThreadExecutorTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); IntStream.range(0, 6).forEach(i -> executor.execute(() -> { try { TimeUnit.SECONDS.sleep(1); String threadName = Thread.currentThread().getName(); System.out.println("finished: " + threadName); } catch (InterruptedException e) { e.printStackTrace(); } }));
//close pool 同上... } }
輸出結果:
finished: pool-1-thread-1 finished: pool-1-thread-2 finished: pool-1-thread-3 finished: pool-1-thread-6 finished: pool-1-thread-5 finished: pool-1-thread-4
建立了6個線程
4.newScheduledThreadPool
public class ScheduledThreadExecutorTest { public static void main(String[] args) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); executor.scheduleAtFixedRate(() -> System.out.println(System.currentTimeMillis()), 1000, 2000, TimeUnit.MILLISECONDS); //close pool 同上 } }
輸出結果:
1457967177735 1457967179736 1457967181735
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。下面咱們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); ... }
從上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。
ThreadPoolExecutor的完整構造方法的簽名是:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小時 TimeUnit.MINUTES; //分鐘 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //納秒
ArrayBlockingQueue; //有界隊列 LinkedBlockingQueue; //無界隊列 SynchronousQueue; //特殊的一個隊列,只有存在等待取出的線程時才能加入隊列,能夠說容量爲0,是無界隊列
ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
ThreadPoolExecutor是Executors類的底層實現。
在JDK幫助文檔中,有如此一段話:
「強烈建議程序員使用較爲方便的 Executors
工廠方法 Executors.newCachedThreadPool()
(無界線程池,能夠進行自動線程回收)、Executors.newFixedThreadPool(int)
(固定大小線程池)Executors.newSingleThreadExecutor()
(單個後臺線程)
它們均爲大多數使用場景預約義了設置。」
下面介紹一下幾個類的源碼:
ExecutorService newFixedThreadPool (int nThreads):固定大小線程池。
能夠看到,corePoolSize和maximumPoolSize的大小是同樣的(實際上,後面會介紹,若是使用無界queue的話 maximumPoolSize參數是沒有意義的),keepAliveTime和unit的設值代表什麼?-就是該實現不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特色,他是無界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService newSingleThreadExecutor():單線程
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService newCachedThreadPool():無界線程池,能夠進行自動線程回收
這個實現就有意思了。首先是無界的線程池,因此咱們能夠發現maximumPoolSize爲big big。其次BlockingQueue的選擇上使用SynchronousQueue。可能對於該BlockingQueue有些陌生,簡單說:該 QUEUE中,每一個插入操做必須等待另外一個線程的對應移除操做。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從上面給出的ThreadPoolExecutor類的代碼能夠知道,ThreadPoolExecutor繼承了AbstractExecutorService,AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。
public abstract class AbstractExecutorService implements ExecutorService
而ExecutorService又是繼承了Executor接口
public interface ExecutorService extends Executor
咱們看一下Executor接口的實現:
public interface Executor { void execute(Runnable command); }
到這裏,你們應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。
Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;
而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法;
而後ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個很是重要的方法:
public void execute(Runnable command) public <T> Future<T> submit(Callable<T> task) public void shutdown() public List<Runnable> shutdownNow() //返回未執行的任務
execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中 並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的 實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
shutdown()和shutdownNow()是用來關閉線程池的。