突擊併發編程JUC系列演示代碼地址: https://github.com/mtcarpenter/JavaTutorial前端
什麼是阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。java
- 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
- 支持阻塞的移除方法:意思是在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。git
插入和移除操做的4種處理方式
方法/處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
- 拋出異常:當隊列滿時,若是再往隊列裏插入元素,會拋出
IllegalStateException
("Queue full")異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException
異常。 - 返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若是是移除方法,則是從隊列裏取出一個元素,若是沒有則返回 null 。
- 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏 put 元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,若是消費者線程從隊列裏 take 元素,隊列會阻塞住消費者線程,直到隊列不爲空。
- 超時退出:當阻塞隊列滿時,若是生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間,若是超過了指定的時間,生產者線程就會退出。
若是是無界阻塞隊列,隊列不可能會出現滿的狀況,因此使用 put 或 offer 方法永遠不會被阻塞,並且使用offer方法時,該方法永遠返回 true。github
ArrayBlockingQueue
ArrayBlockingQueue
是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。 默認狀況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。爲了保證公平性,一般會下降吞吐量。編程
阻塞式寫方法
在ArrayBlockingQueue
中提供了兩個阻塞式寫方法,分別以下(在該隊列中,不管是阻塞式寫方法仍是非阻塞式寫方法,都不容許寫入null)。後端
void put(E e)
:向隊列的尾部插入新的數據,當隊列已滿時調用該方法的線程會進入阻塞,直到有其餘線程對該線程執行了中斷操做,或者隊列中的元素被其餘線程消費。boolean offer(E e, long timeout, TimeUnit unit)
:向隊列尾部寫入新的數據,當隊列已滿時執行該方法的線程在指定的時間單位內將進入阻塞,直到到了指定的超時時間後,或者在此期間有其餘線程對隊列數據進行了消費。
put() 方法示例數組
public class ArrayBlockingQueueExample1 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); try { queue.put("class 1"); queue.put("class 2"); queue.put("class 3"); // 超過指定得容量當前線程阻塞 queue.put("class 4"); } catch (InterruptedException e) { e.printStackTrace(); } } }
非阻塞式寫方法
當隊列已滿時寫入數據,若是不想使得當前線程進入阻塞,那麼就可使用非阻塞式的寫操做方法。緩存
boolean add(E e)
:向隊列尾部寫入新的數據,當隊列已滿時不會進入阻塞,可是該方法會拋出隊列已滿的異常。boolean offer(E e)
:向隊列尾部寫入新的數據,當隊列已滿時不會進入阻塞,而且會當即返回 false。
add() 方法示例多線程
public class ArrayBlockingQueueExample2 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.add("class 1"); queue.add("class 2"); queue.add("class 3"); // 超過指定容量 拋出異常 queue.add("class 4"); } } // 拋出異常
阻塞式讀方法
E take()
:從隊列頭部獲取數據,而且該數據會從隊列頭部移除,當隊列爲空時執行take方法的線程將進入阻塞,直到有其餘線程寫入新的數據,或者當前線程被執行了中斷操做。E poll(long timeout, TimeUnit unit)
:從隊列頭部獲取數據而且該數據會從隊列頭部移除,若是隊列中沒有任何元素時則執行該方法,當前線程會阻塞指定的時間,直到在此期間有新的數據寫入,或者阻塞的當前線程被其餘線程中斷,當線程因爲超時退出阻塞時,返回值爲null。
take() 方法示例併發
public class ArrayBlockingQueueExample3 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); queue.add("class 1"); queue.add("class 2"); queue.add("class 3"); try { // 取出對頭元素 System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } // 隊列大小 System.out.println(queue.size()); } } //class 1 // 2
非阻塞式讀方法
E poll()
:從隊列頭部獲取數據而且該數據會從隊列頭部移除,當隊列爲空時,該方法不會使得當前線程進入阻塞,而是返回null值。E peek()
:當隊列爲空時,該方法不會使得當前線程進入阻塞,而是返回null值。
public class ArrayBlockingQueueExample4 { public static void main(String[] args) { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // 隊列無元素 直接返回 null System.out.println(queue.poll( )); System.out.println(queue.peek( )); } } // null // null
部分源碼
public void put(E e) throws InterruptedException { // 檢查元素 checkNotNull(e); final ReentrantLock lock = this.lock; // 獲取鎖 lock.lockInterruptibly(); try { // 元素滿 一直阻塞,隊列非滿時,被喚醒 while (count == items.length) notFull.await(); // 入隊 enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 獲取鎖 lock.lockInterruptibly(); try { // 隊列爲空 等待 while (count == 0) notEmpty.await(); // 出隊 return dequeue(); } finally { lock.unlock(); } }
LinkedBlockingQueue
LinkedBlockingQueue
是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE
。此隊列按照先進先出的原則對元素進行排序。
PriorityBlockingQueue
PriorityBlockingQueue
是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排列。也能夠自定義類實現compareTo()
方法來指定元素排序規則,或者初始化PriorityBlockingQueue
時,指定構造參數Comparator 來對元素進行排序。須要注意的是不能保證同優先級元素的順序。
public class PriorityBlockingQueueExample1 { public static void main(String[] args) { PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue(); queue.offer(1); queue.offer(12); queue.offer(21); queue.offer(6); // 內部排序 System.out.println(queue.poll()); // 1 System.out.println(queue.poll()); // 6 System.out.println(queue.poll()); // 12 System.out.println(queue.poll()); //21 } }
DelayQueue
DelayQueue
是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue
來實現。隊列中的元素必須實現Delayed
接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。
DelayQueue
很是有用,能夠將DelayQueue
運用在如下應用場景。
- 緩存系統的設計:能夠用
DelayQueue
保存緩存元素的有效期,使用一個線程循環查詢DelayQueue
,一旦能從DelayQueue
中獲取元素時,表示緩存有效期到了。 - 定時任務調度:使用
DelayQueue
保存當天將會執行的任務和執行時間,一旦從DelayQueue
中獲取到任務就開始執行,好比TimerQueue
就是使用DelayQueue
實現的。
DelayQueue
隊列的元素必須實現Delayed
接口。咱們能夠參考ScheduledThreadPoolExecutor
裏ScheduledFutureTask
類的實現。
public class DelayQueueExample1 { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayedEntry> queue = new DelayQueue<>(); // 延期3秒 處理 queue.put(new DelayedEntry("A", 30000L)); // 延期10 秒處理 queue.add(new DelayedEntry("B", 10000L)); // 延期 20 秒處理 queue.add(new DelayedEntry("C", 20000L)); int size = queue.size(); System.out.println("當前時間是:" + LocalDateTime.now()); // 從延時隊列中獲取元素, 將輸出 A,B,C for (int i = 0; i < size; i++) { System.out.println(queue.take() + " ------ " + LocalDateTime.now()); } } } /** * 繼承 Delayed 接口 */ class DelayedEntry implements Delayed { /** * 元素數據內容 */ private final String value; /** * 用於計算失效時間 */ private final long exeTime; DelayedEntry(String value, long exeTime) { this.value = value; this.exeTime = exeTime + System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { return exeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayedEntry t = (DelayedEntry) o; if (this.exeTime < t.exeTime) { return -1; } else if (this.exeTime > t.exeTime) { return 1; } else { return 0; } } @Override public String toString() { return "DelayedEntry{" + "value=" + value + ", exeTime=" + exeTime + '}'; } } //當前時間是:2020-10-15T16:26:37.167 //DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117 // DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105 //DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104
SynchronousQueue
SynchronousQueue
是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。 它支持公平訪問隊列。默認狀況下線程採用非公平性策略訪問隊列。使用如下構造方法能夠建立公平性訪問的SynchronousQueue
,若是設置爲true,則等待的線程會採用先進先出的順序訪問隊列。
LinkedTransferQueue
LinkedTransferQueue
是一個由鏈表結構組成的無界阻塞TransferQueue
隊列。相對於其餘阻塞隊列,LinkedTransferQueue
多了tryTransfer
和transfer
方法。
-
transfer方法
若是當前有消費者正在等待接收元素(消費者使用
take()
方法或帶時間限制的poll()方法時),transfer
方法能夠把生產者傳入的元素馬上transfer
(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer 方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer
方法的關鍵代碼以下 -
tryTransfer方法
tryTransfer
方法是用來試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回false。和transfer
方法的區別是tryTransfer
方法不管消費者是否接收,方法當即返回,而transfer
方法是必須等到消費者消費了才返回。 對於帶有時間限制的tryTransfer(E e,long timeout,TimeUnit unit)
方法,試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回 true。
LinkedBlockingDeque
LinkedBlockingDeque
是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是能夠從隊列的兩端插入和移出元素。雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque
多了addFirst
、addLast
、offerFirst
、offerLast
、peekFirst
和peekLast
等方法,以First
單詞結尾的方法,表示插入、獲取(peek)或移除雙端隊列的第一個元素。以Last
單詞結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。另外,插入方法add
等同於addLast
,移除方法remove
等效於removeFirst
。可是take
方法卻等同於takeFirst
,不知道是否是JDK
的 bug,使用時仍是用帶有First
和Last
後綴的方法更清楚。 在初始化LinkedBlockingDeque
時能夠設置容量防止其過分膨脹。另外,雙向阻塞隊列能夠運用在「工做竊取」模式中。
歡迎關注公衆號 山間木匠 , 我是小春哥,從事 Java 後端開發,會一點前端、經過持續輸出系列技術文章以文會友,若是本文能爲您提供幫助,歡迎你們關注、點贊、分享支持,咱們下期再見!<br />