併發編程-Master-Worker模式

1.Master-Worker模式是經常使用的並行模式之一。它的核心思想是,系統由兩類進程協做,Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完成後,將結果返回給Master進程,由Master進程作概括和彙總。其處理過程如圖所示: java


                                                          圖1-1Master-Worker模式工做示意圖
框架

2.Master-Worker模式的主要參與者如表 2-1 ide

角色     做用
Worker 用於實際處理一個任務
Master 用於任務的分配和最終結果的合成
Main 啓動系統,調度開啓Master

3. Master-Worker的代碼實現

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

	// 任務隊列
	protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
	
	// worker進程隊列
	protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
	
	// 
	protected Map<String, Object> resultMap = new HashMap<String, Object>();
	
	// 是否全部的子任務都結束了
	public boolean isComplete(){
		for(Map.Entry<String, Thread> entry : threadMap.entrySet()){
			if(entry.getValue().getState() != Thread.State.TERMINATED){
				return false;
			}
		}
		return true;
	}
	
	public Master(Worker worker, int countWorker){
		worker.setWorkQueue(workQueue);
		worker.setResultMap(resultMap);
		
		for(int i = 0; i < countWorker; i++){
			threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
		}
	}
	
	
	public void submit(Object job){
		workQueue.add(job);
	}

	public Map<String, Object> getResultMap() {
		return resultMap;
	}
	
	
	public void execute(){
		for(Map.Entry<String, Thread> entry : threadMap.entrySet()){
			entry.getValue().start();
		}
	}
	
}



對應的Worker進程實現以下: 函數

import java.util.Map;
import java.util.Queue;

public class Worker implements Runnable {
	
	protected Queue<Object> workQueue = null;
	
	protected Map<String, Object> resultMap = null;
	

	public void setWorkQueue(Queue<Object> workQueue) {
		this.workQueue = workQueue;
	}


	public void setResultMap(Map<String, Object> resultMap) {
		this.resultMap = resultMap;
	}

	
	public Object handle(Object input){
		return input;
	}


	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true){
			Object  input = workQueue.poll();
			if(input == null) break;
			
			Object re = handle(input);
			
			resultMap.put(Integer.toString(input.hashCode()), re);
		}
	}

}



以上兩段代碼展現了Master-Worker框架的全貌。應用程序經過重載Worker.handle()方法實現應用邏輯。

如今咱們隨便寫一個Worker的實現。 this

public class PlusWorker extends Worker {

spa

public class PlusWorker extends Worker {

	public Object handle(Object input){
		Integer i = (Integer)input;
		return i * i * i;
	}
	
}




使用Master-Worker框架計算的Main()函數實現以下:

import java.util.Map;
import java.util.Set;

public class testmain {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		
		Master m = new Master(new PlusWorker(), 5);
		for(int i = 0; i < 10; i++){
			m.submit(i);
		}
		
		m.execute();
		
		int ret = 0;
		
		Map<String, Object> resultMap =m.getResultMap();
		
		while(resultMap.size() > 0 || !m.isComplete()){
			
			Set<String> keys = resultMap.keySet();
			
			String key = null;
			for(String k : keys){
				key = k;
				break;
			}
			
			Integer i = null;
			
			if(key != null){
				i = (Integer)resultMap.get(key);
				
				if(i != null) ret += i;
				
				resultMap.remove(key);
			}
			
		}
		
		System.out.println("ret = " + ret);
		
	}

}
相關文章
相關標籤/搜索