併發編程之:並行程序設計模式

1.Future模式

實現方式:java

首先實現Data接口服務器

public interface Data {
    String getResult();
}

實現真正執行業務的實現類併發

public class RealData implements Data{

    private final String result;

    public RealData(String param){
        StringBuffer sb = new StringBuffer();
        for (int i=0;i<10;i++){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sb.append(param).append(i);
        }
        result = sb.toString();
    }

    @Override
    public String getResult() {
        return result;
    }
}

實現真實業務類的代理類app

public class FutureData implements Data{

    protected RealData realData;
    protected boolean isReady = false;

    public synchronized void setRealData(RealData realData){
        if(isReady){
            return;
        }
        isReady = true;
        this.realData = realData;
        notifyAll();
    }

    public synchronized String getResult() {
        while (!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getResult();
    }

}

經過client開啓線程,獲取futureDataide

public class Client {

    public Data request(final String name){

        final FutureData futureData = new FutureData();
        new Thread(){
            @Override
            public void run() {
                futureData.setRealData(new RealData(name));
            }
        }.start();

        return futureData;
    }
}

測試工具

public static void main(String[] str){

        Client client = new Client();
        System.out.println("請求開始");
        Data data = client.request("jok");
        try {
            System.out.println("作其餘事情");
            Thread.sleep(6000);
            System.out.println("其餘事情完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(data.getResult());
    }

JDK實現方式測試

首先實現業務類this

public class RealData implements Callable<String> {

    private String name;

    public RealData(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        //只是業務實現
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i<10;i++){
            stringBuffer.append(name).append(i);
            Thread.sleep(1000);
        }
        return stringBuffer.toString();
    }
}

執行方法spa

public static void main(String[] args) throws InterruptedException, ExecutionException {
    FutureTask<String> future = new FutureTask<String>(new RealData("jok"));
    ExecutorService service = Executors.newCachedThreadPool();
    //開始執行
    System.out.println("開始執行");
    service.execute(future);
    //作其餘事情
    System.out.println("執行其餘事情");
    Thread.sleep(6000);
    System.out.println("執行其餘事情完成");
    System.out.println(future.get());
}

2.Master-Worker模式

master-worker模式的核心思想是,系統由兩類進程協做工做:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。線程

由5個worker線程執行100個任務的實現例子

worker工人類

public abstract class Worker implements Runnable{
    //任務子隊列,用於取得子任務
    private Queue<Object>  workQueue;
    //子任務結果集
    private Map<String,Object> resultMap;

    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    /**
     * 子任務處理邏輯,在子類中實現具體邏輯
     * @param input
     * @return
     */
    public abstract Object handle(Object input);

    @Override
    public void run() {
        while (true){
            //從任務列表獲取子任務
            Object input = workQueue.poll();
            if(input == null)
                break;
            //處理子任務
            Object result = handle(input);
            //將結果存入到map中
            resultMap.put(Integer.toString(input.hashCode()),result);
        }
    }
}

Master類實現

public class Master {
    //任務隊列
    protected Queue<Object> workerQueue = new ConcurrentLinkedDeque<Object>();
    //Worker進程隊列
    protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
    //子任務處理結果集
    protected Map<String,Object> resultMap = new ConcurrentHashMap<String,Object>();

    /**
     * 判斷任務是否已經所有結束
     * @return
     */
    public boolean isComplete(){
        for (Map.Entry<String,Thread> entry : threadMap.entrySet()) {
            if(entry.getValue().getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    public Master(Worker worker,int workerCount){
        worker.setWorkQueue(workerQueue);
        worker.setResultMap(resultMap);
        for (int i=0;i<workerCount;i++){
            threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
        }
    }

    /**
     * 提交任務
     * @param o
     */
    public void submit(Object o){
        workerQueue.add(o);
    }

    /**
     * 獲取子任務結果集
     * @return
     */
    public Map<String,Object> getResultMap(){
        return resultMap;
    }

    /**
     * 開始執行因此子任務
     */
    public void execute(){
        for (Thread thread : threadMap.values()){
            thread.start();
        }
    }

}

實現業務邏輯的worker類

/**
 * 計算傳入integer數據立方
 * @authorAdministrator
 * @date2017/3/19
 */
public class PlusWorker extends Worker {

    @Override
    public Object handle(Object input) {
        Integer in = (Integer)input;
        return in*in*in;
    }
}

執行任務和結果處理代碼

public static void main(String[] args){
    //5個工做線程
    Master master = new Master(new PlusWorker(),5);
    //提交100個任務
    for (int i=0;i<100;i++)
        master.submit(i);
    //開始執行,由5個線程執行100個任務
    master.execute();

    String key ;
    int result = 0;
    while (master.resultMap.size()>0 || !master.isComplete()){//當map中還存在結果,或者任務沒有執行完時進入循環取結果
        for (Map.Entry<String,Object> entry :master.resultMap.entrySet()) {
            key = entry.getKey();
            result += (Integer) entry.getValue();
            master.resultMap.remove(key);//從resultMap中獲取結果,取出一個就從map中移除一個
        }
    }
    //執行結束
    System.out.println(result);
}

3.Guarded Suspension模式

該模式爲保護暫停,其核心思想就是當有多個請求過來時,會先將請求放入一個列表中,而後有單獨的線程從列表中取出請求進行處理。這樣的模式能夠避免當請求併發過大的時候不會給服務器形成過大壓力。

下面是有返回值的請求實例:

定義response接口

public interface Response {
    String getResult();
}

response實際實現類

public class RealResponse implements Response {

    private String result;

    public void setResult(String result){
        this.result = result;
    }

    @Override
    public String getResult() {
        return result;
    }
}

代理實現類

public class FutureResponse implements Response {

    private RealResponse realResponse;

    private boolean isReady = false;

    public synchronized void setRealResponse(RealResponse realResponse){
        if(isReady){
            return;
        }
        this.realResponse = realResponse;
        isReady = true;
        notifyAll();
    }

    @Override
    public synchronized String getResult() {
        if(!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realResponse.getResult();
    }
}

request請求對象

public class Request {

    private String name;

    private FutureResponse futureResponse;

    public void setFutureResponse(FutureResponse futureResponse) {
        this.futureResponse = futureResponse;
    }

    public void setName(String name) {
        this.name = name;
    }

    public FutureResponse getFutureResponse() {
        return futureResponse;
    }

    public String getName() {
        return name;
    }
}

存放request對象的列表工具

public class RequestQueue {
    private Queue<Request> requestQueue = new ConcurrentLinkedQueue<Request>();

    public synchronized Request getRequest(){
        if(requestQueue.size()==0){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return requestQueue.poll();
    }
    public synchronized void setRequest(Request request){
        requestQueue.add(request);
        notifyAll();
    }

}

客戶端程序

public class ClientThread extends Thread {

    private RequestQueue requestQueue;

    private List<Request>  myRequest = new ArrayList<Request>();

    public ClientThread(RequestQueue requestQueue){
        this.requestQueue = requestQueue;
    }

    @Override
    public void run() {
        for (int i=0;i<10;i++){
            Request request = new Request();
            FutureResponse futureResponse = new FutureResponse();
            //構造請求request
            request.setName("jok"+i);
            request.setFutureResponse(futureResponse);
            requestQueue.setRequest(request);
            myRequest.add(request);
        }
        System.out.println("請求執行構造發送完成");

    }

    public void showResult(){
        for (Request request:myRequest) {
            System.out.println(request.getName()+":執行結果="+request.getFutureResponse().getResult());
        }
    }
}

服務端程序

public class ServerThread extends Thread {

    private RequestQueue requestQueue = new RequestQueue();

    public RequestQueue getRequestQueue() {
        return requestQueue;
    }

    @Override
    public void run() {
        while (true){
            final Request request = requestQueue.getRequest();
            String result = "";
            try {
                //處理業務
                result = request.getName()+"---do";
                System.out.println("請求處理開始"+request.getName());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RealResponse realResponse = new RealResponse();
            realResponse.setResult(result);
            request.getFutureResponse().setRealResponse(realResponse);
            System.out.println("處理請求"+request.getName()+"結束"+result);
        }
    }
}

測試方法

public class Test {

    public static void main(String[] args){
        //服務器啓動
        ServerThread serverThread = new ServerThread();
        serverThread.start();
        System.out.println("服務器啓動完畢");

        //客戶端開始
        System.out.println("客戶端開始執行");
        ClientThread clientThread = new ClientThread(serverThread.getRequestQueue());
        clientThread.start();
        System.out.println("客戶端發送完畢");

        clientThread.showResult();
    }
}

4.生產者-消費者模式

首先定義任務數據

public final class PCData {
    private final int data;

    public PCData(int data){
        this.data = data;
    }

    public int getData() {
        return data;
    }

    @Override
    public String toString() {
        return "data="+data;
    }
}

生產者代碼

public class Producer implements Runnable {

    private volatile boolean isRunning = true;

    private BlockingQueue<PCData> blockingQueue;//共享空間

    private static AtomicInteger count = new AtomicInteger();

    public Producer(BlockingQueue<PCData> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {

        PCData data = null;
        try {
            while (isRunning){
                Thread.sleep(2000);
                data = new PCData(count.incrementAndGet());
                System.out.println("生產者線程"+Thread.currentThread().getId()+"生產數據:"+count);
                blockingQueue.offer(data,2, TimeUnit.SECONDS);//添加數據到阻塞隊列
            }
            System.out.println("生產者線程中止"+Thread.currentThread().getId());
        }catch (Exception e){
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop(){
        isRunning = false;
    }
}

消費者代碼

public class Consumer implements Runnable {

    private BlockingQueue<PCData> blockingQueue;

    public Consumer(BlockingQueue<PCData> blockingQueue){
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        try {
            PCData data = null;
            while (true){
                data = blockingQueue.take();//若是隊列裏面沒有會wait等待數據放入
                if(data != null){
                    int n = data.getData()*data.getData();
                    System.out.println("消費者線程"+Thread.currentThread().getId()+"消費數據"+ MessageFormat.format("{0}*{1}={2}",data.getData(),data.getData(),n));
                    Thread.sleep(4000);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

測試程序代碼

//定義阻塞隊列做爲共享數據空間
BlockingQueue<PCData> blockingQueue = new LinkedBlockingQueue<PCData>();
//生產者
Producer p = new Producer(blockingQueue);
Producer p2 = new Producer(blockingQueue);
//消費者
Consumer c = new Consumer(blockingQueue);
Consumer c2 = new Consumer(blockingQueue);
//線程池執行
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p);
service.execute(p2);
service.execute(c);
service.execute(c2);
Thread.sleep(6000);
p.stop();
p2.stop();
Thread.sleep(2000);
service.shutdown();
相關文章
相關標籤/搜索