JDK中除了上文提到的各類併發容器,還提供了豐富的阻塞隊列。阻塞隊列統一實現了BlockingQueue接口,BlockingQueue接口在java.util包Queue接口的基礎上提供了put(e)以及take()兩個阻塞方法。他的主要使用場景就是多線程下的生產者消費者模式,生產者線程經過put(e)方法將生產元素,消費者線程經過take()消費元素。除了阻塞功能,BlockingQueue接口還定義了定時的offer以及poll,以及一次性移除方法drainTo。java
//插入元素,隊列滿後會拋出異常
boolean add(E e);
//移除元素,隊列爲空時會拋出異常
E remove();
//插入元素,成功反會true
boolean offer(E e);
//移除元素
E poll();
//插入元素,隊列滿後會阻塞
void put(E e) throws InterruptedException;
//移除元素,隊列空後會阻塞
E take() throws InterruptedException;
//限時插入
boolean offer(E e, long timeout, TimeUnit unit)
//限時移除
E poll(long timeout, TimeUnit unit);
//獲取全部元素到Collection中
int drainTo(Collection<? super E> c);
複製代碼
JDK1.8中的阻塞隊列實現共有7個,分別是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque,下面就來一一對他們進行一個簡單的分析。node
ArrayBlockingQueue是一個底層用數組實現的有界阻塞隊列,有界是指他的容量大小是固定的,不能擴充容量,在初始化時就必須肯定隊列大小。它經過可重入的獨佔鎖ReentrantLock來控制併發,Condition來實現阻塞。編程
//經過數組來存儲隊列中的元素
final Object[] items;
//初始化一個固定的數組大小,默認使用非公平鎖來控制併發
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//初始化固定的items數組大小,初始化notEmpty以及notFull兩個Condition來控制生產消費
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);//經過ReentrantLock來控制併發
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
複製代碼
能夠看到ArrayBlockingQueue初始化了一個ReentrantLock以及兩個Condition,用來控制併發下隊列的生產消費。這裏重點看下阻塞的put以及take方法:數組
//插入元素到隊列中
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //獲取獨佔鎖
try {
while (count == items.length) //若是隊列已滿則經過await阻塞put方法
notFull.await();
enqueue(e); //插入元素
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) //插入元素後將putIndex+1,當隊列使用完後重置爲0
putIndex = 0;
count++;
notEmpty.signal(); //隊列添加元素後喚醒因notEmpty等待的消費線程
}
//移除隊列中的元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //獲取獨佔鎖
try {
while (count == 0) //若是隊列已空則經過await阻塞take方法
notEmpty.await();
return dequeue(); //移除元素
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) //移除元素後將takeIndex+1,當隊列使用完後重置爲0
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //隊列消費元素後喚醒因notFull等待的消費線程
return x;
}
複製代碼
在隊列添加和移除元素的過程當中使用putIndex、takeIndex以及count三個變量來控制生產消費元素的過程,putIndex負責記錄下一個可添加元素的下標,takeIndex負責記錄下一個可移除元素的下標,count記錄了隊列中的元素總量。隊列滿後經過notFull.await()來阻塞生產者線程,消費元素後經過notFull.signal()來喚醒阻塞的生產者線程。隊列爲空後經過notEmpty.await()來阻塞消費者線程,生產元素後經過notEmpty.signal()喚醒阻塞的消費者線程。緩存
限時插入以及移除方法在ArrayBlockingQueue中經過awaitNanos來實現,在給定的時間事後若是線程未被喚醒則直接返回。bash
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout); //獲取定時時長
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0) //指定時長事後,線程仍然未被喚醒則返回false
return false;
nanos = notFull.awaitNanos(nanos); //指定時長內阻塞線程
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
複製代碼
還有一個比較重要的方法:drainTo,drainTo方法能夠一次性獲取隊列中全部的元素,它減小了鎖定隊列的次數,使用得當在某些場景下對性能有不錯的提高。數據結構
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock; //僅獲取一次鎖
lock.lock();
try {
int n = Math.min(maxElements, count); //獲取隊列中全部元素
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x); //循環插入元素
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal(); //喚醒等待的生產者線程
}
}
} finally {
lock.unlock();
}
}
複製代碼
LinkedBlockingQueue是一個底層用單向鏈表實現的有界阻塞隊列,和ArrayBlockingQueue同樣,採用ReentrantLock來控制併發,不一樣的是它使用了兩個獨佔鎖來控制消費和生產。put以及take方法源碼以下:多線程
public void put(E e) throws InterruptedException {
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//由於使用了雙鎖,須要使用AtomicInteger計算元素總量,避免併發計算不許確
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); //隊列已滿,阻塞生產線程
}
enqueue(node); //插入元素到隊列尾部
c = count.getAndIncrement(); //count + 1
if (c + 1 < capacity) //若是+1後隊列還未滿,經過其餘生產線程繼續生產
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) //只有當以前是空時,消費隊列纔會阻塞,不然是不須要通知的
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//將新元素添加到鏈表末尾,而後將last指向尾部元素
last = last.next = node;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await(); //隊列爲空,阻塞消費線程
}
x = dequeue(); //消費一個元素
c = count.getAndDecrement(); //count - 1
if (c > 1) // 通知其餘等待的消費線程繼續消費
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) //只有當以前是滿的,生產隊列纔會阻塞,不然是不須要通知的
signalNotFull();
return x;
}
//消費隊列頭部的下一個元素,同時將新頭部置空
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
複製代碼
能夠看到LinkedBlockingQueue經過takeLock和putLock兩個鎖來控制生產和消費,互不干擾,只要隊列未滿,生產線程能夠一直生產,只要隊列不爲空,消費線程能夠一直消費,不會相互由於獨佔鎖而阻塞。併發
看過了LinkedBlockingQueue以及ArrayBlockingQueue的底層實現,會發現一個問題,正常來講消費者和生產者能夠併發執行對隊列的吞吐量會有比較大的提高,那麼爲何ArrayBlockingQueue中不使用雙鎖來實現隊列的生產和消費呢?個人理解是ArrayBlockingQueue也能使用雙鎖來實現功能,但因爲它底層使用了數組這種簡單結構,至關於一個共享變量,若是經過兩個鎖,須要更加精確的鎖控制,這也是爲何JDK1.7中的ConcurrentHashMap使用了分段鎖來實現,將一個數組分爲多個數組來提升併發量。LinkedBlockingQueue不存在這個問題,鏈表這種數據結構頭尾節點都相對獨立,存儲上也不連續,雙鎖控制不存在複雜性。這是個人理解,若是你有更好的結論,請留言探討。app
PriorityBlockingQueue是一個底層由數組實現的無界隊列,並帶有排序功能,一樣採用ReentrantLock來控制併發。因爲是無界的,因此插入元素時不會阻塞,沒有隊列滿的狀態,只有隊列爲空的狀態。經過這兩點特徵其實能夠猜想它應該是有一個獨佔鎖(底層數組)和一個Condition(只通知消費)來實現的。put以及take方法源碼分析以下:
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//無界隊列,隊列長度不夠時會擴容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
//經過comparator來實現優先級排序
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal(); //和ArrayBlockingQueue同樣,每次添加元素後通知消費線程
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await(); //隊列爲空,阻塞消費線程
} finally {
lock.unlock();
}
return result;
}
複製代碼
DelayQueue也是一個無界隊列,它是在PriorityQueue基礎上實現的,先按延遲優先級排序,延遲時間短的排在前面。和PriorityBlockingQueue類似,底層也是數組,採用一個ReentrantLock來控制併發。因爲是無界的,因此插入元素時不會阻塞,沒有隊列滿的狀態。能想到的最簡單的使用場景通常有兩個:一個是緩存過時,一個是定時執行的任務。但因爲是無界的,緩存過時上通常使用的並很少。簡單來看下put以及take方法:
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();//優先級隊列
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); //插入元素到優先級隊列
if (q.peek() == e) { //若是插入的元素在隊列頭部
leader = null;
available.signal(); //通知消費線程
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); //獲取頭部元素
if (first == null)
available.await(); //空隊列阻塞
else {
long delay = first.getDelay(NANOSECONDS); //檢查元素是否延遲到期
if (delay <= 0)
return q.poll(); //到期則彈出元素
first = null;
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); //阻塞未到期的時間
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
複製代碼
SynchronousQueue相比較以前的4個隊列就比較特殊了,它是一個沒有容量的隊列,也就是說它內部時不會對數據進行存儲,每進行一次put以後必需要進行一次take,不然相同線程繼續put會阻塞。這種特性很適合作一些傳遞性的工做,一個線程生產,一個線程消費。內部分爲公平和非公平訪問兩種模式,默認使用非公平,未使用鎖,所有經過CAS操做來實現併發,吞吐量很是高。這裏只對它的非公平實現下的take和put方法作下簡單分析:
//非公平狀況下調用內部類TransferStack的transfer方法put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
//非公平狀況下調用內部類TransferStack的transfer方法take
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
//具體的put以及take方法,只有E的區別,經過E來區別REQUEST仍是DATA模式
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
//棧無元素或者元素和插入的元素模式相匹配,也就是說都是插入元素
if (h == null || h.mode == mode) {
//有時間限制而且超時
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
casHead(h, h.next); // 從新設置頭節點
else
return null;
}
//未超時cas操做嘗試設置頭節點
else if (casHead(h, s = snode(s, e, h, mode))) {
//自旋一段時間後未消費元素則掛起put線程
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next);
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
//棧不爲空而且和頭節點模式不匹配,存在元素則消費元素並從新設置head節點
else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
}
//節點正在匹配階段
else { // help a fulfiller
SNode m = h.next; // m is h match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
//先自旋後掛起的核心方法
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//計算自旋的次數
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
//匹配成功過返回節點
if (m != null)
return m;
//超時控制
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//自旋檢查,是否進行下一次自旋
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this); //在這裏掛起線程
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
複製代碼
代碼很是複雜,這裏說下我所理解的核心邏輯。代碼中能夠看到put以及take方法都是經過調用transfer方法來實現的,而後經過參數mode來區別,在生產元素時若是是同一個線程屢次put則會採起自旋的方式屢次嘗試put元素,可能自旋過程當中元素會被消費,這樣能夠及時put,下降線程掛起的性能損耗,高吞吐量的核心也在這裏,消費線程同樣,空棧時也會先自旋,自旋失敗而後經過線程的LockSupport.park方法掛起。
LinkedTransferQueue是一個無界的阻塞隊列,底層由鏈表實現。雖然和LinkedBlockingQueue同樣也是鏈表實現的,但併發控制的實現上卻很不同,和SynchronousQueue相似,採用了大量的CAS操做,沒有使用鎖,因爲是無界的,因此不會put生產線程不會阻塞,只會在take時阻塞消費線程,消費線程掛起時一樣使用LockSupport.park方法。
LinkedTransferQueue相比於以上的隊列還提供了一些額外的功能,它實現了TransferQueue接口,有兩個關鍵方法transfer(E e)和tryTransfer(E e)方法,transfer在沒有消費時會阻塞,tryTransfer在沒有消費時不會插入到隊列中,也不會等待,直接返回false。
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
//經過SYNC狀態來實現生產阻塞
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
//經過NOW狀態跳過添加元素以及阻塞
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}
//經過ASYNC狀態跳過阻塞
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
//經過SYNC狀態來實現消費阻塞
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
//生產消費調用同一個方法,經過e是否爲空,haveData,how等參數來區分具體邏輯
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//找出第一個可用節點
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//隊列爲空時直接跳過
if (item != p && (item != null) == isData) { // unmatched
//節點類型相同,跳過
if (isData == haveData) // can not match
break;
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
//插入節點或移除節點具體邏輯
//tryTransfer方法會直接跳過並返回結果
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData); //加入節點
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)
//自旋以及阻塞消費線程邏輯,和SynchronousQueue相似,先嚐試自選,失敗後掛起線程
//transfer方法在沒有消費線程時也會阻塞在這裏
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
複製代碼
LinkedBlockingDeque是一個有界的雙端隊列,底層採用一個雙向的鏈表來實現,在LinkedBlockingQeque的Node實現多了指向前一個節點的變量prev。併發控制上和ArrayBlockingQueue相似,採用單個ReentrantLock來控制併發,這裏是由於雙端隊列頭尾均可以消費和生產,因此使用了一個共享鎖。它實現了BlockingDeque接口,繼承自BlockingQueue接口,多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,用來頭尾生產和消費。LinkedBlockingDeque的實現代碼比較簡單,基本就是綜合了LinkedBlockingQeque和ArrayBlockingQueue的代碼邏輯,這裏就不作分析了。
##總結 文章對JDK1.8中的7種阻塞隊列都作了簡單分析,幫助你們大體梳理的這7個隊列的基本原理。總的來講每種阻塞隊列都有它本身的應用場景,使用時能夠先根據有界仍是無界,而後在根據各自的特性來進行選擇。
有界阻塞隊列包括:ArrayBlockingQueue、LinkedBlockingQueue以及LinkedBlockingDeque三種,LinkedBlockingDeque應用場景不多,通常用在「工做竊取」模式下。ArrayBlockingQueue和LinkedBlockingQueue基本就是數組和鏈表的區別。無界隊列包括PriorityBlockingQueue、DelayQueue和LinkedTransferQueue。PriorityBlockingQueue用在須要排序的隊列中。DelayQueue能夠用來作一些定時任務或者緩存過時的場景。LinkedTransferQueue則相比較其餘隊列多了transfer功能。最後剩下一個不存儲元素的隊列SynchronousQueue,用來處理一些高效的傳遞性場景。
參考資料: