併發編程中級篇三----並行設計模式----生產者-消費者模式

生產者-消費者模式是一個經典的多線程設計模式,它爲多線程的協做提供了良好的解決方案。在生產者-消費者模式中,一般有兩類線程,即若干個生產者線程和若干個消費者線程。生產者線程負責提交用戶請求,消費者線程負責處理用戶請求。生產者和消費者之間經過共享內存緩衝區進行通訊。設計模式

生產者-消費者模式中的內存緩衝區的主要功能是數據在多線程間的共享。緩存

1.建立一個被消費的對象多線程

public final class Data{
        
        private String id;
        
        private String name;
        
        //getter/setter(),toString()省略,構造方法省略
    }

2.建立一個生產者dom

public class Provider implements Runnable{
        
        //共享緩存區
        private BlockingQueue<Data> queue;
        //多線程間是否啓動變量,有強制從主內存中刷新的功能。即時返回線程的狀態
        private volatile boolean isRunning = true;
        //id生成器
        private static AtomicInteger count = new AtomicInteger();
        //隨機對象
        private static Random r = new Random(); 
        
        public Provider(BlockingQueue queue){
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while(isRunning){
                try {
                    //隨機休眠0 - 1000 毫秒 表示獲取數據(產生數據的耗時) 
                    Thread.sleep(r.nextInt(1000));
                    //獲取的數據進行累計...
                    int id = count.incrementAndGet();
                    //好比經過一個getData方法獲取了
                    Data data = new Data(Integer.toString(id), "數據" + id);
                    System.out.println("當前線程:" +
                     Thread.currentThread().getName() +
                      ", 獲取了數據,id爲:" + id +
                       ", 進行裝載到公共緩衝區中...");
                    if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
                        System.out.println("提交緩衝區數據失敗....");
                        //do something... 好比從新提交
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public void stop(){
            this.isRunning = false;
        }
    }

3.添加一個消費者ide

public class Consumer implements Runnable{
        
        private BlockingQueue<Data> queue;
    
        public Consumer(BlockingQueue queue){
            this.queue = queue;
        }
        
        //隨機對象
        private static Random r = new Random(); 
    
        @Override
        public void run() {
            while(true){
                try {
                    //獲取數據
                    Data data = this.queue.take();
                    //進行數據處理。休眠0 - 1000毫秒模擬耗時
                    Thread.sleep(r.nextInt(1000));
                    System.out.println("當前消費線程:" + 
                    Thread.currentThread().getName() +
                     ", 消費成功,消費數據爲id: " + data.getId());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

4.定義一個測試類測試

public class Main{
        
        public static void main(String[] args) throws Exception {
            //內存緩衝區
            BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
            //生產者
            Provider p1 = new Provider(queue);
            
            Provider p2 = new Provider(queue);
            Provider p3 = new Provider(queue);
            //消費者
            Consumer c1 = new Consumer(queue);
            Consumer c2 = new Consumer(queue);
            Consumer c3 = new Consumer(queue);
            //建立線程池運行,這是一個緩存的線程池,能夠建立無窮大的線程,
            //沒有任務的時候不建立線程。空閒線程存活時間爲60s(默認值)
    
            ExecutorService cachePool = Executors.newCachedThreadPool();
            cachePool.execute(p1);
            cachePool.execute(p2);
            cachePool.execute(p3);
            cachePool.execute(c1);
            cachePool.execute(c2);
            cachePool.execute(c3);
    
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            p1.stop();
            p2.stop();
            p3.stop();
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
    //        cachePool.shutdown(); 
    //        cachePool.shutdownNow();
            
    
        }
    }

運行結果以下所示this

圖片描述

相關文章
相關標籤/搜索