深刻理解線程池原理篇

如今CPU都是有多個核心,並行已經成爲事實,一方面咱們但願最大限度利用機器性能(利用多線程提升吞吐率),另外一方面機器的硬件資源是有限的,咱們也不能無限制的去申請,幸運的是,JDK已經爲咱們提供了ExecutorService的實現,還提供了Executors工廠類方便咱們生成模板線程池,可是簡單背後必定是複雜,這篇文章就是深刻線程池的源碼去一探究竟。安全

開始以前,須要明確幾個概念,方便後面理解線程池的運行原理。bash

核心線程(corePool):線程池最終執行任務的角色確定仍是線程,同時咱們也會限制線程的數量,因此咱們能夠這樣理解核心線程,有新任務提交時,首先檢查覈心線程數,若是核心線程都在工做,並且數量也已經達到最大核心線程數,則不會繼續新建核心線程,而會將任務放入等待隊列多線程

等待隊列 (workQueue):等待隊列用於存儲當核心線程都在忙時,繼續新增的任務,核心線程在執行完當前任務後,也會去等待隊列拉取任務繼續執行,這個隊列通常是一個線程安全的阻塞隊列,它的容量也能夠由開發者根據業務來定製。併發

非核心線程當等待隊列滿了,若是當前線程數沒有超過最大線程數,則會新建線程執行任務,那麼核心線程和非核心線程到底有什麼區別呢?說出來你可能不信,本質上它們沒有什麼區別,建立出來的線程也根本沒有標識去區分它們是核心仍是非核心的,線程池只會去判斷已有的線程數(包括核心和非核心)去跟核心線程數和最大線程數比較,來決定下一步的策略高併發

線程活動保持時間 (keepAliveTime):線程空閒下來以後,保持存活的持續時間,超過這個時間尚未任務執行,該工做線程結束。oop

飽和策略 (RejectedExecutionHandler):當等待隊列已滿,線程數也達到最大線程數時,線程池會根據飽和策略來執行後續操做,默認的策略是拋棄要加入的任務。性能

一圖剩千言,上一張圖歸納線程池的基本運做流程。 ui

線程池運做概覽.png

按個人習慣,先提出幾個問題,而後帶着問題去尋找答案。
  1. 線程池的線程是如何作到複用的。
  2. 線程池是如何作到高效併發的。
  3. 從線程池的設計中,咱們能學到什麼?

ThreadPoolExecutor

JDK中線程池的核心實現類是ThreadPoolExecutor,先看這個類的第一個成員變量ctl,AtomicInteger這個類能夠經過CAS達到無鎖併發,效率比較高,這個變量有雙重身份,它的高三位表示線程池的狀態,低29位表示線程池中現有的線程數,這也是Doug Lea一個天才的設計,用最少的變量來減小鎖競爭,提升併發效率。this

//CAS,無鎖併發
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //表示線程池線程數的bit數
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //最大的線程數量,數量是徹底夠用了
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    //1110 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //0000 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //0010 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
    private static final int STOP       =  1 << COUNT_BITS;
    //0100 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
    private static final int TIDYING    =  2 << COUNT_BITS;
    //0110 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    //獲取線程池的狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取線程的數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //組裝狀態和數量,成爲ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. * 判斷狀態c是否比s小,下面會給出狀態流轉圖 */ private static boolean runStateLessThan(int c, int s) { return c < s; } //判斷狀態c是否不小於狀態s private static boolean runStateAtLeast(int c, int s) { return c >= s; } //判斷線程是否在運行 private static boolean isRunning(int c) { return c < SHUTDOWN; } 複製代碼

關於線程池的狀態,有5種,spa

  1. RUNNING, 運行狀態,值也是最小的,剛建立的線程池就是此狀態。
  2. SHUTDOWN,停工狀態,再也不接收新任務,已經接收的會繼續執行
  3. STOP,中止狀態,再也不接收新任務,已經接收正在執行的,也會中斷
  4. 清空狀態,全部任務都中止了,工做的線程也所有結束了
  5. TERMINATED,終止狀態,線程池已銷燬

它們的流轉關係以下:

線程狀態流轉.png

execute/submit

向線程池提交任務有這2種方式,execute是ExecutorService接口定義的,submit有三種方法重載都在AbstractExecutorService中定義,都是將要執行的任務包裝爲FutureTask來提交,使用者能夠經過FutureTask來拿到任務的執行狀態和執行最終的結果,最終調用的都是execute方法,其實對於線程池來講,它並不關心你是哪一種方式提交的,由於任務的狀態是由FutureTask本身維護的,對線程池透明

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
複製代碼

重點看execute的實現

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //第一步,獲取ctl
        int c = ctl.get();
        //檢查當前線程數是否達到核心線程數的限制,注意線程自己是不區分核心仍是非核心,後面會進一步驗證
        if (workerCountOf(c) < corePoolSize) {
            //若是核心線程數未達到,會直接添加一個核心線程,也就是說在線程池剛啓動預熱階段,
            //提交任務後,會優先啓動核心線程處理
            if (addWorker(command, true))
                return;
            //若是添加任務失敗,刷新ctl,進入下一步
            c = ctl.get();
        }
        //檢查線程池是不是運行狀態,而後將任務添加到等待隊列,注意offer是不會阻塞的
        if (isRunning(c) && workQueue.offer(command)) {
           //任務成功添加到等待隊列,再次刷新ctl
            int recheck = ctl.get();
           //若是線程池不是運行狀態,則將剛添加的任務從隊列移除並執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //判斷當前線程數量,若是線程數量爲0,則添加一個非核心線程,而且不指定首次執行任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //添加非核心線程,指定首次執行任務,若是添加失敗,執行異常策略
        else if (!addWorker(command, false))
            reject(command);
    }
    
    /*
     * addWorker方法申明
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
    //.....
    }
複製代碼

這裏有2個細節,能夠深挖一下。

  1. 能夠看到execute方法中沒有用到重量級鎖,ctl雖然能夠保證自己變化的原子性,可是不能保證方法內部的代碼塊的原子性,是否會有併發問題?
  2. 上面提到過,addWorker方法能夠添加工做線程(核心或者非核心),線程自己沒有核心或者非核心的標識,core參數只是用來肯定 當前線程數的比較對象是線程池設置的核心線程數仍是最大線程數,真實狀況是否是這樣?

addWorker

添加線程的核心方法,直接看源碼

private boolean addWorker(Runnable firstTask, boolean core) {
       //至關於goto,雖然不建議濫用,但這裏使用又以爲沒一點問題
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //若是線程池的狀態到了SHUTDOWN或者之上的狀態時候,只有一種狀況還須要繼續添加線程,
            //那就是線程池已經SHUTDOWN,可是隊列中還有任務在排隊,並且不接受新任務(因此firstTask必須爲null)
           //這裏還繼續添加線程的初衷是,加快執行等待隊列中的任務,儘快讓線程池關閉
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
               //傳入的core的參數,惟一用到的地方,若是線程數超過理論最大容量,若是core是true跟最大核心線程數比較,不然跟最大線程數比較
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //經過CAS自旋,增長線程數+1,增長成功跳出雙層循環,繼續往下執行
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               //檢測當前線程狀態若是發生了變化,則繼續回到retry,從新開始循環
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //走到這裏,說明咱們已經成功的將線程數+1了,可是真正的線程尚未被添加
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //添加線程,Worker是繼承了AQS,實現了Runnable接口的包裝類
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
               //到這裏開始加鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //檢查線程狀態,仍是跟以前同樣,只有當線程池處於RUNNING,或者處於SHUTDOWN而且firstTask==null的時候,這時候建立Worker來加速處理隊列中的任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                       //線程只能被start一次
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                      //workers是一個HashSet,添加咱們新增的Worker
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                  //啓動Worker
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

分析完addWorker的源碼實現,咱們能夠回答上面留下的二個疑問,

  1. execute方法雖然沒有加鎖,可是在addWorker方法內部,加鎖了,這樣能夠保證不會建立超過咱們預期的線程數,大師在設計的時候,作到了在最小的範圍內加鎖,儘可能減小鎖競爭,
  2. 能夠看到,core參數,只是用來判斷當前線程數是否超量的時候跟corePoolSize仍是maxPoolSize比較,Worker自己無核心或者非核心的概念。 ####繼續看Worker是怎麼工做的
//Worker的run方法調用的是ThreadPoolExecutor的runWorker方法
    public void run() {
          runWorker(this);
    }


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出須要執行的任務,
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //若是task不是null,或者去隊列中取任務,注意這裏會阻塞,後面會分析getTask方法
            while (task != null || (task = getTask()) != null) {
               //這個lock在這裏是爲了若是線程被中斷,那麼會拋出InterruptedException,而退出循環,結束線程
                w.lock();
                //判斷線程是否須要中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //任務開始執行前的hook方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       ////任務開始執行後的hook方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //Worker退出
            processWorkerExit(w, completedAbruptly);
        }
    }

   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
           //檢查線程池的狀態,若是已是STOP及以上的狀態,或者已經SHUTDOWN,隊列也是空的時候,直接return null,並將Worker數量-1
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

           // 注意這裏的allowCoreThreadTimeOut參數,字面意思是否容許核心線程超時,即若是咱們設置爲false,那麼只有當線程數wc大於corePoolSize的時候纔會超時
           //更直接的意思就是,若是設置allowCoreThreadTimeOut爲false,那麼線程池在達到corePoolSize個工做線程以前,不會讓閒置的工做線程退出
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
          //確認超時,將Worker數-1,而後返回
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //從隊列中取任務,根據timed選擇是有時間期限的等待仍是無時間期限的等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
複製代碼

如今咱們能夠回答文章一開始提出的三個問題中的前2個了

  1. 線程池的線程是如何作到複用的。 線程池中的線程在循環中嘗試取任務執行,這一步會被阻塞,若是設置了allowCoreThreadTimeOut爲true,則線程池中的全部線程都會在keepAliveTime時間超時後還未取到任務而退出。或者線程池已經STOP,那麼全部線程都會被中斷,而後退出。
  2. 線程池是如何作到高效併發的。 看整個線程池的工做流程,有如下幾個須要特別關注的併發點. ①: 線程池狀態和工做線程數量的變動。這個由一個AtomicInteger變量 ctl來解決原子性問題。 ②: 向工做Worker容器workers中添加新的Worker的時候。這個線程池自己已經加鎖了。 ③: 工做線程Worker從等待隊列中取任務的時候。這個由工做隊列自己來保證線程安全,好比LinkedBlockingQueue等。

怎麼用好Executors

JDK已經給咱們提供了很方便的線程池工廠類Executors, 方便咱們快速建立線程池,可能在閱讀源碼以前,咱們在面對具體的業務場景時,到底該選擇哪一種線程池配置是有疑問的,咱們來看一下.

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
複製代碼

newFixedThreadPool, 能夠看到咱們須要傳入一個線程數量的參數nThreads,這樣線程池的核心線程數和最大線程數都會設成nThreads, 而它的等待隊列是一個LinkedBlockingQueue,它的容量限制是Integer.MAX_VALUE, 能夠認爲是沒有邊界的。核心線程keepAlive時間0,allowCoreThreadTimeOut默認false。因此這個方法建立的線程池適合能估算出須要多少核心線程數量的場景。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
複製代碼

newSingleThreadExecutor, 有且只有一個線程在工做,適合任務順序執行,缺點可是不能充分利用CPU多核性能

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼

newCachedThreadPool, 核心線程數0,最大線程數Integer.MAX_VALUE, 線程keepAlive時間60s,用的隊列是SynchronousQueue,這種隊列自己不會存任務,只作轉發,因此newCachedThreadPool適合執行大量的,輕量級任務。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
複製代碼

newScheduledThreadPool, 執行週期性任務,相似定時器。

最後的問題:從線程池的設計中,咱們能學到什麼?

以個人我的體會,大概有下面四點

  1. 清楚實現原理,能夠指導咱們更好的使用。
  2. 在寫併發程序的時候,儘量的縮小鎖的範圍,提升代碼的吞吐率。
  3. goto,不是必定不能用,而不是濫用,有些場景有奇效。
  4. 若是你須要多個線程安全的int型變量,考慮利用位運算把它們合併爲一個。

全文完,水平有限,有疑問,歡迎交流!

相關文章
相關標籤/搜索