異步並行批處理框架設計的一些思考

異步並行批處理框架設計的一些思考

  隨着互聯網信息技術突飛猛進的發展,一個海量數據爆炸的時代已經到來。如何有 效地處理、分析這些海量的數據資源,成爲各大技術廠商爭在激烈的競爭中脫穎而出的一個利器。能夠說,若是不能很好的快速處理分析這些海量的數據資源,將很 快被市場無情地所淘汰。固然,處理分析這些海量數據目前能夠借鑑的方案有不少:首先,在分佈式計算方面有Hadoop裏面的MapReduce並行計算框 架,它主要針對的是離線的數據挖掘分析。此外還有針對實時在線流式數據處理方面的,一樣也是分佈式的計算框架Strom,也能很好的知足數據實時性分析、 處理的要求。最後還有Spring Batch,這個徹底面向批處理的框架,能夠大規模的應用於企業級的海量數據處理。html

  在這裏,我就不具體展開說明這些框架如何部署、以及如何開發使用的詳細教程說明。我想在此基礎上更進一步:咱們可否借鑑這些開源框架背後的技術背景,爲服務的企業或者公司,量身定製一套符合自身數據處理要求的批處理框架。java

  首先我先描述一下,目前我所服務的公司所面臨的一個用戶數據存儲處理的一個現狀背景。目前移動公司一個省內在網用戶數據規模達到幾千萬的規模數 量級,並且每一個省已經根據地市區域對用戶數據進行劃分,咱們把這批數據存儲在傳統的關係型數據庫上面(基於Oracle,地市是分區)。移動公司的計費結 算系統會根據用戶手機話費的餘額狀況,實時的通知業務處理系統,給手機用戶進行停機、復機的操做。業務處理系統收到計費結算系統的請求,會把要處理的用戶 數據往具體的交換機網元上派發不一樣的交換機指令,這裏簡單的能夠稱爲Hlr停復機指令(下面開始本文都簡稱Hlr指令)。目前面臨的現狀是,在平常狀況 下,傳統的C++多進程的後臺處理程序還能勉強的「準實時」地處理這些數據請求,可是,若是一旦到了每月的月初幾天,要處理的數據量每每會暴增,而 C++後臺程序處理的效率並不高。這時問題來了,每每會有用戶投訴,本身繳費了,爲何沒有復機?或者某些用戶明明已經欠費了,可是尚未及時停機。這樣 的結果會直接下降客戶對移動運營商支撐的滿意度,於此同時,移動運營商自己也可能流失這些客戶資源。react

  本身認真評估了一下,形成上述問題的幾個瓶頸所在。c++

  1. 一個省全部的用戶數據都放在數據庫的一個實體表中,數據庫服務器,滿打滿算達到頂級小型機配置,也可能沒法知足月初處理量激增的性能要求,能夠說頻繁的在一臺服務器上讀寫IO開銷很是巨大,整個服務器處理的性能低下。
  2. 處理這些數據的時候,會同步地往交換機物理設備上發送Hlr指令,在交換機沒有處理成功這個請求指令的時候,只能阻塞等待,進一步形成後續待處理數據的積壓。

  針對上述的問題,本人想到了幾個優化方案。spring

  1. 數據庫中的實體表,能不能根據用戶的歸屬地市進行表實體的拆分。即把一臺或者幾臺服務器的壓力,進行水平拆分。一臺數據庫服務器就重點處理某一個或者幾個地市的數據請求?下降IO開銷。
  2. 因爲交換機處理Hlr指令的時候,存在阻塞操做,咱們能不能改爲:經過異步返回處理的方式,把處理任務隊列中的任務先下達通知給交換機,而後交換 機經過異步回調機制,反向通知處理模塊,彙報任務的執行狀況。這樣處理模塊就從主動的任務輪詢等待,變成等待交換機執行結果的異步通知,這樣它就能夠專一 地進行處理數據的派發,不會受到某幾個任務處理時長的限制,從而影響到後面整批次的數據處理。
  3. 數據庫的實體表因爲進行水平拆解,能不能作到並行加載?這樣就會大大節約串行數據加載的處理時長。
  4. 並行加載出來的待處理數據最好能放到一個批處理框架裏面,批處理框架能很好地根據要處理數據的狀況,進行配置參數調整,從而很好地知足實時性的要 求。好比月初期間,能夠加大處理參數的值,提升處理效率。日常的時候,能夠適當下降處理參數的取值,下降系統的CPU/IO開銷。

  基於以上幾點考慮,得出以下圖所示的設計方案的組件圖:sql

  

  下面就具體說明一下,其中關鍵模塊如何協同工做的。數據庫

  1. 異步並行查詢加載模塊BatchQueryLoader:支持傳入多個數據源對象,同時利用google-guava庫中對於Future接口的 擴展ListenableFuture,來實現批量查詢數據的並行加載。Future接口主要是用來表示異步計算的結果,而且計算完成的時候,只能用 get()方法獲取結果,get方法裏面其中有一個方法是能夠設置超時時間的。在並行加載模塊裏面,批量並行地加載多個數據源裏面的實體表中的數據,並最 終反饋加載的結果集合。並行數據加載和串行數據加載所用的耗時能夠簡單用下面的圖例來講明:串行加載的總耗時是每一個數據源加載耗時的總和。而並行加載的總 耗時,取決於最大加載的那個數據源耗時時長。(注:咱們把天天要進行停復機處理的用戶數據經過採集程序,分地市分佈採集到水平分庫的 notify_users提醒用戶表)                 
  2. 並行異步批處理模塊BatchTaskReactor:內部是經過線程池機制來實現的,接受異步並行查詢加載模塊 BatchQueryLoader獲得的加載結果數據,放入線程池中進行任務的異步派發,它最終就是經過Hlr派單指令異步任務執行 HlrBusinessEventTask模塊下發指令任務,而後本身不斷的從阻塞隊列中獲取,待執行的任務列表進行任務的分派。與此同時,他經過 Future接口,異步獲得HlrBusinessEventTask派發指令的執行反饋結果。
  3. 批量處理線程池運行參數配置加載BatchTaskConfigurationLoader:加載線程池運行參數的配置,把結果通知並行異步批處理模塊BatchTaskReactor,配置文件batchtask-configuration.xml的內容以下所示。
    複製代碼
    <?xml version="1.0" encoding="GBK"?> <batchtask> <!-- 批處理異步線程池參數配置 --> <jobpool name="newlandframework_batchtask"> <attribute name="corePoolSize" value="15" /> <attribute name="maxPoolSize" value="30" /> <attribute name="keepAliveTime" value="1000" /> <attribute name="workQueueSize" value="200" /> </jobpool> </batchtask>
    複製代碼

    其中corePoolSize表示保留的線程池大小,workQueueSize表示的是阻塞隊列的大小,maxPoolSize表示的是線程池的 最大大小,keepAliveTime指的是空閒線程結束的超時時間。其中建立線程池方法ThreadPoolExecutor裏面有個參數是unit, 它表示一個枚舉,即keepAliveTime的單位。說了半天,這幾個參數到底什麼關係呢?我舉一個例子說明一下,當出現須要處理的任務的時 候,ThreadPoolExecutor會分配corePoolSize數量的線程池去處理,若是不夠的話,會把任務放入阻塞隊列,阻塞隊列的大小是 workQueueSize,固然這個時候還可能不夠,怎麼辦。只能叫來「臨時工線程」幫忙處理一下,這個時候「臨時工線程」的數量是 maxPoolSize-corePoolSize,固然還會繼續不夠,這個時候ThreadPoolExecutor線程池會採起4種處理策略。apache

  4. 如今具體說一下是那些處理策略。首先是ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。而後是ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的 execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度。其次是,ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。最後是ThreadPoolExecutor.DiscardOldestPolicy 中,若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)。若是要處理的任務沒有那麼多 了,ThreadPoolExecutor線程池會根據keepAliveTime設置的時間單位來回收多餘的「臨時工線程」。你能夠把 keepAliveTime理解成專門是爲maxPoolSize-corePoolSize的「臨時工線程」專用的。數組

  5. 線程池參數的設定。正常狀況下咱們要如何設置線程池的參數呢?咱們應該這樣設置:I、workQueueSize阻塞隊列的大小至少大於等於 corePoolSize的大小。II、maxPoolSize線程池的大小至少大於等於corePoolSize的大小。III、 corePoolSize是你指望處理的默認線程數,我的以爲線程池機制的話,至少大於1吧?否則的話,你這個線程池等於單線程處理任務了,這樣就失去了 線程池設計存在的意義了。服務器

  6. JMX(Java Management Extensions)批處理任務監控模塊BatchTaskMonitor:實時地監控線程池BatchTaskReactor中任務的執行處理狀況(具體就是任務成功/失敗狀況)。

  介紹完畢了幾個核心模塊主要的功能,那下面就依次介紹一下主要模塊的詳細設計思路。

  1. 咱們把天天要進行停復機處理的用戶數據經過採集程序,採集到notify_users表。首先定義的是,咱們要處理採集的通知用戶數據對象的結構 描述,它對應水平分庫的表notify_users的JavaBean對象。notify_users的表結構爲了演示起見,簡單設計以下(基於 Oracle數據庫):

    create table notify_users
    (
    home_city number(3) /*手機用戶的歸屬地市編碼*/,
    msisdn number(15) /*手機號碼*/,
    user_id number(15) /*手機用戶的用戶標識*/
    );

    對應JavaBean實體類NotifyUsers,具體代碼定義以下:
    複製代碼
    /** * @filename:NotifyUsers.java * * Newland Co. Ltd. All rights reserved. * * @Description:要進行批處理通知的用戶對象 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class NotifyUsers { public NotifyUsers() { } // 用戶歸屬地市編碼(這裏具體是:591表示福州/592表示廈門) private Integer homeCity; // 用戶的手機號碼 private Integer msisdn; // 用戶標識 private Integer userId; public Integer getHomeCity() { return homeCity; } public void setHomeCity(Integer homeCity) { this.homeCity = homeCity; } public Integer getMsisdn() { return msisdn; } public void setMsisdn(Integer msisdn) { this.msisdn = msisdn; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("homeCity", homeCity).append("userId", userId) .append("msisdn", msisdn).toString(); } }
    複製代碼
  2. 異步並行查詢加載模塊BatchQueryLoader的類圖結構:                                                                                                             咱們經過並行查詢加載模塊BatchQueryLoader調用異步並行查詢執行器BatchQueryExecutor,來並行地加載不一樣數據源的查詢結果集合。StatementWrapper則是對JDBC裏面Statement的封裝。具體代碼以下所示:
  3. 複製代碼
    /** * @filename:StatementWrapper.java * * Newland Co. Ltd. All rights reserved. * * @Description:Statement封裝類 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.sql.Connection; import java.sql.Statement; public class StatementWrapper { private final String sql; private final Statement statement; private final Connection con; public StatementWrapper(String sql, Statement statement, Connection con) { this.sql = sql; this.statement = statement; this.con = con; } public String getSql() { return sql; } public Statement getStatement() { return statement; } public Connection getCon() { return con; } }
    複製代碼

    定義兩個並行加載的異常類BatchQueryInterruptedException、BatchQueryExecutionException

    複製代碼
    /** * @filename:BatchQueryInterruptedException.java * * Newland Co. Ltd. All rights reserved. * * @Description:並行查詢加載InterruptedException異常類 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; public class BatchQueryInterruptedException extends RuntimeException { public BatchQueryInterruptedException(final String errorMessage, final Object... args) { super(String.format(errorMessage, args)); } public BatchQueryInterruptedException(final Exception cause) { super(cause); } }
    複製代碼
    複製代碼
    /** * @filename:BatchQueryExecutionException.java * * Newland Co. Ltd. All rights reserved. * * @Description:並行查詢加載ExecutionException異常類 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; public class BatchQueryExecutionException extends RuntimeException { public BatchQueryExecutionException(final String errorMessage, final Object... args) { super(String.format(errorMessage, args)); } public BatchQueryExecutionException(final Exception cause) { super(cause); } }
    複製代碼

    再抽象出一個批量查詢接口,主要是爲了後續能擴展在不一樣的數據庫之間進行批量加載。接口類BatchQuery定義以下

    複製代碼
    /** * @filename:BatchQuery.java * * Newland Co. Ltd. All rights reserved. * * @Description:異步查詢接口定義 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; public interface BatchQuery<IN, OUT> { OUT query(IN input) throws Exception; }
    複製代碼

    好了,如今封裝一個異步並行查詢執行器BatchQueryExecutor

    複製代碼
    /** * @filename:BatchQueryExecutor.java * * Newland Co. Ltd. All rights reserved. * * @Description:異步並行查詢執行器 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.ForClosure; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; public class BatchQueryExecutor { private final static int FUTUREQUERYNUMBER = 1; public BatchQueryExecutor() { } public <IN, OUT> List<OUT> executeQuery(final Collection<IN> inputs,final BatchQuery<IN, OUT> executeUnit) { ListenableFuture<List<OUT>> futures = submitBatchTaskFutures(inputs,executeUnit); delegateAsynTask(futures); return getAsynResults(futures); } private <IN, OUT> ListenableFuture<List<OUT>> submitBatchTaskFutures( final Collection<IN> inputs, final BatchQuery<IN, OUT> executeUnit) { final Set<ListenableFuture<OUT>> result = new HashSet<ListenableFuture<OUT>>( inputs.size()); final ListeningExecutorService service = MoreExecutors .listeningDecorator(Executors.newFixedThreadPool(inputs.size())); Closure futureQuery = new Closure() { public void execute(Object input) { final IN p = (IN) input; result.add(service.submit(new Callable<OUT>() { @Override public OUT call() throws Exception { return executeUnit.query(p); } })); } }; Closure parallelTask = new ForClosure(FUTUREQUERYNUMBER, futureQuery); CollectionUtils.forAllDo(inputs, parallelTask); service.shutdown(); return Futures.allAsList(result); } private <OUT> OUT getAsynResults(final ListenableFuture<OUT> futures) { try { return futures.get(); } catch (InterruptedException ex) { throw new BatchQueryInterruptedException(ex); } catch (ExecutionException ex) { throw new BatchQueryExecutionException(ex); } } private <TYPE> void delegateAsynTask( final ListenableFuture<TYPE> allFutures) { Futures.addCallback(allFutures, new FutureCallback<TYPE>() { @Override public void onSuccess(final TYPE result) { System.out.println("並行加載查詢執行成功"); } @Override public void onFailure(final Throwable thrown) { System.out.println("並行加載查詢執行失敗"); } }); } }
    複製代碼

    最後的並行查詢加載模塊BatchQueryLoader直接就是調用上面的異步並行查詢執行器BatchQueryExecutor,完成不一樣數據源的數據並行異步加載,代碼以下

    複製代碼
    /** * @filename:BatchQueryLoader.java * * Newland Co. Ltd. All rights reserved. * * @Description:並行查詢加載模塊 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; public class BatchQueryLoader { private final Collection<StatementWrapper> statements = new ArrayList<StatementWrapper>(); public void attachLoadEnv(final String sql, final Statement statement, final Connection con) { statements.add(new StatementWrapper(sql, statement, con)); } public Collection<StatementWrapper> getStatements() { return statements; } public void close() throws SQLException { Iterator<StatementWrapper> iter = statements.iterator(); while (iter.hasNext()) { iter.next().getCon().close(); } } public List<ResultSet> executeQuery() throws SQLException { List<ResultSet> result; if (1 == statements.size()) { StatementWrapper entity = statements.iterator().next(); result = Arrays.asList(entity.getStatement().executeQuery( entity.getSql())); return result; } else { BatchQueryExecutor query = new BatchQueryExecutor(); result = query.executeQuery(statements, new BatchQuery<StatementWrapper, ResultSet>() { @Override public ResultSet query(final StatementWrapper input) throws Exception { return input.getStatement().executeQuery( input.getSql()); } }); return result; } } }
    複製代碼
  4. 批量處理線程池運行參數配置加載BatchTaskConfigurationLoader模塊,主要從負責從batchtask- configuration.xml中加載線程池的運行參數。BatchTaskConfiguration批處理線程池運行參數對應的JavaBean 結構
    複製代碼
    /** * @filename:BatchTaskConfiguration.java * * Newland Co. Ltd. All rights reserved. * * @Description:批處理線程池參數配置 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class BatchTaskConfiguration { private String name; private int corePoolSize; private int maxPoolSize; private int keepAliveTime; private int workQueueSize; public void setName(String name) { this.name = name; } public String getName() { return this.name; } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } public int getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(int keepAliveTime) { this.keepAliveTime = keepAliveTime; } public int getWorkQueueSize() { return workQueueSize; } public void setWorkQueueSize(int workQueueSize) { this.workQueueSize = workQueueSize; } public int hashCode() { return new HashCodeBuilder(1, 31).append(name).toHashCode(); } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("name", name).append("corePoolSize", corePoolSize) .append("maxPoolSize", maxPoolSize) .append("keepAliveTime", keepAliveTime) .append("workQueueSize", workQueueSize).toString(); } public boolean equals(Object o) { boolean res = false; if (o != null && BatchTaskConfiguration.class.isAssignableFrom(o.getClass())) { BatchTaskConfiguration s = (BatchTaskConfiguration) o; res = new EqualsBuilder().append(name, s.getName()).isEquals(); } return res; } }
    複製代碼

    固然了,你進行參數配置的時候,還能夠指定多個線程池,因而要設計一個:批處理線程池工廠類BatchTaskThreadFactoryConfiguration,來依次循環保存若干個線程池的參數配置

    複製代碼
    /** * @filename:BatchTaskThreadFactoryConfiguration.java * * Newland Co. Ltd. All rights reserved. * * @Description:線程池參數配置工廠 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.Map; import java.util.HashMap; public class BatchTaskThreadFactoryConfiguration { // 批處理線程池參數配置 private Map<String, BatchTaskConfiguration> batchTaskMap = new HashMap<String, BatchTaskConfiguration>(); public BatchTaskThreadFactoryConfiguration() { } public void joinBatchTaskConfiguration(BatchTaskConfiguration batchTaskConfiguration) { if (batchTaskMap.containsKey(batchTaskConfiguration.getName())) { return; }else{ batchTaskMap.put(batchTaskConfiguration.getName(), batchTaskConfiguration); } } public Map<String, BatchTaskConfiguration> getBatchTaskMap() { return batchTaskMap; } }
    複製代碼

    剩下的是,加載運行時參數配置模塊BatchTaskConfigurationLoader

    複製代碼
    /** * @filename:BatchTaskConfigurationLoader.java * * Newland Co. Ltd. All rights reserved. * * @Description:線程池參數配置加載 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.io.InputStream; import org.apache.commons.digester.Digester; public final class BatchTaskConfigurationLoader { private static final String BATCHTASK_THREADPOOL_CONFIG = "./newlandframework/batchtask/parallel/batchtask-configuration.xml"; private static BatchTaskThreadFactoryConfiguration config = null; private BatchTaskConfigurationLoader() { } // 單例模式爲了控制併發要進行同步控制 public static BatchTaskThreadFactoryConfiguration getConfig() { if (config == null) { synchronized (BATCHTASK_THREADPOOL_CONFIG) { if (config == null) { try { InputStream is = getInputStream(); config = (BatchTaskThreadFactoryConfiguration) getDigester().parse(getInputStream()); } catch (Exception e) { e.printStackTrace(); } } } } return config; } private static InputStream getInputStream() { return BatchTaskConfigurationLoader.class.getClassLoader() .getResourceAsStream(BATCHTASK_THREADPOOL_CONFIG); } private static Digester getDigester() { Digester digester = new Digester(); digester.setValidating(false); digester.addObjectCreate("batchtask", BatchTaskThreadFactoryConfiguration.class); // 加載批處理異步批處理線程池參數配置 digester.addObjectCreate("*/jobpool", BatchTaskConfiguration.class); digester.addSetProperties("*/jobpool"); digester.addSetProperty("*/jobpool/attribute", "name", "value"); digester.addSetNext("*/jobpool", "joinBatchTaskConfiguration"); return digester; } }
    複製代碼

    上面的這些模塊主要是針對線程池的運行參數能夠調整而設計準備的。

  5. 並行異步批處理模塊BatchTaskReactor主要類圖結構以下                                                                                                   BatchTaskRunner這個接口,主要定義了批處理框架要初始化和回收資源的動做。
    複製代碼
    /** * @filename:BatchTaskRunner.java * * Newland Co. Ltd. All rights reserved. * * @Description:批處理資源管理定義接口 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.io.Closeable; public interface BatchTaskRunner extends Closeable { public void initialize(); public void close(); }
    複製代碼

    咱們還要從新實現一個線程工廠類BatchTaskThreadFactory,用來管理咱們線程池當中的線程。咱們能夠把線程池當中的線程放到線程組裏面,進行統一管理。好比線程池中的線程,它的運行狀態監控等等處理,你能夠經過從新生成一個監控線程,
    來運行、跟蹤線程組裏面線程的運行狀況。固然你還能夠從新封裝一個JMX(Java Management Extensions)的MBean對象,經過JMX方式對線程池進行監控處理,本文的後面,有給出運用JMX技術,進行批處理線程池任務完成狀況監控的 實現,實現線程池中線程運行狀態的監控能夠參考一下。這裏就不具體給出,線程池線程狀態監控的JMX模塊代碼了。言歸正傳,線程工廠類 BatchTaskThreadFactory的實現以下

    複製代碼
    /** * @filename:BatchTaskThreadFactory.java * * Newland Co. Ltd. All rights reserved. * * @Description:線程池工廠 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ThreadFactory; public class BatchTaskThreadFactory implements ThreadFactory { final private static String BATCHTASKFACTORYNAME = "batchtask-pool"; final private String name; final private ThreadGroup threadGroup; final private AtomicInteger threadNumber = new AtomicInteger(0); public BatchTaskThreadFactory() { this(BATCHTASKFACTORYNAME); } public BatchTaskThreadFactory(String name) { this.name = name; SecurityManager security = System.getSecurityManager(); threadGroup = (security != null) ? security.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(threadGroup, runnable); thread.setName(String.format("BatchTask[%s-%d]", threadGroup.getName(), threadNumber.incrementAndGet())); System.out.println(String.format("BatchTask[%s-%d]", threadGroup.getName(), threadNumber.incrementAndGet())); if (thread.isDaemon()) { thread.setDaemon(false); } if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
    複製代碼

    下面是關鍵模塊:並行異步批處理模塊BatchTaskReactor的實現代碼,主要仍是對ThreadPoolExecutor進行地封裝,考 慮使用有界的數組阻塞隊列ArrayBlockingQueue,仍是爲了防止:生產者無休止的請求服務,致使內存崩潰,最終作到內存使用可控
    採起的措施。

    複製代碼
    /** * @filename:BatchTaskReactor.java * * Newland Co. Ltd. All rights reserved. * * @Description:批處理並行異步線程池處理模塊 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; import java.util.Set; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public final class BatchTaskReactor implements BatchTaskRunner { private Map<String, ExecutorService> threadPools = new ConcurrentHashMap<String, ExecutorService>(); private static BatchTaskReactor context; private static Lock REACTORLOCK = new ReentrantLock(); public static final String BATCHTASK_THREADPOOL_NAME = "newlandframework_batchtask"; private BatchTaskReactor() { initialize(); } // 防止併發重複建立批處理反應器對象 public static BatchTaskReactor getReactor() { if (context == null) { try { REACTORLOCK.lock(); if (context == null) { context = new BatchTaskReactor(); } } finally { REACTORLOCK.unlock(); } } return context; } public ExecutorService getBatchTaskThreadPoolName() { return getBatchTaskThreadPool(BATCHTASK_THREADPOOL_NAME); } public ExecutorService getBatchTaskThreadPool(String poolName) { if (!threadPools.containsKey(poolName)) { throw new IllegalArgumentException(String.format( "批處理線程池名稱:[%s]參數配置不存在", poolName)); } return threadPools.get(poolName); } public Set<String> getBatchTaskThreadPoolNames() { return threadPools.keySet(); } // 關閉線程池,同時等待異步執行的任務返回執行結果 public void close() { for (Entry<String, ExecutorService> entry : threadPools.entrySet()) { entry.getValue().shutdown(); System.out.println(String.format("關閉批處理線程池:[%s]成功", entry.getKey())); } threadPools.clear(); } // 初始化批處理線程池 public void initialize() { BatchTaskThreadFactoryConfiguration poolFactoryConfig = BatchTaskConfigurationLoader.getConfig(); if (poolFactoryConfig != null) { initThreadPool(poolFactoryConfig); } } private void initThreadPool(BatchTaskThreadFactoryConfiguration poolFactoryConfig) { for (Entry<String, BatchTaskConfiguration> entry : poolFactoryConfig.getBatchTaskMap().entrySet()) { BatchTaskConfiguration config = entry.getValue(); // 使用有界的阻塞隊列,考慮爲了防止生產者無休止的請求服務,致使內存崩潰,最終作到內存使用可控 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(config.getWorkQueueSize()); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveTime(), TimeUnit.SECONDS, queue, new BatchTaskThreadFactory(entry.getKey()),new ThreadPoolExecutor.CallerRunsPolicy()); threadPools.put(entry.getKey(), threadPool); System.out.println(String.format("批處理線程池:[%s]建立成功",config.toString())); } } }
    複製代碼
  6. 下面設計實現的是:交換機Hlr指令處理任務模塊。固然,在後續的業務發展過程當中,還可能出現,其餘類型指令的任務處理,因此根據「開閉」原則的定義,要抽象出一個接口類:BusinessEvent
    複製代碼
    /** * @filename:BusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * @Description:業務事件任務接口定義 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; public interface BusinessEvent { // 執行具體批處理的任務 public int execute(Integer userId); }
    複製代碼

    而後具體的Hlr指令發送任務模塊HlrBusinessEvent要實現這個接口類的方法,完成用戶停復機Hlr指令的派發。代碼以下:

    複製代碼
    /** * @filename:HlrBusinessEvent.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派發任務接口定義 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.apache.commons.lang.math.RandomUtils; public class HlrBusinessEvent implements BusinessEvent { // 交換機上的指令執行成功失敗標識0表示成功 1表示失敗 public final static int TASKSUCC = 0; public final static int TASKFAIL = 1; private final static int ELAPSETIME = 1000; @Override public int execute(Integer userId) { // 這裏爲了舉例,隨機產生1000之內的隨機數 int millis = RandomUtils.nextInt(ELAPSETIME); // 簡單模擬往交換機發送停機/復機的指令 try { Thread.sleep(millis); String strContent = String.format( "線程標識[%s]用戶標識:[%d]執行交換機指令工單耗時:[%d]毫秒", Thread .currentThread().getName(), userId, millis); System.out.println(strContent); // 這裏爲了演示直接簡單根據隨機數是否是偶數簡單模擬交換機指令執行的結果 return (millis % 2 == 0) ? TASKSUCC : TASKFAIL; } catch (InterruptedException e) { e.printStackTrace(); return TASKFAIL; } } }
    複製代碼

    實際運行狀況中,咱們可能要監控一下指令發送的時長,因而再設計一個:針對Hlr指令發送任務模塊HlrBusinessEvent,切面嵌入代理的Hlr指令時長計算代理類:HlrBusinessEventAdvisor,具體的代碼以下:

    複製代碼
    /** * @filename:HlrBusinessEventAdvisor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派發時長計算代理類 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.lang.time.StopWatch; public class HlrBusinessEventAdvisor implements MethodInterceptor { public HlrBusinessEventAdvisor() { } @Override public Object invoke(MethodInvocation invocation) throws Throwable { // 計算一下指令派發時長 StopWatch sw = new StopWatch(); sw.start(); Object obj = invocation.proceed(); sw.stop(); System.out.println("執行交換機指令工單耗時: [" + sw.getTime() + "] 毫秒"); return obj; } }
    複製代碼

    剩下的,咱們因爲是要,異步並行計算獲得執行結果,因而咱們設計一個:批處理Hlr任務執行模塊HlrBusinessEventTask,它要實現java.util.concurrent.Callable接口的方法call,它會返回一個異步任務的執行結果。

    複製代碼
    /** * @filename:HlrBusinessEventTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Hlr指令派任務執行類 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.model; import java.util.concurrent.Callable; import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; public class HlrBusinessEventTask implements Callable<Integer> { private NotifyUsers user = null; private final static String MAPPERMETHODNAME = "execute"; public HlrBusinessEventTask(NotifyUsers user) { this.user = user; } @Override public Integer call() throws Exception { synchronized (this) { ProxyFactory weaver = new ProxyFactory(new HlrBusinessEvent()); NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(); advisor.setMappedName(MAPPERMETHODNAME); advisor.setAdvice(new HlrBusinessEventAdvisor()); weaver.addAdvisor(advisor); BusinessEvent proxyObject = (BusinessEvent) weaver.getProxy(); Integer result = new Integer(proxyObject.execute(user.getUserId())); // 返回執行結果 return result; } } }
    複製代碼
  7. 接下來,咱們要把並行異步加載的查詢結果,和並行異步處理任務執行的模塊,給它組合起來使用,故從新封裝一個,通知用戶批處理任務管理類模 塊:NotifyUsersBatchTask。它的主要功能是:批量並行異步加載查詢待停復機的手機用戶,而後把它放入並行異步處理的線程池中,進行異 步處理。而後咱們打印出,本次批處理的任務一共有多少,成功數和失敗數分別是多少(固然,本文還給出了另一種JMX方式的監控)。 NotifyTaskSuccCounter類,主要是統計派發的任務中執行成功的任務的數量,而與之相對應的類 NotifyTaskFailCounter,是用來統計執行失敗的任務的數量。具體的代碼以下
    複製代碼
    /** * @filename:NotifyUsersBatchTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:通知用戶批處理任務管理類 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.IfClosure; import org.apache.commons.lang.StringUtils; import newlandframework.batchtask.jmx.BatchTaskMonitor; import newlandframework.batchtask.model.NotifyUsers; import newlandframework.batchtask.parallel.BatchQueryLoader; import newlandframework.batchtask.parallel.BatchTaskReactor; public class NotifyUsersBatchTask { public NotifyUsersBatchTask() { } private ArrayList<DataSource> dataSource; // 基於JMX的任務完成狀況監控計數器 private BatchTaskMonitor monitor = new BatchTaskMonitor(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); // 支持同時加載多個數據源 public NotifyUsersBatchTask(ArrayList<DataSource> dataSource) { this.dataSource = dataSource; } // 批處理任務執行成功計數器 class NotifyTaskSuccCounter implements Closure { public static final String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER"; private int numberSucc = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKSUCCCOUNTER); numberSucc++; } public int getSuccNumber() { return numberSucc; } } // 批處理任務執行失敗計數器 class NotifyTaskFailCounter implements Closure { public static final String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER"; private int numberFail = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKFAILCOUNTER); numberFail++; } public int getFailNumber() { return numberFail; } } // 並行加載查詢多個水平分庫的數據集合 public List<NotifyUsers> query() throws SQLException { BatchQueryLoader loader = new BatchQueryLoader(); String strSQL = "select home_city, msisdn, user_id from notify_users"; for (int i = 0; i < dataSource.size(); i++) { Connection con = dataSource.get(i).getConnection(); Statement st = con.createStatement(); loader.attachLoadEnv(strSQL, st, con); } List<ResultSet> list = loader.executeQuery(); System.out.println("查詢出記錄總數爲:" + list.size()); final List<NotifyUsers> listNotifyUsers = new ArrayList<NotifyUsers>(); for (int i = 0; i < list.size(); i++) { ResultSet rs = list.get(i); while (rs.next()) { NotifyUsers users = new NotifyUsers(); users.setHomeCity(rs.getInt(1)); users.setMsisdn(rs.getInt(2)); users.setUserId(rs.getInt(3)); listNotifyUsers.add(users); } } // 釋放鏈接資源  loader.close(); return listNotifyUsers; } // 批處理數據集合,任務分派 public void batchNotify(List<NotifyUsers> list, final ExecutorService excutor) { System.out.println("處理記錄總數爲:" + list.size()); System.out.println(StringUtils.center("記錄明細以下", 40, "-")); NotifyTaskSuccCounter cntSucc = new NotifyTaskSuccCounter(); NotifyTaskFailCounter cntFail = new NotifyTaskFailCounter(); BatchTaskPredicate predicate = new BatchTaskPredicate(excutor); Closure batchAction = new IfClosure(predicate, cntSucc, cntFail); CollectionUtils.forAllDo(list, batchAction); System.out.println("批處理一共處理:" + list.size() + "記錄,處理成功:" + cntSucc.getSuccNumber() + "條記錄,處理失敗:" + cntFail.getFailNumber() + "條記錄"); } }
    複製代碼

    異步處理任務執行提交模塊BatchTaskPredicate,主要是從線程池中採集異步提交要處理的任務,而後根據異步的執行結果,反饋給線程池:這個任務執行成功仍是執行失敗了。具體代碼以下:

    複製代碼
    /** * @filename:BatchTaskPredicate.java * * Newland Co. Ltd. All rights reserved. * * @Description:批處理異步任務提交執行任務模塊 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.Predicate; import newlandframework.batchtask.model.HlrBusinessEvent; import newlandframework.batchtask.model.HlrBusinessEventTask; import newlandframework.batchtask.model.NotifyUsers; public class BatchTaskPredicate implements Predicate { private ExecutorService excutor = null; public BatchTaskPredicate(ExecutorService excutor) { this.excutor = excutor; } public boolean evaluate(Object object) { if (object instanceof NotifyUsers) { NotifyUsers users = (NotifyUsers) object; Future<Integer> future = excutor.submit(new HlrBusinessEventTask(users)); try { // 設定5s超時 Integer result = future.get(5, TimeUnit.SECONDS); return result.intValue() == HlrBusinessEvent.TASKSUCC; } catch (Exception e) { // 若是失敗試圖取消對此任務的執行 future.cancel(true); e.printStackTrace(); return false; } } else { return false; } } }
    複製代碼

    最後,咱們經過,通知用戶批處理任務管理類NotifyUsersBatchTask,它構造的時候,能夠經過指定數據庫鏈接池,批量加載多個數據 源的數據對象。這裏咱們假設並行加載cms/ccs兩個數據源對應的notify_users表的數據,它的spring配置batchtask- multidb.xml配置內容以下:

    複製代碼
    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>newlandframework/batchtask/jdbc-cms.properties</value> <value>newlandframework/batchtask/jdbc-ccs.properties</value> </list> </property> </bean> <bean id="dtSource-cms" destroy-method="close" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="${jdbc.cms.driverClassName}"/> <property name="url" value="${jdbc.cms.url}"/> <property name="username" value="${jdbc.cms.username}"/> <property name="password" value="${jdbc.cms.password}"/> </bean> <bean id="dtSource-ccs" destroy-method="close" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="${jdbc.ccs.driverClassName}"/> <property name="url" value="${jdbc.ccs.url}"/> <property name="username" value="${jdbc.ccs.username}"/> <property name="password" value="${jdbc.ccs.password}"/> </bean> <bean id="notifyUsers" class="newlandframework.batchtask.NotifyUsersBatchTask"> <constructor-arg name="dataSource"> <list> <ref bean="dtSource-ccs"/> <ref bean="dtSource-cms"/> </list> </constructor-arg> </bean> </beans>
    複製代碼
  8. 咱們再來實現一種,經過JMX方式進行線程池批處理任務完成狀況的監控模塊。首先定義一個MBean接口,它根據計數器的名稱,返回計數結果。
    複製代碼
    /** * @filename:BatchTaskMonitorMBean.java * * Newland Co. Ltd. All rights reserved. * * @Description:JMX批處理任務監控接口 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.jmx; public interface BatchTaskMonitorMBean { public int getBatchTaskCounter(String taskName); }
    複製代碼

    咱們再來實現這個接口,因而設計獲得BatchTaskMonitor模塊

    複製代碼
    /** * @filename:BatchTaskMonitor.java * * Newland Co. Ltd. All rights reserved. * * @Description:JMX批處理任務監控模塊 * @author tangjie * @version 1.0 * */ package newlandframework.batchtask.jmx; import javax.management.AttributeChangeNotification; import javax.management.NotificationBroadcasterSupport; import javax.management.ObjectName; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import java.util.concurrent.atomic.AtomicInteger; import java.lang.management.ManagementFactory; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; public class BatchTaskMonitor extends NotificationBroadcasterSupport implements BatchTaskMonitorMBean { private static final String TASKMONITOR_NAME = "newlandframework.batchtask.jmx.{0}:type=BatchTaskMonitor"; Map<String, AtomicInteger> batchTaskCounter; private int sequenceTaskNumber = 0; // 註冊MBean,內置計數器,實時監控批處理任務的成功/失敗狀況 public BatchTaskMonitor(String taskName) { batchTaskCounter = new HashMap<String, AtomicInteger>(); try { registerMBean(taskName); } catch (InstanceAlreadyExistsException e) { System.out.println("InstanceAlreadyExistsException BatchTaskMonitor Register Fail"); } catch (MBeanRegistrationException e) { System.out.println("MBeanRegistrationException BatchTaskMonitor Register Fail"); } catch (NotCompliantMBeanException e) { System.out.println("NotCompliantMBeanException BatchTaskMonitor Register Fail"); } catch (MalformedObjectNameException e) { System.out.println("MalformedObjectNameException BatchTaskMonitor Register Fail"); } } private void registerMBean(String taskName) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, MalformedObjectNameException { String strObjectName = MessageFormat.format(TASKMONITOR_NAME, taskName); ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(strObjectName)); } // 批處理任務計數器遞增 public void increaseBatchTaskCounter(String taskName) { if (batchTaskCounter.containsKey(taskName)) { notifyMessage(taskName, batchTaskCounter.get(taskName).incrementAndGet()); } else { batchTaskCounter.put(taskName, new AtomicInteger(1)); } } private void notifyMessage(String taskName, int batchNewTaskCounter) { sendNotification(new AttributeChangeNotification(this, sequenceTaskNumber++, System.currentTimeMillis(), "batchTaskCounter \"" + taskName + "\" incremented", "batchTaskCounter", "int", batchNewTaskCounter - 1, batchNewTaskCounter)); } // 獲取計數器的計數結果 public int getBatchTaskCounter(String taskName) { if (batchTaskCounter.containsKey(taskName)) { return batchTaskCounter.get(taskName).intValue(); } else { return 0; } } }
    複製代碼

    其中,計數器的名稱,我已經在NotifyUsersBatchTask模塊中已經指定了。批處理任務執行成功計數器叫作:String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER"。批處理任務執行失敗計數器叫作String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER"。這樣咱們就能夠經過JConsole實現,監控線程池任務的運行處理狀況了。

  9. 最終,咱們要把上面全部的模塊所有「組裝」起來。客戶端調用方式的參考代碼,樣例以下所示
    複製代碼
    try { // 初始化並行異步任務執行反應器 BatchTaskReactor reactor = BatchTaskReactor.getReactor(); final ExecutorService excutor = reactor.getBatchTaskThreadPool(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); List<NotifyUsers> listNotifyUsers = null; NotifyUsersBatchTask notifyTask = (NotifyUsersBatchTask) context.getBean("notifyUsers"); // 並行查詢水平分庫的結果 listNotifyUsers = notifyTask.query(); StopWatch sw = new StopWatch(); sw.start(); // 並行異步批處理查詢結果集合  notifyTask.batchNotify(listNotifyUsers, excutor); sw.stop(); reactor.close(); String strContent = String.format("=========批處理並行任務執行結束,耗時[%d]毫秒=========", sw.getTime()); System.out.println(strContent); } catch (SQLException e) { e.printStackTrace(); }
    複製代碼

  咱們再來運行一下,看下結果如何?先在數據庫中分別插入福州59一、廈門592一共80條的待處理數據(實際上,你能夠插得更多,越多越能體現出這種異步並行批處理框架的價值)。運行截圖以下:

     

  正如咱們所預想地那樣。很好。

  如今,咱們再經過JMX技術,查看監控一下,並行批處理異步線程池任務的完成狀況吧。咱們先鏈接上咱們的MBean對象BatchTaskMonitor。

     

  發現裏面有個暴露的操做方法getBatchTaskCounter(根據計數器名稱返回計數結果)。咱們在上面紅圈的輸入框內,輸入統計失敗任務個數的計數器TASKFAILCOUNTER,而後點擊肯定。最後運行結果以下所示:

     

     發現咱們批處理任務,目前已經處理失敗了196個啦!正如咱們但願的那樣,可視化實時監控的感受很是好。

  寫在最後

  最終,咱們經過並行異步加載技術和線程池機制設計出了一個精簡的批處理框架。上面的代碼雖然不算多,可是,有它很獨特的應用場景,麻雀雖小五臟 俱全。相信它對於其餘的同行朋友,仍是頗有借鑑意義的。何況如今的服務器都是多核、多CPU的配置,咱們要很好地利用這一硬件資源。對於IO密集型的應 用,能夠根據上面的思路,加以改良,相信必定能收到不錯的效果!

  好了,不知不覺地寫了這麼多的內容和代碼。本文的前期準備、編碼、調試、文章編寫工做,也消耗了本人大量的腦力和精力。不過仍是挺開心的,想着 能把本身的一些想法經過博客的方式沉澱下來,對別人有借鑑意義,而對本身則是一種「學習和總結」。路漫漫其修遠兮,吾將上下而求索。故在此,拋磚引玉。如 果本人有說地不對的地方,但願各位園友批評指正!不吝賜教!

相關文章
相關標籤/搜索