ThreadPoolExecutor的配置(二)

配置ThreadPoolExecutor二java

線程的建立和銷燬

線程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活時間(keepAliveTime)等因素共同負責線程的建立和銷燬。基本大小也就是線程池的目標大小,即在沒有任務執行時線程池的大小,而且只有在工做隊列滿了的狀況下才會建立超出這個數量的線程。線程池的最大大小表示可同時活動的線程數量的上限。若是某個線程的空閒時間超過了存活時間,那麼將被標記爲可回收的,而且當線程池的當前大小超過了基本大小時,這個線程將被終止。併發

經過調節線程池的基本大小和存活時間,能夠幫助線程池回收空閒線程佔有的資源,從而使得這些資源能夠用於執行其餘工做。(顯然,這是一種折衷:回收空閒線程會產生額外的延遲,由於當需求增長時,必須建立新的線程來知足需求。)ide

newFixedThreadPool工廠方法將線程池的基本大小和最大大小設置爲參數中指定的值,函數

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
}

newCachedThreadPool工廠方法將線程池的最大大小設置爲Integer.MAX_VALUE,而將基本大小設置爲零,並將超時設置爲1分鐘,ui

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

這種方法建立出來的線程池能夠被無限擴展,而且當需求下降時會自動收縮。其餘形式的線程池能夠經過顯式的ThreadPoolExecutor 構造函數來構造。this

注:在建立ThreadPoolExecutor的初期,線程並不會當即啓動,而是等到有任務提交時纔會啓動,除非調用prestartAllCoreThreads

管理隊列任務

在有限的線程池中會限制可併發執行的任務數量。若是無限制的建立線程,那麼將致使不穩定性,並經過採用固定大小的線程池來解決這個問題。然而這個方案並不完整。在高負載狀況下,應用程序仍可能耗盡資源,只是出現問題的機率較小。若是新請求的到達速率超過了線程池的處理速率,那麼新到來的請求將累積起來。在線程池中,這些請求會在一個由Executor管理的Runnable隊列中等待,而不會像線程那樣去競爭CPU資源。線程

即便請求的平均到達速率很穩定,也仍然會出現請求突增的狀況。儘管隊列有助於緩解任務的突增問題,但若是任務持續高速的到來,那麼最終仍是會抑制請求的到達速率以免耗盡內存。rest

ThreadPoolExecutor容許提供一個BlockingQueue來保存等待執行的任務。基本的任務排隊方法有三種:無界隊列、有界隊列和同步移交(Synchronous Handleroff)。隊列的選擇和其餘的配置參數有關,例如線程池的大小。code

newFixedThreadPool 和 newSingleThreadExecutor 在默認狀況下將使用一個無界的LinkedBlockingQueue。若是全部工做者線程都處於忙碌狀態,那麼任務將在隊列中等候。若是任務持續快速的到達,而且超過了線程池的處理速度,那麼隊列將無限增長。隊列

一種更穩妥的資源管理策略是使用有界隊列,例如ArrayBlockingQueue、 有界的LinkedBlockingQueue、PriorityBlockingQueue(PriorityBlockingQueue 能夠指定初始的隊列大小,後面插入元素的時候,若是空間不夠的話會自動擴容)。有界隊列有助於避免資源耗盡的狀況發生,但他又帶來了新的問題:當隊列填滿後,新的任務該怎麼辦?

在使用有界的工做隊列時,隊列的大小與線程池的大小必須一塊兒調節。若是線程池較小而隊列較大,那麼有助於減小內存使用量,下降CPU的使用率,同時還能夠減小上下文切換,但付出的代價是可能會限制吞吐量。

**對於很是大的或者無界的線程池來講,能夠經過SynchronousQueue來避免任務排隊,以及直接將任務從生產者移交給工做者線程。**SynchronousQueue不是一個真正的隊列,而是一種在線程之間進行移交的機制。要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接受這個元素。若是沒有線程正在等待,而且線程池的當前大小小於最大值,那麼ThreadPoolExecutor將建立一個新的線程,不然根據飽和策略,這個任務將被拒絕。使用直接移交將更高效,由於任務會直接移交給執行它的線程,而不是放在隊列中,而後由工做者線程從隊列中提取該任務。只有當線程池是無界的或者能夠拒絕任務時,SynchronousQueue纔有實際價值。

當使用像LinkedBlockingQueue或ArrayBlockingQueue這樣的FIFO隊列時,任務的執行順序與他們的到達順序相同。若是像進一步控制任務的執行順序,還可使用PriorityBlockingQueue,這個隊列將根據優先級來安排任務。任務的優先級時經過天然順序或者Compator來定義的。

飽和策略

當有界隊列被填滿後,飽和策略開始發揮做用。ThreadPoolExecutor的飽和策略能夠經過調用setRejectedExecutionHandler 來修改。JDK提供了幾種不一樣的RejectedExecutionHandler 實現,每種實現包含了不一樣的飽和策略:

AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
停止策略:是默認的飽和策略,該策略將拋出未檢查的RejectedExecutionException異常,調用者能夠捕獲這個異常,而後根據需求編寫本身的處理代碼。
拋棄策略:當新提交的任務沒法保存到隊列中等待執行是,拋棄策略會悄悄拋棄該任務。
拋棄最舊策略:將會拋棄下一個將被執行的任務,而後嘗試從新提交新的任務(若是工做隊列是一個優先隊列,那麼拋棄最舊的策略將致使拋棄優先級最高的任務,所以最好不要將拋棄最舊的飽和策略和優先級隊列放在一塊兒使用)。
調用者運行(Caller-Runs)策略:實現了一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而下降新任務的流量。他不會在線程池的某個線程中執行,而是在一個調用了execute的線程中執行該任務。

線程工廠

每當線程池須要建立一個線程時,都是經過線程工廠方法來完成的。默認的線程工廠將建立一個新的、非守護的線程,而且包含特殊的配置信息。經過指定一個線程工廠方法,能夠定製線程池的配置信息。在ThreadFactory中只定義一個方法newThread,每當線程池須要建立一個新線程時都會調用這個方法。

使用 Semaphore 來控制任務的提交速率

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

public class BoundedExecutor {

    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Semaphore semaphore, Executor exec) {
        this.semaphore = semaphore;
        this.exec = exec;
    }

    public void submit(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });

        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

=========END=========

相關文章
相關標籤/搜索