Java ExecutorService四種線程池及自定義ThreadPoolExecutor機制

1、Java 線程池spring

Java經過Executors提供四種線程池,分別爲:
一、newCachedThreadPool:建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。(線程最大併發數不可控制);線程池爲無限大,當執行第二個任務時若第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程。
二、newFixedThreadPool:建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
三、newScheduledThreadPool:建立一個定長線程池,支持定時及週期性任務執行、延遲執行。
四、newSingleThreadExecutor:建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。緩存

線程池比較單線程的優點在於:服務器

a. 重用存在的線程,減小對象建立、消亡的開銷,性能佳。
b. 可有效控制最大併發線程數,提升系統資源的使用率,同時避免過多資源競爭,避免堵塞。
c. 提供定時執行、按期執行、單線程、併發數控制等功能。併發

2、ThreadPoolExecutor機制函數

一、newCachedThreadPool性能

 在newCachedThreadPool中若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。 
初看該構造函數時我有這樣的疑惑:核心線程池爲0,那按照前面所講的線程池策略新任務來臨時沒法進入核心線程池,只能進入 SynchronousQueue中進行等待,而SynchronousQueue的大小爲1,那豈不是第一個任務到達時只能等待在隊列中,直到第二個任務到達發現沒法進入隊列才能建立第一個線程? 
這個問題的答案在上面講SynchronousQueue時其實已經給出了,要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接收這個元素。所以即使SynchronousQueue一開始爲空且大小爲1,第一個任務也沒法放入其中,由於沒有線程在等待從SynchronousQueue中取走元素。所以第一個任務到達時便會建立一個新線程執行該任務。ui

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

二、newFixedThreadPoolthis

看代碼一目瞭然了,線程數量固定,使用無限大的隊列。再次強調,樓主就是踩的這個無限大隊列的坑(固定了N個線程,而提交給線程池的任務隊列是不限制大小的,若是發消息被阻塞或者變慢,那麼顯然隊列裏面的內容會愈來愈多,很快就內存耗盡)。spa

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

三、newScheduledThreadPool線程

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

 

四、newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

這幾種線程池最終都是返回了ThreadPoolExecutor對象。

ThreadPoolExecutor的構造方法:

public ThreadPoolExecutor(int corePoolSize,//核心線程池大小
                              int maximumPoolSize,//最大線程池大小
                              long keepAliveTime,//線程池中超過corePoolSize數目的空閒線程最大存活時間;能夠allowCoreThreadTimeOut(true)成爲核心線程的有效時間
                              TimeUnit unit,//keepAliveTime的時間單位
                              BlockingQueue<Runnable> workQueue,//阻塞任務隊列
                              ThreadFactory threadFactory,//線程工廠
                              RejectedExecutionHandler handler) {//當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

重點講解: 
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關係:
1.當線程池小於corePoolSize時,新提交任務將建立一個新線程執行任務,即便此時線程池中存在空閒線程。 
2.當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行 
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新線程執行任務 
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理 
5.當線程池中超過corePoolSize線程,空閒時間達到keepAliveTime時,關閉空閒線程 
6.當設置allowCoreThreadTimeOut(true)時,線程池中corePoolSize線程空閒時間達到keepAliveTime也將關閉 

學會使用ThreadPoolExecutor的參數後,咱們就能夠不用侷限於最上面那四種線程池,能夠按照須要來構建本身的線程池;還有一點,經過ThreadFactory能夠實現對線程的命名;

自定義線程工廠管理線程池:使用spring初始化實例類,使用同步鎖將線程池封裝到線程集合中;

/**
*
* @Date: 2019/1/7 15:54
* @Author: zhenliang.song
* @Description: 使用ThreadPoolExecutor自定義線程池
*/
public class ExecutorPoolFactoryWrap {

/**
* 線程池集合:key-自定義的枚舉類型,value-線程池的接口類型,初始化集合長度爲枚舉類的values長度
*/
private ConcurrentHashMap<ThreadPoolEnum, ExecutorService> PoolFactoryMap = new ConcurrentHashMap<ThreadPoolEnum, ExecutorService>(ThreadPoolEnum.values().length);

/**
* 從集合中獲取線程池對象:根據枚舉類型映射map集合中的自定義線程對象
* @param poolEnum 枚舉類
* @return
*/
public ExecutorService get(ThreadPoolEnum poolEnum) {
ExecutorService executorService = PoolFactoryMap.get(poolEnum);

if (executorService != null) {
return executorService;
}

synchronized (ExecutorPoolFactoryWrap.class) {
if (PoolFactoryMap.get(poolEnum) == null) {
int poolSize = poolEnum.getPoolSize() > 0 ? poolEnum.getPoolSize() : 1;
int capacity = poolEnum.getCapacity() > 0 ? poolEnum.getCapacity() : 256;
RejectedExecutionHandler rejectedHandler = poolEnum.getRejectedHandler() != null ? poolEnum.getRejectedHandler() : getRejectedExecutionHandler();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(poolEnum.getPoolName() + "-%d").build();
PoolFactoryMap.put(poolEnum, new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(capacity),
threadFactory,
rejectedHandler
));

}
}

return PoolFactoryMap.get(poolEnum);
}

/**
* 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理,
* 當沒有更多的線程或隊列插槽時,自定義如何處理超出能力範圍以外的任務
* @return
*/
private RejectedExecutionHandler getRejectedExecutionHandler() {
return new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
r.run();
}
}
};
}
/**
* 銷燬線程池:銷燬集合中的線程池
*/
public void destroy() {
if (MapUtils.isEmpty(PoolFactoryMap)) {
return;
}

for (Map.Entry<ThreadPoolEnum, ExecutorService> entry : PoolFactoryMap.entrySet()) {
ExecutorService executorService = entry.getValue();
try {
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
 

無界隊列

隊列大小無限制,經常使用的爲無界的LinkedBlockingQueue,使用該隊列作爲阻塞隊列時要尤爲小心,當任務耗時較長時可能會致使大量新任務在隊列中堆積最終致使OOM。閱讀代碼發現,Executors.newFixedThreadPool 採用就是 LinkedBlockingQueue,而樓主踩到的就是這個坑,當QPS很高,發送數據很大,大量的任務被添加到這個無界LinkedBlockingQueue 中,致使cpu和內存飆升服務器掛掉。

有界隊列

經常使用的有兩類,一類是遵循FIFO原則的隊列如ArrayBlockingQueue與有界的LinkedBlockingQueue,另外一類是優先級隊列如PriorityBlockingQueue。PriorityBlockingQueue中的優先級由任務的Comparator決定。 
使用有界隊列時隊列大小需和線程池大小互相配合,線程池較小有界隊列較大時可減小內存消耗,下降cpu使用率和上下文切換,可是可能會限制系統吞吐量。

在咱們的修復方案中,選擇的就是這個類型的隊列,雖然會有部分任務被丟失,可是咱們線上是排序日誌蒐集任務,因此對部分對丟失是能夠容忍的。

同步移交隊列

若是不但願任務在隊列中等待而是但願將任務直接移交給工做線程,可以使用SynchronousQueue做爲等待隊列。SynchronousQueue不是一個真正的隊列,而是一種線程之間移交的機制。要將一個元素放入SynchronousQueue中,必須有另外一個線程正在等待接收這個元素。只有在使用無界線程池或者有飽和策略時才建議使用該隊列。

 可選擇的飽和策略RejectedExecutionHandler詳解

JDK主要提供了4種飽和策略供選擇。4種策略都作爲靜態內部類在ThreadPoolExcutor中進行實現。

1 AbortPolicy停止策略

該策略是默認飽和策略。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
 }

使用該策略時在飽和時會拋出RejectedExecutionException(繼承自RuntimeException),調用者可捕獲該異常自行處理。

2 DiscardPolicy拋棄策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

如代碼所示,不作任何處理直接拋棄任務

3 DiscardOldestPolicy拋棄舊任務策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
}

如代碼,先將阻塞隊列中的頭元素出隊拋棄,再嘗試提交任務。若是此時阻塞隊列使用PriorityBlockingQueue優先級隊列,將會致使優先級最高的任務被拋棄,所以不建議將該種策略配合優先級隊列使用。

4 CallerRunsPolicy調用者運行

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
}

既不拋棄任務也不拋出異常,直接運行任務的run方法,換言之將任務回退給調用者來直接運行。使用該策略時線程池飽和後將由調用線程池的主線程本身來執行任務,所以在執行任務的這段時間裏主線程沒法再提交新任務,從而使線程池中工做線程有時間將正在處理的任務處理完成。

相關文章
相關標籤/搜索