本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html
![]()
本節,咱們來探討Java併發包中的各類隊列。Java併發包提供了豐富的隊列類,能夠簡單分爲:java
無鎖非阻塞是這些隊列不使用鎖,全部操做老是能夠當即執行,主要經過循環CAS實現併發安全,阻塞隊列是指這些隊列使用鎖和條件,不少操做都須要先獲取鎖或知足特定條件,獲取不到鎖或等待條件時,會等待(即阻塞),獲取到鎖或條件知足再返回。git
這些隊列迭代都不會拋出ConcurrentModificationException,都是弱一致的,後面就不單獨強調了。下面,咱們來簡要探討每類隊列的用途、用法和基本實現原理。github
有兩個無鎖非阻塞隊列:ConcurrentLinkedQueue和ConcurrentLinkedDeque,它們適用於多個線程併發使用一個隊列的場合,都是基於鏈表實現的,都沒有限制大小,是無界的,與ConcurrentSkipListMap相似,它們的size方法不是一個常量運算,不過這個方法在併發應用中用處也不大。算法
ConcurrentLinkedQueue實現了Queue接口,表示一個先進先出的隊列,從尾部入隊,從頭部出隊,內部是一個單向鏈表。ConcurrentLinkedDeque實現了Deque接口,表示一個雙端隊列,在兩端均可以入隊和出隊,內部是一個雙向鏈表。它們的用法相似於LinkedList,咱們就不贅述了。編程
這兩個類最基礎的原理是循環CAS,ConcurrentLinkedQueue的算法基於一篇論文:"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms",ConcurrentLinkedDeque擴展了ConcurrentLinkedQueue的技術,但它們的具體實現都很是複雜,咱們就不探討了。swift
除了剛介紹的兩個隊列,其餘隊列都是阻塞隊列,都實現了接口BlockingQueue,在入隊/出隊時可能等待,主要方法有:數組
//入隊,若是隊列滿,等待直到隊列有空間
void put(E e) throws InterruptedException;
//出隊,若是隊列空,等待直到隊列不爲空,返回頭部元素
E take() throws InterruptedException;
//入隊,若是隊列滿,最多等待指定的時間,若是超時仍是滿,返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出隊,若是隊列空,最多等待指定的時間,若是超時仍是空,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
複製代碼
普通阻塞隊列是經常使用的隊列,經常使用於生產者/消費者模式。安全
ArrayBlockingQueue和LinkedBlockingQueue都是實現了Queue接口,表示先進先出的隊列,尾部進,頭部出,而LinkedBlockingDeque實現了Deque接口,是一個雙端隊列。微信
ArrayBlockingQueue是基於循環數組實現的,有界,建立時須要指定大小,且在運行過程當中不會改變,這與咱們在容器類中介紹的ArrayDeque是不一樣的,ArrayDeque也是基於循環數組實現的,可是是無界的,會自動擴展。
LinkedBlockingQueue是基於單向鏈表實現的,在建立時能夠指定最大長度,也能夠不指定,默認是無限的,節點都是動態建立的。LinkedBlockingDeque與LinkedBlockingQueue同樣,最大長度也是在建立時可選的,默認無限,不過,它是基於雙向鏈表實現的。
內部,它們都是使用顯式鎖ReentrantLock和顯式條件Condition實現的。
ArrayBlockingQueue的實現很直接,有一個數組存儲元素,有兩個索引表示頭和尾,有一個變量表示當前元素個數,有一個鎖保護全部訪問,有兩個條件,"不滿"和"不空"用於協做,成員聲明以下:
final Object[] items;
int takeIndex; // 頭
int putIndex; //尾
int count; //元素個數
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
複製代碼
實現思路與咱們在72節實現的相似,就不贅述了。
與ArrayBlockingQueue相似,LinkedBlockingDeque也是使用一個鎖和兩個條件,使用鎖保護全部操做,使用"不滿"和"不空"兩個條件,LinkedBlockingQueue稍微不一樣,由於它使用鏈表,且只從頭部出隊、從尾部入隊,它作了一些優化,使用了兩個鎖,一個保護頭部,一個保護尾部,每一個鎖關聯一個條件。
普通阻塞隊列是先進先出的,而優先級隊列是按優先級出隊的,優先級高的先出,咱們在容器類中介紹過優先級隊列PriorityQueue及其背後的數據結構堆。
PriorityBlockingQueue是PriorityQueue的併發版本,與PriorityQueue同樣,它沒有大小限制,是無界的,內部的數組大小會動態擴展,要求元素要麼實現Comparable接口,要麼建立PriorityBlockingQueue時提供一個Comparator對象。
與PriorityQueue的區別是,PriorityBlockingQueue實現了BlockingQueue接口,在隊列爲空時,take方法會阻塞等待。
另外,PriorityBlockingQueue是線程安全的,它的基本實現原理與PriorityQueue是同樣的,也是基於堆,但它使用了一個鎖ReentrantLock保護全部訪問,使用了一個條件協調阻塞等待。
延時阻塞隊列DelayQueue是一種特殊的優先級隊列,它也是無界的,它要求每一個元素都實現Delayed接口,該接口的聲明爲:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
複製代碼
Delayed擴展了Comparable接口,也就是說,DelayQueue的每一個元素都是可比較的,它有一個額外方法getDelay返回一個給定時間單位unit的整數,表示再延遲多長時間,若是小於等於0,表示再也不延遲。
DelayQueue也是優先級隊列,它按元素的延時時間出隊,它的特殊之處在於,只有當元素的延時過時以後才能被從隊列中拿走,也就是說,take方法老是返回第一個過時的元素,若是沒有,則阻塞等待。
DelayQueue能夠用於實現定時任務,咱們看段簡單的示例代碼:
public class DelayedQueueDemo {
private static final AtomicLong taskSequencer = new AtomicLong(0);
static class DelayedTask implements Delayed {
private long runTime;
private long sequence;
private Runnable task;
public DelayedTask(int delayedSeconds, Runnable task) {
this.runTime = System.currentTimeMillis() + delayedSeconds * 1000;
this.sequence = taskSequencer.getAndIncrement();
this.task = task;
}
@Override
public int compareTo(Delayed o) {
if (o == this) {
return 0;
}
if (o instanceof DelayedTask) {
DelayedTask other = (DelayedTask) o;
if (runTime < other.runTime) {
return -1;
} else if (runTime > other.runTime) {
return 1;
} else if (sequence < other.sequence) {
return -1;
} else {
return 1;
}
}
throw new IllegalArgumentException();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runTime - System.currentTimeMillis(),
TimeUnit.MICROSECONDS);
}
public Runnable getTask() {
return task;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> tasks = new DelayQueue<>();
tasks.put(new DelayedTask(2, new Runnable() {
@Override
public void run() {
System.out.println("execute delayed task");
}
}));
DelayedTask task = tasks.take();
task.getTask().run();
}
}
複製代碼
DelayedTask表示延時任務,只有延時過時後任務纔會執行,任務按延時時間排序,延時同樣的按照入隊順序排序。
內部,DelayQueue是基於PriorityQueue實現的,它使用一個鎖ReentrantLock保護全部訪問,使用一個條件available表示頭部是否有元素,當頭部元素的延時未到時,take操做會根據延時計算需睡眠的時間,而後睡眠,若是在此過程當中有新的元素入隊,且成爲頭部元素,則阻塞睡眠的線程會被提早喚醒而後從新檢查。以上是基本思路,DelayQueue的實現有一些優化,以減小沒必要要的喚醒,具體咱們就不探討了。
Java併發包中還有兩個特殊的阻塞隊列,SynchronousQueue和LinkedTransferQueue。
SynchronousQueue與通常的隊列不一樣,它不算一種真正的隊列,它沒有存儲元素的空間,存儲一個元素的空間都沒有。它的入隊操做要等待另外一個線程的出隊操做,反之亦然。若是沒有其餘線程在等待從隊列中接收元素,put操做就會等待。take操做須要等待其餘線程往隊列中放元素,若是沒有,也會等待。SynchronousQueue適用於兩個線程之間直接傳遞信息、事件或任務。
LinkedTransferQueue實現了TransferQueue接口,TransferQueue是BlockingQueue的子接口,但增長了一些額外功能,生產者在往隊列中放元素時,能夠等待消費者接收後再返回,適用於一些消息傳遞類型的應用中。TransferQueue的接口定義爲:
public interface TransferQueue<E> extends BlockingQueue<E> {
//若是有消費者在等待(執行take或限時的poll),直接轉給消費者,
//返回true,不然返回false,不入隊
boolean tryTransfer(E e);
//若是有消費者在等待,直接轉給消費者,
//不然入隊,阻塞等待直到被消費者接收後再返回
void transfer(E e) throws InterruptedException;
//若是有消費者在等待,直接轉給消費者,返回true
//不然入隊,阻塞等待限定的時間,若是最後被消費者接收,返回true
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//是否有消費者在等待
boolean hasWaitingConsumer();
//等待的消費者個數
int getWaitingConsumerCount();
}
複製代碼
LinkedTransferQueue是基於鏈表實現的、無界的TransferQueue,具體實現比較複雜,咱們就不探討了。
本節簡要介紹了Java併發包中的各類隊列,包括其基本概念和基本原理。
從73節到本節,咱們介紹了Java併發包的各類容器,至此,就介紹完了,在實際開發中,應該儘可能使用這些現成的容器,而非從新發明輪子。
Java併發包中還提供了一種方便的任務執行服務,使用它,能夠將要執行的併發任務與線程的管理相分離,大大簡化併發任務和線程的管理,讓咱們下一節來探討。
(與其餘章節同樣,本節全部代碼位於 github.com/swiftma/pro…)
未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),從入門到高級,深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。