ThreadPoolExecutor是怎麼去執行一個任務的?

@TOCjava

前言

前面一遍文章 咱們看了下FutureTask的源碼,知道了怎麼樣去獲取一個任務的返回值,今天咱們看下ThreadPoolExecutor。面試

ThreadPoolExecutor 看名詞 咱們就能夠 看作是ThreadPool 和Executor的結合,大概意思咱們也能知道就是線程池執行器,哈哈這翻譯 真棒。這篇博文 會從源碼的角度去分析下 一個線程任務 加入的線程池之後 是怎麼被執行的~編程

線程池

上面 說線程的時候 咱們也說過 線程是系統中極其珍貴的資源,那咱們要合理的使用他,因此有了線程池的出現,那線程池能帶來哪些好處呢安全

  • 下降資源的消耗:經過重複利用已經建立的線程來下降線程建立和銷燬帶來的消耗
  • 提供響應速度:當咱們建立人物到達的時候,任務能夠不須要等待線程的建立就能當即執行
  • 提升線程可管理性:線程是稀缺資源,不能無限建立,因此要使用線程池對線程進行同一的管理和分配,調優和監控等等。

源碼分析

繼承結構

首先 咱們看下ThreadPoolExecutor 的繼承關係多線程

public class ThreadPoolExecutor extends AbstractExecutorService{}

public abstract class AbstractExecutorService implements ExecutorService{}

public interface ExecutorService extends Executor {
    <!--中止線程池,狀態設置爲SHUTDOWN,而且不在接受新的任務,已經提交的任務會繼續執行-->
    void shutdown();
    <!--中止線程池,狀態設置爲STOP,不在接受先任務,嘗試中斷正在執行的任務,返回還未執行的任務-->
    List<Runnable> shutdownNow();
    <!--是不是SHUTDOWN狀態-->
    boolean isShutdown();
    <!--是否全部任務都已經終止-->
    boolean isTerminated();
    <!--超時時間內,去等待任務執行任務-->
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;   
    <!--Callable 去提交任務-->
    <T> Future<T> submit(Callable<T> task);
    <!--Runnable 去提交任務-->
    <T> Future<T> submit(Runnable task, T result);
    <!--Runnable 去提交任務-->
    Future<?> submit(Runnable task);
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

public interface Executor {
    void execute(Runnable command);
}
複製代碼

咱們先從最下面的接口Executor 來看,這個接口就是一個實現,就是執行execute方法,這個接口就是線程執行的入口併發

ExecutorService接口繼承了Executor接口,裏面的的方法比較多,咱們常見的shutdownNow,shutdown 就是在這個接口裏面的,還有就是咱們常見往線程池裏面提交任務的時候submit方法。ExecutorService豐富了對任務執行和管理的功能函數

AbstractExecutorService是一個抽象類,實現了ExecutorService接口,這邊順帶說下,爲何java 源碼裏面存在大量 抽象類實現接口,而後類再繼承抽象類,爲何類不直接實現接口呢?還要套一層呢,以前我也不明白,後來我才清楚,抽象類去實現接口,就是去實現一些公共的接口方法,這樣類再次去實現接口的時候,只要關心我不一樣的實現就行了,由於 咱們知道接口的實現類不止一個,抽象類就是把這些要實現接口的類的公共的實現再次抽取出來,避免了大量的重複實現,尤爲List,Set 接口 你看下 幾乎都有響應的抽象類實現!高併發

主要的變量

<!--ctl 存儲了線程池狀態和線程的數量-->
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//2的29次方-1

    // runState is stored in the high-order bits
    <!--表示線程池正在運行,能夠接受任務 處理線程池中任務-->
    private static final int RUNNING    = -1 << COUNT_BITS;
    <!--不接受新的任務,可是任然會處理隊列中的任務-->
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    <!--不接受新的任務,不會處理隊裏中任務,對正在執行的任務進行中斷-->
    private static final int STOP       =  1 << COUNT_BITS;
    <!--任務被中斷,正在處理整理狀態-->
    private static final int TIDYING    =  2 << COUNT_BITS;
    <!--表示終結狀態-->
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    <!--獲取當前線程池的運行狀態-->
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    <!--獲取當前線程池中工做線程的數量->
    private static int workerCountOf(int c) { return c & CAPACITY; }
     <!--獲取ctl的值 ->
    private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

關於Ctl是怎麼處理線程狀態和線程數的數量的,能夠看下個人另一篇博文:blog.csdn.net/zxlp520/art…oop

構造函數

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

這個構造函數 是全部構造函數最終調用的方法,那咱們就說下 這些具體的參數源碼分析

  1. int corePoolSize 核心的線程數量
  2. int maximumPoolSize 最大的線程數量
  3. long keepAliveTime 線程存活的最大時間設置
  4. TimeUnit unit 設置時間的單位 和keepAliveTime是對應的
  5. BlockingQueue workQueue 阻塞隊裏,存儲要執行的任務
  6. ThreadFactory threadFactory 建立執行線程的工廠 默認值:Executors.defaultThreadFactory()
  7. RejectedExecutionHandler handler 任務的拒絕Hander方法
    • 默認的是AbortPolicy就是拋出異常,
    • 還有三種策略是DiscardPolicy丟棄策略,DiscardOldestPolicy丟棄隊列中等待時間最長的任務策略,CallerRunsPolicy這個是讓調用的線程去處理的策略

Worker

爲何要先講worker呢?由於咱們提交的任務Runnabale是以Worker這個對象去包裝後運行的,這個後面我我講addWorker方法的時候在細聊

先看下Worker的代碼:

/** Worker 繼承了AQS 和實現了Runnable接口 */
 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;

        /** worker 運行的主體線程 就是在哪一個線程裏面運行任務的 */
        final Thread thread;
        /** 須要運行的任務 */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//這邊的this 就是當前的Worker 對象 
        }

        /** 運行 當前的任務 runWorker是ThreadPoolExecutor裏面的方法 */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        // 0 表示 沒有鎖住狀態
        // 1 表示 鎖住狀態
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        
        <!--這個方法咱們應該很熟悉 我在將AQS的時候聊過這個方法,這邊作的就是嘗試修改state的狀態,這樣就是表示加鎖的意思,表示這個worker 是鎖住狀態,別的線程不能執行,-->
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {//CAS 去修改State的值,1表示 已經被上鎖
                setExclusiveOwnerThread(Thread.currentThread());設置當前鎖的佔用者線程是當前線程
                return true;
            }
            return false;
        }
        <!--釋放鎖,也就是修改State的值 爲0 unused這個字段命名也挺有意思,意思是說 沒用的意思-->
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);//設置當前鎖的佔用者線程是null
            setState(0);
            return true;
        }
        <!--給當前的Worker加鎖,若是獲取不到 就加入等待隊裏中,阻塞當前執行線程-->
        public void lock() { acquire(1); }
        <!--這邊至關於一個非公平鎖的實現  去嘗試下加鎖-->
        public boolean tryLock() { return tryAcquire(1); }
        <!--釋放鎖-->
        public void unlock() { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        
        <!--嘗試去中斷運行的線程任務,就是咱們調用shutdownNow 的時候-->
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
複製代碼

首先看下 這個Worker的繼承結構,首先是實現了Runnable,又了這樣的關係,Worker就能夠被Thread去執行了,另一個還有一個繼承了一個抽象類AbstractQueuedSynchronizer,簡稱AQS,這個類 哈哈 真的是好久不見了,我以前花了5篇文章解釋了這個AQS,可想而知其重要性,JUC 中不少實現都是 基於這個去作的,仍是不清楚的小夥伴能夠去到個人博客裏面去找下。

這邊又一行代碼 咱們須要留意下,挺有意思的,this.thread = getThreadFactory().newThread(this);這邊 的this 就是咱們構建的Worker,thread 就是用ThreadFactory去建立的一個線程而且執行的任務就是Worker,也就是調用thread.start()就能夠執行Worker了

execute

execute是實現Executor接口的方法,就是執行的任務的入口方法,咱們看下一個任務的提交進來是怎麼作的

public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
       int c = ctl.get();//獲取當前的ctl 值
       /* * workerCountOf方法我上面也講過,就是獲取當前的工做線程數 * 若是當前的工做線程數小於設置的核心線程數量,就調用addWorker去新增一個工做線程,ture是表示要添加核心工做線程 * addWorker 若是添加成功就直接返回,若是添加失敗就繼續後去下ctl,這邊重寫獲取是爲了 防止在addWorker過程當中 ctl發生了改變 */
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       /* * 走到這步 說明當前的工做線程數大於核心線程數或者是addWorker發生了失敗 * 首先去判斷了下 當前的線程狀態是不是Running 而後把當前任務加入到阻塞隊列workQueue中 * 若是都成功了 那就再次獲取下ctl,由於咱們在offer Runnable的時候可能ctl也會發生變化 *這邊的多重驗證 考慮到高併發的狀況,代碼邏輯很是的嚴謹 * 繼續走下去的邏輯是 再次判斷下線程池狀態 若是是非Running,那就移除當前的任務,最後執行reject方法 根據不一樣的拒絕策略,作不一樣的行爲 * 最後走到 判斷當前線程數量若是是0,仍是回去調用addWorker方法,傳入一個空的Runnalbe,false 是表示建立一個非核心的工做線程 */
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           if (! isRunning(recheck) && remove(command))
               reject(command);
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       /* * 走到這個判斷 說明當前線程池狀態是非Running或者入隊任務失敗,隊列多是滿了 * 這邊是去建立非核心線程去處理任務,若是建立失敗 就執行拒絕策略 */
       else if (!addWorker(command, false))
           reject(command);
   }
複製代碼

這邊的英文註釋 我沒捨得刪除,讀者能夠去本身翻譯下 描述的可能比我準確,我相信 你們能看的懂,而後再對比下 我下面的中文註釋,我相信能清楚 一個任務新增進來 是怎麼個處理流程!

看完本身再回想下,何時去建立核心線程?何時去建立非核心線程?何時任務會加入的阻塞隊列中?最後執行拒絕策略 有那幾種狀況?知道這些答案 那麼execute方法你應該瞭然於心了!

addWorker

下面咱們看下一個重點的方法,這個方法 調用的頻次很高,咱們進入去看下

private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       //這個是一個自旋 套了一個自旋 其目的就是CAS 新增線程池的數量
       for (;;) {
           int c = ctl.get();//獲取ctl的值
           int rs = runStateOf(c);//獲取當前的線程狀態

           // 這邊這個條件看上去很繞頭,可是仔細看看就能知道
           // 第一個條件rs >= SHUTDOWN 說明線程池狀態不正常
           // 後面有一個非的判斷 其實就是括號裏面的條件有一個不成立 整個條件就是false
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

           for (;;) {
               // 下面是獲取線程裏面的工做線程 若是大於最大值或者設置的閾值,就返回直接返回false 方法結束 
               int wc = workerCountOf(c);
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))
                   return false;
               //這個的意思是 若是CAS修改workerCount成功 整個最外層的自旋就結束
               if (compareAndIncrementWorkerCount(c))
                   break retry;
               // 這邊爲何要用2個自旋 主要是這邊又判斷了下 當前這個自旋CAS修改WorkerCount失敗後,ctl會發生變化
               //若是和外層的不相等,就要返回外層的自旋 去重寫作
               這邊就是爲何用的是   continue retry 
               c = ctl.get();  // Re-read ctl
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
       }

       boolean workerStarted = false;//worker是否開始執行了
       boolean workerAdded = false;//worker 是否添加成功
       Worker w = null;
       try {
           w = new Worker(firstTask);//將Runnable 傳入到worker的構造函數中,上面也講過,其實就是用firstTask去構造了先的Thread
           final Thread t = w.thread;//當前的t就是執行Runnable的線程,在worker中建立
           if (t != null) {
               final ReentrantLock mainLock = this.mainLock;//重入鎖
               mainLock.lock();//保證添加workder時候的線程安全
               try {
                   int rs = runStateOf(ctl.get());
                   if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w);//添加worker到一個工做worker集合中HashSet存儲的
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                   }
               } finally {
                   mainLock.unlock();//釋放鎖
               }
               if (workerAdded) {//若是添加成功
                   t.start();//這個是真正執行Worker的地方 就是這兒
                   workerStarted = true;
               }
           }
       } finally {
           if (! workerStarted)
               addWorkerFailed(w);//若是最終Worker沒有運行,那就清理掉他 修改對應的WorkerCount 
       }
       return workerStarted;
   }
複製代碼

方法最開始的地方 用了2個自旋去解決併發狀況下的CAS修改workerCount失敗的狀況,這邊每一個細節,每種狀況都考慮的很到位,狀態判斷的特別的嚴謹,真正看明白,感受多線程狀況下的編程是多麼的麻煩,辛虧幫咱們作了封裝!

咱們看下 t.Start() 這邊方法,咱們知道t就是Worker裏面建立線程主體,是以本身爲任務傳入到Thread中的,咱們知道start是開始運行線程,最終是會調用到run方法的,那麼就是說會調到Worker裏面的run 方法,咱們在回看下Worker裏面的run方法

public void run() {
   runWorker(this);//ThreadPoolExecutor裏面的方法
}
複製代碼

runWorker

上面我也說了 線程start後會調用run方法,那麼也就是調用 runWorker方法,咱們在看下這個裏面寫的時候什麼

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//獲取Worker裏面的任務
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //一直while循環
            while (task != null || (task = getTask()) != null) {
                w.lock();//鎖住Worker
                //判斷若是當前的線程池狀態是stop 而且檢測當前線程的中斷狀態若是是false 就幫助當前線程執行中斷調用interrupt()
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//執行任務的前置Action
                    Throwable thrown = null;
                    try {
                        task.run();//執行最終的Runnable任務 
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);///執行任務的後置Action
                    }
                } finally {
                    task = null;
                    w.completedTasks++;//Worker完成的任務+1
                    w.unlock();//釋放鎖
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//Worker執行結束後退出
        }
    }
複製代碼

RunWorker方法是整個線程池運行任務的核心方法,線程會使用While循環 不斷的從阻塞隊裏裏面去獲取任務,而後去執行任務,若是阻塞隊列裏面沒有任務,這個時候 getTask() 方法就會阻塞線程,直至新任務的到來,因此咱們在作單元測試的時候,用到線程池,若是你不調用Shutdown 方法 ,你的debug 小紅點就一直在運行,就是這個緣由!

getTask

這個方法就是從阻塞隊列中取獲取任務

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //判斷線程池的狀態若是是SHUTDOWN而且隊列爲空 或者直接狀態就是null 就不會從阻塞隊列中 取出任務 直接返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            
            //timed 就是用來控制 獲取阻塞隊列中的任務 是否有等待時間,咱們設置的keepAliveTime值就會在這邊用到,若是一個工做線程在等待任務超過了設置的值就會退出等待,回收線程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))//工做線程數減1
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();// 獲取任務
                if (r != null)
                    return r;
                timedOut = true;//設置等待超時標誌 應該在自旋中,下次判斷會用到此值
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

咱們都知道 當咱們調用shutdown的時候 線程池狀態是ShUTDOWN,調用shutdownnow的時候線程狀態是Stop,那麼這2種狀態是怎麼處理阻塞隊列裏面的任務的呢,看了上文咱們應該能找到答案,當狀態是stop的時候,咱們獲取隊列中的任務是直接返回的null的也就是說隊列中的任務不會在執行了,可是當狀態是shutdown的時候 只有 隊列爲空的時候 纔會返回null,也就是隊列不空 仍是能夠獲取隊列中的任務的,這種問題 在面試題中常常出現,若是要正在知道答案,仍是要經過從源碼中去真正理解,光是被答案我相信你很快仍是會忘記的!

submit

掌握了execute方法 在看submit方法 其實就很簡單了,submit通常是用於添加 帶返回值的任務,咱們看下 代碼

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);//將Runnable 包裝成FutureTask任務 去讓線程執行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);//將Runnable 包裝成FutureTask任務 去讓線程執行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
複製代碼

看到這邊的代碼,應該有點兒熟悉的味道,應該上篇文章聊FutureTask的時候 不少已經將過了,包括Runnable和Callable怎麼轉換的,Future是怎麼獲取返回值的? 不清楚的小夥伴 能夠去看下我以前的文章!blog.csdn.net/zxlp520/art…

上面三個構造函數,就是對應着FutureTask的構造函數,說白了就是咱們使用execute的時候都是用FutureTask去傳入的,由於FutureTask也是實現了Runable接口的

執行流程圖

最後 用一張流程圖,來描述下一個任務從添加到運行結束,經歷了哪些方法!

線程Task 執行流程圖

總結

ThreadPoolExecutor 雖然裏面執行方法不少,可是你若是掌握了常見的邏輯運算符,AQS,線程,FutureTask 等相關知識的基礎前提下 去看源碼,也不會那麼的累。最後我畫的流程圖,就是一個任務在新增到線程池中執行的整個流程!

最後分享下最近看到的一段話: 什麼是危機?

真正的危機,來源於在正確的時間作不正確的事。沒有在正確的時間,爲下一步作出積累,這纔是危機的根源。

若是你正在這條成長路上的朋友,晚醒不如早醒,這就是我想說的。千萬別等到中年才發現本身沒有創建好本身的護城河,這個時候才知道努力。在本身努力的階段,不只不努力反了選擇了縱容本身,這纔是危機的根源。

但願你們會有所收穫,不負時光,不負卿!

相關文章
相關標籤/搜索