爲何阿里建議你不要使用Executors來建立線程池?

前言

我相信你們在項目中或多或少的都使用過線程,而線程是寶貴的資源,不能頻繁的建立,應當給其餘任務進行復用,因此就有了咱們的線程池。java

線程池的使用

你知道咱們如何建立線程池嗎?程序員

這我固然知道了,JDK主要提供了三種建立線程池的方法web

  1. Executors.newFixedThreadPool(int nThreads) : 建立固定線程數量的線程池
  2. Executors.newSingleThreadExecutor() : 建立單個線程的線程池
  3. Executors.newCachedThreadPool() : 建立一個"無限大小"的線程池

線程池如何使用緩存

ExecutorService threadPool = Executors.newFixedThreadPool(5);

threadPool.execute(() -> {
  System.out.println("執行任務");
});

threadPool.shutdown();
複製代碼

線程池的原理

你給我講講線程池的原理呢?多線程

線程池核心參數以及狀態

上面說的建立線程池的方法實際上都是經過建立ThreadPoolExecutor這個類來實現的,因此咱們直接看這個類的實現原理便可。
首先來看看它的構造方法app

 public ThreadPoolExecutor(int corePoolSize,
       int maximumPoolSize,
       long keepAliveTime,
       TimeUnit unit,
       BlockingQueue<Runnable> workQueue,
       RejectedExecutionHandler handler)

複製代碼

先說下它這幾個核心參數的含義測試

  • corePoolSize : 線程池核心線程數量
  • maximumPoolSize : 線程池最大線程數量
  • keepAliveTime : 非核心線程的超時時長,當系統中非核心線程閒置時間超過keepAliveTime以後,則會被回收。若是ThreadPoolExecutor的allowCoreThreadTimeOut屬性設置爲true,則該參數也表示核心線程的超時時長
  • unit : 超時時長單位
  • workQueue : 線程池中的任務隊列,該隊列主要用來存儲已經被提交可是還沒有執行的任務
  • handler : 當線程池沒法處理任務的時候的處理策略

固然只是知道這幾個參數也沒有什麼太大的做用,咱們仍是要着眼全局來看ThreadPoolExecutor類。this

首先來認識下線程池中定義的狀態,它們一直貫穿在整個主體spa

// ctl存儲了兩個值,一個是線程池的狀態,
// 另外一個是活動線程數(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;

// 線程池最多容許2^29-1個(大概5億)線程存在,
// 固然首先要你的系統能新建這麼多個
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

從上面的成員變量的定義咱們能夠知道,線程池最多容許5億個(2^29-1)個線程活動,那麼爲何不是2^31-1呢?由於設計者以爲這個值已經夠大了,若是未來以爲這是一個瓶頸的話,會把這個換成Long類型。線程

同時線程池這裏也存在了五個狀態,它們解決着線程池的生命週期。

  1. RUNNING : 運行狀態。接受新的task而且處理隊列中的task
  2. SHUTDOWN : 關閉狀態(調用了shutdown方法)。不接受新的task,可是要處理隊列中的task
  3. STOP : 中止狀態(調用了shutdownNow方法)。不接受新的task,也不處理隊列中的task,而且要中斷正在處理的task
  4. TIDYING : 全部的task都已終止了,workerCount (活動終止狀態,當執行 terminated() 後會更新爲這個狀態線程數) 爲0,線程池進入該狀態後會調用 terminated() 方法進入TERMINATED 狀態
  5. TERMINATED : 終止狀態,當執行 terminated() 後會更新爲這個狀態

狀態流轉圖以下:

線程池狀態
線程池狀態

原理

咱們知道當咱們執行一個task的時候,調用的是execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1. 若是工做線程數小於核心線程數(corePoolSize),則建立一個工做線程執行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 2. 若是當前是running狀態,而且任務隊列可以添加任務
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 若是不處於running狀態了(使用者可能調用了shutdown方法),
        // 則將剛纔添加到任務隊列的任務移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.2 若是當前沒有工做線程,
       // 則新建一個工做線程來執行任務(任務已經被添加到了任務隊列)
        else if (workerCountOf(recheck) == 0)
            addWorker(nullfalse);
    }

  // 3. 隊列已經滿了的狀況下,則新啓動一個工做線程來執行任務
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

而在addWorker方法中還存在有一些必要的判斷邏輯,好比當前線程池是不是非running狀態,隊列是否爲空等條件,固然最主要的邏輯仍是判斷當前工做線程數量是否大於maximumPoolSize以及啓動工做線程執行任務。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        for (;;) {
            int wc = workerCountOf(c);
            // 1. 判斷當前工做線程是否知足條件
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 2. 增長工做線程數量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // 3. 建立工做線程
    w = new Worker(firstTask);
    final Thread t = w.thread;
    workers.add(w);     
    if (workerAdded) {
        // 4. 運行工做線程
        t.start();
        workerStarted = true;
    }     
    return workerStarted;
}
複製代碼

因此,總結下線程池的工做流程以下:

  1. 提交一個任務,若是線程池中的工做線程數小於corePoolSize,則新建一個工做線程執行任務
  2. 若是線程池當前的工做線程已經等於了corePoolSize,則將新的任務放入到工做隊列中正在執行
  3. 若是工做隊列已經滿了,而且工做線程數小於maximumPoolSize,則新建一個工做線程來執行任務
  4. 若是當前線程池中工做線程數已經達到了maximumPoolSize,而新的任務沒法放入到任務隊列中,則採用對應的策略進行相應的處理(默認是拒絕策略)

若是你以爲上面很差記,我給你講個火鍋店的故事你就更加明白了。

之前有個火鍋店,叫作朱帥帥火鍋,老闆是個剛辭掉程序員工做出來創業的帥小夥子,火鍋店不大,只能擺上10張桌子(corePoolSize),若是吃火鍋的來得早就能夠去店裏面坐(店裏有空調),來晚了,店裏面坐滿了,後面來的人就要排隊了(workQueue)。
排隊的人數愈來愈多,朱帥帥一看不是辦法,就給外面擺了幾張臨時桌子(非核心工做線程),讓客人在外面吃。若是店裏面有人吃完了或者外面臨時桌子吃完了就讓排隊的人去吃。後面時間晚了,沒有排隊的人了,老闆就讓人撤了外面的臨時桌子,畢竟擺在外面也不太好,並且還怕城管來。若是生意特別好,又來了特別多的人,已經超出火鍋店的服務能力了,就只能喊他們去別家了。

上面的故事,你要品,細細的品,最後你會發現,代碼來源於生活。

上面一直說到工做線程,工做線程究竟是個什麼鬼?其實工做線程指的就是咱們的Worker類,它是ThreadPoolExecutor中的私有類

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable 
{
    // 運行worker的線程(new Thread(this))
    final Thread thread;

    // 須要執行的任務
    Runnable firstTask;

    Worker(Runnable firstTask) {
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
  }
}
複製代碼

能夠看到Woker不只繼承了AbstractQueuedSynchronizer(實現獨佔鎖功能),還實現了Runnable接口。

實際上線程池中的每個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象

Worker用本身做爲task構造一個線程,同時把外層任務賦值給本身的task成員變量,至關於對task作了一個包裝。

addWorker()方法中執行了worker.thread.start(),實際上執行的就是Worker的runWorker方法。

final void runWorker(Worker w) {

    // 1. 獲取任務開始執行任務,若是獲取不到任務,當前的worker就會被JVM回收
    while (task != null || (task = getTask()) != null) {
       task.run();
    }     
}

private Runnable getTask() {
    boolean timedOut = false

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 1. 判斷線程池是否關閉
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);

        // 2. 判斷是否須要進行超時控制。
        // allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時;
        // wc > corePoolSize,表示當前線程池中的工做線程數量大於核心線程數量;
        // 對於超過核心線程數量的這些線程,須要進行超時控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 3. wc > maximumPoolSize的狀況是由於可能在此方法執行階段同時執行了setMaximumPoolSize方法;
        // timed && timedOut 若是爲true,表示當前操做須要進行超時控制,而且上次從阻塞隊列中獲取任務發生了超時
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 4. timed若是爲true,則經過阻塞隊列的poll方法進行超時控制,
       //若是在keepAliveTime時間內沒有獲取到任務,則返回null。若是爲false,則直接阻塞
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;

    }
}
複製代碼

上面的代碼中尤爲須要注意的是getTask()中的第3點,它的目的是控制線程池有效的工做線程數量。 從以前的分析咱們能夠知道,若是當前線程池的工做線程數量超過了corePoolSize且小於maximumPoolSize,而且workQueue已滿,則能夠增長工做線程,但這時若是超時沒有獲取到任務,也就是timedOut爲true的狀況,說明workQueue已經爲空了,也就說明了線程池中不須要那麼多線程來執行任務了,能夠把多於corePoolSize數量的線程銷燬掉,保持線程數量在corePoolSize便可。

拒絕策略

你剛纔說到拒絕策略,都有哪些拒絕策略呀?

主要有下面4種拒絕策略

  1. AbortPolicy : 始終拋出RejectedExecutionException
  2. CallerRunsPolicy : 若是線程池未關閉,則交給調用線程池的線程執行
  3. DiscardPolicy : 直接丟棄任務,啥也不作
  4. DiscardOldestPolicy : 丟棄隊列裏最老的任務,而後將當前這個任務從新提交給線程池。

經過Executors建立的線程池不一樣之處

你給我說說你開頭說的經過Executors建立的線程池三者有何不一樣嗎?

newFixedThreadPool

固定線程數量的線程池,corePoolSize等於maximumPoolSize,採用的阻塞隊列是LinkedBlockingQueue,是一個無界隊列,當任務量忽然很大,線程池來不及處理,就會將任務一直添加到隊列中,就有可能致使內存溢出。

newSingleThreadExecutor

建立單個線程的線程池,corePoolSize = maximumPoolSize = 1,也採用的LinkedBlockingQueue這個無界隊列,當任務量很大,線程池來不及處理,就有可能會致使內存溢出。

newCachedThreadPool

建立可緩存的線程池,corePoolSize = 1,maximumPoolSize = Interger.MAX_VALUE;可是使用的是SynousQueue,這個隊列比較特殊,內部沒有結構來存儲任何元素,因此若是任務數很大,而建立的那個線程(corePoolSize=1)遲遲沒有處理完成任務,就會一直建立線程來處理,也有OOM的可能。

cacheThreadPool中的cache其實指的就是SynousQueue,當往這個隊列插入數據的時候,若是沒有任務來取,插入這個過程會被阻塞。

你既然說了都有可能OOM,那麼應該如何建立線程池呢?

實際使用中不建議經過Executors來建立線程池,而是經過 new ThreadPoolExecutor的方式來建立,而隊列也不建議使用無界隊列,而要使用有界隊列,好比ArrayBlockingQueue。而拒絕策略這個就看你本身需求了(系統提供的若是不知足,就本身寫一個)
同時對於核心線程數的設置也不是越大越好,只能說根據你的需求來設置這個值,通常來說能夠根據下面兩點來進行合理配置

  1. 對於I/O密集型任務,能夠將線程數設置大一點,好比 CPU個數 * 2
  2. 對於計算性任務(在內存中進行大量運算),能夠將線程數設置小一點, 好比就等於 CPU的個數

固然啦,這個考慮是不少方面的,不只僅和程序有關,還和硬件等資源有關,總之就是在測試的時候多多調試。

你們不要覺得我上面說的是句廢話,請你自信一點,把覺得去掉。

相關文章
相關標籤/搜索