併發模型之Master-Worker設計模式

1、Master-Worker設計模式

Master-Worker模式是經常使用的並行設計模式。它的核心思想是,系統有兩個進程協議工做:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完後,將結果返回給Master進程,由Master進行概括和彙總,從而獲得系統結果。java

Master-Worker模式的好處是,它能將大任務分解成若干個小任務,併發執行,從而提升系統性能。而對於系統請求者Client來講,任務一旦提交,Master進程就會馬上分配任務並當即返回,並不會等系統處理徹底部任務再返回,其處理過程是異步的。數據庫

 

2、Master-Worker設計模式代碼實現

一、建立Task任務對象設計模式

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 /**
 4  * Created by Root on 5/12/2017.
 5  */
 6 public class Task {
 7 
 8     private int id;
 9 
10     private String name;
11 
12     private int price;
13 
14     public int getId() {
15         return id;
16     }
17 
18     public void setId(int id) {
19         this.id = id;
20     }
21 
22     public String getName() {
23         return name;
24     }
25 
26     public void setName(String name) {
27         this.name = name;
28     }
29 
30     public int getPrice() {
31         return price;
32     }
33 
34     public void setPrice(int price) {
35         this.price = price;
36     }
37 }

二、實現Worker對象併發

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 import java.util.concurrent.ConcurrentHashMap;
 4 import java.util.concurrent.ConcurrentLinkedQueue;
 5 
 6 /**
 7  * Created by Root on 5/12/2017.
 8  */
 9 public class Worker implements Runnable {
10 
11     private ConcurrentLinkedQueue<Task> workQueue;
12     private ConcurrentHashMap<String, Object> resultMap;
13 
14     public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
15         this.workQueue = workQueue;
16     }
17 
18     public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
19         this.resultMap = resultMap;
20     }
21 
22     @Override
23     public void run() {
24         while (true) {
25             Task input = this.workQueue.poll();
26             if (input == null) {
27                 break;
28             }
29             // 真正的去作業務處理
30             //Object output = handle(input);
31             // 改造
32             Object output = MyWorker.handle(input);
33             // 返回處理結果集
34             this.resultMap.put(Integer.toString(input.getId()), output);
35         }
36     }
37 
38 //    private Object handle(Task input) {
39 //        Object output = null;
40 //        try {
41 //            // 表示處理task任務的耗時,多是數據的加工,也多是操做數據庫......
42 //            Thread.sleep(500);
43 //            output = input.getPrice();
44 //        } catch (InterruptedException e) {
45 //            e.printStackTrace();
46 //        }
47 //        return output;
48 //    }
49 
50     // 優化,考慮讓繼承類去本身實現具體的業務處理
51     public static Object handle(Task input) {
52         return null;
53     }
54 
55 }

三、爲了使程序更靈活,將具體的業務執行邏輯抽離,在具體的Worker對象去實現,如這裏的MyWorker對象dom

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 /**
 4  * Created by Root on 5/13/2017.
 5  */
 6 public class MyWorker extends Worker {
 7 
 8     public static Object handle(Task input) {
 9         Object output = null;
10         try {
11             // 表示處理task任務的耗時,多是數據的加工,也多是操做數據庫......
12             Thread.sleep(500);
13             output = input.getPrice();
14         } catch (InterruptedException e) {
15             e.printStackTrace();
16         }
17         return output;
18     }
19 
20 }

四、Master類異步

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.ConcurrentHashMap;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 
 8 /**
 9  * Created by Root on 5/12/2017.
10  */
11 public class Master {
12 
13     // 一、使用一個ConcurrentLinkedQueue集合來裝載全部須要執行的任務
14     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
15 
16     // 二、使用HashMap來裝載全部的worker對象
17     private HashMap<String, Thread> workers = new HashMap<String, Thread>();
18 
19     // 三、使用一個容器承裝每個worker併發執行任務的結果集
20     private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
21 
22     // 四、構造方法
23     public Master(Worker worker, int workerCount) {
24         // 每個worker對象都須要有Master的引用,workQueue用於任務的領取,resultMap用於任務的提交
25         worker.setWorkerQueue(this.workQueue);
26         worker.setResultMap(this.resultMap);
27 
28         for (int i = 0; i < workerCount; i++) {
29             workers.put("子節點" + Integer.toString(i), new Thread(worker));
30         }
31     }
32 
33     // 五、提交方法
34     public void submit(Task task) {
35         this.workQueue.add(task);
36     }
37 
38     // 六、須要有一個執行的方法(啓動應用程序,讓全部的worker工做)
39     public void execute() {
40         for (Map.Entry<String, Thread> me : workers.entrySet()) {
41             me.getValue().start();
42         }
43     }
44 
45     // 七、判斷線程是否執行完畢
46     public boolean isComplete() {
47         for (Map.Entry<String, Thread> me : workers.entrySet()) {
48             // 判斷全部的線程狀態是否屬於已中止狀態
49             if (me.getValue().getState() != Thread.State.TERMINATED) {
50                 return false;
51             }
52         }
53         return true;
54     }
55 
56     // 八、返回結果集數據
57     public int getResult() {
58         int ret = 0;
59         for (Map.Entry<String, Object> me : resultMap.entrySet()) {
60             // 彙總邏輯
61             ret += (Integer) me.getValue();
62         }
63         return ret;
64     }
65 
66 }

五、測試,具體調用實現ide

 1 package com.ietree.basicskill.mutilthread.designpattern.masterworker;
 2 
 3 import java.util.Random;
 4 
 5 /**
 6  * Created by Root on 5/13/2017.
 7  */
 8 public class MasterWorkerTest {
 9 
10     public static void main(String[] args) {
11 
12 //        Master master = new Master(new Worker(), 10);
13         // 改造
14 //        Master master = new Master(new MyWorker(), 10);
15         // 改造(獲取當前機器可用線程數)
16         System.out.println("個人機器可用Processors數量:" + Runtime.getRuntime().availableProcessors());
17         Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
18 
19         Random r = new Random();
20         for (int i = 1; i <= 10; i++) {
21             Task t = new Task();
22             t.setId(i);
23             t.setName("任務" + i);
24             t.setPrice(r.nextInt(1000));
25             master.submit(t);
26         }
27         master.execute();
28 
29         long start = System.currentTimeMillis();
30 
31         while (true) {
32             if (master.isComplete()) {
33                 long end = System.currentTimeMillis() - start;
34                 int ret = master.getResult();
35                 System.out.println("最終的結果:" + ret + ",執行耗時:" + end);
36                 break;
37             }
38         }
39     }
40 
41 }

程序輸出:性能

個人機器可用Processors數量:20
最終的結果:4473,執行耗時:500

從上面的運行結果來看,程序最終執行時間幾乎就等於一個線程單獨運行的時間,在此注意的是,同時執行的線程數是根據你執行此程序的機器配置決定的。測試

相關文章
相關標籤/搜索