1.阻塞隊列的概念java
阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。一樣,試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列,下圖展現瞭如何經過阻塞隊列來合做:編程
線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素併發
從剛纔的描述能夠看出,發生阻塞起碼得知足下面至少一個條件: (前提:隊列是有界的)socket
1.從隊列裏取元素時,若是隊列爲空,則代碼一直等在這裏(即阻塞),直到隊列裏有東西了,拿到元素了,後面的代碼才能繼續ide
2.向隊列裏放元素時,若是隊列滿了(即放不下更多元素),則代碼也會卡住,直到隊列裏的東西被取走了(即:有空位能夠放新元素了),後面的代碼才能繼續this
2.生產者消費者模型用阻塞隊列實現和原來的區別線程
下面先使用Object.wait()和Object.notify()、非阻塞隊列實現生產者-消費者模式:code
public class Test { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("隊列空,等待數據"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走隊首元素 queue.notify(); System.out.println("從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素"); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("隊列滿,等待有空餘空間"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一個元素 queue.notify(); System.out.println("向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size())); } } } } }
這個是經典的生產者-消費者模式,經過阻塞隊列和Object.wait()和Object.notify()實現,wait()和notify()主要用來實現線程間通訊。接口
具體的線程間通訊方式(wait和notify的使用)在後續問章中會講述到。隊列
下面是使用阻塞隊列實現的生產者-消費者模式:
public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread{ @Override public void run() { consume(); } private void consume() { while(true){ try { queue.take(); System.out.println("從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread{ @Override public void run() { produce(); } private void produce() { while(true){ try { queue.put(1); System.out.println("向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
有沒有發現,使用阻塞隊列代碼要簡單得多,不須要再單獨考慮同步和線程間通訊的問題。
在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,只要符合生產者-消費者模型的均可以使用阻塞隊列。
3.實現原理:
這裏只貼幾段主要的代碼,體會一下思想:
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
這3個變量很重要,ReentrantLock重入鎖,notEmpty檢查不爲空的Condition 以及 notFull用來檢查隊列未滿的Condition
Condition是一個接口,裏面有二個重要的方法:
await() : Causes the current thread to wait until it is signalled or interrupted. 即阻塞當前線程,直到被通知(喚醒)或中斷
singal(): Wakes up one waiting thread. 喚醒阻塞的線程
再來看put方法:(jdk 1.8)
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
1.先獲取鎖
2.而後用while循環檢測元素個數是否等於items長度,若是相等,表示隊列滿了,調用notFull的await()方法阻塞線程
3.不然調用enqueue()方法添加元素
4.最後解鎖
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
這是添加元素的代碼(jdk 1.8),注意最後一行notEmpty.signal()方法,表示添加完元素後,調用singal()通知等待(從隊列中取元素)的線程,隊列不空(有值)啦,能夠來取東西了。
相似的take()與dequeue()方法則至關於逆過程(注:一樣都是jdk 1.8)
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
相似的:
1. 先加鎖
2. 若是元素個數爲空,表示隊列已空,調用notEmpty的await()阻塞線程,直接隊列裏又有新元素加入爲止
3. 而後調用dequeue 從隊列裏刪除元素
4. 解鎖
dequeue方法:
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
倒數第2行,元素移除後,調用notFull.singnal喚醒等待(向隊列添加元素的)線程,隊列有空位了,能夠向裏面添加元素了。