java後臺異步任務執行器TaskManagerjava
此方式基於MVC方式:程序員
一,使用任務:spring
1 @Resource 2 private TaskManager taskManager; 3 4 public string commit(TradeStatus status) { 5 if (status== TradeStatus.UNDERWAY) { 6 7 // 執行任務 8 taskManager.addTask(new Runnable() { 9 10 @Override 11 public void run() { 12 handleUnderway(status); //運行業程序 13 } 14 }); 15 16 } else { 17 18 } 19 20 }
/** * 返回業務處理狀態 * * * <p> * 此方法須要被異步調用 * </p> * */ public void handleUnderway(TradeSatus satus) { int waitTimeOut = 20*1000; try { if (logger.isDebugEnabled()) { logger.debug("提交查詢支付狀態"); } Assert.notNull(trade, "交易對象不能爲空"); int timeOut = 0; String result = null; while (true) { result = queryStatus(satus); //獲取狀態 if (result == TradeStatus.UNDERWAY) { try { if (timeOut > waitTimeOut) { logger.warn(String.format("查詢狀態結果超時.")); break; } timeOut += 5000; Thread.sleep(5000); // 5秒同步一次 } catch (InterruptedException e) { if (logger.isInfoEnabled()) { logger.info("等待支付結果被中斷"); } break; } } else { break;// 查到銀行結果 } } if (result != TradeStatus.UNDERWAY) { //處理結果 } if (logger.isDebugEnabled()) { logger.debug(String.format("查詢完成,查詢結果爲:%s", result)); } } catch (Exception e) { logger.error(String.format("查詢支付結果異常"), e); throw new SimpleException("查詢支付結果異常", e); } }
二:實現的serviceapache
public interface TaskManager { /** * @param task */ void addTask(Runnable task); /** * @return */ int getActiveCount(); /** * 中止任務管理器 */ void stop(); }
三:service的實現類服務器
package com.service.impl; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.TaskExecutor; import com.service.TaskManager; /** * 後臺異步任務執行器 * */ public class TaskManagerImpl implements TaskManager, InitializingBean { private static Log logger = LogFactory.getLog(TaskManagerImpl.class); private ThreadPoolExecutor executer = null; private int corePoolSize = 5; private int maximumPoolSize = 50; private long keepAliveTime = 10; private TimeUnit unit = TimeUnit.MINUTES;// 秒 private BlockingQueue<Runnable> workQueue; private Thread t; @Resource private TaskExecutor taskExecutor; @Override public void addTask(Runnable task) { taskExecutor.execute(task); // executer.execute(task); } @Override public int getActiveCount() { return 0; } /* * (non-Javadoc) * * @see * org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { if (executer == null) { workQueue = new LinkedBlockingQueue<Runnable>(); executer = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy()); t = new Thread(new Runnable() { @Override public void run() { while (true) { try { Thread.sleep(120 * 1000); if (logger.isDebugEnabled()) { String msg = String.format("隊列大小:%d, 激活任務數: %d, poolSize: %d, 任務數:%d, 已處理任務數:%d", workQueue.size(), getActiveCount(), executer.getPoolSize(), executer.getTaskCount(), executer.getCompletedTaskCount()); logger.debug(msg); } } catch (InterruptedException e) { return; } } } }); t.setDaemon(true); t.setName("異步任務健康檢查線程"); t.start(); } } @Override public void stop() { if (executer == null) return; try { logger.info("準備中止任務管理器"); executer.shutdown(); executer.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.info("任務被停止."); } finally { if (!executer.isTerminated()) { logger.info("取消未完成的任務."); } executer.shutdownNow(); logger.info("任務管理器中止完成."); } } /** * @return the corePoolSize */ public int getCorePoolSize() { return corePoolSize; } /** * @param corePoolSize * the corePoolSize to set */ public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } /** * @return the maximumPoolSize */ public int getMaximumPoolSize() { return maximumPoolSize; } /** * @param maximumPoolSize * the maximumPoolSize to set */ public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } /** * @return the keepAliveTime */ public long getKeepAliveTime() { return keepAliveTime; } /** * @param keepAliveTime * the keepAliveTime to set */ public void setKeepAliveTime(long keepAliveTime) { this.keepAliveTime = keepAliveTime; } /** * @return the unit */ public TimeUnit getUnit() { return unit; } /** * @param unit * the unit to set */ public void setUnit(TimeUnit unit) { this.unit = unit; } }
四:xml 配置多線程
<!-- 異步任務執行器配置 -->
<bean id="taskManager" class="com.service.impl.TaskManagerImpl">
<property name="corePoolSize" value="30"></property>
<property name="maximumPoolSize" value="100"></property>
<property name="keepAliveTime" value="10"></property>
</bean>
<!--
任務執行器
pool-size="5-20", 表示線程池活躍的線程數爲5,最大線程數爲20;
queue-capacity="100" 表示隊列大小爲100
-->
<task:executor id="taskExecutor" keep-alive="30" pool-size="5-20" queue-capacity="100" rejection-policy="ABORT"/>
以上異步處理就完成了。併發
下面是摘錄下來的一些解釋:異步
五。配置解釋
當一個任務經過execute(Runnable)方法欲添加到線程池時:
一、 若是此時線程池中的數量小於corePoolSize,即便線程池中的線程都處於空閒狀態,也要建立新的線程來處理被添加的任務。
二、 若是此時線程池中的數量等於 corePoolSize,可是緩衝隊列 workQueue未滿,那麼任務被放入緩衝隊列。
三、若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
四、 若是此時線程池中的數量大於corePoolSize,緩衝隊列workQueue滿,而且線程池中的數量等於maximumPoolSize,那麼經過 handler所指定的策略來處理此任務。也就是:處理任務的優先級爲:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,若是三者都滿了,使用handler處理被拒絕的任務。
五、 當線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止。這樣,線程池能夠動態的調整池中的線程數。
ide
六 其它線程池
JDK的ThreadPoolExecutor
一個 ExecutorService,它使用可能的幾個池線程之一執行每一個提交的任務,一般使用Executors 工廠方法配置。
線程池能夠解決兩個不一樣問題:因爲減小了每一個任務調用的開銷,它們一般能夠在執行大量異步任務時提供加強的性能,而且還能夠提供綁定和管理資源(包括執行 集合任務時使用的線程)的方法。每一個ThreadPoolExecutor 還維護着一些基本的統計數據,如完成的任務數。
爲了便於跨大量上下文使用,此類提供了不少可調整的參數和擴展掛鉤。可是,強烈建議程序員使用較爲方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,能夠進行自動線程回收)、 Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個後臺線程),它們均爲大多數使用場景預約義了設置。不然,在手動配置和調 整此類時,使用如下指導:
一、核心和最大池大小
ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見getMaximumPoolSize())設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。若是將 maximumPoolSize 設置爲基本的無界值(如Integer.MAX_VALUE),則容許池適應任意數量的併發任務。在大多數狀況下,核心和最大池大小僅基於構造來設置,不 過也可使用setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。
二、按需構造
默認狀況下,即便核心線程最初只是在新任務須要時才建立和啓動的,也可使用方法 prestartCoreThread() 或prestartAllCoreThreads() 對其進行動態重寫。
三、建立新線程
使用 ThreadFactory 建立新線程。若是沒有另外說明,則在同一個ThreadGroup 中一概使用Executors.defaultThreadFactory() 建立線程,而且這些線程具備相同的NORM_PRIORITY 優先級和非守護進程狀態。經過提供不一樣的 ThreadFactory,能夠改變線程的名稱、線程組、優先級、守護進程狀態,等等。若是從newThread 返回 null 時ThreadFactory 未能建立線程,則執行程序將繼續運行,但不能執行任何任務。
四、保持活動時間
若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當池處於非活動狀態時減小資源消耗的方 法。若是池後來變得更爲活動,則能夠建立新的線程。也可使用方法setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。使用Long.MAX_VALUETimeUnit.NANOSECONDS 的值在關閉前有效地從之前的終止狀態禁用空閒線程。
五、排隊
全部 BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:
若是運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
若是運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
若是沒法將請求加入隊列,則建立新的線程,除非建立此線程超出 maximumPoolSize,在這種狀況下,任務將被拒絕。
排隊有三種通用策略:
a、直接提交。工做隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,若是不存在可用於當即運行任務的線程,則試圖把任務加入隊列將失敗,所以 會構造一個新的線程。此策略能夠避免在處理可能具備內部依賴性的請求集合時出現鎖定。直接提交一般要求無界 maximumPoolSizes 以免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
b、無界隊列。使用無界隊列(例如,不具備預約義容量的 LinkedBlockingQueue)將致使在全部 corePoolSize 線程都忙的狀況下將新任務加入隊列。這樣,建立的線程就不會超過 corePoolSize。(所以,maximumPoolSize 的值也就無效了。)當每一個任務徹底獨立於其餘任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略容許無界線程具備增加的可能性。
c、有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,可是可能較難調整和控制。隊列大小和最大池大小可能須要相互折衷:使用大型隊列和小型池能夠 最大限度地下降 CPU 使用率、操做系統資源和上下文切換開銷,可是可能致使人工下降吞吐量。若是任務頻繁阻塞(例如,若是它們是 I/O 邊界),則系統可能爲超過您許可的更多線程安排時間。使用小型隊列一般要求較大的池大小,CPU 使用率較高,可是可能遇到不可接受的調度開銷,這樣也會下降吞吐量。
六、被拒絕的任務
當 Executor 已經關閉,而且 Executor 將有限邊界用於最大線程和工做隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種狀況下,execute 方法都將調用其RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預約義的處理程序策略:
在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。
在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
在 ThreadPoolExecutor.DiscardOldestPolicy 中,若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)。
定義和使用其餘種類的 RejectedExecutionHandler 類也是可能的,但這樣作須要很是當心,尤爲是當策略僅用於特定容量或排隊策略時。
七、掛鉤方法
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每一個任務以前和以後調用。它們可用於操縱執行環境;例如,從新初始化 ThreadLocal、蒐集統計信息或添加日誌條目。此外,還能夠重寫方法terminated() 來執行 Executor 徹底終止後須要完成的全部特殊處理。
若是掛鉤或回調方法拋出異常,則內部輔助線程將依次失敗並忽然終止。
八、隊列維護
方法 getQueue() 容許出於監控和調試目的而訪問工做隊列。強烈反對出於其餘任何目的而使用此方法。remove(java.lang.Runnable) 和purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。性能
偶遇晨光原創
2016-03-11