並行設計模式屬於設計優化的一部分,它是對一些經常使用的多線程結構的總結和抽象。與串行程序相比,並行程序的結構一般更爲複雜,所以合理的使用並行模式在多線程開發中更具備意義,在這裏主要介紹==Future==、==Master-Worker==和==生產者-消費者==模型數據庫
Future模式有點相似於商品訂單。好比在網購時,當看中某一件商品時,就能夠提交訂單,當訂單處理完成後,在家等待商品送貨上門便可。或者說更形象的,咱們發送Ajax請求的時候,頁面是異步的進行後臺處理,用戶無需一直等待請求的結果,能夠繼續瀏覽或操做其餘內容。
設計模式
public class Main { public static void main(String[] args) { FutureClient futureClient = new FutureClient(); Date date = futureClient.request("date"); System.out.println("請求已經被處理..."); System.out.println("去作其餘操做..."); System.out.println("結果爲:" + date.getRequest()); } } public class FutureClient { public Date request(final String queryStr) { //1.想要一個代理對象(Date接口的實現類)先返回給發送請求的客戶端,告訴她請求已經被接收到,能夠作其餘事情 final FutureDate futureDate = new FutureDate(); //2.啓動一個新的線程,去加載真實數據,傳遞給這個代理對象 new Thread(new Runnable() { @Override public void run() { //3.這個新的線程能夠去加載真實對象,而後傳遞給代理對象 RealDate realDate = new RealDate(queryStr); futureDate.setRealDate(realDate); } }).start();; return futureDate; } } public interface Date { String getRequest(); } public class FutureDate implements Date{ private RealDate realDate; private Boolean isReady = false; @Override public synchronized String getRequest() { while (!isReady) { try { //若是沒有裝載完畢,程序一直處於阻塞狀態 wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //裝載好直接獲取數據 return this.realDate.getRequest(); } public synchronized void setRealDate(RealDate realDate) { while (isReady) { //若是已經加載完畢,就直接返回 return; } //若是沒有,就進行裝載真實對象 this.realDate = realDate; this.isReady = true; //通知 notify(); } } public class RealDate implements Date{ private String realDate; public RealDate(String realDate) { System.out.println("根據" + realDate + "進行查詢,這是一個很耗時的操做..."); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("操做完畢,獲取結果..."); this.realDate = "查詢結果"; } @Override public String getRequest() { // TODO Auto-generated method stub return this.realDate; } }
運行結果:
請求已經被處理...
去作其餘操做...
根據date進行查詢,這是一個很耗時的操做...
操做完畢,獲取結果...
結果爲:查詢結果緩存
Master-Worker模式是經常使用的並行設計模式。它的核心思想是系統由兩類進程協做工做:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程處理完成後,會將結果返回給Master,由Master作概括和總結。其好處是能將一個大任務分解成若干個小任務,並行執行,從而提升系統的吞吐量。
多線程
public class Task { private int id; private String name; private int price; public Task(int id, String name, int price) { super(); this.id = id; this.name = name; this.price = price; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } } public class Master { //1.有一個承裝任務的集合ConcurrentLinkedQueue private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>(); //2.使用普通的HashMap承裝全部的Worker對象 private HashMap<String, Thread> workers = new HashMap<>(); //3.使用一個容器承裝每個Worker併發執行任務的結果集 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>(); //4.構造方法 public Master(Worker worker, int workerCount) { //每個Worker對象都須要有Master的引用workQueue用於任務的領取,resultMap用於任務的提交 worker.setWorkerQueue(this.workQueue); worker.setResultMap(this.resultMap); for (int i = 1; i <= workerCount; i++) { //key表示每一個Worker的名字,value表示線程執行對象 this.workers.put("子節點" + Integer.toString(i), new Thread(worker)); } } //5.提交方法 public void submit(Task task) { this.workQueue.add(task); } //6.須要執行方法,讓全部Worker工做 public void execute() { for(Map.Entry<String,Thread> entry : workers.entrySet()) { System.out.println("Worker:" + entry.getKey() + "開始執行..."); entry.getValue().start(); } } //7.判斷線程是否已經執行完畢 public boolean isComplete() { for(Map.Entry<String,Thread> entry : workers.entrySet()) { if(entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //8.返回結果集數據 public int getResult() { int ret = 0; for(Map.Entry<String,Object> entry : resultMap.entrySet()) { ret += (Integer)entry.getValue(); } return ret; } } public class Worker implements Runnable{ private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<String, Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true) { Task input = this.workQueue.poll(); if(input == null) break; //handle真正處理業務的方法 Object ouput = hanle(input); this.resultMap.put(Integer.toString(input.getId()), ouput); } } private Object hanle(Task input) { Object output = null; try { //表示處理業務的耗時,多是數據的加工也多是操做數據庫 Thread.sleep(500); output = input.getPrice(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return output; } } public class Main { public static void main(String[] args) { //Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors()); //當前及其可用線程數 Master master = new Master(new Worker(), 20); Random price = new Random(); for (int i = 0; i < 100 ; i++) { Task t = new Task(i, "任務" + i, price.nextInt(1000)); master.submit(t); } master.execute(); long start = System.currentTimeMillis(); while(true) { if(master.isComplete()) { long end = System.currentTimeMillis(); int result = master.getResult(); System.out.println("最終結果:" + result + ",耗時:" + (end - start)); break; } } } }
運行結果:
Worker:子節點8開始執行...
Worker:子節點7開始執行...
Worker:子節點9開始執行...
Worker:子節點16開始執行...
Worker:子節點17開始執行...
Worker:子節點2開始執行...
Worker:子節點18開始執行...
Worker:子節點1開始執行...
Worker:子節點19開始執行...
Worker:子節點4開始執行...
Worker:子節點12開始執行...
Worker:子節點3開始執行...
Worker:子節點13開始執行...
Worker:子節點6開始執行...
Worker:子節點14開始執行...
Worker:子節點5開始執行...
Worker:子節點15開始執行...
Worker:子節點20開始執行...
Worker:子節點10開始執行...
Worker:子節點11開始執行...
最終結果:50179,耗時:2505併發
生產者和消費者也是一個很是經典的多線程模式,咱們在實際開發中應用很是普遍的思想理念。在生產-消費者模式中:一般由兩類線程,即若干個生產者的線程和若干個消費者的線程。生產者線程負責提交用戶請求,消費者線程則負責具體處理生產者提交的任務,在生產者和消費者之間經過共享內存緩存區進行通訊。
MQ(Message Queue)消息隊列中間件使用了生產者-消費者模式dom
public class Data { private String id; private String data; public Data(String id, String data) { super(); this.id = id; this.data = data; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getData() { return data; } public void setData(String data) { this.data = data; } } public class Provider implements Runnable{ private LinkedBlockingQueue<Data> queue; private AtomicInteger count = new AtomicInteger(0); private volatile boolean isRunning = true; private Random random = new Random(); public Provider(LinkedBlockingQueue<Data> queue) { super(); this.queue = queue; } @Override public void run() { while(this.isRunning) { try { Thread.sleep(random.nextInt(1000)); int id = count.incrementAndGet(); Data data = new Data(Integer.toString(id), "數據" + id); System.out.println("當前生產線程:" + Thread.currentThread().getName() + ",獲取了數據,id爲:" + id + ",進行裝載到公共緩衝區..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("提交緩衝區失敗..."); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void stop() { this.isRunning = false; } } public class Consumer implements Runnable{ private LinkedBlockingQueue<Data> queue; public Consumer(LinkedBlockingQueue<Data> queue) { super(); this.queue = queue; } @Override public void run() { while(true) { try { Data data = this.queue.take(); Thread.sleep(1000); System.out.println("當前消費線程爲:" + Thread.currentThread().getName() + ",消費成功,消費數據爲id:" + data.getId()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public class Main { public static void main(String[] args) { //內存緩衝區 LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>(); //生產者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); //消費者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //建立線程池運行,這是一個緩存的線程池,能夠建立無窮大的線程,沒有任務的時候不建立線程,空閒線程存活時間爲60s(默認) ExecutorService cachePool = Executors.newCachedThreadPool(); cachePool.execute(p1); cachePool.execute(p2); cachePool.execute(p3); cachePool.execute(c1); cachePool.execute(c2); cachePool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //cachePool.shutdown(); } }
運行結果:
當前生產線程:pool-1-thread-3,獲取了數據,id爲:1,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-1,獲取了數據,id爲:1,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:2,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:1,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-1,獲取了數據,id爲:2,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:1
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:1
當前生產線程:pool-1-thread-1,獲取了數據,id爲:3,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:2,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:3,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:2
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:1
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:2
當前生產線程:pool-1-thread-1,獲取了數據,id爲:4,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:3,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:4,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-3,獲取了數據,id爲:5,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-1,獲取了數據,id爲:5,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:3
當前生產線程:pool-1-thread-3,獲取了數據,id爲:6,進行裝載到公共緩衝區...
當前生產線程:pool-1-thread-2,獲取了數據,id爲:4,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:2
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:3
當前生產線程:pool-1-thread-1,獲取了數據,id爲:6,進行裝載到公共緩衝區...
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:4
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:3
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:4
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:5
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:5
當前消費線程爲:pool-1-thread-6,消費成功,消費數據爲id:6
當前消費線程爲:pool-1-thread-5,消費成功,消費數據爲id:4
當前消費線程爲:pool-1-thread-4,消費成功,消費數據爲id:6異步