一,阻塞隊列?java
當阻塞隊列爲空時,獲取(take)操做是阻塞的;當阻塞隊列爲滿時,添加(put)操做是阻塞的。git
二,爲何用,有什麼好處?程序員
阻塞隊列不用手動控制何時該被阻塞,何時該被喚醒,簡化了操做。github
三,架構梳理與種類分析api
(1)種類分析數組
- ArrayBlockingQueue:由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue:由鏈表結構組成的有界(大小默認爲Integer.MAX_VALUE)阻塞隊列。
- PriorityBlockingQueue:支持優先級排序的無界阻塞隊列。
- DelayBlockingQueue:使用優先級隊列實現的延遲無界阻塞隊列。
- SynchronousQueue:不存儲元素的阻塞隊列,也即單個元素的隊列。
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列。
注意:安全
LinkedBlockingQueue
雖然是有界的,但有個巨坑,其默認大小是Integer.MAX_VALUE
,高達21億,通常狀況下內存早爆了(在線程池的ThreadPoolExecutor
有體現)。
(2)BlockingQueue的核心方法多線程
- 拋出異常
- 當阻塞隊列滿時,再往隊列裏add插入元素會拋出 java.lang.IllegalStateException: Queue full;
- 當阻塞隊列空時,再從隊列裏remove移除元素會拋出 java.util.NoSuchElementException架構
- 特殊值
- 插入方法,成功true失敗false
- 移除方法,成功返回出隊列的元素,隊列裏面沒有就返回nullthis
- 一直阻塞
- 當阻塞隊列滿時,生產者線程繼續往隊列裏put元素,隊列會一直阻塞生產線程知道put數據或者響應中斷退出
- 當阻塞隊列空時,消費者線程試圖從隊列裏take元素,隊列會一直阻塞消費者線程知道隊列可用
- 超時退出
- 當阻塞隊列滿時,隊列會阻塞生產者現場必定時間,超過限時後生產者線程會退出
因爲Java中的阻塞隊列接口BlockingQueue繼承自Queue接口,所以先來看看阻塞隊列接口爲咱們提供的主要方法
public interface BlockingQueue<E> extends Queue<E> { //將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量) //在成功時返回 true,若是此隊列已滿,則拋IllegalStateException。 boolean add(E e); //將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量) // 將指定的元素插入此隊列的尾部,若是該隊列已滿, //則在到達指定的等待時間以前等待可用的空間,該方法可中斷 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //將指定的元素插入此隊列的尾部,若是該隊列已滿,則一直等到(阻塞)。 void put(E e) throws InterruptedException; //獲取並移除此隊列的頭部,若是沒有元素則等待(阻塞), //直到有元素將喚醒等待線程執行該操做 E take() throws InterruptedException; //獲取並移除此隊列的頭部,在指定的等待時間前一直等到獲取元素, //超過期間方法將結束 E poll(long timeout, TimeUnit unit) throws InterruptedException; //今後隊列中移除指定元素的單個實例(若是存在)。 boolean remove(Object o); } //除了上述方法還有繼承自Queue接口的方法 //獲取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementException E element(); //獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。 E peek(); //獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。 E poll();
這裏咱們把上述操做進行分類
插入方法:
add(E e) : 添加成功返回true,失敗拋IllegalStateException異常
offer(E e) : 成功返回 true,若是此隊列已滿,則返回 false。
put(E e) :將元素插入此隊列的尾部,若是該隊列已滿,則一直阻塞
刪除方法:remove(Object o) :移除指定元素,成功返回true,失敗返回false
poll() : 獲取並移除此隊列的頭元素,若隊列爲空,則返回 null
take():獲取並移除此隊列頭元素,若沒有元素則一直阻塞。
檢查方法element() :獲取但不移除此隊列的頭元素,沒有元素則拋異常
peek() :獲取但不移除此隊列的頭;若隊列爲空,則返回 null。
四,應用場景:
- 線程通訊之生產消費者(傳統版生產消費,阻塞隊列版)
- 線程池
- 消息中間件
過分期間:
題目:synchronized 和 Lock 有什麼區別?用新的Lock有什麼好處?
- 原始構成
synchronized 是關鍵字屬於 JVM 層面
monitorenter(底層是經過 monitor 對象來完成,其實 wait/notify等方法也依賴於monitor對象只有在同步塊或方法中才能wait/notify等方法)
monitorexit
Lock 是具體類(java.util.concurrent.locks.Lock) 是api層面的鎖2,使用方法
synchronized 不須要用戶去手動釋放鎖,當 synchronized 代碼執行完成後系統會自動讓線程釋放對鎖的佔用
ReentrantLock 則須要用戶去手動釋放鎖,若沒有主動釋放鎖,就有可能致使出現死鎖現象。須要 lock()和unLock()方法配置tru/finally語句塊來完成
3,等待是否可斷synchronized 不可中斷,除非拋出異常或者正常運行完成
ReentrantLock 可中斷,a.設置超時方法 tryLock(long timeout,TimeUnit unit)
b.lockInterruptibly() 放代碼塊,調用 interrupt() 方法可中斷
4,加鎖是否公平synchronized 非公平鎖
ReentrantLock 二者均可以,默認非公平鎖,構造方法能夠傳入boolean值,true爲公平鎖,false爲非公平鎖
5,鎖綁定多個條件Conditionsynchronized 沒有
ReentrantLock 用來實現分組喚醒須要喚醒的線程們,能夠精確喚醒,而不是像synchronized要麼隨機喚醒一個線程要麼喚醒所有線程。
舉例:鎖綁定多個條件Condition
public class SyncAndReetrantLockDemo { public static void main(String[] args) { /* 多線程之間按順序調用,實現A->B->C三個線程啓動,要求以下: AA打印5次,BB打印10次,CC打印15次...共10輪 */ ShareResource shareResource = new ShareResource(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print(5,shareResource.getC1(),shareResource.getC2(),2); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print(10,shareResource.getC2(),shareResource.getC3(),3); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print(15,shareResource.getC3(),shareResource.getC1(),1); } }, "C").start(); } } class ShareResource{ private int number = 1;//A:1 B:2 C:3 private Lock lock = new ReentrantLock(); private Condition c1 = lock.newCondition(); private Condition c2 = lock.newCondition(); private Condition c3 = lock.newCondition(); public Condition getC1() { return c1; } public Condition getC2() { return c2; } public Condition getC3() { return c3; } public void print(int times, Condition condition1, Condition condition2, int num){ lock.lock(); try { int temp = num == 1 ? 3 : (num-1); while (number != temp){ condition1.await(); } for (int i = 0; i < times; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } //通知標誌 this.number = num; condition2.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } }
public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) throws Exception { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 生產線程啓動"); try { myResource.myProd(); System.out.println(); System.out.println(); } catch (Exception e) { e.printStackTrace(); } }, "Prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 消費線程啓動"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "Consumer").start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("5秒鐘時間到,main線程叫停,活動結束"); myResource.stop(); } } class MyResource{ private volatile boolean flag = true;//默認開啓,進行生產消費 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd() throws Exception { String data = null; boolean retValue; while (flag) { data = atomicInteger.incrementAndGet() + ""; retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if (retValue){ System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + " 插入隊列" + data + "失敗"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 被叫停了!表示flag=false,生產動做結束"); } public void myConsumer() throws Exception { String result; while (flag) { result = blockingQueue.poll(2L, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")){ flag = false; System.out.println(Thread.currentThread().getName() + " 超過2秒鐘沒有取到,消費隊列退出"); return; } System.out.println(Thread.currentThread().getName() + " 消費隊列" + result + "成功"); } } public void stop() throws Exception { this.flag = false; } }
參考博主:https://blog.csdn.net/javazejian/article/details/77410889