1.Master-Worker模式是經常使用的並行模式之一。它的核心思想是,系統由兩類進程協做,Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完成後,將結果返回給Master進程,由Master進程作概括和彙總。其處理過程如圖所示: java
圖1-1Master-Worker模式工做示意圖
框架
2.Master-Worker模式的主要參與者如表 2-1 ide
角色 | 做用 |
Worker | 用於實際處理一個任務 |
Master | 用於任務的分配和最終結果的合成 |
Main | 啓動系統,調度開啓Master |
import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { // 任務隊列 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); // worker進程隊列 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); // protected Map<String, Object> resultMap = new HashMap<String, Object>(); // 是否全部的子任務都結束了 public boolean isComplete(){ for(Map.Entry<String, Thread> entry : threadMap.entrySet()){ if(entry.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } public Master(Worker worker, int countWorker){ worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for(int i = 0; i < countWorker; i++){ threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } public void submit(Object job){ workQueue.add(job); } public Map<String, Object> getResultMap() { return resultMap; } public void execute(){ for(Map.Entry<String, Thread> entry : threadMap.entrySet()){ entry.getValue().start(); } } }
對應的Worker進程實現以下: 函數
import java.util.Map; import java.util.Queue; public class Worker implements Runnable { protected Queue<Object> workQueue = null; protected Map<String, Object> resultMap = null; public void setWorkQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } public Object handle(Object input){ return input; } @Override public void run() { // TODO Auto-generated method stub while(true){ Object input = workQueue.poll(); if(input == null) break; Object re = handle(input); resultMap.put(Integer.toString(input.hashCode()), re); } } }
如今咱們隨便寫一個Worker的實現。 this
public class PlusWorker extends Worker {
spa
public class PlusWorker extends Worker { public Object handle(Object input){ Integer i = (Integer)input; return i * i * i; } }
import java.util.Map; import java.util.Set; public class testmain { public static void main(String[] args) { // TODO Auto-generated method stub Master m = new Master(new PlusWorker(), 5); for(int i = 0; i < 10; i++){ m.submit(i); } m.execute(); int ret = 0; Map<String, Object> resultMap =m.getResultMap(); while(resultMap.size() > 0 || !m.isComplete()){ Set<String> keys = resultMap.keySet(); String key = null; for(String k : keys){ key = k; break; } Integer i = null; if(key != null){ i = (Integer)resultMap.get(key); if(i != null) ret += i; resultMap.remove(key); } } System.out.println("ret = " + ret); } }