淺析ThreadPoolExecutor源碼

這是我參與更文挑戰的第9天,活動詳情查看:更文挑戰markdown

execute()原理

執行步驟以下:oop

若是workerCount小於corePoolSize,將任務做爲first Task新建線程來執行任務。 將任務添加到隊列中,若是添加成功依然須要檢查,在進入任務以前,post

若是線程池被關閉,那麼將任務從隊列中移除;ui

若是當前線程池中沒有工做線程,而剛剛在隊列中加入了任務,要保證線程池中至少有一個工做線程能夠處理任務。this

若是不能將任務加入隊列中,嘗試新加一個線程來執行任務,可是並不定會成功,多是線程池被shut down或者線程池達到了飽和(maximumPoolSize),若是失敗了執行拒絕策略。spa

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        // 高3位表示狀態,低29位任務數量。
    //工做線程小於核心線程數,建立新的線程。
        if (workerCountOf(c) < corePoolSize) {
        //建立新的worker當即執行command,輪詢workQueue處理task。
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //線程池在運行狀態且能夠將task插入隊列
    //第一次校驗線程池在運行狀態
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //第二次校驗,防止在第一次校驗經過後線程池關閉。若是線程池關閉,在隊列中刪除task並拒絕task
            if (! isRunning(recheck) && remove(command))
                reject(command);
                //若是線程數=0(線程都死掉了,好比:corePoolSize=0),新建線程且未指定firstTask,僅僅去輪訓workQueue
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //線程隊列已滿,嘗試建立新線程執行task,建立失敗後拒絕task,建立失敗緣由:1.線程池關閉;2.線程數已經達到maxPoolSize
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

addWorker方法

addWorker方法主要有兩部分線程

一:判斷是否能夠建立worker。根據自旋、CAS、ctl等,判斷繼續建立仍是返回false,自旋週期通常很短。code

二:同步建立workder,啓動線程。 addWorker會根據當前線程池的工做狀態和給定的界限限制(corePoolSize 和maximumPoolSize)判斷是否能夠添加新的工做線程。 其中每個Worker對象都是一個AQS隊列。orm

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
         //外層循環判斷線程池的狀態
         for (;;) {
            //在進行獲取一次上下文操做機制
            int c = ctl.get();
            int rs = runStateOf(c);
             //線程池狀態
            // Check if queue empty only if necessary.
          // 線程池狀態:RUNNING = -一、SHUTDOWN = 0、 STOP = 一、TIDYING = 二、TERMINATED = 3 
        //線程池至少是shutdown狀態
        if (rs >= SHUTDOWN &&
          // 除了線程池正在關閉(shutdown),
          // 隊列裏還有未處理的task的狀況,其餘都不能添加
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //內層循環判斷是否到達容量上限,worker+1
            for (;;) {
                int wc = workerCountOf(c);//worker數量
                //worker大於Integer最大上限
                //或到達邊界上限
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS worker+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;//成功了跳出循環
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) 
                //若是線程池狀態發生變化,重試外層循環
                    continue retry;
                // else CAS failed due to workerCount change;
                // retry inner loop 
         // CAS失敗workerCount被其餘線程改變,
         // 從新嘗試內層循環CAS對workerCount+1

            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask); 
            //1.state置爲-1,Worker繼承了AbstractQueuedSynchronizer.
            //2.設置firstTask屬性.
            //3.Worker實現了Runable接口,將this做爲入參建立線程.
            final Thread t = w.thread;
            if (t != null) {
          //addWorker須要加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//workers是HashSet<Worker>
              //設置最大線程池大小
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

Worker

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

複製代碼

Worker方法實現了Runnable,它將構造的Thread賦值給thread。第一步中的setState(-1),即將AQS中的同步狀態設置爲-1,線程池使用AQS中的同步狀態來判斷該工做線程是否能夠被中斷。-1:初始化值,此時工做線程尚未啓動,也沒有中斷的必要;0:表示接受中斷,此時工做線程爲空閒狀態;1:表示此時工做線程正在執行任務。它實現了非重入互斥鎖,非重入是爲了不線程池的一些控制方法得到重入鎖。注意Worker實現鎖的目的與傳統鎖的意義不太同樣。其主要是爲了控制線程是否可interrupt,以及其餘的監控,如線程是否active(正在執行任務)。對象

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    //每一個worker有本身的內部線程,ThreadFactory建立失敗時是null
    //封裝任務線程機制
    final Thread thread;
    //初始化任務,多是null
    Runnable firstTask;
    //每一個worker的完成任務數
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // inhibit interrupts until runWorker //狀態置爲-1,若是中斷線程須要CAS將state 從0-
        >1,以此來保證能只中斷從workerQueue getTask的線程
        setState(-1); // 禁止線程在啓動前被打斷
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    //核心方法
    public void run() {
    //首先執行w.unlock,就是把state置爲0,對該線程的中斷就能夠進行了
        runWorker(this);
    }

    // state = 0 表明未鎖;state = 1 表明已鎖

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 在setCorePoolSize/shutdown等方法中斷worker線程時須要調用該方法,
    // 確保中斷的是從workerQueue getTask的線程
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    //調用tryRelease修改state=0,LockSupport.unpark(thread) 下一個等待鎖的線程
    public boolean isLocked() { return isHeldExclusively(); }
    // interrupt已啓動線程
    void interruptIfStarted() {
        Thread t;
        // 初始化是 state = -1,不會被interrupt
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
複製代碼

Worker的run()方法實際調用的是runWorker(this),啓動工做線程(注意這裏不是直接啓動的任務),在工做線程中執行任務。工做線程在這循環,反覆的從隊列中獲取任務並執行它們。

在運行全部任務以前,須要獲取鎖,來保證當任務在運行的時候不會被中斷,除非線程池正在中止(Stop) 總結以下: 1.Worker類主要負責運行線程狀態的控制。 2.Worker繼承了AQS實現了簡單的獲取鎖和釋放所的操做。來避免中斷等待執行任務的線 程時,中斷正在運行中的線程(線程剛啓動,還沒開始執行任務)。 3.本身實現不可重入鎖,是爲了不在實現線程池控狀態控制的方法,例如:setCorePoolSize的時候中斷正在開始運行的線程。setCorePoolSize可能會調用interruptIdleWorkers(),該方法中會調用worker的tryLock()方法中斷線程,本身實現鎖能夠確保工做線程啓動以前不會被中斷

runWorker的主要任務就是一直loop循環處理任務,沒有任務就去getTask(), 代碼以下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 容許被 interrupt
        boolean completedAbruptly = true;
        try {
            //循環獲取任務直至 task = null (線程池關閉、超時等)
            // 注意這裏的getTask()方法,咱們配置的阻塞隊列會在這裏起做用
            while (task != null || (task = getTask()) != null) {
                w.lock();  // 執行任務前上鎖               
                // 若是線程池中止(處於STOP狀態或者TIDYING、TERMINATED狀態時),設置當前線程處於中斷狀態若是不是(線程就處於RUNNING或者SHUTDOWN狀態),確保線程不中斷,從新檢查當前線程池的狀態是否大於等於STOP狀態
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //在線程運行前調用
                    beforeExecute(wt, task); 
                    Throwable thrown = null;
                    try {
                        //這裏執行run方法
                        task.run(); 
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //在線程運行後調用
                        afterExecute(task, thrown); 
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); // 線程退出工做
        }
    }
複製代碼

getTask()

從隊列中獲取Task,大概分爲如下幾個步驟。

判斷線程池以及隊列的狀態,若是線程池狀態在STOP以上,此時線程池不處理隊列中的任務;或者線程池處於SHUTDOWN可是隊列爲空(SHUTDOWN再也不接受新的任務),workerCount減1,返回null,注意此時只是將變量減1,其實工做線程並無終止真正的終止在 processWorkerExit(w, completedAbruptly);中。 若是經過了狀態檢查,判斷是否要進行線程回收,若是須要workerCount數量減1,成功後返回null。 根據timed(timed表示須要進行超時閒置線程回收),選擇是限時等待仍是阻塞的方式從隊列中獲取任務。

方法依據配置的workQueue來工做,其阻塞原理與超時原理基於阻塞隊列實現,再也不詳述。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
	//循環
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //線程線程池狀態和隊列是否爲空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
		//線程數量
        int wc = workerCountOf(c);

        
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //(當前線程數是否大於最大線程數或者)
        //且(線程數大於1或者任務隊列爲空)
        //這裏有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //獲取任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

總結

ThreadPoolExecutor的核心是Worker,Worker實現了AbstractQueuedSynchronizer並繼承了Runnable。aqs鎖運用的極爲巧妙。

相關文章
相關標籤/搜索