Master-Worker模式--併發編程

  • 簡介

  Master-Worker模式是經常使用的並行設計模式。它的核心思想是,系統有兩個進程協議工做:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完後,將結果返回給Master進程,由Master進行概括和彙總,從而獲得系統結果。處理過程以下圖:設計模式

Master-Worker模式的好處是,它能將大任務分解成若干個小任務,併發執行,從而提升系統性能。而對於系統請求者Client來講,任務一旦提交,Master進程就會馬上分配任務並當即返回,並不會等系統處理徹底部任務再返回,其處理過程是異步的。併發

  • Master-Worker模式結構

  Master-Worker模式的主要結構以下圖:異步

 

  如上圖所示,Master進程是主要進程,它維護着一個Worker進程隊列、子任務隊列和子結果集,Worker進程中的Worker進程不斷的從任務隊列中提取要處理的子任務,並將子任務的處理結果放入到子結果集中。ide

  在上圖中,Master:用於任務的分配和最終結果的合併;Worker:用於實際處理一個任務;客戶端進程:用於啓動系統,調度開啓Master。性能

  • Master-Worker模式代碼實現

  Master代碼實現:this

複製代碼

複製代碼

1 public class Master {
 2     //任務隊列
 3     protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
 4     //worker進程隊列
 5     protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
 6     //結果集
 7     protected Map<String, Object> resultMap = new HashMap<String,Object>();
 8     
 9     //是否全部的子任務都結束
10     
11     public boolean isComplete(){
12         for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
13             if(entry.getValue().getState()!=Thread.State.TERMINATED){
14                 return false;
15             }
16         }
17         return true;
18     }
19 
20     public Master(Worker worker,int countWorker) {
21         worker.setResultMap(resultMap);
22         worker.setWorkQueue(workQueue);
23         for (int i = 0; i < countWorker; i++) {
24             threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
25         }
26     }
27     
28     //提交任務
29     
30     public void submit(Object obj){
31         workQueue.add(obj);
32         //System.out.println(obj.toString());
33     }
34 
35     
36     
37     //返回子任務結果集
38     public Map<String, Object> getResultMap() {
39         return resultMap;
40     }
41     
42     //開始運行全部worker進程,並進行處理
43     
44     public void execute(){
45         for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
46             entry.getValue().start();
47         }
48     }
49     
50 }

複製代碼

複製代碼

  Worker代碼實現:線程

複製代碼

複製代碼

1 public class Worker implements Runnable {
 2     //任務隊列
 3     protected Queue<Object> workQueue;
 4     //子任務結果集
 5     protected Map<String,Object> resultMap = new HashMap<String, Object>();
 6     
 7     
 8     public void setWorkQueue(Queue<Object> workQueue) {
 9         this.workQueue = workQueue;
10     }
11     public void setResultMap(Map<String, Object> resultMap) {
12         this.resultMap = resultMap;
13     }
14 
15     public Object handle(Object input){
16         return input;
17     }
18     @Override
19     public void run() {
20         while(true){
21             Object input = workQueue.poll();
22             
23             if(null==input) break;
24             //處理子任務
25             Object re = handle(input);
26             resultMap.put(Integer.toString(input.hashCode()),re);
27             //System.out.println(re.toString());
28         }
29     }
30 
31 }

複製代碼

複製代碼

  Master-Worker模式是一種串行任務並行化的方法,被分解的子任務在系統中能夠並行處理。同時,若是有須要,Master進程不須要全部子任務都執行完成,就能夠根據已有的部分結果集計算最終的結果。設計

  如今以上面的Master-Worker實現爲基礎,來實現計算1-100的立方和。計算將被分解爲100個子任務,每一個子任務僅用於計算單獨的立方和。Master產生固定數目Worker,來處理這些子任務。Worker不斷的從任務集合中取出這些計算立方和的子任務,並將計算結果放入到Master的結果集中。Master負責將全部Worker的任務結果進行累加,從而產生最終的立方和。整個計算過程,Worker和Master的運算也是徹底異步的,Master進程沒必要等全部的Worker進程都執行完成,就能夠進行求和操做了。也就是所,Master在獲取部分子任務的結果集時,就能夠對最終結果進行計算了,從而提升了系統的併發性和吞吐量。隊列

  計算子任務的實現以下:進程

複製代碼

複製代碼

1 public class PlusWorker extends Worker {
2 
3     @Override
4     public Object handle(Object input) {
5         Integer i = (Integer) input;
6         return i*i*i;
7     }
8     
9 }

複製代碼

複製代碼

  客戶端代碼以下:

複製代碼

複製代碼

1 public class Client {
 2     public static void main(String[] args) {
 3         Master m = new Master(new PlusWorker(), 5);//啓動五個線程處理
 4         for (int i = 0; i < 100; i++) {
 5             m.submit(i);
 6         }
 7         m.execute();
 8         int re = 0;
 9         Map<String, Object> resultMap = m.getResultMap();
10         while(resultMap.size()>0||!m.isComplete()){
11             Set<String> keys = resultMap.keySet();
12             String key =  null;
13             for(String k:keys){
14                 key=k;
15                 break;
16             }
17             Integer i = null;
18             if(key != null){
19                 i = (Integer) resultMap.get(key);
20             }
21             if(i!=null){
22                 re+=i;//並行計算結果集
23             }
24             
25             if(key!=null){
26                 resultMap.remove(key);//將計算完成的結果移除
27             }
28         }
29         
30         System.out.println(re);
31     }
32 }

複製代碼

複製代碼

  經過Master建立5個Worker工做線程和PlusWorker工做實例。提交完100個任務後,就開始計算子任務。這些子任務,由生成的5個Worker線程共同完成。Master並不等全部的子任務都計算完成,就開始訪問子結果集進行最終結果的計算,直到子結果集中全部的數據都被處理,而且5個活躍的Worker線程所有終止,才能求出最終結果。

相關文章
相關標籤/搜索