阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素
阻塞隊列提供了四種處理方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,如果沒有則返回null
一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
成員方法
隊列 | 有界性 | 鎖 | 數據結構 |
---|---|---|---|
ArrayBlockingQueue | bounded(有界) | 加鎖 | arrayList |
LinkedBlockingQueue | optionally-bounded | 加鎖 | linkedList |
PriorityBlockingQueue | unbounded | 加鎖 | heap |
DelayQueue | unbounded | 加鎖 | heap |
SynchronousQueue | bounded | 加鎖 | 無 |
LinkedTransferQueue | unbounded | 加鎖 | heap |
LinkedBlockingDeque | unbounded | 無鎖 | heap |
下面分別簡單介紹一下:
ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】
LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度爲Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿後才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閒線程則會重複使用,線程空閒了60秒後會被回收。
LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程併發時,可以將鎖的競爭最多降到一半。
ArrayBlockingQueue
package demo.queue; import java.util.concurrent.ArrayBlockingQueue; public class BlockingQueue1 { public static void main(String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//給定初始容量 // add/remove 拋出異常 arrayBlockingQueue.add("a"); arrayBlockingQueue.add("b"); arrayBlockingQueue.add("c"); //容量爲3 當我們加入第四個值時則會拋出異常 arrayBlockingQueue.add("d"); } }
輸出:
package demo.queue; import java.util.concurrent.ArrayBlockingQueue; public class BlockingQueue1 { public static void main(String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//給定初始容量 // add/remove 拋出異常 arrayBlockingQueue.add("a"); arrayBlockingQueue.add("b"); arrayBlockingQueue.add("c"); //容量爲3 當我們加入第四個值時則會拋出異常 //arrayBlockingQueue.add("d"); //remove System.out.println(arrayBlockingQueue.remove()); System.out.println(arrayBlockingQueue.remove()); System.out.println(arrayBlockingQueue.remove()); //當隊列沒有元素時拋出異常 System.out.println(arrayBlockingQueue.remove()); } }
輸出:
add方法和offer方法最終調用的是enqueue(E x)方法 看源碼可知
SynchronousQueue
SynchronousQueue , 只有一個容量!
每一個put操作,就需要有一個 take操作!
package demo.queue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class BlockingQueue2 { public static void main(String[] args) { SynchronousQueue<String> arrayBlockingQueue = new SynchronousQueue<>(); // A 存 new Thread(()->{ try { System.out.println(Thread.currentThread().getName() + "put a"); arrayBlockingQueue.put("a"); System.out.println(Thread.currentThread().getName() + "put b"); arrayBlockingQueue.put("b"); System.out.println(Thread.currentThread().getName() + "put c"); arrayBlockingQueue.put("c"); } catch (InterruptedException e) { e.printStackTrace(); } },"A:").start(); // B 取 new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"B:").start(); } }
輸出: