生產者-消費者模式是一個經典的多線程設計模式,它爲多線程的協做提供了良好的解決方案。在生產者-消費者模式中,一般有兩類線程,即若干個生產者線程和若干個消費者線程。生產者線程負責提交用戶請求,消費者線程負責處理用戶請求。生產者和消費者之間經過共享內存緩衝區進行通訊。設計模式
生產者-消費者模式中的內存緩衝區的主要功能是數據在多線程間的共享。緩存
1.建立一個被消費的對象多線程
public final class Data{ private String id; private String name; //getter/setter(),toString()省略,構造方法省略 }
2.建立一個生產者dom
public class Provider implements Runnable{ //共享緩存區 private BlockingQueue<Data> queue; //多線程間是否啓動變量,有強制從主內存中刷新的功能。即時返回線程的狀態 private volatile boolean isRunning = true; //id生成器 private static AtomicInteger count = new AtomicInteger(); //隨機對象 private static Random r = new Random(); public Provider(BlockingQueue queue){ this.queue = queue; } @Override public void run() { while(isRunning){ try { //隨機休眠0 - 1000 毫秒 表示獲取數據(產生數據的耗時) Thread.sleep(r.nextInt(1000)); //獲取的數據進行累計... int id = count.incrementAndGet(); //好比經過一個getData方法獲取了 Data data = new Data(Integer.toString(id), "數據" + id); System.out.println("當前線程:" + Thread.currentThread().getName() + ", 獲取了數據,id爲:" + id + ", 進行裝載到公共緩衝區中..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ System.out.println("提交緩衝區數據失敗...."); //do something... 好比從新提交 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop(){ this.isRunning = false; } }
3.添加一個消費者ide
public class Consumer implements Runnable{ private BlockingQueue<Data> queue; public Consumer(BlockingQueue queue){ this.queue = queue; } //隨機對象 private static Random r = new Random(); @Override public void run() { while(true){ try { //獲取數據 Data data = this.queue.take(); //進行數據處理。休眠0 - 1000毫秒模擬耗時 Thread.sleep(r.nextInt(1000)); System.out.println("當前消費線程:" + Thread.currentThread().getName() + ", 消費成功,消費數據爲id: " + data.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
4.定義一個測試類測試
public class Main{ public static void main(String[] args) throws Exception { //內存緩衝區 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10); //生產者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); //消費者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //建立線程池運行,這是一個緩存的線程池,能夠建立無窮大的線程, //沒有任務的時候不建立線程。空閒線程存活時間爲60s(默認值) ExecutorService cachePool = Executors.newCachedThreadPool(); cachePool.execute(p1); cachePool.execute(p2); cachePool.execute(p3); cachePool.execute(c1); cachePool.execute(c2); cachePool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // cachePool.shutdown(); // cachePool.shutdownNow(); } }
運行結果以下所示this