[TOC]java
在各行各業中,批量業務處理都是常規需求,很是常見。它的特色是,離線處理、運行時間長、計算密集。傳統的解決方式是,或者使用多線程技術,或者使用數據庫計算,好比調用數據庫的存儲過程技術實現等等。 在以Hadoop爲首的分佈式計算技術出現後,狀況有了很大的變化,MapReduce範式爲大規模離線數據處理提供了新的思路,性能獲得了很大的提高,也提供了很好的線性擴展解決方案。數據庫
多線程或者相似於存儲過程這樣的技術,共有的缺陷是,擴展性差,性能依賴於單一硬件性能,大幅提高性能困難,沒法實現分佈式計算。 而以Hadoop爲首的大數據處理解決方案,近段時間發展迅速,性能指標也在不斷地提高,可是設計的目標,或者說適用的場景,主要仍是在互聯網的大規模非結構化數據的分析業務上,雖然也能夠用於傳統的批量業務處理,可是一方面批量業務處理並不須要那麼多的功能,殺雞用了牛刀;另外一方面,這些新一代的計算平臺屬於異構系統,須要在具體應用以外單獨部署,若是要實現高可用,總體的架構也會變得很是複雜,總體的運維成本也會上升,增長了對應的服務器以後,若是計算很少,資源利用率也會降低,採用這樣技術的投入產出比,是須要考慮的。apache
Ignite計算網格實現了分佈式的閉包和ExecutorService,同時它還提供了一個輕量級的MapReduce(或ForkJoin)實現。 本文重點講一下輕量級MapReduce,其它的能夠參照相關的手冊。數組
ComputeTask
接口是Ignite的簡化版內存MapReduce的抽象,它也很是接近於ForkJoin範式。這個接口能夠對做業到節點的映射作細粒度的控制以及定製故障轉移的策略,若是不須要這些,可使用更簡單的分佈式閉包實現,代碼將會更加精煉。緩存
ComputeTask
定義了要在集羣內執行的做業以及這些做業到節點的映射,它還定義瞭如何處理做業的返回值(Reduce)。全部的IgniteCompute.execute(...)
方法都會在集羣上執行給定的任務,應用只須要實現ComputeTask
接口的map(...)
和reduce(...)
方法便可,其中:服務器
map(...)
方法負責將做業實例化而後將它們映射到工做節點,這個過程經過ComputeTaskSplitAdapter
,還能夠進一步簡化;result(...)
方法在每次做業在集羣節點上執行時都會被調用,它接收計算做業返回的結果,以及迄今爲止收到的做業結果的列表,該方法會返回一個ComputeJobResultPolicy
的實例,說明下一步要作什麼;reduce(...)
方法在Reduce階段被調用。該方法接收到全部計算結果的一個列表而後返回一個最終的計算結果。定義計算時每次都實現ComputeTask
的全部三個方法並非必須的,經過Ignite提供的適配器,能夠進一步簡化開發,我着重介紹下ComputeTaskSplitAdapter
,它增長了將做業自動分配給節點的功能。它隱藏了map(...)
方法而後增長了一個新的split(...)
方法,使得開發者只須要提供一個待執行的做業集合便可,這很是適用於批量業務處理。這個適配器對於全部節點都適於執行做業的同質化環境是很是有用的,這樣的話映射階段就能夠隱式地完成。多線程
任務觸發的全部做業都要實現ComputeJob
接口,這個接口的execute()
方法定義了做業的邏輯而後返回一個做業的結果。閉包
下面這段代碼,做爲一個簡單示例,顯示瞭如何計算一段話中的字母的總數量:架構
IgniteCompute compute = ignite.compute(); // 在集羣上執行任務。 int cnt = grid.compute().execute(CharacterCountTask.class, "Hello Grid Enabled World!"); private static class CharacterCountTask extends ComputeTaskSplitAdapter<String, Integer> { // 1. 將收到的字符串拆分爲字符串數組 // 2. 爲每一個單詞建立一個做業 // 3. 將每一個做業發送給工做節點進行處理 @Override public List<ClusterNode> split(List<ClusterNode> subgrid, String arg) { String[] words = arg.split(" "); List<ComputeJob> jobs = new ArrayList<>(words.length); for (final String word : arg.split(" ")) { jobs.add(new ComputeJobAdapter() { @Override public Object execute() { return word.length(); } }); } return jobs; } @Override public Integer reduce(List<ComputeJobResult> results) { int sum = 0; for (ComputeJobResult res : results) sum += res.<Integer>getData(); return sum; } }
是否是很是簡單?運維
Ignite支持做業的自動故障轉移,當一個節點故障時,做業會被轉移到其它可用節點再次執行。故障轉移是經過FailoverSpi
實現的,FailoverSpi
負責選擇一個新的節點來執行失敗的做業。它會檢查發生故障的做業以及該做業能夠嘗試執行的全部可用的網格節點的列表。它會確保該做業不會再次映射到出現故障的同一個節點。故障轉移是在ComputeTask.result(...)
方法返回ComputeJobResultPolicy.FAILOVER
策略時觸發的。Ignite內置了一些故障轉移SPI的實現,開發者也能夠進行定製。另外,Ignite保證,只要有一個節點是有效的,做業就不會丟失。
Ignite中的負載平衡是經過LoadBalancingSpi
實現的。它控制全部節點的負載以及確保集羣中的每一個節點負載水平均衡。對於同質化環境中的同質化的任務,負載平衡採用的是隨機或者循環的策略。然而在不少其它場景中,特別是在一些不均勻的負載下,就須要更復雜的自適應負載平衡策略。Ignite內置了若干中負載平衡實現,好比循環式負載平衡RoundRobinLoadBalancingSpi
以及隨機或者加權負載平衡WeightedRandomLoadBalancingSpi
,這部分開發者也能夠定製開發,知足個性化需求。
Ignite中,做業是在客戶端側的任務拆分初始化或者閉包執行階段被映射到集羣節點上的,可是一旦做業到達被分配的節點,就會有序地執行。默認狀況下,做業會被提交到一個線程池而後隨機地執行,若是要對做業執行順序進行細粒度控制的話,須要啓用CollisionSpi
,好比,能夠按照FIFO排序或者按照優先級排序。
在企業級批量業務處理中,一般要對數據庫進行頻繁的更新操做,在分佈式計算環境下,將整個任務配置爲一個事務顯然是不合適的。最佳實踐是將每一個做業配置成一個事務,這樣若是某個做業失敗,只是該做業回滾,其它成功的做業仍是正常提交的,而後故障轉移機制會使該失敗的做業再次執行,直到成功提交。
Ignite的內存MapReduce實現還支持會話,這個機制能夠在任務和做業之間共享一些數據,還支持節點局部狀態共享,這個實際上是節點的局部變量,它能夠用於任務在不一樣的執行過程當中共享狀態。還有,經過計算和緩存數據的並置,能夠極大地提升性能,它還支持檢查點,能夠在一個長時間執行的做業中保存一些中間狀態,這個機制在重啓一個故障節點後,做業能夠從保存的檢查點載入而後從故障處繼續執行。等等,在這裏就不一一介紹了。
在以前的關於Ignite的集羣部署的文章中我對Ignite的集羣特性作了簡要的介紹,該文中推薦了一種混合式的集羣部署方案,以下圖:
在這個架構中,若是可以在應用集羣組中進行分佈式計算來實現批量業務處理,那麼這會是一個很優雅的解決方案,幸運的是,Ignite真的實現了,這個解決方案總體上來說,具備以下的優點: