線程池ThreadPoolExecutor 瞭解

本文章出處 線程池ThreadPoolExecutor 瞭解 轉載請說明git

經常使用線程池類型

Java經過Executors靜態方法建立4種不一樣類型線程池。github

  • newSingleThreadExecutor 建立單例的線程池,保證執行任務順序,超出線程任務將會在任務中等待,全部的任務都按照FIFO隊列順序執行。
  • newFixedThreadPool 建立一個固定大小的線程組,指定工做線程數量,當任務超過指定工做數量時,在隊列中排隊等待執行。
  • newCachedThreadPool 建立一個能夠緩存線程池,這個線程池活動線程是0,最大線程Integer.MAX,當不斷有新的任務添加到線程池中,池內線程數量不夠時,能夠馬上建立新的線程執行任務。當空閒的線程超過60s就被系統回收掉。
  • newScheduleThreadPool 建立一個定長的線程池,並且支持定時的以及週期性的任務執行,相似於Timer。
  • newWorkStealingPool 會建立一個含有足夠多線程的線程池,來維持相應的並行級別,它會經過工做竊取的方式,使得多核的 CPU 不會閒置,總會有活着的線程讓 CPU 去運行。

像newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool都時內部封裝ThreadPoolExecutor生成線程池的,下面具體分析ThreadPoolExecutor這個類。緩存

ThreadPoolExecutor 構造函數

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

複製代碼
  • corePoolSize 線程核心線程數,不會被回收的線程。
  • maximumPoolSize 線程池可以申請最大線程數量
  • workQueue 同步性隊列轉載執行的任務
  • keepAliveTime 當線程數大於核心時,這是多餘的空閒線程在終止以前等待新任務的最大時間。
  • threadFactory 線程工廠
  • handler 當任務數量超過隊列容量時,須要處理這種狀況,飽和策略,主要有4種處理策略
    • AbortPolicy:直接拋出異常,這是默認策略;
    • CallerRunsPolicy:使用調用者所在的線程來執行任務;
    • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
    • DiscardPolicy:直接丟棄任務;

線程池疑問

建立線程池基本核心構造參數咱們已經知道了,可是咱們還有不少問題沒有搞明白的。怎麼知道線程池內每一個線程運行狀態,是在工做中仍是空閒呢?是否是有一個專門線程去標記空閒線程活動時間?線程是如何實現共用線程。 帶着這些問題去閱讀代碼。安全

線程池內線程狀態

如下內容都是來自ThreadPoolExecutor代碼註釋。 線程池內的線程狀態都是有一個AtomicInteger ctl保持的,是一個原子整數,包裝了兩個領域含義。 多線程

ctl內部結構.png

  • workerCount 有效的線程數 ,線程總數2 ^ 29 -1 ,線程啓動數量不包括線程中止的數量,而該值多是 與活動線程的實際數量暫時不一樣。例如當ThreadFactory建立線程失敗時,線程正在執行退出,統計線程數量依然包括退出的線程。函數

  • runState線程狀態oop

    • RUNNING正在接受新的任務而且處理隊列中的任務
    • SHUTDOWN 不接受新的任務,可是能處理任務
    • STOP 不能接受新的任務,不能處理隊列中的任務,可是能夠中斷正在執行的任務。
    • TIDYING 全部的任務終止,workerCount爲0 ,線程所有過渡到TIDYING狀態,即將運行terminated() 鉤子方法
    • TERMINATEDterminated() 鉤子方法執行完成

這些狀態都有一個轉換順序this

  • RUNNING -> SHUTDOWN 執行shutdown()
  • (RUNNING or SHUTDOWN) -> STOP 執行shutdownNow()
  • SHUTDOWN -> TIDYING 當任務隊列和線程池都是空
  • STOP -> TIDYING 線程池都是空
  • TIDYING -> TERMINATED 當 terminated()鉤子方法執行完 這些狀態具體代碼實現
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

複製代碼

execute 方法解析

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /* * 處理3個步驟 * 1. 若是正在運行的線程數量小於核心線程數,直接建立一個新的線程去執行任務 * 調用addWorker 方法自動檢查 線程狀態和數量,避免在不能添加線程時添加線程出現錯誤警報 * * 2. 若是任務能夠成功進入隊列,咱們仍然須要雙重檢查是否添加一個線程 * 由於存在上次檢查時有線程死亡或者當咱們進入方法時線程池正在關閉 * 所以,咱們從新檢查狀態,若是中止,則回滾排隊,若是沒有,則啓動新線程。 * * 3. 添加任務失敗,則嘗試建立一個線程,若是失敗了,使用拒絕策略 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //當前線程數量小於核心線程數
            if (addWorker(command, true)) // 建立線程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //線程池狀態RUNNING 而且 任務添加成功
            int recheck = ctl.get(); // 第二重檢查
            if (! isRunning(recheck) && remove(command)) //判斷線程池狀態 刪除任務修改狀態
                reject(command);
            else if (workerCountOf(recheck) == 0)  //線程池數量爲0
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //線程池狀態不爲RUNNING 或者 隊列已滿再或者線程大於最大線程數而且任務隊列滿了
            reject(command);
    }
 
複製代碼

下一步咱們進入addWorker建立線程的核心方法spa

private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //retry標記,第一次看到 😓
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN) //至少SHUTDOWN
                && (runStateAtLeast(c, STOP) // 至少STOP 都是不合法
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) { //狀態合法
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) //大於核心線程或者最大線程都不須要建立線程,和掩碼相與防止最大線程數超過2 ^ 29 - 1 細節啊
                    return false;
                if (compareAndIncrementWorkerCount(c))  // ctl 自增成功,跳出整個循環
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN)) //狀態至少SHUTDOWN 從新進入循環 
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //建立線程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //在加鎖期間從新檢查線程池狀態
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // 剛建立線程已經開始執行任務,這是有問題
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { 
                    t.start(); //啓動任務
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

addWorker() 主要流程檢查線程池狀態是否合法,建立新的線程,加入workers中,調用start()執行任務。咱們去了解下Worker 類線程

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        /** Thread this worker is running in. Null if factory fails. */
        final Thread thread;
        /** Initial task to run. Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        // TODO: switch to AbstractQueuedLongSynchronizer and move
        // completedTasks into the lock word.

        /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }
}
複製代碼

Worker其實就是Runnable包裝類,可是增長了任務中斷功能,他的主要任務就是維護中斷狀態,繼承AQS能夠簡化獲取和釋放圍繞每一個任務執行的鎖定,防止旨在喚醒等待任務的工做線程的中斷。 瞭解Worker怎麼執行任務的進入runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; //取出任務
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { //若是當前worker沒有任務,從隊列中獲取任務,直到隊列爲空
                w.lock();
                //處理線程中斷機制 
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); //前置處理,相似攔截器機制,須要子類去實現
                    try {
                        task.run(); //調用任務方法
                        afterExecute(task, null); //後置處理
                    } catch (Throwable ex) {
                        afterExecute(task, ex); //異常處理
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;  //執行任務數量+ 1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); //線程生命週期走完,執行回收工做
        }
    }

複製代碼

結合Worker構造函數,Worker在初始化就本身給本身上鎖了,避免線程在任務尚未開始的狀況下就被中斷了 。啓動線程執行runWorker方法,取出任務,釋放鎖,若是Worker中的任務爲空,從隊列中拉取任務。處理線程中斷,主要依據第一線程狀態已經至少STOP狀態,而後清除中斷狀態,在判斷線程沒有中斷信號了,再發送中斷信號。按照做者註釋的意思就是當線程池已經在中止過程當中,線程應該中斷,可是必須雙重檢查防止關閉過程當中競爭發送中繼信號。調用run方法執行任務。爲何要上鎖執行任務,主要是執行任務過程,必需要獲取鎖才能中斷線程的,可是Worker自己不支持重入鎖的,只有在任務開始關閉過程才能中斷。 在這裏咱們終於看到線程共用方式了,經過線程不斷從隊列中獲取任務,而後再進行調用run方法執行任務,當線程退出獲取隊列循環,線程生命週期就結束了。

####geTask()

private Runnable getTask() {
        boolean timedOut = false; //上一次拉取是否超時

        for (;;) {
            int c = ctl.get();

            //檢查線程池狀態是SHUTDOWN 不接受新的任務
            // 任務隊列爲空
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount(); //核心線程數workerCount -1
                return null;
            }

            int wc = workerCountOf(c); 

            // allowCoreThreadTimeOut 空閒狀況下是否回收核心線程數 默認是false
           // 當前線程數大於 核心線程數
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // wc 大於最大線程數 ,先處理線程數量
           // 線程在存活的時間內沒有獲取到任務,則須要回收掉,上一個循環的,線程數-1
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { //wc 不要爲0,任務隊列爲空的狀況
                if (compareAndDecrementWorkerCount(c)) //線程-1成功沒有其餘線程競爭,沒有新增任務
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? 
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //超時會返回空
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true; 
            } catch (InterruptedException retry) { //中斷等待獲取任務,放棄執行任務
                timedOut = false;
            }
        }
    }

複製代碼

這裏咱們知道空閒時間是怎麼回收線程的,經過同步性隊列poll() + 超時時間知道一個線程在這個時間內沒有任務執行,線程池處於空閒狀態的,返回null給調用方法,跳出while循環,結束整個線程的生命週期。 ####進入processWorkerExit()

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) //若是沒有執行到任務,核心線程-1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); //移除當前worker ,線程會被回收掉
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); //判斷線程池內狀態,是否對線程池發出關閉信號

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { //線程池在RUNNABLE或者SHUTDOWN狀態,線程池任然能夠執行任務或者接受任務
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) //線程池內線程已經被回收完了而且任務尚未執行完
                    min = 1;
                if (workerCountOf(c) >= min) //線程池內線程數量大於核心線程池,不須要新建線程去處理
                    return; // replacement not needed
            }
            addWorker(null, false); //建立新的線程處理任務
        }
    }

複製代碼

####進入 tryTerminate() 在線程池SHUTDOWN狀態線程爲0和任務隊列爲空的狀況,或者STOP狀態核心隊列爲空狀況,線程池狀向TIDYING轉移,傳播關閉池信號。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) || //RUNNING 狀態不須要處理
                runStateAtLeast(c, TIDYING) || //已經進入TIDYING,也不作處理 
                (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) //任務隊列不爲空,不知足條件
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE); 嘗試去中斷一個worker 
                return;
            }
        
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); //加鎖修改線程池狀態
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 進入TIDYING狀態
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));  //執行完terminated() 進入TERMINATED狀態 
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
複製代碼

####shutdown() 再去了解下線程池終止方法

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); //修改線程池狀態爲SHUTDOWN
            interruptIdleWorkers(); //中斷線程
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
複製代碼

進入interruptIdleWorkers() 怎麼中斷線程

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //加鎖主要是workers 是一個不安全集合
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { //沒有中斷和 可以獲取到鎖,說明此線程池沒有在執行任務,Worker 是不支持重入的
                    try {
                        t.interrupt(); 
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

複製代碼

處理方法挺簡單的,修改線程池狀態不要接收新的任務,將works中空閒線程取出發出中斷信號。

shutdownNow

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();  //刪除隊列中的任務,返回給tasks
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

複製代碼

shutdownNow 會將隊列中尚未來得及處理任務所有刪除掉,直接調用tryTerminate()終止線程池生命週期。

總結

如今咱們知道線程池內部機制是如何建立線程,共用線程,空閒回收,線程池的生命週期。調用execute()提交任務,若是當前線程池數量小於核心線程數,調用addWorker()建立一個新的線程池去執行任務,不然直接加入到隊列中。在addWorker()啓動一個線程去不斷從隊列拉取任務,直到一個隊列存活時間沒有任務執行或者隊列爲空,線程纔會被回收掉。

相關文章
相關標籤/搜索