多線程的設計模式:Future、Master-Worker

一 簡介

並行設計模式屬於設計優化的一部分,它是對一些經常使用的多線程結構的總結和抽象。與串行程序相比,並行程序的結構一般更爲複雜,所以合理的使用並行模式在多線程開發中更具備意義,在這裏主要介紹==Future==、==Master-Worker==和==生產者-消費者==模型數據庫

二 Future模式

Future模式有點相似於商品訂單。好比在網購時,當看中某一件商品時,就能夠提交訂單,當訂單處理完成後,在家等待商品送貨上門便可。或者說更形象的,咱們發送Ajax請求的時候,頁面是異步的進行後臺處理,用戶無需一直等待請求的結果,能夠繼續瀏覽或操做其餘內容。
設計模式

public class Main {

    public static void main(String[] args) {
        FutureClient futureClient = new FutureClient();
        Date date = futureClient.request("date");
        System.out.println("請求已經被處理...");
        System.out.println("去作其餘操做...");
        
        System.out.println("結果爲:" + date.getRequest());
    }

}

public class FutureClient {

    public Date request(final String queryStr) {
        //1.想要一個代理對象(Date接口的實現類)先返回給發送請求的客戶端,告訴她請求已經被接收到,能夠作其餘事情
        final FutureDate futureDate = new FutureDate();
        //2.啓動一個新的線程,去加載真實數據,傳遞給這個代理對象
        new Thread(new Runnable() {
            @Override
            public void run() {
                //3.這個新的線程能夠去加載真實對象,而後傳遞給代理對象
                RealDate realDate = new RealDate(queryStr);
                futureDate.setRealDate(realDate);
            }
        }).start();;
        return futureDate;
    }
}

public interface Date {

    String getRequest();
}

public class FutureDate implements Date{
    private RealDate realDate;
    private Boolean isReady = false;
    @Override
    public synchronized String getRequest() {
        while (!isReady) {
            try {
                //若是沒有裝載完畢,程序一直處於阻塞狀態
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //裝載好直接獲取數據
        return this.realDate.getRequest();
    }
    
    public synchronized void setRealDate(RealDate realDate) {
        while (isReady) {
            //若是已經加載完畢,就直接返回
            return;
        }
        //若是沒有,就進行裝載真實對象
        this.realDate = realDate;
        this.isReady = true;
        //通知
        notify();
    }
}

public class RealDate implements Date{
    private String realDate;
    public RealDate(String realDate) {
        System.out.println("根據" + realDate + "進行查詢,這是一個很耗時的操做...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("操做完畢,獲取結果...");
        this.realDate = "查詢結果";
    }
    @Override
    public String getRequest() {
        // TODO Auto-generated method stub
        return this.realDate;
    }
}

運行結果:
請求已經被處理...
去作其餘操做...
根據date進行查詢,這是一個很耗時的操做...
操做完畢,獲取結果...
結果爲:查詢結果緩存

三 Master-Worker模式

Master-Worker模式是經常使用的並行設計模式。它的核心思想是系統由兩類進程協做工做:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程處理完成後,會將結果返回給Master,由Master作概括和總結。其好處是能將一個大任務分解成若干個小任務,並行執行,從而提升系統的吞吐量。
多線程

public class Task {

    private int id;
    private String name;
    private int price;
    
    public Task(int id, String name, int price) {
        super();
        this.id = id;
        this.name = name;
        this.price = price;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }
}

public class Master {
    //1.有一個承裝任務的集合ConcurrentLinkedQueue
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>();
    //2.使用普通的HashMap承裝全部的Worker對象
    private HashMap<String, Thread> workers = new HashMap<>();
    //3.使用一個容器承裝每個Worker併發執行任務的結果集
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>();
    //4.構造方法
    public Master(Worker worker, int workerCount) {
        //每個Worker對象都須要有Master的引用workQueue用於任務的領取,resultMap用於任務的提交
        worker.setWorkerQueue(this.workQueue);
        worker.setResultMap(this.resultMap);
        for (int i = 1; i <= workerCount; i++) {
            //key表示每一個Worker的名字,value表示線程執行對象
            this.workers.put("子節點" + Integer.toString(i), new Thread(worker));
        }
    }
    //5.提交方法
    public void submit(Task task) {
        this.workQueue.add(task);
    }
    //6.須要執行方法,讓全部Worker工做  
    public void execute() {
        for(Map.Entry<String,Thread> entry : workers.entrySet()) {
            System.out.println("Worker:" + entry.getKey() + "開始執行...");
            entry.getValue().start();
        }
    }
    //7.判斷線程是否已經執行完畢
    public boolean isComplete() {
        for(Map.Entry<String,Thread> entry : workers.entrySet()) {
            if(entry.getValue().getState() != Thread.State.TERMINATED) {
                return false;
            }
        }
        return true;
    }
    //8.返回結果集數據
    public int getResult() {
        int ret = 0;
        for(Map.Entry<String,Object> entry : resultMap.entrySet()) {
            ret += (Integer)entry.getValue();
        }
        return ret;
    }
    
}

public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task> workQueue;
    private ConcurrentHashMap<String, Object> resultMap;
    public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
    @Override
    public void run() {
        while(true) {
            Task input = this.workQueue.poll();
            if(input == null) break;
            //handle真正處理業務的方法
            Object ouput = hanle(input);
            this.resultMap.put(Integer.toString(input.getId()), ouput);
        }
    }

    private Object hanle(Task input) {
        Object output = null;
        try {
            //表示處理業務的耗時,多是數據的加工也多是操做數據庫
            Thread.sleep(500);
            output = input.getPrice();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return output;
    }
}

public class Main {

    public static void main(String[] args) {
        //Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors());
        //當前及其可用線程數
        Master master = new Master(new Worker(), 20);
        Random price = new Random();
        for (int i = 0; i < 100 ; i++) {
            Task t = new Task(i, "任務" + i, price.nextInt(1000));
            master.submit(t);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while(true) {
            if(master.isComplete()) {
                long end = System.currentTimeMillis();
                int result = master.getResult();
                System.out.println("最終結果:" + result + ",耗時:" + (end - start));
                break;
            }
        }
    }
}

運行結果:
Worker:子節點8開始執行...
Worker:子節點7開始執行...
Worker:子節點9開始執行...
Worker:子節點16開始執行...
Worker:子節點17開始執行...
Worker:子節點2開始執行...
Worker:子節點18開始執行...
Worker:子節點1開始執行...
Worker:子節點19開始執行...
Worker:子節點4開始執行...
Worker:子節點12開始執行...
Worker:子節點3開始執行...
Worker:子節點13開始執行...
Worker:子節點6開始執行...
Worker:子節點14開始執行...
Worker:子節點5開始執行...
Worker:子節點15開始執行...
Worker:子節點20開始執行...
Worker:子節點10開始執行...
Worker:子節點11開始執行...
最終結果:50179,耗時:2505併發

三 生產者-消費者模式

生產者和消費者也是一個很是經典的多線程模式,咱們在實際開發中應用很是普遍的思想理念。在生產-消費者模式中:一般由兩類線程,即若干個生產者的線程和若干個消費者的線程。生產者線程負責提交用戶請求,消費者線程則負責具體處理生產者提交的任務,在生產者和消費者之間經過共享內存緩存區進行通訊。
MQ(Message Queue)消息隊列中間件使用了生產者-消費者模式dom

public class Data {

    private String id;
    private String data;
    public Data(String id, String data) {
        super();
        this.id = id;
        this.data = data;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getData() {
        return data;
    }
    public void setData(String data) {
        this.data = data;
    }
    
}

public class Provider implements Runnable{

    private LinkedBlockingQueue<Data> queue;
    private AtomicInteger count = new AtomicInteger(0);
    private volatile boolean isRunning = true;
    private Random random = new Random();
    
    public Provider(LinkedBlockingQueue<Data> queue) {
        super();
        this.queue = queue;
    }
    @Override
    public void run() {
        while(this.isRunning) {
            try {
                Thread.sleep(random.nextInt(1000));
                int id = count.incrementAndGet();
                Data data = new Data(Integer.toString(id),  "數據" + id);
                System.out.println("當前生產線程:" + Thread.currentThread().getName() + ",獲取了數據,id爲:" + id + ",進行裝載到公共緩衝區...");
                if(!this.queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("提交緩衝區失敗...");
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void stop() {
        this.isRunning = false;
    }
}

public class Consumer implements Runnable{

    private LinkedBlockingQueue<Data> queue;

    public Consumer(LinkedBlockingQueue<Data> queue) {
        super();
        this.queue = queue;
    }
    @Override
    public void run() {
        while(true) {
            try {
                Data data = this.queue.take();
                Thread.sleep(1000);
                System.out.println("當前消費線程爲:" + Thread.currentThread().getName() + ",消費成功,消費數據爲id:" + data.getId());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

public class Main {

    public static void main(String[] args) {
        //內存緩衝區
        LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>();
        //生產者
        Provider p1 = new Provider(queue);
        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);
        //消費者
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        
        //建立線程池運行,這是一個緩存的線程池,能夠建立無窮大的線程,沒有任務的時候不建立線程,空閒線程存活時間爲60s(默認)
        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(p1);
        cachePool.execute(p2);
        cachePool.execute(p3);
        cachePool.execute(c1);
        cachePool.execute(c2);
        cachePool.execute(c3);
        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        p1.stop();
        p2.stop();
        p3.stop();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        //cachePool.shutdown();
    }

}

運行結果:
當前生產線程:pool-1-thread-3,獲取了數據,id爲:1,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-1,獲取了數據,id爲:1,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:2,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:1,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-1,獲取了數據,id爲:2,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:1
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:1
當前生產線程:pool-1-thread-1,獲取了數據,id爲:3,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:2,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:3,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:2
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:1
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:2
當前生產線程:pool-1-thread-1,獲取了數據,id爲:4,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:3,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:4,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:5,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-1,獲取了數據,id爲:5,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:3
當前生產線程:pool-1-thread-3,獲取了數據,id爲:6,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:4,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:2
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:3
當前生產線程:pool-1-thread-1,獲取了數據,id爲:6,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:4
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:3
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:4
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:5
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:5
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:6
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:4
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:6異步

相關文章
相關標籤/搜索