隊列就能夠想成是一個數組,從一頭進入,一頭出去,排隊買飯java
BlockingQueue 阻塞隊列,排隊擁堵,首先它是一個隊列,而一個阻塞隊列在數據結構中所起的做用大體以下圖所示:程序員
線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素數組
當阻塞隊列是空時,從隊列中獲取元素的操做將會被阻塞
安全
當阻塞隊列是滿時,從隊列中添加元素的操做將會被阻塞
數據結構
也就是說 試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其它線程往空的隊列插入新的元素多線程
同理,試圖往已經滿的阻塞隊列中添加新元素的線程,直到其它線程往滿的隊列中移除一個或多個元素,或者徹底清空隊列後,使隊列從新變得空閒起來,並後續新增架構
去海底撈吃飯,大廳滿了,須要進候廳等待,可是這些等待的客戶可以對商家帶來利潤,所以咱們很是歡迎他們阻塞測試
在多線程領域:所謂的阻塞,在某些清空下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動喚醒this
好處是咱們不須要關心何時須要阻塞線程,何時須要喚醒線程,由於這一切BlockingQueue都幫你一手包辦了atom
在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須本身取控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。
// 你用過List集合類 // ArrayList集合類熟悉麼? // 還用過 CopyOnWriteList 和 BlockingQueue
BlockingQueue阻塞隊列是屬於一個接口,底下有七個實現類
這裏須要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue
拋出異常 | 當阻塞隊列滿時:在往隊列中add插入元素會拋出 IIIegalStateException:Queue full 當阻塞隊列空時:再往隊列中remove移除元素,會拋出NoSuchException |
---|---|
特殊性 | 插入方法,成功true,失敗false 移除方法:成功返回出隊列元素,隊列沒有就返回空 |
一直阻塞 | 當阻塞隊列滿時,生產者繼續往隊列裏put元素,隊列會一直阻塞生產線程直到put數據or響應中斷退出, 當阻塞隊列空時,消費者線程試圖從隊列裏take元素,隊列會一直阻塞消費者線程直到隊列可用。 |
超時退出 | 當阻塞隊列滿時,隊裏會阻塞生產者線程必定時間,超過限時後生產者線程會退出 |
但執行add方法,向已經滿的ArrayBlockingQueue中添加元素時候,會拋出異常
// 阻塞隊列,須要填入默認值 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println(blockingQueue.add("XXX"));
運行後:
true true true Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312) at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
同時若是咱們多取出元素的時候,也會拋出異常,咱們假設只存儲了3個值,可是取的時候,取了四次
// 阻塞隊列,須要填入默認值 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove());
那麼出現異常
true true true a b c Exception in thread "main" java.util.NoSuchElementException at java.util.AbstractQueue.remove(AbstractQueue.java:117) at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)
咱們使用 offer的方法,添加元素時候,若是阻塞隊列滿了後,會返回false,否者返回true
同時在取的時候,若是隊列已空,那麼會返回null
BlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll());
運行結果
true true true false a b c null
咱們使用 put的方法,添加元素時候,若是阻塞隊列滿了後,添加消息的線程,會一直阻塞,直到隊列元素減小,會被清空,纔會喚醒
通常在消息中間件,好比RabbitMQ中會使用到,由於須要保證消息百分百不丟失,所以只有讓它阻塞
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); System.out.println("================"); blockingQueue.take(); blockingQueue.take(); blockingQueue.take(); blockingQueue.take();
同時使用take取消息的時候,若是內容不存在的時候,也會被阻塞
offer( ) , poll 加時間
使用offer插入的時候,須要指定時間,若是2秒尚未插入,那麼就放棄插入
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));
同時取的時候也進行判斷
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
若是2秒內取不出來,那麼就返回null
SynchronousQueue沒有容量,與其餘BlockingQueue不一樣,SynchronousQueue是一個不存儲的BlockingQueue,每個put操做必須等待一個take操做,否者不能繼續添加元素
下面咱們測試SynchronousQueue添加元素的過程
首先咱們建立了兩個線程,一個線程用於生產,一個線程用於消費
生產的線程分別put了 A、B、C這三個字段
BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put A "); blockingQueue.put("A"); System.out.println(Thread.currentThread().getName() + "\t put B "); blockingQueue.put("B"); System.out.println(Thread.currentThread().getName() + "\t put C "); blockingQueue.put("C"); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start();
消費線程使用take,消費阻塞隊列中的內容,而且每次消費前,都等待5秒
new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "\t take A "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "\t take B "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "\t take C "); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start();
最後結果輸出爲:
t1 put A t2 take A 5秒後... t1 put B t2 take B 5秒後... t1 put C t2 take C
咱們從最後的運行結果能夠看出,每次t1線程向隊列中添加阻塞隊列添加元素後,t1輸入線程就會等待 t2消費線程,t2消費後,t2處於掛起狀態,等待t1在存入,從而周而復始,造成 一存一取的狀態
一個初始值爲0的變量,兩個線程對其交替操做,一個加1,一個減1,來5輪
關於多線程的操做,咱們須要記住下面幾句
咱們下面實現一個簡單的生產者消費者模式,首先有資源類ShareData
/** * 資源類 */ class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception{ // 同步代碼塊,加鎖 lock.lock(); try { // 判斷 while(number != 0) { // 等待不能生產 condition.await(); } // 幹活 number++; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception{ // 同步代碼塊,加鎖 lock.lock(); try { // 判斷 while(number == 0) { // 等待不能消費 condition.await(); } // 幹活 number--; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
裏面有一個number變量,同時提供了increment 和 decrement的方法,分別讓number 加1和減1
可是咱們在進行判斷的時候,爲了防止出現虛假喚醒機制,不能使用if來進行判斷,而應該使用while
// 判斷 while(number != 0) { // 等待不能生產 condition.await(); }
不能使用 if判斷
// 判斷 if(number != 0) { // 等待不能生產 condition.await(); }
完整代碼
/** * 生產者消費者 傳統版 * 題目:一個初始值爲0的變量,兩個線程對其交替操做,一個加1,一個減1,來5輪 */ /** * 線程 操做 資源類 * 判斷 幹活 通知 * 防止虛假喚醒機制 */ /** * 資源類 */ class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception{ // 同步代碼塊,加鎖 lock.lock(); try { // 判斷 while(number != 0) { // 等待不能生產 condition.await(); } // 幹活 number++; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement() throws Exception{ // 同步代碼塊,加鎖 lock.lock(); try { // 判斷 while(number == 0) { // 等待不能消費 condition.await(); } // 幹活 number--; System.out.println(Thread.currentThread().getName() + "\t " + number); // 通知 喚醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class ProdConsumerTraditionDemo { public static void main(String[] args) { // 高內聚,低耦合 內聚指的是,一個空調,自身帶有調節溫度高低的方法 ShareData shareData = new ShareData(); // t1線程,生產 new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } }, "t1").start(); // t2線程,消費 new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } }, "t2").start(); } }
最後運行成功後,咱們一個進行生產,一個進行消費
t1 1 t2 0 t1 1 t2 0 t1 1 t2 0 t1 1 t2 0 t1 1 t2 0
在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須本身去控制這些細節,尤爲還要兼顧效率和線程安全,則這會給咱們的程序帶來不小的時間複雜度
如今咱們使用新版的阻塞隊列版生產者和消費者,使用:volatile、CAS、atomicInteger、BlockQueue、線程交互、原子引用
/** * 生產者消費者 阻塞隊列版 * 使用:volatile、CAS、atomicInteger、BlockQueue、線程交互、原子引用 * */ class MyResource { // 默認開啓,進行生產消費 // 這裏用到了volatile是爲了保持數據的可見性,也就是當TLAG修改時,要立刻通知其它線程進行修改 private volatile boolean FLAG = true; // 使用原子包裝類,而不用number++ private AtomicInteger atomicInteger = new AtomicInteger(); // 這裏不能爲了知足條件,而實例化一個具體的SynchronousBlockingQueue BlockingQueue<String> blockingQueue = null; // 而應該採用依賴注入裏面的,構造注入方法傳入 public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; // 查詢出傳入的class是什麼 System.out.println(blockingQueue.getClass().getName()); } /** * 生產 * @throws Exception */ public void myProd() throws Exception{ String data = null; boolean retValue; // 多線程環境的判斷,必定要使用while進行,防止出現虛假喚醒 // 當FLAG爲true的時候,開始生產 while(FLAG) { data = atomicInteger.incrementAndGet() + ""; // 2秒存入1個data retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if(retValue) { System.out.println(Thread.currentThread().getName() + "\t 插入隊列:" + data + "成功" ); } else { System.out.println(Thread.currentThread().getName() + "\t 插入隊列:" + data + "失敗" ); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "\t 中止生產,表示FLAG=false,生產介紹"); } /** * 消費 * @throws Exception */ public void myConsumer() throws Exception{ String retValue; // 多線程環境的判斷,必定要使用while進行,防止出現虛假喚醒 // 當FLAG爲true的時候,開始生產 while(FLAG) { // 2秒存入1個data retValue = blockingQueue.poll(2L, TimeUnit.SECONDS); if(retValue != null && retValue != "") { System.out.println(Thread.currentThread().getName() + "\t 消費隊列:" + retValue + "成功" ); } else { FLAG = false; System.out.println(Thread.currentThread().getName() + "\t 消費失敗,隊列中已爲空,退出" ); // 退出消費隊列 return; } } } /** * 中止生產的判斷 */ public void stop() { this.FLAG = false; } } public class ProdConsumerBlockingQueueDemo { public static void main(String[] args) { // 傳入具體的實現類, ArrayBlockingQueue MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 生產線程啓動"); System.out.println(""); System.out.println(""); try { myResource.myProd(); System.out.println(""); System.out.println(""); } catch (Exception e) { e.printStackTrace(); } }, "prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 消費線程啓動"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer").start(); // 5秒後,中止生產和消費 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(""); System.out.println(""); System.out.println("5秒中後,生產和消費線程中止,線程結束"); myResource.stop(); } }
最後運行結果
java.util.concurrent.ArrayBlockingQueue prod 生產線程啓動 consumer 消費線程啓動 prod 插入隊列:1成功 consumer 消費隊列:1成功 prod 插入隊列:2成功 consumer 消費隊列:2成功 prod 插入隊列:3成功 consumer 消費隊列:3成功 prod 插入隊列:4成功 consumer 消費隊列:4成功 prod 插入隊列:5成功 consumer 消費隊列:5成功 5秒中後,生產和消費線程中止,線程結束 prod 中止生產,表示FLAG=false,生產介紹