Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理

    相關文章目錄:html

    Java線程池ThreadPoolExecutor使用和分析(一)java

    Java線程池ThreadPoolExecutor使用和分析(二) - execute()原理併發

    Java線程池ThreadPoolExecutor使用和分析(三) - 終止線程池原理less

 

    execute()是 java.util.concurrent.Executor接口中惟一的方法,JDK註釋中的描述是「在將來的某一時刻執行命令command」,即向線程池中提交任務,在將來某個時刻執行,提交的任務必須實現Runnable接口,該提交方式不能獲取返回值。下面是對execute()方法內部原理的分析,分析前先簡單介紹線程池有哪些狀態,在一系列執行過程當中涉及線程池狀態相關的判斷。如下分析基於JDK 1.7dom

 

    如下是本文的目錄大綱:ide

    1、線程池執行流程oop

    2、線程池狀態源碼分析

    3、任務提交內部原理ui

        一、execute()  --  提交任務this

        二、addWorker()  --  添加worker線程

        三、內部類Worker

        四、runWorker()  --  執行任務

        五、getTask()  --  獲取任務

        六、processWorkerExit()  --  worker線程退出

 

    如有不正之處請多多諒解,歡迎批評指正、互相討論。

    請尊重做者勞動成果,轉載請標明原文連接:

    http://www.cnblogs.com/trust-freedom/p/6681948.html

1、線程池的執行流程

一、若是線程池中的線程數量少於corePoolSize,就建立新的線程來執行新添加的任務
二、若是線程池中的線程數量大於等於corePoolSize,但隊列workQueue未滿,則將新添加的任務放到workQueue中
三、若是線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會建立新的線程來處理被添加的任務
四、若是線程池中的線程數量等於了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕策略

2、線程池狀態

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中ctl這個AtomicInteger的功能很強大,其高3位用於維護線程池運行狀態,低29位維護線程池中線程數量

一、RUNNING:-1<<COUNT_BITS,即高3位爲1,低29位爲0,該狀態的線程池會接收新任務,也會處理在阻塞隊列中等待處理的任務

二、SHUTDOWN:0<<COUNT_BITS,即高3位爲0,低29位爲0,該狀態的線程池不會再接收新任務,但還會處理已經提交到阻塞隊列中等待處理的任務

三、STOP:1<<COUNT_BITS,即高3位爲001,低29位爲0,該狀態的線程池不會再接收新任務,不會處理在阻塞隊列中等待的任務,並且還會中斷正在運行的任務

四、TIDYING:2<<COUNT_BITS,即高3位爲010,低29位爲0,全部任務都被終止了,workerCount爲0,爲此狀態時還將調用terminated()方法

五、TERMINATED:3<<COUNT_BITS,即高3位爲100,低29位爲0,terminated()方法調用完成後變成此狀態

這些狀態均由int型表示,大小關係爲 RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,這個順序基本上也是遵循線程池從 運行 到 終止這個過程。

 

runStateOf(int c)  方法:c & 高3位爲1,低29位爲0的~CAPACITY,用於獲取高3位保存的線程池狀態

workerCountOf(int c)方法:c & 高3位爲0,低29位爲1的CAPACITY,用於獲取低29位的線程數量

ctlOf(int rs, int wc)方法:參數rs表示runState,參數wc表示workerCount,即根據runState和workerCount打包合併成ctl

3、任務提交內部原理

一、execute()  --  提交任務

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 * 在將來的某個時刻執行給定的任務。這個任務用一個新線程執行,或者用一個線程池中已經存在的線程執行
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 * 若是任務沒法被提交執行,要麼是由於這個Executor已經被shutdown關閉,要麼是已經達到其容量上限,任務會被當前的RejectedExecutionHandler處理
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution                 RejectedExecutionException是一個RuntimeException
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 若是運行的線程少於corePoolSize,嘗試開啓一個新線程去運行command,command做爲這個線程的第一個任務
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 若是任務成功放入隊列,咱們仍須要一個雙重校驗去確認是否應該新建一個線程(由於可能存在有些線程在咱們上次檢查後死了) 或者 從咱們進入這個方法後,pool被關閉了
     * 因此咱們須要再次檢查state,若是線程池中止了須要回滾入隊列,若是池中沒有線程了,新開啓 一個線程
     * 
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 若是沒法將任務入隊列(可能隊列滿了),須要新開區一個線程(本身:往maxPoolSize發展)
     * 若是失敗了,說明線程池shutdown 或者 飽和了,因此咱們拒絕任務
     */
    int c = ctl.get();
    
    /**
     * 一、若是當前線程數少於corePoolSize(多是因爲addWorker()操做已經包含對線程池狀態的判斷,如此處沒加,而入workQueue前加了)
     */
    if (workerCountOf(c) < corePoolSize) {
    	//addWorker()成功,返回
        if (addWorker(command, true))
            return;
        
        /**
         * 沒有成功addWorker(),再次獲取c(凡是須要再次用ctl作判斷時,都會再次調用ctl.get())
         * 失敗的緣由多是:
         * 一、線程池已經shutdown,shutdown的線程池再也不接收新任務
         * 二、workerCountOf(c) < corePoolSize 判斷後,因爲併發,別的線程先建立了worker線程,致使workerCount>=corePoolSize
         */
        c = ctl.get();
    }
    
    /**
     * 二、若是線程池RUNNING狀態,且入隊列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();//再次校驗位
        
        /**
         * 再次校驗放入workerQueue中的任務是否能被執行
         * 一、若是線程池不是運行狀態了,應該拒絕添加新任務,從workQueue中刪除任務
         * 二、若是線程池是運行狀態,或者從workQueue中刪除任務失敗(恰好有一個線程執行完畢,並消耗了這個任務),確保還有線程執行任務(只要有一個就夠了)
         */
        //若是再次校驗過程當中,線程池不是RUNNING狀態,而且remove(command)--workQueue.remove()成功,拒絕當前command
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //若是當前worker數量爲0,經過addWorker(null, false)建立一個線程,其任務爲null
        //爲何只檢查運行的worker數量是否是0呢?? 爲何不和corePoolSize比較呢??
        //只保證有一個worker線程能夠從queue中獲取任務執行就好了??
        //由於只要還有活動的worker線程,就能夠消費workerQueue中的任務
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);  //第一個參數爲null,說明只爲新建一個worker線程,沒有指定firstTask
                                     //第二個參數爲true表明佔用corePoolSize,false佔用maxPoolSize
    }
    /**
     * 三、若是線程池不是running狀態 或者 沒法入隊列
     *   嘗試開啓新線程,擴容至maxPoolSize,若是addWork(command, false)失敗了,拒絕當前command
     */
    else if (!addWorker(command, false))
        reject(command);
}

execute(Runnable command)

參數:
    command    提交執行的任務,不能爲空
執行流程:
一、若是線程池當前線程數量少於corePoolSize,則addWorker(command, true)建立新worker線程,如建立成功返回,如沒建立成功,則執行後續步驟;
    addWorker(command, true)失敗的緣由多是:
    A、線程池已經shutdown,shutdown的線程池再也不接收新任務
    B、workerCountOf(c) < corePoolSize 判斷後,因爲併發,別的線程先建立了worker線程,致使workerCount>=corePoolSize
二、若是線程池還在running狀態,將task加入workQueue阻塞隊列中,若是加入成功,進行double-check,若是加入失敗(多是隊列已滿),則執行後續步驟;
    double-check主要目的是判斷剛加入workQueue阻塞隊列的task是否能被執行
    A、若是線程池已經不是running狀態了,應該拒絕添加新任務,從workQueue中刪除任務
    B、若是線程池是運行狀態,或者從workQueue中刪除任務失敗(恰好有一個線程執行完畢,並消耗了這個任務),確保還有線程執行任務(只要有一個就夠了)
三、若是線程池不是running狀態 或者 沒法入隊列,嘗試開啓新線程,擴容至maxPoolSize,若是addWork(command, false)失敗了,拒絕當前command

 

二、addWorker()  --  添加worker線程

 

/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread#start), we roll back cleanly.
 * 檢查根據當前線程池的狀態和給定的邊界(core or maximum)是否能夠建立一個新的worker
 * 若是是這樣的話,worker的數量作相應的調整,若是可能的話,建立一個新的worker並啓動,參數中的firstTask做爲worker的第一個任務
 * 若是方法返回false,可能由於pool已經關閉或者調用過了shutdown
 * 若是線程工廠建立線程失敗,也會失敗,返回false
 * 若是線程建立失敗,要麼是由於線程工廠返回null,要麼是發生了OutOfMemoryError
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass(繞開) queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
	//外層循環,負責判斷線程池狀態
	retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c); //狀態

        // Check if queue empty only if necessary.
        /**
         * 線程池的state越小越是運行狀態,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
         * 一、若是線程池state已經至少是shutdown狀態了
         * 二、而且如下3個條件任意一個是false
         *   rs == SHUTDOWN         (隱含:rs>=SHUTDOWN)false狀況: 線程池狀態已經超過shutdown,多是stop、tidying、terminated其中一個,即線程池已經終止
         *   firstTask == null      (隱含:rs==SHUTDOWN)false狀況: firstTask不爲空,rs==SHUTDOWN 且 firstTask不爲空,return false,場景是在線程池已經shutdown後,還要添加新的任務,拒絕
         *   ! workQueue.isEmpty()  (隱含:rs==SHUTDOWN,firstTask==null)false狀況: workQueue爲空,當firstTask爲空時是爲了建立一個沒有任務的線程,再從workQueue中獲取任務,若是workQueue已經爲空,那麼就沒有添加新worker線程的必要了
         * return false,即沒法addWorker()
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //內層循環,負責worker數量+1
        for (;;) {
            int wc = workerCountOf(c); //worker數量
            
            //若是worker數量>線程池最大上限CAPACITY(即便用int低29位能夠容納的最大值)
            //或者( worker數量>corePoolSize 或  worker數量>maximumPoolSize ),即已經超過了給定的邊界
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            
            //調用unsafe CAS操做,使得worker數量+1,成功則跳出retry循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            
            //CAS worker數量+1失敗,再次讀取ctl
            c = ctl.get();  // Re-read ctl
            
            //若是狀態不等於以前獲取的state,跳出內層循環,繼續去外層循環判斷
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
            // else CAS失敗時由於workerCount改變了,繼續內層循環嘗試CAS對worker數量+1
        }
    }

	/**
	 * worker數量+1成功的後續操做
	 * 添加到workers Set集合,並啓動worker線程
	 */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock; 
        w = new Worker(firstTask); //一、設置worker這個AQS鎖的同步狀態state=-1
                                   //二、將firstTask設置給worker的成員變量firstTask
                                   //三、使用worker自身這個runnable,調用ThreadFactory建立一個線程,並設置給worker的成員變量thread
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
            	//--------------------------------------------這部分代碼是上鎖的
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
            	// 當獲取到鎖後,再次檢查
                int c = ctl.get();
                int rs = runStateOf(c);

                //若是線程池在運行running<shutdown 或者 線程池已經shutdown,且firstTask==null(多是workQueue中仍有未執行完成的任務,建立沒有初始任務的worker線程執行)
                //worker數量-1的操做在addWorkerFailed()
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable   線程已經啓動,拋非法線程狀態異常
                        throw new IllegalThreadStateException();
                    
                    workers.add(w);//workers是一個HashSet<Worker>
                    
                    //設置最大的池大小largestPoolSize,workerAdded設置爲true
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
              //--------------------------------------------
            } 
            finally {
                mainLock.unlock();
            }
            
            //若是往HashSet中添加worker成功,啓動線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
    	//若是啓動線程失敗
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker(Runnable firstTask, boolean core)
參數:
    firstTask:    worker線程的初始任務,能夠爲空
    core:           true:將corePoolSize做爲上限,false:將maximumPoolSize做爲上限
addWorker方法有4種傳參的方式:

    一、addWorker(command, true)

    二、addWorker(command, false)

    三、addWorker(null, false)

    四、addWorker(null, true)

在execute方法中就使用了前3種,結合這個核心方法進行如下分析
    第一個:線程數小於corePoolSize時,放一個須要處理的task進Workers Set。若是Workers Set長度超過corePoolSize,就返回false
    第二個:當隊列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。若是線程池也滿了的話就返回false
    第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task爲空的worker在線程執行的時候會去任務隊列裏拿任務,這樣就至關於建立了一個新的線程,只是沒有立刻分配任務
    第四個:這個方法就是放一個null的task進Workers Set,並且是在小於corePoolSize時,若是此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來爲線程池預先啓動corePoolSize個worker等待從workQueue中獲取任務執行
執行流程:
一、判斷線程池當前是否爲能夠添加worker線程的狀態,能夠則繼續下一步,不能夠return false:
    A、線程池狀態>shutdown,可能爲stop、tidying、terminated,不能添加worker線程
    B、線程池狀態==shutdown,firstTask不爲空,不能添加worker線程,由於shutdown狀態的線程池不接收新任務
    C、線程池狀態==shutdown,firstTask==null,workQueue爲空,不能添加worker線程,由於firstTask爲空是爲了添加一個沒有任務的線程再從workQueue獲取task,而workQueue爲空,說明添加無任務線程已經沒有意義
二、線程池當前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續下一步
三、在線程池的ReentrantLock保證下,向Workers Set中添加新建立的worker實例,添加完成後解鎖,並啓動worker線程,若是這一切都成功了,return true,若是添加worker入Set失敗或啓動失敗,調用addWorkerFailed()邏輯

 

三、內部類Worker

/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 * 
 * Worker類大致上管理着運行線程的中斷狀態 和 一些指標
 * Worker類投機取巧的繼承了AbstractQueuedSynchronizer來簡化在執行任務時的獲取、釋放鎖
 * 這樣防止了中斷在運行中的任務,只會喚醒(中斷)在等待從workQueue中獲取任務的線程
 * 解釋:
 *   爲何不直接執行execute(command)提交的command,而要在外面包一層Worker呢??
 *   主要是爲了控制中斷
 *   用什麼控制??
 *   用AQS鎖,當運行時上鎖,就不能中斷,TreadPoolExecutor的shutdown()方法中斷前都要獲取worker鎖
 *   只有在等待從workQueue中獲取任務getTask()時才能中斷
 * worker實現了一個簡單的不可重入的互斥鎖,而不是用ReentrantLock可重入鎖
 * 由於咱們不想讓在調用好比setCorePoolSize()這種線程池控制方法時能夠再次獲取鎖(重入)
 * 解釋:
 *   setCorePoolSize()時可能會interruptIdleWorkers(),在對一個線程interrupt時會要w.tryLock()
 *   若是可重入,就可能會在對線程池操做的方法中中斷線程,相似方法還有:
 *   setMaximumPoolSize()
 *   setKeppAliveTime()
 *   allowCoreThreadTimeOut()
 *   shutdown()
 * 此外,爲了讓線程真正開始後才能夠中斷,初始化lock狀態爲負值(-1),在開始runWorker()時將state置爲0,而state>=0才能夠中斷
 * 
 * 
 * Worker繼承了AQS,實現了Runnable,說明其既是一個可運行的任務,也是一把鎖(不可重入)
 */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread; //利用ThreadFactory和 Worker這個Runnable建立的線程對象
    
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
    	//設置AQS的同步狀態private volatile int state,是一個計數器,大於0表明鎖已經被獲取
        setState(-1); // inhibit interrupts until runWorker 
                      // 在調用runWorker()前,禁止interrupt中斷,在interruptIfStarted()方法中會判斷 getState()>=0
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); //根據當前worker建立一個線程對象
                                                          //當前worker自己就是一個runnable任務,也就是不會用參數的firstTask建立線程,而是調用當前worker.run()時調用firstTask.run()
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this); //runWorker()是ThreadPoolExecutor的方法
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state. 0表明「沒被鎖定」狀態
    // The value 1 represents the locked state. 1表明「鎖定」狀態

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    /**
     * 嘗試獲取鎖
     * 重寫AQS的tryAcquire(),AQS原本就是讓子類來實現的
     */
    protected boolean tryAcquire(int unused) {
    	//嘗試一次將state從0設置爲1,即「鎖定」狀態,但因爲每次都是state 0->1,而不是+1,那麼說明不可重入
    	//且state==-1時也不會獲取到鎖
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread()); //設置exclusiveOwnerThread=當前線程
            return true;
        }
        return false;
    }

    /**
     * 嘗試釋放鎖
     * 不是state-1,而是置爲0
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null); 
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    /**
     * 中斷(若是運行)
     * shutdownNow時會循環對worker線程執行
     * 且不須要獲取worker鎖,即便在worker運行時也能夠中斷
     */
    void interruptIfStarted() {
        Thread t;
        //若是state>=0、t!=null、且t沒有被中斷
        //new Worker()時state==-1,說明不能中斷
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker類
Worker類自己既實現了Runnable,又繼承了AbstractQueuedSynchronizer(如下簡稱AQS),因此其既是一個可執行的任務,又能夠達到鎖的效果
new Worker()
一、將AQS的state置爲-1,在runWoker()前不容許中斷
二、待執行的任務會以參數傳入,並賦予firstTask
三、用Worker這個Runnable建立Thread

之因此Worker本身實現Runnable,並建立Thread,在firstTask外包一層,是由於要經過Worker控制中斷,而firstTask這個工做任務只是負責執行業務
Worker控制中斷主要有如下幾方面:
一、初始AQS狀態爲-1,此時不容許中斷interrupt(),只有在worker線程啓動了,執行了runWoker(),將state置爲0,才能中斷
    不容許中斷體如今:
    A、shutdown()線程池時,會對每一個worker tryLock()上鎖,而Worker類這個AQS的tryAcquire()方法是固定將state從0->1,故初始狀態state==-1時tryLock()失敗,沒發interrupt()
    B、shutdownNow()線程池時,不用tryLock()上鎖,但調用worker.interruptIfStarted()終止worker,interruptIfStarted()也有state>0才能interrupt的邏輯
二、爲了防止某種狀況下,在運行中的worker被中斷,runWorker()每次運行任務時都會lock()上鎖,而shutdown()這類可能會終止worker的操做須要先獲取worker的鎖,這樣就防止了中斷正在運行的線程

Worker實現的AQS爲不可重入鎖,爲了是在得到worker鎖的狀況下再進入其它一些須要加鎖的方法

Worker和Task的區別:
Worker是線程池中的線程,而Task雖然是runnable,可是並無真正執行,只是被Worker調用了run方法,後面會看到這部分的實現。

 

四、runWorker()  --  執行任務

 

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 * 重複的從隊列中獲取任務並執行,同時應對一些問題:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 * 咱們可能使用一個初始化任務開始,即firstTask爲null
 * 而後只要線程池在運行,咱們就從getTask()獲取任務
 * 若是getTask()返回null,則worker因爲改變了線程池狀態或參數配置而退出
 * 其它退出由於外部代碼拋異常了,這會使得completedAbruptly爲true,這會致使在processWorkerExit()方法中替換當前線程
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and
 * clearInterruptsForTaskRun called to ensure that unless pool is
 * stopping, this thread does not have its interrupt set.
 * 在任何任務執行以前,都須要對worker加鎖去防止在任務運行時,其它的線程池中斷操做
 * clearInterruptsForTaskRun保證除非線程池正在stoping,線程不會被設置中斷標示
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 * 每一個任務執行前會調用beforeExecute(),其中可能拋出一個異常,這種狀況下會致使線程die(跳出循環,且completedAbruptly==true),沒有執行任務
 * 由於beforeExecute()的異常沒有cache住,會上拋,跳出循環
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to
 * afterExecute. We separately handle RuntimeException, Error
 * (both of which the specs guarantee that we trap) and arbitrary
 * Throwables.  Because we cannot rethrow Throwables within
 * Runnable.run, we wrap them within Errors on the way out (to the
 * thread's UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 * 假定beforeExecute()正常完成,咱們執行任務
 * 彙總任何拋出的異常併發送給afterExecute(task, thrown)
 * 由於咱們不能在Runnable.run()方法中從新上拋Throwables,咱們將Throwables包裝到Errors上拋(會到線程的UncaughtExceptionHandler去處理)
 * 任何上拋的異常都會致使線程die
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 * 任務執行結束後,調用afterExecute(),也可能拋異常,也會致使線程die
 * 根據JLS Sec 14.20,這個異常(finally中的異常)會生效
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
                // new Worker()是state==-1,此處是調用Worker類的tryRelease()方法,將state置爲0, 而interruptIfStarted()中只有state>=0才容許調用中斷
    boolean completedAbruptly = true; //是否「忽然完成」,若是是因爲異常致使的進入finally,那麼completedAbruptly==true就是忽然完成的
    try {
    	/**
    	 * 若是task不爲null,或者從阻塞隊列中getTask()不爲null
    	 */
        while (task != null || (task = getTask()) != null) {
            w.lock(); //上鎖,不是爲了防止併發執行任務,爲了在shutdown()時不終止正在運行的worker
            
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * clearInterruptsForTaskRun操做
             * 確保只有在線程stoping時,纔會被設置中斷標示,不然清除中斷標示
             * 一、若是線程池狀態>=stop,且當前線程沒有設置中斷狀態,wt.interrupt()
             * 二、若是一開始判斷線程池狀態<stop,但Thread.interrupted()爲true,即線程已經被中斷,又清除了中斷標示,再次判斷線程池狀態是否>=stop
             *   是,再次設置中斷標示,wt.interrupt()
             *   否,不作操做,清除中斷標示後進行後續步驟
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); //當前線程調用interrupt()中斷
            
            try {
            	//執行前(子類實現)
                beforeExecute(wt, task);
                
                Throwable thrown = null;
                try {
                    task.run();
                } 
                catch (RuntimeException x) {
                    thrown = x; throw x;
                } 
                catch (Error x) {
                    thrown = x; throw x;
                } 
                catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } 
                finally {
                	//執行後(子類實現)
                    afterExecute(task, thrown); //這裏就考驗catch和finally的執行順序了,由於要以thrown爲參數
                }
            } 
            finally {
                task = null; //task置爲null
                w.completedTasks++; //完成任務數+1
                w.unlock(); //解鎖
            }
        }
        
        completedAbruptly = false;
    } 
    finally {
    	//處理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker(Worker w)
執行流程:
一、Worker線程啓動後,經過Worker類的run()方法調用runWorker(this)
二、執行任務以前,首先worker.unlock(),將AQS的state置爲0,容許中斷當前worker線程
三、開始執行firstTask,調用task.run(),在執行任務前會上鎖wroker.lock(),在執行完任務後會解鎖,爲了防止在任務運行時被線程池一些中斷操做中斷
四、在任務執行先後,能夠根據業務場景自定義beforeExecute() 和 afterExecute()方法
五、不管在beforeExecute()、task.run()、afterExecute()發生異常上拋,都會致使worker線程終止,進入processWorkerExit()處理worker退出的流程
六、如正常執行完當前task後,會經過getTask()從阻塞隊列中獲取新任務,當隊列中沒有任務,且獲取任務超時,那麼當前worker也會進入退出流程

 

五、getTask()  --  獲取任務

 

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:  如下狀況會返回null
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 *    超過了maximumPoolSize設置的線程數量(由於調用了setMaximumPoolSize())
 * 2. The pool is stopped.
 *    線程池被stop
 * 3. The pool is shutdown and the queue is empty.
 *    線程池被shutdown,而且workQueue空了
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait.
 *    線程等待任務超時
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 *         返回null表示這個worker要結束了,這種狀況下workerCount-1
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    /**
     * 外層循環
     * 用於判斷線程池狀態
     */
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 對線程池狀態的判斷,兩種狀況會workerCount-1,而且返回null
         * 線程池狀態爲shutdown,且workQueue爲空(反映了shutdown狀態的線程池仍是要執行workQueue中剩餘的任務的)
         * 線程池狀態爲stop(shutdownNow()會致使變成STOP)(此時不用考慮workQueue的狀況)
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); //循環的CAS減小worker數量,直到成功
            return null;
        }

        boolean timed;      // Are workers subject to culling?
                            // 是否須要定時從workQueue中獲取
        
        /**
         * 內層循環
         * 要麼break去workQueue獲取任務
         * 要麼超時了,worker count-1
         */
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize; //allowCoreThreadTimeOut默認爲false
                                                                 //若是allowCoreThreadTimeOut爲true,說明corePoolSize和maximum都須要定時
            
            //若是當前執行線程數<maximumPoolSize,而且timedOut 和 timed 任一爲false,跳出循環,開始從workQueue獲取任務
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            
            /**
             * 若是到了這一步,說明要麼線程數量超過了maximumPoolSize(可能maximumPoolSize被修改了)
             * 要麼既須要計時timed==true,也超時了timedOut==true
             * worker數量-1,減一執行一次就好了,而後返回null,在runWorker()中會有邏輯減小worker線程
             * 若是本次減一失敗,繼續內層循環再次嘗試減一
             */
            if (compareAndDecrementWorkerCount(c))
                return null;
            
            //若是減數量失敗,再次讀取ctl
            c = ctl.get();  // Re-read ctl
            
            //若是線程池運行狀態發生變化,繼續外層循環
            //若是狀態沒變,繼續內層循環
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
        	//poll() - 使用  LockSupport.parkNanos(this, nanosTimeout) 掛起一段時間,interrupt()時不會拋異常,但會有中斷響應
        	//take() - 使用 LockSupport.park(this) 掛起,interrupt()時不會拋異常,但會有中斷響應
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    //大於corePoolSize
                workQueue.take();                                        //小於等於corePoolSize
            
            //如獲取到了任務就返回
            if (r != null)
                return r;
            
            //沒有返回,說明超時,那麼在下一次內層循環時會進入worker count減一的步驟
            timedOut = true;
        } 
        /**
			  * blockingQueue的take()阻塞使用LockSupport.park(this)進入wait狀態的,對LockSupport.park(this)進行interrupt不會拋異常,但仍是會有中斷響應
			  * 但AQS的ConditionObject的await()對中斷狀態作了判斷,會報告中斷狀態 reportInterruptAfterWait(interruptMode)
			  * 就會上拋InterruptedException,在此處捕獲,從新開始循環
			  * 若是是因爲shutdown()等操做致使的空閒worker中斷響應,在外層循環判斷狀態時,可能return null
			  */
        catch (InterruptedException retry) { 
            timedOut = false; //響應中斷,從新開始,中斷狀態會被清除
        }
    }
}

getTask()
執行流程:
一、首先判斷是否能夠知足從workQueue中獲取任務的條件,不知足return null
    A、線程池狀態是否知足:
        (a)shutdown狀態 + workQueue爲空 或 stop狀態,都不知足,由於被shutdown後仍是要執行workQueue剩餘的任務,但workQueue也爲空,就能夠退出了
        (b)stop狀態,shutdownNow()操做會使線程池進入stop,此時不接受新任務,中斷正在執行的任務,workQueue中的任務也不執行了,故return null返回
    B、線程數量是否超過maximumPoolSize 或 獲取任務是否超時
        (a)線程數量超過maximumPoolSize多是線程池在運行時被調用了setMaximumPoolSize()被改變了大小,不然已經addWorker()成功不會超過maximumPoolSize
        (b)若是 當前線程數量>corePoolSize,纔會檢查是否獲取任務超時,這也體現了當線程數量達到maximumPoolSize後,若是一直沒有新任務,會逐漸終止worker線程直到corePoolSize
二、若是知足獲取任務條件,根據是否須要定時獲取調用不一樣方法:
    A、workQueue.poll():若是在keepAliveTime時間內,阻塞隊列仍是沒有任務,返回null
    B、workQueue.take():若是阻塞隊列爲空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務
三、在阻塞從workQueue中獲取任務時,能夠被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut爲初始值false,再次執行第1步中的判斷,知足就繼續獲取任務,不知足return null,會進入worker退出的流程

 

六、processWorkerExit()  --  worker線程退出

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
	/**
	 * 一、worker數量-1
	 * 若是是忽然終止,說明是task執行時異常狀況致使,即run()方法執行時發生了異常,那麼正在工做的worker線程數量須要-1
	 * 若是不是忽然終止,說明是worker線程沒有task可執行了,不用-1,由於已經在getTask()方法中-1了
	 */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代碼和註釋正好相反啊
        decrementWorkerCount();

    /**
     * 二、從Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; //把worker的完成任務數加到線程池的完成任務數
        workers.remove(w); //從HashSet<Worker>中移除
    } finally {
        mainLock.unlock();
    }

    /**
     * 三、在對線程池有負效益的操做時,都須要「嘗試終止」線程池
     * 主要是判斷線程池是否知足終止的狀態
     * 若是狀態知足,但還有線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程
     * 沒有線程了,更新狀態爲tidying->terminated
     */
    tryTerminate();

    /**
     * 四、是否須要增長worker線程
     * 線程池狀態是running 或 shutdown
     * 若是當前線程是忽然終止的,addWorker()
     * 若是當前線程不是忽然終止的,但當前線程數量 < 要維護的線程數量,addWorker()
     * 故若是調用線程池shutdown(),直到workQueue爲空前,線程池都會維持corePoolSize個線程,而後再逐漸銷燬這corePoolSize個線程
     */
    int c = ctl.get();
    //若是狀態是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個worker
    if (runStateLessThan(c, STOP)) {
    	//不是忽然完成的,即沒有task任務能夠獲取而完成的,計算min,並根據當前worker數量判斷是否須要addWorker()
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默認爲false,即min默認爲corePoolSize
            
            //若是min爲0,即不須要維持核心線程數量,且workQueue不爲空,至少保持一個線程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            
            //若是線程數量大於最少數量,直接返回,不然下面至少要addWorker一個
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        
        //添加一個沒有firstTask的worker
        //只要worker是completedAbruptly忽然終止的,或者線程數量小於要維護的數量,就新添一個worker線程,即便是shutdown狀態
        addWorker(null, false);
    }
}

processWorkerExit(Worker w, boolean completedAbruptly)
參數:
    worker:                      要結束的worker
    completedAbruptly: 是否忽然完成(是否由於異常退出)
執行流程:
一、worker數量-1
    A、若是是忽然終止,說明是task執行時異常狀況致使,即run()方法執行時發生了異常,那麼正在工做的worker線程數量須要-1
    B、若是不是忽然終止,說明是worker線程沒有task可執行了,不用-1,由於已經在getTask()方法中-1了
二、從Workers Set中移除worker,刪除時須要上鎖mainlock
三、tryTerminate():在對線程池有負效益的操做時,都須要「嘗試終止」線程池,大概邏輯:
    判斷線程池是否知足終止的狀態
    A、若是狀態知足,但還有線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程
    B、沒有線程了,更新狀態爲tidying->terminated
四、是否須要增長worker線程,若是線程池尚未徹底終止,仍須要保持必定數量的線程
    線程池狀態是running 或 shutdown
    A、若是當前線程是忽然終止的,addWorker()
    B、若是當前線程不是忽然終止的,但當前線程數量 < 要維護的線程數量,addWorker()
    故若是調用線程池shutdown(),直到workQueue爲空前,線程池都會維持corePoolSize個線程,而後再逐漸銷燬這corePoolSize個線程

 

參考資料:

深刻分析java線程池的實現原理 - 佔小狼

JUC源碼分析-線程池-ThreadPoolExecutor

相關文章
相關標籤/搜索