Spring線程池ThreadPoolTaskExecutor的底層及阻塞隊列

https://www.cnblogs.com/jmsjh/p/7762034.html
轉自 https://www.cnblogs.com/lic309/p/4186880.html

一:  ThreadPoolTaskExecutor是一個spring的線程池技術,查看代碼能夠看到這樣一個字段:spring

    private ThreadPoolExecutor threadPoolExecutor;

  能夠發現,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor進行實現,緩存

  直接看代碼:安全

複製代碼
    @Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
        ThreadPoolExecutor executor  = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);
        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }
複製代碼

  這是ThreadPoolTaskExecutor用來初始化threadPoolExecutor的方法,BlockingQueue是一個阻塞隊列,這個咱們先無論。因爲ThreadPoolTaskExecutor的實現方式徹底是使用threadPoolExecutor進行實現,咱們須要知道這個threadPoolExecutor的一些參數。多線程

  

複製代碼
   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

  這個是調用的構造函數:併發

  int corePoolSize:線程池維護線程的最小數量. 
  int maximumPoolSize:線程池維護線程的最大數量. 
  long keepAliveTime:空閒線程的存活時間. 
  TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值. 
  BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列. 
  RejectedExecutionHandler handler: 
  用來拒絕一個任務的執行,有兩種狀況會發生這種狀況。 
  一是在execute方法中若addIfUnderMaximumPoolSize(command)爲false,即線程池已經飽和; 
  二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。eclipse

ThreadPoolExecutor池子的處理流程以下:  異步

1)當池子大小小於corePoolSize就新建線程,並處理請求分佈式

2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裏的空閒線程就去從workQueue中取任務並處理

3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,若是池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來作拒絕處理

4)另外,當池子的線程數大於corePoolSize的時候,多餘的線程會等待keepAliveTime長的時間,若是無請求可處理就自行銷燬

其會優先建立  CorePoolSiz 線程, 當繼續增長線程時,先放入Queue中,當 CorePoolSiz  和 Queue 都滿的時候,就增長建立新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException

另外MaxPoolSize的設定若是比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。

 

  這個是ThreadPoolExecutor的運算流程,既然ThreadPoolTaskExecutor是直接使用ThreadPoolExecutor進行處理,因此運算規則確定同樣。

在spring中使用ThreadPoolTaskExecutor的配置:

複製代碼
 <!-- 異步線程池 -->
    <bean id="threadPool"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心線程數 -->
        <property name="corePoolSize" value="3" />
        <!-- 最大線程數 -->
        <property name="maxPoolSize" value="10" />
        <!-- 隊列最大長度 >=mainExecutor.maxSize -->
        <property name="queueCapacity" value="25" />
        <!-- 線程池維護線程所容許的空閒時間 -->
        <property name="keepAliveSeconds" value="300" />
        <!-- 線程池對拒絕任務(無線程可用)的處理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,若是執行器已關閉,則丟棄.  -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
    </bean>
複製代碼

Reject策略預約義有四種: 

Reject 策略詳解:https://blog.csdn.net/jgteng/article/details/54411423

(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。 
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,若是執行器已關閉,則丟棄. 
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄. 
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程).

向spring容器中加入ThreadPoolTaskExecutor後,使用時只須要調用其的execute方法,其參數爲一個Runnable。

複製代碼
threadPool.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println("=======");

                }
            });
複製代碼

ThreadPoolTaskExecutor有兩個execute的重載,但翻看代碼能夠知道調用的是同一個方法,因此只調用execute就能夠了

複製代碼
    @Override
    public void execute(Runnable task) {
        Executor executor = getThreadPoolExecutor();
        try {
            executor.execute(task);
        }
        catch (RejectedExecutionException ex) {
            throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
        }
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        execute(task);
    }
複製代碼

在execute中調用的是ThreadPoolExecutor中的execute方法,執行了上面的處理流程後執行任務。

複製代碼
  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

 

二:阻塞隊列BlockingQueue

  在ThreadPoolTaskExecutor源碼中咱們看到了BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);這樣一句話用來獲得一個隊列,這個隊列是用來存聽任務的。當線程池中有空閒線程時就回去任務隊列中拿任務並處理。BlockingQueue是一個阻塞併線程安全的一個隊列

  多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干 生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數 據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,並 且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。

BlockingQueue的核心方法:
放入數據:
  offer(anObject):表示若是可能的話,將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,
    則返回true,不然返回false.(本方法不阻塞當前執行方法的線程)
  offer(E o, long timeout, TimeUnit unit),能夠設定等待的時間,若是在指定的時間內,還不能往隊列中
    加入BlockingQueue,則返回失敗。
  put(anObject):把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷
    直到BlockingQueue裏面有空間再繼續.
獲取數據:
  poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,
    取不到時返回null;
  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,若是在指定時間內,
    隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間超時尚未數據可取,返回失敗。
  take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到
    BlockingQueue有新的數據被加入; 
  drainTo():一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數), 
    經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。

查看 ThreadPoolTaskExecutor的代碼能夠發現,其主要是使用 BlockingQueue的一種實現LinkedBlockingQueue進行實現。

複製代碼
/**
     * Create the BlockingQueue to use for the ThreadPoolExecutor.
     * <p>A LinkedBlockingQueue instance will be created for a positive
     * capacity value; a SynchronousQueue else.
     * @param queueCapacity the specified queue capacity
     * @return the BlockingQueue instance
     * @see java.util.concurrent.LinkedBlockingQueue
     * @see java.util.concurrent.SynchronousQueue
     */
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new LinkedBlockingQueue<Runnable>(queueCapacity);
        }
        else {
            return new SynchronousQueue<Runnable>();
        }
    }
複製代碼

 

LinkedBlockingQueue是一個基於鏈表的阻塞隊列,其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立 即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從 隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理 併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以 此來提升整個隊列的併發性能。

  生成LinkedBlockingQueue時有一個大小限制,其默認爲Integer.MAX_VALUE.

  另外LinkedBlockingQueue不接受null值,當添加null的時候,會直接拋出NullPointerException:

  public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();

 

 3.另外最近的項目中使用的多線程和隊列比較多,多線程自不用說,我百度了一下隊列的優勢,感受說的很好,特此抄過來:

1. 解耦

在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息隊列在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。

2. 冗餘

有時在處理數據的時候處理過程會失敗。除非數據被持久化,不然將永遠丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在被許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。

3. 擴展性

由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的;只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。

4. 靈活性 & 峯值處理能力

在訪問量劇增的狀況下,你的應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲 以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住增加的訪問壓力,而不是由於超出負荷的請求而徹底崩潰。

5. 可恢復性

當體系的一部分組件失效,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。而這種容許重試或者延後處理請求的能力一般是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。

6. 送達保證

消息隊列提供的冗餘機制保證了消息能被實際的處理,只要一個進程讀取了該隊列便可。在此基礎上,IronMQ提供了一個"只送達一次"保證。不管有多少進 程在從隊列中領取數據,每個消息只能被處理一次。這之因此成爲可能,是由於獲取一個消息只是"預約"了這個消息,暫時把它移出了隊列。除非客戶端明確的 表示已經處理完了這個消息,不然這個消息會被放回隊列中去,在一段可配置的時間以後可再次被處理。

7.排序保證

在許多狀況下,數據處理的順序都很重要。消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。

8.緩衝

在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行--寫入隊列的處理會盡量的快速,而不受從隊列讀的預備處理的約束。該緩衝有助於控制和優化數據流通過系統的速度。

 

9. 理解數據流

在一個分佈式系統裏,要獲得一個關於用戶操做會用多長時間及其緣由的整體印象,是個巨大的挑戰。消息系列經過消息被處理的頻率,來方便的輔助肯定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。

10. 異步通訊

不少時候,你不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許你把一個消息放入隊列,但並不當即處理它。你想向隊列中放入多少消息就放多少,而後在你樂意的時候再去處理它們。

ThreadPoolTaskExecutor

相關文章
相關標籤/搜索