如下是官方已經實現的所有7個TaskExecuter。Spring宣稱對於任何場景,這些TaskExecuter徹底夠用了:html
名字 | 特色 |
---|---|
SimpleAsyncTaskExecutor | 每次請求新開線程,沒有最大線程數設置.不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程。 --【1】 |
SyncTaskExecutor | 不是異步的線程.同步能夠用SyncTaskExecutor,但這個能夠說不算一個線程池,由於還在原線程執行。這個類沒有實現異步調用,只是一個同步操做。 |
ConcurrentTaskExecutor | Executor的適配類,不推薦使用。若是ThreadPoolTaskExecutor不知足要求時,才用考慮使用這個類。 |
SimpleThreadPoolTaskExecutor | 監聽Spring’s lifecycle callbacks,而且能夠和Quartz的Component兼容.是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才須要使用此類。 |
ThreadPoolTaskExecutor | 最經常使用。要求jdk版本大於等於5。能夠在程序而不是xml裏修改線程池的配置.其實質是對java.util.concurrent.ThreadPoolExecutor的包裝。 |
TimerTaskExecutor | |
WorkManagerTaskExecutor |
1. SyncTaskExecutor:同步能夠用SyncTaskExecutor,但這個能夠說不算一個線程池,由於還在原線程執行。這個類沒有實現異步調用,只是一個同步操做。java
2.也能夠用ThreadPoolTaskExecutor結合FutureTask作到同步。spring
前者是同步執行器,執行任務同步,後者是線程池,執行任務異步。併發
異步執行用戶任務的SimpleAsyncTaskExecutor。每次執行客戶提交給它的任務時,它會啓動新的線程,並容許開發者控制併發線程的上限(concurrencyLimit),從而起到必定的資源節流做用。默認時,concurrencyLimit取值爲-1,即不啓用資源節流。app
<bean id="simpleAsyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"> <property name="daemon" value="true"/> <property name="concurrencyLimit" value="2"/> <property name="threadNamePrefix" value="simpleAsyncTaskExecutor"/> </bean>
主要實現:1.支持限流處理 2.異步註冊線程返回結果框架
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable { //限流主要實現 private final SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter concurrencyThrottle = new SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter(); private ThreadFactory threadFactory; //設置最大的線程數量 public void setConcurrencyLimit(int concurrencyLimit) { this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); } //是否開啓了限流 限流數量大於0? public final boolean isThrottleActive() { return this.concurrencyThrottle.isThrottleActive(); } //1.是否開啓限流 不然不開啓限流處理 //2.執行開始以前檢測是否能夠知足要求 當前數量++ //3.開啓限流將執行的Runable進行封裝,執行完成調用final方法 當前數量-- public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); if(this.isThrottleActive() && startTimeout > 0L) { this.concurrencyThrottle.beforeAccess(); this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task)); } else { this.doExecute(task); } } //異步提交有返回值 public Future<?> submit(Runnable task) { FutureTask future = new FutureTask(task, (Object)null); this.execute(future, 9223372036854775807L); return future; } public <T> Future<T> submit(Callable<T> task) { FutureTask future = new FutureTask(task); this.execute(future, 9223372036854775807L); return future; } public ListenableFuture<?> submitListenable(Runnable task) { ListenableFutureTask future = new ListenableFutureTask(task, (Object)null); this.execute(future, 9223372036854775807L); return future; } public <T> ListenableFuture<T> submitListenable(Callable<T> task) { ListenableFutureTask future = new ListenableFutureTask(task); this.execute(future, 9223372036854775807L); return future; } //擁有工廠?沒有的話調用父類能夠設置各類參數的建立線程 protected void doExecute(Runnable task) { Thread thread = this.threadFactory != null?this.threadFactory.newThread(task):this.createThread(task); thread.start(); } //父類的方法,方便配置線程,方便xml設置線程參數CustomizableThreadCreator public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; } }
內部類的實現異步
//下面只是對於操做進行簡單的封裝,最真的實現仍是抽象的ConcurrencyThrottleSupport private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport { private ConcurrencyThrottleAdapter() { } protected void beforeAccess() { super.beforeAccess(); } protected void afterAccess() { super.afterAccess(); } }
更多關於限流功能源碼見:《spring控制併發數的工具類ConcurrencyThrottleSupport和ConcurrencyThrottleInterceptor》async
//這裏是對於Runable對象執行在次封裝,在執行完畢後處理限流操做 private class ConcurrencyThrottlingRunnable implements Runnable { private final Runnable target; public ConcurrencyThrottlingRunnable(Runnable target) { this.target = target; } public void run() { try { this.target.run(); } finally { SimpleAsyncTaskExecutor.this.concurrencyThrottle.afterAccess(); } } }
簡單的經過synchronized和wati and notify達到控制線程數量的效果,從而實現限流的策略。工具
看SimpleAsyncTaskExecutor.java的關鍵源代碼:post
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
createThread()在父類中CustomizableThreadCreator.java中
public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; }
ListenableFutureTask 其實主要是依靠FutureTask這個JDK的封裝,覆蓋了原始的run方法,在run中封裝能夠獲取到線程的返回值。
ListenableFutureTask 在次封裝,因爲FutureTask執行完成以後會調用done()空方法,ListenableFutureTask覆蓋done方法能夠獲取到執行的結果,而後在調用前期註冊的錯誤處理或者成功處理的方法,便可到達異步處理的效果。
相似於回調的效果
public interface SuccessCallback<T> { /** * Called when the {@link ListenableFuture} successfully completes. * @param result the result */ void onSuccess(T result); } public interface FailureCallback { /** * Called when the {@link ListenableFuture} fails to complete. * @param ex the exception that triggered the failure */ void onFailure(Throwable ex); } public interface ListenableFuture<T> extends Future<T> { //成功和失敗的集合 void addCallback(ListenableFutureCallback<? super T> var1); void addCallback(SuccessCallback<? super T> var1, FailureCallback var2); }
實現類(ListenableFutureTask)可有返回值,可被監聽的,註冊監聽,這裏能夠註冊監聽者放在一個單獨的類中去處理,很好的分配工做ListenableFutureCallbackRegistry
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> { private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry(); public ListenableFutureTask(Callable<T> callable) { super(callable); } public ListenableFutureTask(Runnable runnable, T result) { super(runnable, result); } public void addCallback(ListenableFutureCallback<? super T> callback) { this.callbacks.addCallback(callback); } public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { this.callbacks.addSuccessCallback(successCallback); this.callbacks.addFailureCallback(failureCallback); } //FutureTask執行完成後的回調,調用監聽接口的實現類的方法 protected final void done() { Object cause; try { Object ex = this.get(); //回調實現類的方法 this.callbacks.success(ex); return; } catch (InterruptedException var3) { Thread.currentThread().interrupt(); return; } catch (ExecutionException var4) { cause = var4.getCause(); if(cause == null) { cause = var4; } } catch (Throwable var5) { cause = var5; } this.callbacks.failure((Throwable)cause); } }
註冊監聽,還維護了一個狀態量的信息,很標準的寫法,維護隊列的添加和成功消息和失敗消息的處理
public class ListenableFutureCallbackRegistry<T> { private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>(); private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>(); private State state = State.NEW; private Object result = null; private final Object mutex = new Object(); /** * Add the given callback to this registry. * @param callback the callback to add */ public void addCallback(ListenableFutureCallback<? super T> callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { switch (this.state) { case NEW: this.successCallbacks.add(callback); this.failureCallbacks.add(callback); break; case SUCCESS: callback.onSuccess((T) this.result); break; case FAILURE: callback.onFailure((Throwable) this.result); break; } } } /** * Add the given success callback to this registry. * @param callback the success callback to add * @since 4.1 */ public void addSuccessCallback(SuccessCallback<? super T> callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { switch (this.state) { case NEW: this.successCallbacks.add(callback); break; case SUCCESS: callback.onSuccess((T) this.result); break; } } } /** * Add the given failure callback to this registry. * @param callback the failure callback to add * @since 4.1 */ public void addFailureCallback(FailureCallback callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { switch (this.state) { case NEW: this.failureCallbacks.add(callback); break; case FAILURE: callback.onFailure((Throwable) this.result); break; } } } /** * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all * added callbacks with the given result. * @param result the result to trigger the callbacks with */ public void success(T result) { synchronized (this.mutex) { this.state = State.SUCCESS; this.result = result; while (!this.successCallbacks.isEmpty()) { this.successCallbacks.poll().onSuccess(result); } } } public void failure(Throwable ex) { synchronized (this.mutex) { this.state = State.FAILURE; this.result = ex; while (!this.failureCallbacks.isEmpty()) { this.failureCallbacks.poll().onFailure(ex); } } } private enum State {NEW, SUCCESS, FAILURE} }
比起從線程池取一個線程再執行, 你僅僅須要把你的Runnable類加入到隊列中,而後TaskExecutor用它內置的規則決定什麼時候開始取一個線程並執行該Runnable類
先在xml中添加bean的配置:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="25" /> </bean> <bean id="taskExecutorExample" class="TaskExecutorExample"> <constructor-arg ref="taskExecutor" /> </bean>
配置解釋
當一個任務經過execute(Runnable)方法欲添加到線程池時:
一、 若是此時線程池中的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。
二、 若是此時線程池中的數量等於 corePoolSize,可是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。
三、若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
四、 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。也就是:處理任務的優先級爲:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。
五、 當線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。這樣,線程池能夠動態的調整池中的線程數。
具體調用:
import org.springframework.core.task.TaskExecutor; public class TaskExecutorExample { private class MessagePrinterTask implements Runnable { private String message; public MessagePrinterTask(String message) { this.message = message; } public void run() { System.out.println(message); } } private TaskExecutor taskExecutor; public TaskExecutorExample(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } public void printMessages() { for(int i = 0; i < 25; i++) { taskExecutor.execute(new MessagePrinterTask("Message" + i)); } } }
首先,爲了以註解方式使用異步功能,咱們須要在Spring的xml配置中定義相關的bean:
1.必須在*-servlet.xml而不是applicationContext.xml中定義@Async相關配置
引自http://stackoverflow.com/questions/6610563/spring-async-not-working
2 不使用線程池版本
引自http://www.springframework.org/schema/task/spring-task-3.2.xsd
因此,若是咱們僅僅添加<task:annotation-driven/>
,也可使用@Async標籤。然而,此時使用的是SimpleAsyncTaskExecutor。如「官方文檔27章:Task Execution」中所述,SimpleAsyncTaskExecutor不會使用線程池,僅僅是爲每個請求新開一個線程。這樣在大併發的業務場景下,發生OutOfMemory是不足爲奇的。
<?xml version="1.0" encoding="UTF-8"?> <!--Spring框架的xml標籤訂義文檔, 可訪問http://www.springframework.org/schema/task/查看最新task組件的xml標籤文檔--> <beans xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd"> <!--掃描項目實例化@Component,@Service,@Controller修飾的類--> <context:component-scan base-package="com.your_app" /> <!--create a SimpleAsyncTaskExecutor instance--> <task:annotation-driven/> </beans>
3 推薦 - 使用線程池版本
<?xml version="1.0" encoding="UTF-8"?> <!--Spring框架的xml標籤訂義文檔, 可訪問http://www.springframework.org/schema/task/查看最新task組件的xml標籤文檔--> <beans xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd"> <!--掃描項目實例化@Component,@Service,@Controller修飾的類--> <context:component-scan base-package="com.your_app" /> <!-- 在代碼中@Async不加參數就會使用task:annotation-driven標籤訂義的executor--> <task:annotation-driven executor="myExecutor"/> <!-- 在代碼中@Async("myExecutor")能夠顯式指定executor爲"myExecutor"--> <task:executor id="myExecutor" pool-size="5-25" queue-capacity="100" rejection-policy="CALLER_RUNS"/> </beans>
其中,注意到屬性pool-size的值」5-25」是一個範圍,這對應的是線程池的min和max容量,它們的意義請參考本文上一節的「配置說明」裏的第三、4點。若是隻有一個值,如pool-size=n
, 意味着minSize==maxSize==n
而關於rejection-policy,官方文檔裏說:
總結以下:
池滿時的拒絕策略 | 效果 |
---|---|
AbortPolicy(默認) | 拋異常 |
DiscardPolicy or DiscardOldestPolicy | 放棄該線程 |
CallerRunsPolicy | 通知該線程的建立者,讓其不要提交新的線程 |
轉自:https://blog.csdn.net/caib1109/article/details/51623089