Spring線程池ThreadPoolTaskExecutor學習總結

ThreadPoolTaskExecutor線程是Spring的線程池,其底層是依據JDK線程池ThreadPoolExecutor來實現的

image.png
1.ThreadPoolTaskExecutor的基本參數:由於它是基於ThreadPoolExecutor來實現的,咱們能夠參考ThreadPoolExecutor的構造函數
image.png

corePoolSize:線程池維護線程最小的數量,默認爲1
maxPoolSize:線程池維護線程最大數量,默認爲Integer.MAX_VALUE
keepAliveSeconds:(maxPoolSize-corePoolSize)部分線程空閒最大存活時間,默認存活時間是60s
queueCapacity:阻塞任務隊列的大小,默認爲Integer.MAX_VALUE,默認使用LinkedBlockingQueue
allowCoreThreadTimeOut:設置爲true的話,keepAliveSeconds參數設置的有效時間對corePoolSize線程也有效,默認是flase
threadFactory::用於設置建立線程的工廠,能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder能夠快速給線程池裏的線程設置有意義的名字
rejectedExecutionHandler:拒絕策略,當隊列workQueue和線程池maxPoolSize都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常,有如下四種策略,固然也能夠根據實際業務需求類實現RejectedExecutionHandler接口實現本身的處理策略
①AbortPolicy:丟棄任務,而且拋出RejectedExecutionException異常
②DiscardPolicy:丟棄任務,不處理,不拋出異常
③CallerRunsPolicy:只用調用者所在線程來運行任務
③DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務,而且重複該操做
複製代碼

2.線程池的執行流程:當提早任務到當前線程池時,先判斷當前線程池中線程數量是否小於corePoolSize,若是小於建立新的線程處理請求,無論當前有沒有線程閒置;若是大於等於,則將線程想放入阻塞隊列workQueue中,線程池中存在空閒的線程後會去處理workQueue中任務;若是workQueue也滿了,則會新建工做線程處理任務,當線程池中的線程大於最大線程數maxPoolSize時,則會用選定的拒絕策略來處理新的線程 安全

image.png
下面是源碼部分(使用execute()方法): 注意:ThreadPoolTaskExecutor中execute()有重載方法,但最後都是調用同一方法,處理邏輯相同

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl記錄着runState和workerCount
        int c = ctl.get();
        // 使用workerCountOf取出當前線程池中存在的線程數,與核心線程比較大小
        if (workerCountOf(c) < corePoolSize) {
            // 當前線程小於核心線程,添加線程執行任務,第二個參數表示限制添加線程的數量是和核心線程corePoolSize仍是和線程池維護最大線程數maxPoolSize比較
            if (addWorker(command, true))
                // 若是成功擇直接返回結果
                return;
            // 失敗則從新獲取runState和workerCount的值
            c = ctl.get();
        }
        // 若是當前線程池是運行狀態而且任務添加到隊列成功
        if (isRunning(c) && workQueue.offer(command)) {
             // 從新獲取runState和workerCount的值
            int recheck = ctl.get();
            // 若是線程池不是運行狀態而且任務已經被移除
            if (! isRunning(recheck) && remove(command))
                // 根據拒絕策略處理該任務
                reject(command);
                // 判斷當前線程池的線程數是否爲0
            else if (workerCountOf(recheck) == 0)
              //第一個參數爲null,表示在線程池中建立一個線程,但不去啓動,第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize
                addWorker(null, false);
        }
        //再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize
        else if (!addWorker(command, false))
            // 若是失敗拒絕該任務
            reject(command);
    }
複製代碼

從上看出其實線程池執行任務的主要方法是addWorker()方法,execute()方法只是將任務提交以及作一些判斷,咱們看一下addWorker()方法:bash

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取運行狀態
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
           // 若是re >= SHUTDOWN (線程池關閉不在接收任務,也不會處理隊列中已有的任務),而且rs == SHUTDOWN ,firstTask 爲空,阻塞隊列workQueue不爲空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                // 返回flase
                return false;

            for (;;) {
                // 獲取當前線程池中已有線程數
                int wc = workerCountOf(c);
                // wc >= 容器最大量或者 wc >= 核心線程數或最大維護線程數(根據core值判斷以哪一個爲依據)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 返回flase
                    return false;
                //  增長workerCount(原子操做)
                if (compareAndIncrementWorkerCount(c))
                    // 若是增長成功,則直接返回
                    break retry;
                // 失敗從新獲取runState
                c = ctl.get();  // Re-read ctl
               // 判斷當前runState與最開始rs是否相等,不相等說明狀態已經被修改,返回從新執行
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 根據firstTask建立Worker開始執行任務
            w = new Worker(firstTask);
            // 根據worker建立一個線程
            final Thread t = w.thread;
            if (t != null) {
                // 使用重入鎖的目的是一文workers使用的是 HashSet線程不安全
                final ReentrantLock mainLock = this.mainLock;
                // 加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // rs  < SHUTDOWN 說明線程池正在處於運行狀態,rs =SHUTDOWN firstTask 爲空,其中firstTask 爲空表示只是新建線程但沒有任務須要執行
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            // 判斷建立的線程是否存活,存活則拋出異常
                            throw new IllegalThreadStateException();
                      // 加入worker中,workers中包含線程池中全部的工做線程,只有只有鎖時才能訪問
                        workers.add(w);
                      // 獲取當前工做線程數 
                        int s = workers.size();
                        if (s > largestPoolSize)
                           // 若是大於最大線程數,將最大線程數從新賦值
                            largestPoolSize = s;
                        // worker的添加工做狀態改成true   
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                // 若是添加worker工做完成,啓動線程,並修改線程啓動狀態
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 若是線程啓動失敗
            if (! workerStarted)
                // 將線程從workers中刪除,該方法中也是用了重入鎖,保證線程安全
                addWorkerFailed(w);
        }
        // 返回線程啓動狀態
        return workerStarted;
    }
複製代碼

上面源碼中調用start()方法啓動線程,實際上是調用Worker中run()方法來啓動,由於Worker實現實現了Runnable接口,以下 微信

image.png
調用Worker的run()方法,從圖中能夠看出本質執行的方法是runWorker()方法,源碼以下:

final void runWorker(Worker w) {
        // 獲取當前線程
        Thread wt = Thread.currentThread();
        // 獲取當前任務
        Runnable task = w.firstTask;
         // 將當前任務與值爲空,防止其餘線程拿到後執行
        w.firstTask = null;
        //  釋放鎖 
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // task不爲空,task ==getTask不爲空,一直從線程池中獲取任務,任務不爲空則一直循環
            while (task != null || (task = getTask()) != null) {
              // 加鎖
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 若是線程池狀態 >= STOP,或者若是線程池狀態 >= STOP而且當前線程沒有中斷,終端當前線程
                    wt.interrupt();
                try {
                    // 空方法,能夠本身定義
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      // 執行run方法,其實就是Runable接口中的方法實現
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                  // 任務完成後,任務置空,任務完成數累加1,釋放鎖
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 刪除已完成的任務
            processWorkerExit(w, completedAbruptly);
        }
    }
複製代碼

源碼執行流程圖以下: 框架

微信圖片_20190529094340.jpg
相關文章
相關標籤/搜索