初學者很容易看錯,若是沒有看到spring或者JUC源碼的人確定是不太瞭解的。javascript
ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是對ThreadPoolExecutor進行了封裝處理。java
本身在以前寫多線程代碼的時候都是這麼玩的executor=Executors.newCachedThreadPool();可是有一次在大量數據的時候因爲入庫速度遠大於出庫速度致使內存急劇膨脹最後悲劇了重寫代碼,原來spring 早就給咱們作好封裝了。spring
來看一下ThreadPoolExecutor結構,祖類都是調用Executor接口:多線程
再來看一下ThreadPoolTaskExecutor結構,祖類都是調用Executor接口:異步
再來看一下源碼:ide
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = 2147483647; private int keepAliveSeconds = 60; private boolean allowCoreThreadTimeOut = false; private int queueCapacity = 2147483647; private ThreadPoolExecutor threadPoolExecutor; //這裏就用到了ThreadPoolExecutor
這是ThreadPoolTaskExecutor用來初始化threadPoolExecutor的方法,BlockingQueue是一個阻塞隊列,這個咱們先無論。因爲ThreadPoolTaskExecutor的實現方式徹底是使用threadPoolExecutor進行實現,咱們須要知道這個threadPoolExecutor的一些參數。優化
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize:線程池維護線程的最小數量. this
int maximumPoolSize:線程池維護線程的最大數量. spa
long keepAliveTime:空閒線程的存活時間. 線程
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列.
RejectedExecutionHandler handler:
用來拒絕一個任務的執行,有兩種狀況會發生這種狀況。
一是在execute方法中若addIfUnderMaximumPoolSize(command)爲false,即線程池已經飽和;
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。
ThreadPoolExecutor池子的處理流程以下:
1)當池子大小小於corePoolSize就新建線程,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裏的空閒線程就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,若是池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來作拒絕處理
4)另外,當池子的線程數大於corePoolSize的時候,多餘的線程會等待keepAliveTime長的時間,若是無請求可處理就自行銷燬
其會優先建立 CorePoolSiz 線程, 當繼續增長線程時,先放入Queue中,當 CorePoolSiz 和 Queue 都滿的時候,就增長建立新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的設定若是比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。
<!-- 異步線程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心線程數,默認爲1 --> <property name="corePoolSize" value="3" /> <!-- 最大線程數,默認爲Integer.Max_value --> <property name="maxPoolSize" value="10" /> <!-- 隊列最大長度 >=mainExecutor.maxSize --> <property name="queueCapacity" value="25" /> <!-- 線程池維護線程所容許的空閒時間 --> <property name="keepAliveSeconds" value="300" /> <!-- 線程池對拒絕任務(無線程可用)的處理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,若是執行器已關閉,則丟棄. --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 --> <!-- CallerRunsPolicy:若已達到待處理隊列長度,將由主線程直接處理請求 --> <!-- DiscardOldestPolicy:拋棄舊的任務;會致使被丟棄的任務沒法再次被執行 --> <!-- DiscardPolicy:拋棄當前任務;會致使被丟棄的任務沒法再次被執行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
Reject策略預約義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,若是執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程).
關於callable回調方法(由於爲隊列阻塞,若是到取值某個執行的值會等待執行完成)
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setMaxPoolSize(50); threadPoolTaskExecutor.initialize(); List<String> paymentSeqNoList = new ArrayList<>(); for (int i = 0; i < 100; i++) { paymentSeqNoList.add(String.valueOf(i)); } Long startTime = System.currentTimeMillis(); Map<String, FutureTask<String>> futureMap = new HashMap<String, FutureTask<String>>(); //線程池提交返回 for (String paymentSeqNo : paymentSeqNoList) { FutureTask<String> futureTask = new FutureTask<String>(new MyTestCallable(paymentSeqNo)); futureMap.put(paymentSeqNo, futureTask); // submit提交執行 threadPoolTaskExecutor.submit(futureTask); } Long endTime = System.currentTimeMillis(); System.out.println("耗時1:" + (endTime - startTime));
關於callable回調值監聽是否成功,JDK1.8 也開始支持guava方法了,guava有ListenableFuture 返回優化以下:
Long startTime2 = System.currentTimeMillis(); ListenableFuture<String> listenableFuture = null; for (String paymentSeqNo : paymentSeqNoList) { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); listenableFuture = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "成功"; } }); } //監聽事件 Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("get listenable future's result with callback " + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }); Long endTime2 = System.currentTimeMillis(); System.out.println("耗時2:" + (endTime2 - startTime2));
源自:https://cloud.tencent.com/developer/article/1408125