Java併發編程利用 Condition 實現阻塞隊列

什麼是阻塞隊列 BlockingQueuejava

隊列是一種數據結構,它的特色是先進先出(First In First Out),它有兩個基本操做:在隊列尾部加入一個元素,從隊列頭部移除一個元素。隊列在多線程應用中,經常使用於生產-消費場景。編程

BlockingQueue 是 Java util.concurrent 包下重要的數據結構,BlockingQueue 提供了線程安全的隊列訪問方式:當阻塞隊列進行插入數據時,若是隊列已滿,線程將會阻塞等待直到隊列非滿;從阻塞隊列取數據時,若是隊列已空,線程將會阻塞等待直到隊列非空。併發包下不少高級同步類的實現都是基於 BlockingQueue 實現的。數組

BlockingQueue 具備 4 組不一樣的方法用於插入、移除以及對隊列中的元素進行檢查。若是請求的操做不能獲得當即執行的話,每一個方法的表現也不一樣。這些方法以下: 安全

BlockingQueue 是個接口,你須要使用它的實現之一來使用 BlockingQueue,Java.util.concurrent 包下具備如下 BlockingQueue 接口的實現類:數據結構

  • ArrayBlockingQueue:ArrayBlockingQueue 是一個有界的阻塞隊列,其內部實現是將對象放到一個數組裏。有界也就意味着,它不可以存儲無限多數量的元素。它有一個同一時間可以存儲元素數量的上限。你能夠在對其初始化的時候設定這個上限,但以後就沒法對這個上限進行修改了。多線程

  • DelayQueue:DelayQueue 對元素進行持有直到一個特定的延遲到期。注入其中的元素必須實現 java.util.concurrent.Delayed 接口。併發

  • LinkedBlockingQueue:LinkedBlockingQueue 內部以一個鏈式結構對其元素進行存儲。若是須要的話,這一鏈式結構能夠選擇一個上限。若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限。this

  • PriorityBlockingQueue:PriorityBlockingQueue 是一個無界的併發隊列。它使用了和類 java.util.PriorityQueue 同樣的排序規則。你沒法向這個隊列中插入 null 值。全部插入到 PriorityBlockingQueue 的元素必須實現 java.lang.Comparable 接口。所以該隊列中元素的排序就取決於你本身的 Comparable 實現。線程

  • SynchronousQueue:SynchronousQueue 是一個特殊的隊列,它的內部同時只可以容納單個元素。若是該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另外一個線程將該元素從隊列中抽走。一樣,若是該隊列爲空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另外一個線程向隊列中插入了一條新的元素。據此,把這個類稱做一個隊列顯然是誇大其詞了。它更多像是一個匯合點。3d

下面用 BlockQueue 技術來實現一下:點擊連接加入羣【Java併發編程交流組】:https://jq.qq.com/?_wv=1027&k=5mOvK7L

/** 定義一個盤子類,能夠放雞蛋和取雞蛋 */  
public class BigPlate {  
   
    /** 裝雞蛋的盤子,大小爲5 */  
    private BlockingQueue<Object> eggs = new ArrayBlockingQueue<Object>(5);  
       
    /** 放雞蛋 */  
    public void putEgg(Object egg) {  
        try {  
            eggs.put(egg);// 向盤子末尾放一個雞蛋,若是盤子滿了,當前線程阻塞  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
   
        // 下面輸出有時不許確,由於與put操做不是一個原子操做  
        System.out.println("放入雞蛋");  
    }  
       
    /** 取雞蛋 */  
    public Object getEgg() {  
        Object egg = null;  
        try {  
            egg = eggs.take();// 從盤子開始取一個雞蛋,若是盤子空了,當前線程阻塞  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
   
        // 下面輸出有時不許確,由於與take操做不是一個原子操做  
        System.out.println("拿到雞蛋");  
        return egg;  
    }  
       
    /** 放雞蛋線程 */  
    static class AddThread extends Thread {  
        private BigPlate plate;  
        private Object egg = new Object();  
   
        public AddThread(BigPlate plate) {  
            this.plate = plate;  
        }  
   
        public void run() {  
            plate.putEgg(egg);  
        }  
    }  
   
    /** 取雞蛋線程 */  
    static class GetThread extends Thread {  
        private BigPlate plate;  
   
        public GetThread(BigPlate plate) {  
            this.plate = plate;  
        }  
   
        public void run() {  
            plate.getEgg();  
        }  
    }  
       
    public static void main(String[] args) {  
        BigPlate plate = new BigPlate();  
        // 先啓動10個放雞蛋線程  
        for(int i = 0; i < 10; i++) {  
            new Thread(new AddThread(plate)).start();  
        }  
        // 再啓動10個取雞蛋線程  
        for(int i = 0; i < 10; i++) {  
            new Thread(new GetThread(plate)).start();  
        }  
    }  
}

 

利用 Condition 來實現阻塞隊列

Java 1.5 以後新增了顯式鎖的接口 java.util.concurrent.locks.Lock 接口,一樣提供了顯式的條件接口 Condition,並對條件隊列進行了加強。

Condition 對象能夠提供和 Object 的 wait 和 notify 同樣的行爲,可是後者必須使用 synchronized 這個內置的monitor鎖,而 Condition 使用的是 RenentranceLock 。這兩種方式在阻塞等待時都會將相應的鎖釋放掉,可是 Condition 的等待能夠中斷,這是兩者惟一的區別。

下面就用 Condition 技術來實現一下:點擊連接加入羣【Java併發編程交流組】:https://jq.qq.com/?_wv=1027&k=5mOvK7L

class Buffer {
    final Lock lock = new ReentrantLock(); //定義一個鎖
    final Condition notFull = lock.newCondition(); //定義阻塞隊列滿了的Condition
    final Condition notEmpty = lock.newCondition();//定義阻塞隊列空了的Condition
 
    final Object[] items = new Object[10]; //爲了下面模擬,設置阻塞隊列的大小爲10,不要設太大
 
    int putptr, takeptr, count; //數組下標,用來標定位置的
 
    //往隊列中存數據
    public void put(Object x) throws InterruptedException {
        lock.lock(); //上鎖
        try {
            while (count == items.length) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時沒法存數據!");
                notFull.await();    //若是隊列滿了,那麼阻塞存數據這個線程,等待被喚醒
            }
            //若是沒滿,按順序往數組中存
            items[putptr] = x;
            if (++putptr == items.length) //這是到達數組末端的判斷,若是到了,再回到始端
                putptr = 0;
            ++count;    //消息數量
            System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
            notEmpty.signal(); //好了,如今隊列中有數據了,喚醒隊列空的那個線程,能夠取數據啦
        } finally {
            lock.unlock(); //放鎖
        }
    }
 
    //從隊列中取數據
    public Object take() throws InterruptedException {
        lock.lock(); //上鎖
        try {
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時沒法取數據!");
                notEmpty.await();  //若是隊列是空,那麼阻塞取數據這個線程,等待被喚醒
            }
            //若是沒空,按順序從數組中取
            Object x = items[takeptr];
            if (++takeptr == items.length) //判斷是否到達末端,若是到了,再回到始端
                takeptr = 0;
            --count; //消息數量
            System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
            notFull.signal(); //好了,如今隊列中有位置了,喚醒隊列滿的那個線程,能夠存數據啦
            return x;
        } finally {
            lock.unlock(); //放鎖
        }
    }
}
相關文章
相關標籤/搜索