【初識】-JUC·ThreadPoolExecutor 線程池

ThreadPoolExecutor算是JUC中最經常使用的類之一了。ThreadPoolExecutor,顧名思義,thread-pool-executor,硬翻譯就是「線程-池-執行者」;java中,經過ThreadPoolExecutor能夠很容易的建立一個線程池。可是咱們爲何要使用線程池?呢?它可以帶來什麼樣的優點呢?它又是怎麼實現的呢?OK,帶着這幾個問題,咱們來學習一下JAVA中的線程池技術。java

爲何要使用線程池?

關於這個問題其實有點雞肋,我以爲再問這個問題以前更應該問爲何要有線程池。那爲何呢?數組


this is a 例子:bash

快遞行業最近兩年發展的灰常火熱,據說工資也很是的高,搞得我一每天的都沒有心思去好好寫代碼了...微信

以前的小快遞公司都是沒有固定的快遞員的,就是說,每次去送一件快遞,站點負責人就須要去找一我的來幫忙送,送完以後就沒有而後了(固然,錢仍是要給的)。框架

可是後來隨着貨愈來愈多,找人給錢成本太大,並且農忙時還須要花很長時間去找人,因此就僱用了5我的,簽了合同,長期爲站點配送。ide

之前都是隨時用隨時找,如今不是,如今是成立了一個物流公司,開了一個配送部,配送部門規定正式配送員最多隻能有五我的。函數

以前配送的缺點是什麼:oop

  • 每次有貨,我都會去臨時找一我的,而後簽定臨時合同,送完以後解除合同。很麻煩。 這也是不用線程池的缺點,就是任務來了,咱們須要頻繁的去建立新的線程,用完以後還須要釋放線程資源,對於系統的消耗是很大的。
  • 由於配送的貨車只有那麼幾個,若是臨時簽定的人多了,車子不夠用,其餘人只能等着車子送完以後才能用。

成立配送部以後解決的問題post

  • 成立配送部以後呢,由於簽定的是勞務合同,咱們能夠重複的讓配送員配送不一樣的貨物。達到線程資源的複用。
  • 由於限定了最多招聘的人數,能夠很好的避免招過多無用的人。

OK,咱們以上述例子來對應理解線程池的基本原理學習

先來看下,JAVA對ThreadPoolExecutor的類申明:

public class ThreadPoolExecutor extends AbstractExecutorService 複製代碼

【初識】-JUC·Executor框架中給出了Executor的繼承體系。ThreadPoolExecutor就是具有線程池功能的集成者。

構造方法

//構造方法一
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                          
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
         
 }
 //構造方法二
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
//構造方法三
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
//構造方法四
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
複製代碼

從上面的代碼能夠看出,構造方法(1、2、三)都是經過調用(四)來作具體屬性初始化的。那麼咱們直接來看構造方法四;在構造方法四中總共須要7個參數,先來看下每一個參數的具體含義:

  • corePoolSize

    核心線程數大小。那麼什麼是核心線程數呢,咱們能夠類比於上面例子中的配送部中籤定勞動合同的人的個數。

  • maximumPoolSize

    最大線程數。加入說如今是雙十一期間,快遞異常的多,配送部的5我的徹底忙不過來,並且倉庫也滿了,怎麼辦呢?這個時候就須要再招聘一些臨時配送員,假設maximumPoolSize爲10,那麼也就是說,臨時招聘能夠招5我的,配送部簽定正式勞動合同的人和簽定臨時合同的人加一塊不能超過配送部規定的最大人數(10人)。因此說,maximumPoolSize就是線程池可以容許的存在的最大線程的數量。

  • keepAliveTime

    存活時間。爲何要有這個呢?想一下,雙十一過去了,貨物已經配送的差很少了。臨時合同寫的是若是臨時配送員2天沒有配送了,那配送部就有權利終止臨時合同,如今已經達到2天這個點了,須要開除這些臨時配送專員了。對於線程池來講,keepAliveTime就是用來表示,當除核心線程池以外的線程超過keepAliveTime時間以後,就須要被系統回收了。

  • unit

    keepAliveTime的時間單位。

  • workQueue

    工做隊列。這個就至關於一個倉庫,如今配送部5我的都在配送,可是還不斷的有新的快遞達到,這個時候就須要一個倉庫來存放這些快遞。對於線程池來講,當核心線程都有本身的任務處理,而且還有任務進來的時候,就會將任務添加到工做隊列中去。

  • threadFactory

    線程工廠。就是用來建立線程的。能夠類比成招聘組,會給每一個線程分配名字或者編號這樣。

  • handler

    RejectedExecutionHandler 用來描述拒絕策略的。假設如今個人倉庫也知足,而且配送部已經達到10我的了。怎麼辦呢,那麼只能採用一些策略來拒絕任務了。

線程池的狀態

// runState is stored in the high-order bits
//RUNNING;該狀態的線程池接收新任務,而且處理阻塞隊列中的任務
private static final int RUNNING    = -1 << COUNT_BITS;
//SHUTDOWN;該狀態的線程池不接收新任務,但會處理阻塞隊列中的任務;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//STOP;不接收新任務,也不處理阻塞隊列中的任務,而且會中斷正在運行的任務;
private static final int STOP       =  1 << COUNT_BITS;
//全部的任務已終止,ctl記錄的」任務數量」爲0,線程池會變爲TIDYING狀態
private static final int TIDYING    =  2 << COUNT_BITS;
//線程池完全終止,就變成TERMINATED狀態。 
private static final int TERMINATED =  3 << COUNT_BITS;
複製代碼

下面是在網上發現的一位大牛的圖;感受能夠較爲直觀的描述狀態的變動

工做原理

線程池執行原理

有幾個點須要注意。

一、如何提交一個任務到線程池?

public void execute(Runnable command) {
    //任務爲null,直接拋出空指針異常
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //若是線程數大於等於基本線程數,將任務加入隊列
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼
  • 若是少於corePoolSize線程正在運行,請嘗試使用給定命令啓動一個新線程做爲其第一個任務。 對addWorker的調用會自動檢查runState和workerCount,從而防止錯誤報警,在不該該的時候經過返回false來添加線程。
  • 若是一個任務可以成功排隊,那麼咱們仍然須要再次檢查是否應該添加一個線程(由於現有的線程自上次檢查以來已經死掉)或者自從進入這個方法以來,池關閉了。因此咱們從新檢查狀態,若是當前command已經stop了,那麼就退出工做隊列,若是沒有的話就開始一個新的線程。
  • 若是隊列滿了,會想嘗試去建立一個新的線程去執行,若是建立不了,那就執行拒絕策略。

二、如何建立一個線程去處理任務?

經過實現這個接口去建立一個新的線程

public interface ThreadFactory {
    Thread newThread(Runnable r);
}
複製代碼

三、如何將任務添加到隊列?

經過addWorker方法來添加,其實在excute中只是做爲一個提交任務的入口,實際的處理邏輯都是在addWorker這個方法裏來完成的。addWorker有兩個參數:

  • firstTask 當前任務
  • core 用來標註當前須要建立的線程是不是核心線程,若是core爲true,則代表建立的是核心線程,也就是說當前尚未達到最大核心線程數。

先來看下這個方法的前半部分:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //自旋方式
    for (;;) {
        //獲取當前線程池的狀態
        int c = ctl.get();
        int rs = runStateOf(c);
    
        //若是狀態是STOP,TIDYING,TERMINATED狀態的話,則會返回false
        //若是狀態是SHUTDOWN,可是firstTask不爲空或者workQueue爲空的話,那麼直接返回false。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //經過自旋的方式,判斷要添加的worker是否爲corePool範疇以內的
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
複製代碼

//若是超過CAPACITY限制了則直接返回false

wc >= CAPACITY
複製代碼

//判斷當前的workerCount是否大於corePoolsize,不然則判斷是否大於maximumPoolSize //具體的比較取決於入參core是true仍是false。

wc >= (core ? corePoolSize : maximumPoolSize)
複製代碼

若是上面兩個有一個知足了,則直接返回false。

下面是判斷WorkerCount經過CAS操做增長1是否成功,成功的話就到此結束

if (compareAndIncrementWorkerCount(c))
    break retry;
複製代碼

若是不成功,則再次判斷當前線程池的狀態,若是如今獲取到的狀態與進入自旋的狀態不一致的話,那麼則經過continue retry從新進行狀態的判斷。

c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
    continue retry;
複製代碼

再來看下這個方法的後面半個部分:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    final ReentrantLock mainLock = this.mainLock;
    //建立一個新的Worker對象
    w = new Worker(firstTask);
    final Thread t = w.thread;
    //
    if (t != null) {
    //加鎖
        mainLock.lock();
        try {
            // 在鎖定的狀況下從新檢查。
            // 在一下狀況退出:ThreadFactory 建立失敗或者在獲取鎖以前shut down了
            int c = ctl.get();
            int rs = runStateOf(c);
           //狀態校驗
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // 預先檢查t是能夠啓動的
                    throw new IllegalThreadStateException();
                //添加至workers中
                workers.add(w);
                int s = workers.size();
                //若是超過了歷史最大線程數,則將當前池數量設置爲歷史最大線程記錄數
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //標識添加工做線程成功
                workerAdded = true;
            }
        } finally {
        //解鎖
            mainLock.unlock();
        }
        //若是添加成功則啓動當前工做線程
        if (workerAdded) {
            t.start();
            //並將當前線程狀態設置爲已啓動
            workerStarted = true;
        }
    }
} finally {
//添加失敗
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
}
複製代碼

拒絕策略有哪些?

  • 一、AbortPolicy:直接拋出異常,默認策略;
  • 二、CallerRunsPolicy:使用調用者本身的當前線程來執行任務;
  • 三、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
  • 四、DiscardPolicy:直接丟棄任務;

固然咱們也能夠自定義拒絕策略。

經常使用工做隊列類型

一、ArrayBlockingQueue

基於數組的阻塞隊列,長度有限

二、LinkedBlockingQuene

基於鏈表的阻塞隊列,長度無限,使用這個可能會致使咱們的拒絕策略失效。由於能夠無限的建立新的工做線程。

三、PriorityBlockingQueue

具備優先級的無界阻塞隊列;

三、SynchronousQuene

SynchronousQuene是一個是一個不存儲元素的BlockingQueue;每個put操做必需要等待一個take操做,不然不能繼續添加元素。因此這個比較特殊,它不存咱們的任務,也就說說它的每一個put操做必須等到另外一個線程調用take操做,不然put操做一直處於阻塞狀態。

Worker

這個是ThreadPoolExecutor的一個內部類,表示一個工做線程。重要的是這個內部類實現了AbstractQueuedSynchronizer(AQS:抽象隊列同步器)抽象類。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
    private static final long serialVersionUID = 6138294804551838833L;

    /** 當前work持有的線程 */
    final Thread thread;
    /** 運行的初始任務。 可能爲空。*/
    Runnable firstTask;
    /** 每一個線程完成任務的計數器 */
    volatile long completedTasks;

    /** * 構造函數 */
    Worker(Runnable firstTask) {
    // 禁止中斷,直到runWorker
        setState(-1); 
        //想提交的任務交給當前工做線程
        this.firstTask = firstTask;
        //經過線程工廠建立一個新的線程
        this.thread = getThreadFactory().newThread(this);
    }

    /** 將run方法的執行委託給外部runWorker */
    public void run() {
        runWorker(this);
    }

    // 是否鎖定
    //
    // 0表明解鎖狀態。
    // 1表明鎖定狀態。

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //嘗試獲取鎖(重寫AQS的方法)
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //嘗試釋放鎖(重寫AQS的方法)
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    //加鎖
    public void lock() { acquire(1); }
    //嘗試加鎖
    public boolean tryLock() { return tryAcquire(1); }
    //解鎖
    public void unlock() { release(1); }
    //是否鎖定
    public boolean isLocked() { return isHeldExclusively(); }
    //若是啓動則中斷
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
複製代碼

runWorker

最後來看下runWorker這個方法(ThreadPoolExecutor中的方法):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

下面是對註釋的蹩腳翻譯,歡迎吐槽,但注意尺度,O(∩_∩)O哈哈~

主要工做循環運行。重複地從隊列中獲取任務並執行它們,同時處理一些問題:

  • 咱們可能會從最初的任務開始,在這種狀況下,咱們不須要獲得第一個任務。不然,只要池正在運行,咱們就從getTask得到任務。 若是它返回null,則因爲更改池狀態或配置參數而致使worker退出。其餘退出的結果是在外部代碼中拋出的異常,在這種狀況下completeAbruptly成立,這一般會致使processWorkerExit來取代這個線程。
  • 在運行任何任務以前,獲取鎖以防止任務正在執行時發生其餘池中斷,調用clearInterruptsForTaskRun確保除非池正在中止,則此線程沒有設置其中斷。
  • 每一個任務運行以前都會調用beforeExecute,這可能會引起一個異常,在這種狀況下,咱們會致使線程死亡(斷開循環completeAbruptly爲true),而不處理任務。
  • 假設beforeExecute正常完成,咱們運行任務,收集任何拋出的異常發送到afterExecute。 咱們分別處理RuntimeException,Error(這兩個規範保證咱們陷阱)和任意的Throwables。 由於咱們不能在Runnable.run中從新拋出Throwable,因此咱們把它們封裝在Errors中(到線程的UncaughtExceptionHandler)。 任何拋出的異常也保守地致使線程死亡。
  • task.run完成後,咱們調用afterExecute,這也可能會拋出一個異常,這也會致使線程死亡。 根據JLS Sec 14.20,即便task.run拋出,這個異常也是有效的。

異常機制的最終效果是afterExecute和線程的UncaughtExceptionHandler擁有關於用戶代碼遇到的任何問題的準確信息。

總結

本文是JUC的第二篇,意在經過查看源碼來了解線程池的具體工做原理。文中若是存在不當的描述,但願小夥伴們可以及時提出。灰常感謝!

歡迎關注微信公衆號,乾貨滿滿哦~

相關文章
相關標籤/搜索