java併發庫中的阻塞隊列--BlockingQueue

1.阻塞隊列的概念java

     阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。一樣,試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列,下圖展現瞭如何經過阻塞隊列來合做:編程

 

 

線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素併發

 

從剛纔的描述能夠看出,發生阻塞起碼得知足下面至少一個條件: (前提:隊列是有界的)socket

1.從隊列裏取元素時,若是隊列爲空,則代碼一直等在這裏(即阻塞),直到隊列裏有東西了,拿到元素了,後面的代碼才能繼續ide

2.向隊列裏放元素時,若是隊列滿了(即放不下更多元素),則代碼也會卡住,直到隊列裏的東西被取走了(即:有空位能夠放新元素了),後面的代碼才能繼續this

 

2.生產者消費者模型用阻塞隊列實現和原來的區別線程

 

下面先使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式:code

public class Test {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
     
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
         
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == 0){
                        try {
                            System.out.println("隊列空,等待數據");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.poll();          //每次移走隊首元素
                    queue.notify();
                    System.out.println("從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素");
                }
            }
        }
    }
     
    class Producer extends Thread{
         
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while(true){
                synchronized (queue) {
                    while(queue.size() == queueSize){
                        try {
                            System.out.println("隊列滿,等待有空餘空間");
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            queue.notify();
                        }
                    }
                    queue.offer(1);        //每次插入一個元素
                    queue.notify();
                    System.out.println("向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size()));
                }
            }
        }
    }
}

這個是經典的生產者-消費者模式,經過阻塞隊列和Object.wait()和Object.notify()實現,wait()和notify()主要用來實現線程間通訊。接口

  具體的線程間通訊方式(wait和notify的使用)在後續問章中會講述到。隊列

  下面是使用阻塞隊列實現的生產者-消費者模式:

public class Test {
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
     
    public static void main(String[] args)  {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
         
        producer.start();
        consumer.start();
    }
     
    class Consumer extends Thread{
         
        @Override
        public void run() {
            consume();
        }
         
        private void consume() {
            while(true){
                try {
                    queue.take();
                    System.out.println("從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
     
    class Producer extends Thread{
         
        @Override
        public void run() {
            produce();
        }
         
        private void produce() {
            while(true){
                try {
                    queue.put(1);
                    System.out.println("向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

有沒有發現,使用阻塞隊列代碼要簡單得多,不須要再單獨考慮同步和線程間通訊的問題。

  在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。

  阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,只要符合生產者-消費者模型的均可以使用阻塞隊列。

3.實現原理:

這裏只貼幾段主要的代碼,體會一下思想:

/** Main lock guarding all access */
final ReentrantLock lock;
 
/** Condition for waiting takes */
private final Condition notEmpty;
 
/** Condition for waiting puts */
private final Condition notFull;

這3個變量很重要,ReentrantLock重入鎖,notEmpty檢查不爲空的Condition 以及  notFull用來檢查隊列未滿的Condition

Condition是一個接口,裏面有二個重要的方法:
await() : Causes the current thread to wait until it is signalled or interrupted. 即阻塞當前線程,直到被通知(喚醒)或中斷

singal(): Wakes up one waiting thread. 喚醒阻塞的線程

再來看put方法:(jdk 1.8)

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

1.先獲取鎖

2.而後用while循環檢測元素個數是否等於items長度,若是相等,表示隊列滿了,調用notFull的await()方法阻塞線程

3.不然調用enqueue()方法添加元素

4.最後解鎖

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

這是添加元素的代碼(jdk 1.8),注意最後一行notEmpty.signal()方法,表示添加完元素後,調用singal()通知等待(從隊列中取元素)的線程,隊列不空(有值)啦,能夠來取東西了。

相似的take()與dequeue()方法則至關於逆過程(注:一樣都是jdk 1.8)

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

相似的:

1. 先加鎖

2. 若是元素個數爲空,表示隊列已空,調用notEmpty的await()阻塞線程,直接隊列裏又有新元素加入爲止

3. 而後調用dequeue 從隊列裏刪除元素

4. 解鎖

dequeue方法:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

倒數第2行,元素移除後,調用notFull.singnal喚醒等待(向隊列添加元素的)線程,隊列有空位了,能夠向裏面添加元素了。

相關文章
相關標籤/搜索