阻塞隊列知道嗎

 

一,阻塞隊列?java

     當阻塞隊列爲空時,獲取(take)操做是阻塞的;當阻塞隊列爲滿時,添加(put)操做是阻塞的。git

      

 

 

 

  二,爲何用,有什麼好處?程序員

         阻塞隊列不用手動控制何時該被阻塞,何時該被喚醒,簡化了操做。github

  • 在多線程領域:所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒
  • 好處是咱們不須要關心何時須要阻塞線程,何時須要喚醒線程,由於這一切BlockingQueue都一手包辦了;在concurrent 包發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,會給咱們程序帶來不小的複雜度

三,架構梳理與種類分析api

 

 

 (1)種類分析數組

  •  ArrayBlockingQueue:由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue:由鏈表結構組成的有界(大小默認爲Integer.MAX_VALUE)阻塞隊列。
  • PriorityBlockingQueue:支持優先級排序的無界阻塞隊列。
  • DelayBlockingQueue:使用優先級隊列實現的延遲無界阻塞隊列。
  • SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列。
  • LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列。

    注意:安全

  1.  粗體標記的三個用得比較多,許多消息中間件底層就是用它們實現的。
  2. 須要注意的是LinkedBlockingQueue雖然是有界的,但有個巨坑,其默認大小是Integer.MAX_VALUE,高達21億,通常狀況下內存早爆了(在線程池的ThreadPoolExecutor有體現)。
  3. API:拋出異常是指當隊列滿時,再次插入會拋出異常;返回布爾是指當隊列滿時,再次插入會返回false;阻塞是指當隊列滿時,再次插入會被阻塞,直到隊列取出一個元素,才能插入。超時是指當一個時限事後,纔會插入或者取出。API使用見BlockingQueueDemo
  4. SynchronousQueue 沒有容量。與其餘BlockingQueue不一樣,SynchronousQueue 是一個不存儲元素的 BlockingQueue。每個put操做必須等待一個take操做,不然不能繼續添加元素,反之亦然。

 

(2)BlockingQueue的核心方法多線程

       

  •  拋出異常

- 當阻塞隊列滿時,再往隊列裏add插入元素會拋出 java.lang.IllegalStateException: Queue full; 
- 當阻塞隊列空時,再從隊列裏remove移除元素會拋出 java.util.NoSuchElementException架構

  •  特殊值

- 插入方法,成功true失敗false
- 移除方法,成功返回出隊列的元素,隊列裏面沒有就返回nullthis

  •  一直阻塞

- 當阻塞隊列滿時,生產者線程繼續往隊列裏put元素,隊列會一直阻塞生產線程知道put數據或者響應中斷退出
- 當阻塞隊列空時,消費者線程試圖從隊列裏take元素,隊列會一直阻塞消費者線程知道隊列可用

  •  超時退出

- 當阻塞隊列滿時,隊列會阻塞生產者現場必定時間,超過限時後生產者線程會退出

因爲Java中的阻塞隊列接口BlockingQueue繼承自Queue接口,所以先來看看阻塞隊列接口爲咱們提供的主要方法

public interface BlockingQueue<E> extends Queue<E> { //將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量) //在成功時返回 true,若是此隊列已滿,則拋IllegalStateException。 
    boolean add(E e); //將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量) // 將指定的元素插入此隊列的尾部,若是該隊列已滿, //則在到達指定的等待時間以前等待可用的空間,該方法可中斷 
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //將指定的元素插入此隊列的尾部,若是該隊列已滿,則一直等到(阻塞)。 
    void put(E e) throws InterruptedException; //獲取並移除此隊列的頭部,若是沒有元素則等待(阻塞), //直到有元素將喚醒等待線程執行該操做 
    E take() throws InterruptedException; //獲取並移除此隊列的頭部,在指定的等待時間前一直等到獲取元素, //超過期間方法將結束
    E poll(long timeout, TimeUnit unit) throws InterruptedException; //今後隊列中移除指定元素的單個實例(若是存在)。 
    boolean remove(Object o); } //除了上述方法還有繼承自Queue接口的方法 //獲取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementException 
 E element(); //獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。 
 E peek(); //獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。 
    E poll();

這裏咱們把上述操做進行分類

插入方法:

add(E e) : 添加成功返回true,失敗拋IllegalStateException異常
offer(E e) : 成功返回 true,若是此隊列已滿,則返回 false。
put(E e) :將元素插入此隊列的尾部,若是該隊列已滿,則一直阻塞
刪除方法:

remove(Object o) :移除指定元素,成功返回true,失敗返回false
poll() : 獲取並移除此隊列的頭元素,若隊列爲空,則返回 null
take():獲取並移除此隊列頭元素,若沒有元素則一直阻塞。
檢查方法

element() :獲取但不移除此隊列的頭元素,沒有元素則拋異常
peek() :獲取但不移除此隊列的頭;若隊列爲空,則返回 null。

 

四,應用場景:

  • 線程通訊之生產消費者(傳統版生產消費,阻塞隊列版)
  • 線程池
  • 消息中間件

過分期間:

    

題目:synchronized 和 Lock 有什麼區別?用新的Lock有什麼好處?

  1. 原始構成

synchronized 是關鍵字屬於 JVM 層面
monitorenter(底層是經過 monitor 對象來完成,其實 wait/notify等方法也依賴於monitor對象只有在同步塊或方法中才能wait/notify等方法)
monitorexit
Lock 是具體類(java.util.concurrent.locks.Lock) 是api層面的鎖

  2,使用方法

synchronized 不須要用戶去手動釋放鎖,當 synchronized 代碼執行完成後系統會自動讓線程釋放對鎖的佔用
ReentrantLock 則須要用戶去手動釋放鎖,若沒有主動釋放鎖,就有可能致使出現死鎖現象。須要 lock()和unLock()方法配置tru/finally語句塊來完成
  3,等待是否可斷

synchronized 不可中斷,除非拋出異常或者正常運行完成
ReentrantLock 可中斷,a.設置超時方法 tryLock(long timeout,TimeUnit unit)
b.lockInterruptibly() 放代碼塊,調用 interrupt() 方法可中斷
  4,加鎖是否公平

synchronized 非公平鎖
ReentrantLock 二者均可以,默認非公平鎖,構造方法能夠傳入boolean值,true爲公平鎖,false爲非公平鎖
  5,鎖綁定多個條件Condition

synchronized 沒有
ReentrantLock 用來實現分組喚醒須要喚醒的線程們,能夠精確喚醒,而不是像synchronized要麼隨機喚醒一個線程要麼喚醒所有線程。

 

 

     舉例:鎖綁定多個條件Condition

public class SyncAndReetrantLockDemo { public static void main(String[] args) { /* 多線程之間按順序調用,實現A->B->C三個線程啓動,要求以下: AA打印5次,BB打印10次,CC打印15次...共10輪 */ ShareResource shareResource = new ShareResource(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print(5,shareResource.getC1(),shareResource.getC2(),2); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print(10,shareResource.getC2(),shareResource.getC3(),3); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print(15,shareResource.getC3(),shareResource.getC1(),1); } }, "C").start(); } } class ShareResource{ private int number = 1;//A:1 B:2 C:3
    private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); public Condition getC1() { return c1; } public Condition getC2() { return c2; } public Condition getC3() { return c3; } public void print(int times, Condition condition1, Condition condition2, int num){ lock.lock(); try { int temp = num == 1 ? 3 : (num-1); while (number != temp){ condition1.await(); } for (int i = 0; i < times; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } //通知標誌
            this.number = num; condition2.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } }
  • 生產者消費者隊列版
public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) throws Exception { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 生產線程啓動"); try { myResource.myProd(); System.out.println(); System.out.println(); } catch (Exception e) { e.printStackTrace(); } }, "Prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 消費線程啓動"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "Consumer").start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("5秒鐘時間到,main線程叫停,活動結束"); myResource.stop(); } } class MyResource{ private volatile boolean flag = true;//默認開啓,進行生產消費
    private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception { String data = null; boolean retValue; while (flag) { data = atomicInteger.incrementAndGet() + ""; retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if (retValue){ System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "失敗"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 被叫停了!表示flag=false,生產動做結束"); } public void myConsumer() throws Exception { String result; while (flag) { result = blockingQueue.poll(2L, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")){ flag = false; System.out.println(Thread.currentThread().getName() + " 超過2秒鐘沒有取到,消費隊列退出"); return; } System.out.println(Thread.currentThread().getName() + " 消費隊列" + result + "成功"); } } public void stop() throws Exception { this.flag = false; } }

 參考博主:https://blog.csdn.net/javazejian/article/details/77410889

相關文章
相關標籤/搜索