ThreadPoolExecutor源碼解析(一)

1.ThreadPoolExcuter原理說明

  首先咱們要知道爲何要使用ThreadPoolExcuter,具體能夠看看文檔中的說明:   線程池能夠解決兩個不一樣問題:因爲減小了每一個任務的調用開銷,在執行大量的異步任務時,它一般可以提供更好的性能,而且還能夠提供綁定和管理資源(包括執行集合任務時使用的線程)的方法。每一個 ThreadPoolExecutor還維護着一些基本的統計數據,如完成的任務數。   線程池作的其實能夠看得很簡單,其實就是把你提交的任務(task)進行調度管理運行,但這個調度的過程以及其中的狀態控制是比較複雜的。java

2.初始化參數介紹

能夠直接看最完整的ThreadPoolExcuter的初始化函數:緩存

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}

逐個介紹以下: **corePoolSize:**核心線程數,在ThreadPoolExcutor中有一個與它相關的配置:allowCoreThreadTimeOut(默認爲false),當allowCoreThreadTimeOut爲false時,核心線程會一直存活,哪怕是一直空閒着。而當allowCoreThreadTimeOut爲true時核心線程空閒時間超過keepAliveTime時會被回收。併發

**maximumPoolSize:**最大線程數,線程池能容納的最大線程數,當線程池中的線程達到最大時,此時添加任務將會採用拒絕策略,默認的拒絕策略是拋出一個運行時錯誤(RejectedExecutionException)。值得一提的是,當初始化時用的工做隊列爲LinkedBlockingDeque時,這個值將無效。異步

**keepAliveTime:**存活時間,當非核心空閒超過這個時間將被回收,同時空閒核心線程是否回收受allowCoreThreadTimeOut影響。ide

**unit:**keepAliveTime的單位。函數

**workQueue:**任務隊列,經常使用有三種隊列,即SynchronousQueue,LinkedBlockingDeque(無界隊列),ArrayBlockingQueue(有界隊列)。oop

**threadFactory:**線程工廠,ThreadFactory是一個接口,用來建立worker。經過線程工廠能夠對線程的一些屬性進行定製。默認直接新建線程。性能

**RejectedExecutionHandler:**也是一個接口,只有一個方法,當線程池中的資源已經所有使用,添加新線程被拒絕時,會調用RejectedExecutionHandler的rejectedExecution法。 默認是拋出一個運行時異常。ui

  這麼多參數看起來好像很複雜,因此Java貼心得爲咱們準備了便捷的API,便可以直接用Executors建立各類線程池。分別是:this

//建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
//經過設置corePoolSize爲0,而maximumPoolSize爲Integer.Max_VALUE(Int型數據最大值)實現。
ExecutorService cache = Executors.newCachedThreadPool();
//建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
//經過將corePoolSize和maximumPoolSize的值設置爲同樣的值來實現。
ExecutorService fixed = Executors.newFixedThreadPool(num);
//建立一個定長線程池,支持定時及週期性任務執行。
//經過將隊列參數workQueue設置爲DelayWorkQueue來實現。
ExecutorService schedule = Executors.newScheduledThreadPool(5);
//建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
 //經過將corePoolSize和maximumPoolSize都設置爲1來實現。
ExecutorService single = Executors.newSingleThreadExecutor();

   這幾個API會根據具體的狀況而使用預設定好默認的初始化參數去建立一個ThreadPoolExecutor。

  這裏須要作一個額外說明,在ThreadPoolExcuter中,worker和task是有區別的,task是用戶提交的任務,而worker則是用來執行task的線程。在初始化參數中,corePoolSize和maximumPoolSize都是針對worker的,而workQueue是用來存放task的。

3.worker介紹

  前面有介紹了一下worker和task的區別,其中task是用戶提交的線程任務,而worker則是ThreadPoolExecutor本身內部實現的一個類了。

  具體源碼以下:

 /**
     * Woker主要維護着運行task的worker的中斷控制信息,以及其餘小記錄。這個類拓展AbstractQueuedSynchronizer
     * 而來簡化獲取和釋放每個任務執行中的鎖。這能夠防止中斷那些打算喚醒正在等待其餘線程任務的任務,而不是
     * 中斷正在運行的任務。咱們實現一個簡單的不可重入鎖而不是ReentrantLo,由於咱們不想當其調用setCorePoolSize
     * 這樣的方法的時候能得到鎖。
     */
    //worker主要是對進行中的任務進行中斷控制,順帶着對其餘行爲進行記錄
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        //正在跑的線程,若是是null標識factory失敗
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        //初始化一個任務以運行
        Runnable firstTask;
        /** Per-thread task counter */
        //每一個線程計數
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         * 用給定的first task和從threadFactory建立
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }


        /** Delegates main run loop to outer runWorker  */
        //主要調用了runWorker
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        //鎖方法

        //
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        //嘗試獲取鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

   Worker其實能夠看做高級一點的線程。其中繼承AbstractQueuedSynchronizer主要是爲了實現鎖控制。ThreadPoolExecutor會持有並管理Worker,在Worker中firstTask其實就是存放task的,而thread則是存放當前Worker自己的線程。

其中比較重要的就是run方法了,但這個方法其實又是去調用ThreadPoolExecutor裏面的runWorker()方法,具體能夠看下一節的介紹。

4.ctl介紹以及運行狀態說明

  首先須要介紹線程池有五種運行狀態: RUNNING(狀態值-1): 接收新任務並處理隊列中的任務 SHUTDOWN(狀態值0): 不接收新任務但會處理隊列中的任務。 STOP(狀態值1): 不接收新任務,不處理隊列中的任務,並中斷正在處理的任務 TIDYING(狀態值2): 全部任務已終止,workerCount爲0,處於TIDYING狀態的線程將調用鉤子方法terminated()。 TERMINATED(狀態值3): terminated()方法完成。

  而後咱們能夠看看ThreadPoolExcuter中的ctl這個變量。   ctl是ThreadPoolExcuter中比較有意思的一個實現,它是一個AtomicInteger,這裏不對AtomicInteger多作討論,只要知道能夠把它當作有原子性的Integer就夠了,其實它具備原子性的原理是使用了CAS的技術,這是一種樂觀鎖的具體實現。   ThreadPoolExcuter是將兩個內部值打包成一個值,即將workerCount和runState(運行狀態)這兩個值打包在一個ctl中,由於runState有5個值,須要3位,因此有3位表示 runState,而其餘29位表示爲workerCount。   而運行時要獲取其餘數據時,只須要對ctl進行拆包便可。具體這部分代碼以下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl  
  //拆包ctl,分別獲取runState和WorkerCount
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
  //打包操做
    private static int ctlOf(int rs, int wc) { return rs | wc; }

5.拒絕策略

當執行器(Executor)處於終止狀態,或者執行器在max threads和工做隊列都是有界而且處於飽和的時候,新提交的任務會被拒絕。在任一狀況下,執行的任務將調用RejectedExecutionHandler的方法rejectedExecution(Runnable, ThreadPoolExecutor)。有如下四種拒絕策略:

1.默認的是ThreadPoolExecutor.AbortPolicy,在這種策略下,處理器會在拒絕後拋出一個運行異常RejectedExecutionException。

2.在ThreadPoolExecutor.CallerRunsPolicy的策略下,線程會調用它直接的execute來運行這個任務。這種方式提供簡單的反饋控制機制來減緩新任務提交的速度。

3.在ThreadPoolExecutor.DiscardPolicy策略下,沒法執行的任務將被簡單得刪除掉。

4.在ThreadPoolExecutor.DiscardOldestPolicy策略下,若是executor沒有處於終止狀態,在工做隊列頭的任務將被刪除,而後會從新執行(可能會再次失敗,這會致使重複這個過程)。

總結:本篇初步介紹了ThreadPoolExcuter的基本原理,解決了什麼問題。然後說明了ThreadPoolExcuter中的初始化參數,對其中的各個參數作初步介紹。再以後介紹ctl變量的做用,並初步介紹了任務提交失敗後的拒絕策略。

相關文章
相關標籤/搜索