若採用"爲每一個任務分配一個線程"的方式會存在一些缺陷,尤爲是當須要建立大量線程時:java
任務是一組邏輯工做單元,線程則是使任務異步執行的機制。當存在大量併發任務時,建立、銷燬線程須要很大的開銷,運用線程池能夠大大減少開銷。緩存
說明:併發
線程池是一個複雜的任務調度工具,它涉及到任務、線程池等的生命週期問題。要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的。框架
JDK中的線程池均由ThreadPoolExecutor類實現。其構造方法以下:less
參數說明:異步
corePoolSize:核心線程數。ide
maximumPoolSize:最大線程數。高併發
keepAliveTime:線程存活時間。當線程數大於core數,那麼超過該時間的線程將會被終結。工具
unit:keepAliveTime的單位。java.util.concurrent.TimeUnit類存在靜態靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDSthis
workQueue:Runnable的阻塞隊列。若線程池已經被佔滿,則該隊列用於存放沒法再放入線程池中的Runnable。
另外一個構造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
該方法在下面的擴展部分有更深刻的講解。其中handler表示線程池對拒絕任務的處理策略。
ThreadPoolExecutor的使用須要注意如下概念:
JDK內部提供了五種最多見的線程池。由Executors類的五個靜態工廠方法建立。
這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。
返回單線程的Executor,將多個任務交給此Exector時,這個線程處理完一個任務後接着處理下一個任務,若該線程出現異常,將會有一個新的線程來替代。此線程池保證全部任務的執行順序按照任務的提交順序執行。
說明:LinkedBlockingQueue會無限的添加須要執行的Runnable。
每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。
public static ExecutorSevice newFixedThreadPool()
返回一個包含指定數目線程的線程池,若是任務數量多於線程數目,那麼沒有沒有執行的任務必須等待,直到有任務完成爲止。
若是線程池的大小超過了處理任務所須要的線程,那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。
newCachedThreadPool方法建立的線程池能夠自動的擴展線程池的容量。核心線程數量爲0。
SynchronousQueue是個特殊的隊列。 SynchronousQueue隊列的容量爲0。當試圖爲SynchronousQueue添加Runnable,則執行會失敗。只有當一邊從SynchronousQueue取數據,一邊向SynchronousQueue添加數據才能夠成功。SynchronousQueue僅僅起到數據交換的做用,並不保存線程。但newCachedThreadPool()方法沒有線程上限。Runable添加到SynchronousQueue會被馬上取出。
根據用戶的任務數建立相應的線程來處理,該線程池不會對線程數目加以限制,徹底依賴於JVM能建立線程的數量,可能引發內存不足。
建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
例:
public class ScheduledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); } }, 1000, 2000, TimeUnit.MILLISECONDS); } }
此線程池支持定時以及週期性執行任務的需求。
Executor是一個線程執行接口。任務執行的主要抽象不是Thead,而是Executor。
public interface Executor{ void executor(Runnable command); }
Executor將任務的提交過程與執行過程解耦,並用Runnable來表示任務。執行的任務放入run方法中便可,將Runnable接口的實現類交給線程池的execute方法,做爲它的一個參數。若是須要給任務傳遞參數,能夠經過建立一個Runnable接口的實現類來完成。
Executor能夠支持多種不一樣類型的任務執行策略。
Executor基於生產者消費者模式,提交任務的操做至關於生產者,執行任務的線程則至關於消費者。
ExecutorService接口
線程池接口。ExecutorService在Executor的基礎上增長了一些方法,其中有兩個核心的方法:
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
這兩個方法都是向線程池中提交任務,它們的區別在於Runnable在執行完畢後沒有結果,Callable執行完畢後有一個結果。這在多個線程中傳遞狀態和結果是很是有用的。另外他們的相同點在於都返回一個Future對象。Future對象能夠阻塞線程直到運行完畢(獲取結果,若是有的話),也能夠取消任務執行,固然也可以檢測任務是否被取消或者是否執行完畢。
在沒有Future以前咱們檢測一個線程是否執行完畢一般使用Thread.join()或者用一個死循環加狀態位來描述線程執行完畢。如今有了更好的方法可以阻塞線程,檢測任務執行完畢甚至取消執行中或者未開始執行的任務。
ScheduledExecutorService接口
ScheduledExecutorService描述的功能和Timer/TimerTask相似,解決那些須要任務重複執行的問題。這包括延遲時間一次性執行、延遲時間週期性執行以及固定延遲時間週期性執行等。固然了繼承ExecutorService的ScheduledExecutorService擁有ExecutorService的所有特性。
線程是有多種執行狀態的,一樣管理線程的線程池也有多種狀態。JVM會在全部線程(非後臺daemon線程)所有終止後才退出,爲了節省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候沒法正確的關閉線程池,將會阻止JVM的結束。
線程池Executor是異步的執行任務,所以任什麼時候刻不可以直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。所以關閉線程池可能出現一下幾種狀況:
另外關閉線程池後對於任務的狀態應該有相應的反饋信息。
啓動線程池
線程池在構造前(new操做)是初始狀態,一旦構造完成線程池就進入了執行狀態RUNNING。嚴格意義上講線程池構造完成後並無線程被當即啓動,只有進行"預啓動"或者接收到任務的時候纔會啓動線程。
線程池是處於運行狀態,隨時準備接受任務來執行。
關閉線程池
線程池運行中能夠經過shutdown()和shutdownNow()來改變運行狀態。
線程池結束
一旦shutdown()或者shutdownNow()執行完畢,線程池就進入TERMINATED狀態,即線程池就結束了。
例:使用固定大小的線程池。並將任務添加到線程池。
import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; public class JavaThreadPool { public static void main(String[] args) { // 建立一個可重用固定線程數的線程池 ExecutorService pool = Executors.newFixedThreadPool(2); // 建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口 Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); // 將線程放入池中進行執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); // 關閉線程池 pool.shutdown(); } } class MyThread extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + "正在執行。。。"); } }
ThreadPoolExecutor中定義了三個空方法,用於監控線程的執行狀況。
ThreadPoolExecutor源碼:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
例:使用覆蓋方法,定義新的線程池。
public class ExtThreadPoolTest { static class MyTask implements Runnable { public String name; public MyTask(String name) { super(); this.name = name; } @Override public void run() { try { Thread.sleep(500); System.out.println("執行中:"+this.name); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行:" + ((MyTask)r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執行完成:" + ((MyTask)r).name); } @Override protected void terminated() { System.out.println("執行退出"); } }; for(int i=0;i<5;i++){ MyTask task = new MyTask("Task-"+i); es.execute(task); } Thread.sleep(10); // 等待terminated()執行 es.shutdown(); // 若無該方法,主線程不會結束。 } }
線程池不可能處理無限多的線程。因此一旦線程池中中須要執行的任務過多,線程池對於某些任務就沒法處理了。拒絕策略即對這些沒法處理的任務進行處理。可能丟棄掉這些不能處理的任務,也可能用其餘方式。
ThreadPoolExecutor類還有另外一個構造方法。該構造方法中的RejectedExecutionHandler 用於定義拒絕策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ..... }
JDK內部已經提供一些拒絕策略。
AbortPolicy 一旦線程不能處理,則拋出異常。
AbortPolicy源碼:
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy 一旦線程不能處理,則丟棄任務。
DiscardPolicy源碼:
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
CallerRunsPolicy 一旦線程不能處理,則將任務返回給提交任務的線程處理。
CallerRunsPolicy源碼:
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardOldestPolicy 一旦線程不能處理,丟棄掉隊列中最老的任務。
DiscardOldestPolicy源碼:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
例:自定義拒絕策略。打印並丟棄沒法處理的任務。
public class RejectedPolicyHandleTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 打印並丟棄。 System.out.println(r.toString()+" is discard"); } }); for(int i=0;i<Integer.MAX_VALUE;i++){ MyTask task = new MyTask("Task-"+i); es.execute(task); Thread.sleep(10); } es.shutdown(); // 若無該方法,主線程不會結束。 } }
ThreadPoolExecutor類構造器的參數其中之一即爲ThreadFactory線程工廠。
ThreadFactory用於建立線程池中的線程。
public interface ThreadFactory { Thread newThread(Runnable r); }
ThreadFactory的實現類中通常定義線程了線程組,線程數與線程名稱。
DefaultThreadFactory源碼:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
這裏須要稍微提一下的是CompletionService接口,它是用於描述順序獲取執行結果的一個線程池包裝器。它依賴一個具體的線程池調度,可是可以根據任務的執行前後順序獲得執行結果,這在某些狀況下可能提升併發效率。