首先咱們要知道爲何要使用ThreadPoolExcuter,具體能夠看看文檔中的說明: 線程池能夠解決兩個不一樣問題:因爲減小了每一個任務的調用開銷,在執行大量的異步任務時,它一般可以提供更好的性能,而且還能夠提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每一個 ThreadPoolExecutor還維護着一些基本的統計數據,如完成的任務數。 線程池作的其實能夠看得很簡單,其實就是把你提交的任務(task)進行調度管理運行,但這個調度的過程以及其中的狀態控制是比較複雜的。java
能夠直接看最完整的ThreadPoolExcuter的初始化函數:緩存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
逐個介紹以下: **corePoolSize:**核心線程數,在ThreadPoolExcutor中有一個與它相關的配置:allowCoreThreadTimeOut(默認爲false),當allowCoreThreadTimeOut爲false時,核心線程會一直存活,哪怕是一直空閒着。而當allowCoreThreadTimeOut爲true時核心線程空閒時間超過keepAliveTime時會被回收。併發
**maximumPoolSize:**最大線程數,線程池能容納的最大線程數,當線程池中的線程達到最大時,此時添加任務將會採用拒絕策略,默認的拒絕策略是拋出一個運行時錯誤(RejectedExecutionException)。值得一提的是,當初始化時用的工做隊列爲LinkedBlockingDeque時,這個值將無效。異步
**keepAliveTime:**存活時間,當非核心空閒超過這個時間將被回收,同時空閒核心線程是否回收受allowCoreThreadTimeOut影響。ide
**unit:**keepAliveTime的單位。函數
**workQueue:**任務隊列,經常使用有三種隊列,即SynchronousQueue,LinkedBlockingDeque(無界隊列),ArrayBlockingQueue(有界隊列)。oop
**threadFactory:**線程工廠,ThreadFactory是一個接口,用來建立worker。經過線程工廠能夠對線程的一些屬性進行定製。默認直接新建線程。性能
**RejectedExecutionHandler:**也是一個接口,只有一個方法,當線程池中的資源已經所有使用,添加新線程被拒絕時,會調用RejectedExecutionHandler的rejectedExecution法。 默認是拋出一個運行時異常。ui
這麼多參數看起來好像很複雜,因此Java貼心得爲咱們準備了便捷的API,便可以直接用Executors建立各類線程池。分別是:this
//建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。 //經過設置corePoolSize爲0,而maximumPoolSize爲Integer.Max_VALUE(Int型數據最大值)實現。 ExecutorService cache = Executors.newCachedThreadPool(); //建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。 //經過將corePoolSize和maximumPoolSize的值設置爲同樣的值來實現。 ExecutorService fixed = Executors.newFixedThreadPool(num); //建立一個定長線程池,支持定時及週期性任務執行。 //經過將隊列參數workQueue設置爲DelayWorkQueue來實現。 ExecutorService schedule = Executors.newScheduledThreadPool(5); //建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。 //經過將corePoolSize和maximumPoolSize都設置爲1來實現。 ExecutorService single = Executors.newSingleThreadExecutor();
這幾個API會根據具體的狀況而使用預設定好默認的初始化參數去建立一個ThreadPoolExecutor。
這裏須要作一個額外說明,在ThreadPoolExcuter中,worker和task是有區別的,task是用戶提交的任務,而worker則是用來執行task的線程。在初始化參數中,corePoolSize和maximumPoolSize都是針對worker的,而workQueue是用來存放task的。
前面有介紹了一下worker和task的區別,其中task是用戶提交的線程任務,而worker則是ThreadPoolExecutor本身內部實現的一個類了。
具體源碼以下:
/** * Woker主要維護着運行task的worker的中斷控制信息,以及其餘小記錄。這個類拓展AbstractQueuedSynchronizer * 而來簡化獲取和釋放每個任務執行中的鎖。這能夠防止中斷那些打算喚醒正在等待其餘線程任務的任務,而不是 * 中斷正在運行的任務。咱們實現一個簡單的不可重入鎖而不是ReentrantLo,由於咱們不想當其調用setCorePoolSize * 這樣的方法的時候能得到鎖。 */ //worker主要是對進行中的任務進行中斷控制,順帶着對其餘行爲進行記錄 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ //正在跑的線程,若是是null標識factory失敗 final Thread thread; /** Initial task to run. Possibly null. */ //初始化一個任務以運行 Runnable firstTask; /** Per-thread task counter */ //每一個線程計數 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) * 用給定的first task和從threadFactory建立 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //主要調用了runWorker public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. //鎖方法 // protected boolean isHeldExclusively() { return getState() != 0; } //嘗試獲取鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker其實能夠看做高級一點的線程。其中繼承AbstractQueuedSynchronizer主要是爲了實現鎖控制。ThreadPoolExecutor會持有並管理Worker,在Worker中firstTask其實就是存放task的,而thread則是存放當前Worker自己的線程。
其中比較重要的就是run方法了,但這個方法其實又是去調用ThreadPoolExecutor裏面的runWorker()方法,具體能夠看下一節的介紹。
首先須要介紹線程池有五種運行狀態: RUNNING(狀態值-1): 接收新任務並處理隊列中的任務 SHUTDOWN(狀態值0): 不接收新任務但會處理隊列中的任務。 STOP(狀態值1): 不接收新任務,不處理隊列中的任務,並中斷正在處理的任務 TIDYING(狀態值2): 全部任務已終止,workerCount爲0,處於TIDYING狀態的線程將調用鉤子方法terminated()。 TERMINATED(狀態值3): terminated()方法完成。
而後咱們能夠看看ThreadPoolExcuter中的ctl這個變量。 ctl是ThreadPoolExcuter中比較有意思的一個實現,它是一個AtomicInteger,這裏不對AtomicInteger多作討論,只要知道能夠把它當作有原子性的Integer就夠了,其實它具備原子性的原理是使用了CAS的技術,這是一種樂觀鎖的具體實現。 ThreadPoolExcuter是將兩個內部值打包成一個值,即將workerCount和runState(運行狀態)這兩個值打包在一個ctl中,由於runState有5個值,須要3位,因此有3位表示 runState,而其餘29位表示爲workerCount。 而運行時要獲取其餘數據時,只須要對ctl進行拆包便可。具體這部分代碼以下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //拆包ctl,分別獲取runState和WorkerCount private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } //打包操做 private static int ctlOf(int rs, int wc) { return rs | wc; }
當執行器(Executor)處於終止狀態,或者執行器在max threads和工做隊列都是有界而且處於飽和的時候,新提交的任務會被拒絕。在任一狀況下,執行的任務將調用RejectedExecutionHandler的方法rejectedExecution(Runnable, ThreadPoolExecutor)。有如下四種拒絕策略:
1.默認的是ThreadPoolExecutor.AbortPolicy,在這種策略下,處理器會在拒絕後拋出一個運行異常RejectedExecutionException。
2.在ThreadPoolExecutor.CallerRunsPolicy的策略下,線程會調用它直接的execute來運行這個任務。這種方式提供簡單的反饋控制機制來減緩新任務提交的速度。
3.在ThreadPoolExecutor.DiscardPolicy策略下,沒法執行的任務將被簡單得刪除掉。
4.在ThreadPoolExecutor.DiscardOldestPolicy策略下,若是executor沒有處於終止狀態,在工做隊列頭的任務將被刪除,而後會從新執行(可能會再次失敗,這會致使重複這個過程)。
總結:本篇初步介紹了ThreadPoolExcuter的基本原理,解決了什麼問題。然後說明了ThreadPoolExcuter中的初始化參數,對其中的各個參數作初步介紹。再以後介紹ctl變量的做用,並初步介紹了任務提交失敗後的拒絕策略。