1.ArrayDeque, (數組雙端隊列)
2.PriorityQueue, (優先級隊列)
3.ConcurrentLinkedQueue, (基於鏈表的併發隊列)算法
4.DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口)
5.ArrayBlockingQueue, (基於數組的併發阻塞隊列)
6.LinkedBlockingQueue, (基於鏈表的FIFO阻塞隊列)
7.LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列)
8.PriorityBlockingQueue, (帶優先級的無界阻塞隊列)
9.SynchronousQueue (併發同步阻塞隊列)
—————————————————–設計模式
阻塞隊列和生產者-消費者模式數組
阻塞隊列(Blocking queue)提供了可阻塞的put和take方法,它們與可定時的offer和poll是等價的。若是Queue已經滿了,put方法會被阻塞直到有空間可用;若是Queue是空的,那麼take方法會被阻塞,直到有元素可用。Queue的長度能夠有限,也能夠無限;無限的Queue永遠不會充滿,因此它的put方法永遠不會阻塞。併發
阻塞隊列支持生產者-消費者設計模式。一個生產者-消費者設計分離了「生產產品」和「消費產品」。該模式不會發現一個工做便當即處理,而是把工做置於一個任務(「to do」)清單中,以備後期處理。生產者-消費者模式簡化了開發,由於它解除了生產者和消費者之間相互依賴的代碼。生產者和消費者以不一樣的或者變化的速度生產和消費數據,生產者-消費者模式將這些活動解耦,於是簡化了工做負荷的管理。工具
生產者-消費者設計是圍繞阻塞隊列展開的,生產者把數據放入隊列,並使數據可用,當消費者爲適當的行爲作準備時會從隊列中獲取數據。生產者不須要知道消費者的省份或者數量,甚至根本沒有消費者—它們只負責把數據放入隊列。相似地,消費者也不須要知道生產者是誰,以及是誰給它們安排的工做。BlockingQueue可使用任意數量的生產者和消費者,從而簡化了生產者-消費者設計的實現。最多見的生產者-消費者設計是將線程池與工做隊列相結合。性能
阻塞隊列簡化了消費者的編碼,由於take會保持阻塞直到可用數據出現。若是生產者不能足夠快地產生工做,讓消費者忙碌起來,那麼消費者只能一直等待,直到有工做可作。同時,put方法的阻塞特性也大大地簡化了生產者的編碼;若是使用一個有界隊列,那麼當隊列充滿的時候,生產者就會阻塞,暫不能生成更多的工做,從而給消費者時間來趕進進度。this
有界隊列是強大的資源管理工具,用來創建可靠的應用程序:它們遏制那些能夠產生過多工做量、具備威脅的活動,從而讓你的程序在面對超負荷工做時更加健壯。編碼
雖然生產者-消費者模式能夠把生產者和消費者的代碼相互解耦合,可是它們的行爲仍是間接地經過共享隊列耦合在一塊兒了。spa
類庫中包含一些BlockingQueue的實現,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO隊列,與 LinkedList和ArrayList類似,可是卻擁有比同步List更好的併發性能。PriorityBlockingQueue是一個按優先級順序排序的隊列,當你不但願按照FIFO的屬性處理元素時,這個PriorityBolckingQueue是很是有用的。正如其餘排序的容器同樣,PriorityBlockingQueue能夠比較元素自己的天然順序(若是它們實現了Comparable),也可使用一個 Comparator進行排序。線程
最後一個BlockingQueue的實現是SynchronousQueue,它根本上不是一個真正的隊列,由於它不會爲隊列元素維護任何存儲空間。不過,它維護一個排隊的線程清單,這些線程等待把元素加入(enqueue)隊列或者移出(dequeue)隊列。由於SynchronousQueue沒有存儲能力,因此除非另外一個線程已經準備好參與移交工做,不然put和take會一直阻止。SynchronousQueue這類隊列只有在消費者充足的時候比較合適,它們總能爲下一個任務做好準備。
生產者-消費者模式一樣帶來了一些性能方面的提升。生產者和消費者能夠併發地執行,若是一個受限於I/O,另外一個受限於CPU,那麼併發執行的所有產出會高於順序執行的產出。
————
Deque是一個雙端隊列,容許高效地在頭和尾部分別進行插入和移除。實現有ArrayDeque和LinkedBlockingDeque。
正如阻塞隊列適用於生產者-消費者模式同樣,雙端隊列使它們自身與一種叫作竊取工做(work stealing)的模式相關聯。一個消費者生產者設計中,全部的消費者只共享一個工做隊列;在竊取工做的設計中,每個消費者都有一個本身的雙端隊列。若是一個消費者完成了本身雙端隊列中的所有工做,它能夠竊取其餘消費者的雙端隊列中的末尾任務。由於工做者線程並不會競爭一個共享的任務隊列,因此竊取工做模式比傳統的生產者-消費者設計有更佳的可伸縮性;大多數時候它們訪問本身的雙端隊列,減小競爭。當一個工做者必需要訪問另外一個隊列時,它會從尾部截取,而不是從頭部,從而進一步下降對雙端隊列的爭奪。
竊取工做剛好適合用於解決消費者與生產者同體的問題—-當運行到一個任務的某單元時,可能會識別出更多的任務。好比垃圾回收時對堆作了記號,能夠並行使用竊取工做。當一個線程發現了一個新的任務單元時,它會把它放在本身隊列的末尾;當雙端隊列爲空時,它會去其餘隊列的隊尾尋找新的任務,這樣能確保每個線程都保持忙碌狀態。
————————
非阻塞算法
基於鎖的算法會帶來一些活躍度失敗的風險。若是線程在持有鎖的時候由於阻塞I/O,頁面錯誤,或其餘緣由發生延遲,極可能全部的線程都不能前進了。
一個線程的失敗或掛起不該該影響其餘線程的失敗或掛起,這樣的算法成爲非阻塞(nonblocking)算法;若是算法的每個步驟中都有一些線程可以繼續執行,那麼這樣的算法稱爲鎖自由(lock-free)算法。在線程間使用CAS進行協調,這樣的算法若是能構建正確的話,它既是非阻塞的,又是鎖自由的。非競爭的CAS老是可以成功,若是多個線程以一個CAS競爭,總會有一個勝出並前進。非阻塞算法堆死鎖和優先級倒置有「免疫性」(但它們可能會出現飢餓和活鎖,由於它們容許重進入)。
非阻塞算法經過使用低層次的併發原語,好比比較交換,取代了鎖。原子變量類向用戶提供了這些底層級原語,也可以當作「更佳的volatile變量」使用,同時提供了整數類和對象引用的原子化更新操做。
——————————-
/**
*DelayQueue(延時隊列)是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象,
*內部經過一個優先隊列(PriorityQueue)的引用實現相關數據操做。
*其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即對頭對象的延遲到期的時間最長。
*若是沒有任何到期,那麼就不會有任何頭元素,而且poll將返回null(正由於這樣,不能將null放入該隊列中)
*Delayed接口有一個名爲getDelay()的方法,它用來告知延時到期有多長時間,或延遲在多長時間以前已經到期。
*爲了排序,Delayed接口還繼承了Comparable接口,所以必須實現comparaTo(),使其產生合理比較。
*/
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private transient final ReentrantLock lock = new ReentrantLock();//鎖
/**
*接口Condition :鎖條件變量,其實例和特定的鎖綁定。提供了:
* await()、awaitUninterruptibly()、awaitNanos(long)、await(long, TimeUnit)、awaitUntil(Date)
* signal()、signalAll() 方法,實現了對線程的「等待」和「喚醒」操做
*/
private transient final Condition available = lock.newCondition();//鎖的條件變量,提供「等待」「喚醒」線程的操做
private final PriorityQueue<E> q = new PriorityQueue<E>();//內部有一個優先隊列(無界的隊列)的引用
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
/**
* 插入元素到延時隊列 (調用offer實現)
*/
public boolean add(E e) {
return offer(e);
}
/**
* 插入元素(加鎖)
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e); //調用優先隊列的實現
if (first == null || e.compareTo(first) < 0)
available.signalAll();//喚醒其餘的線程
return true;
} finally {
lock.unlock();
}
}
/**
* 插入元素
*/
public void put(E e) {
offer(e);
}
//因爲是無界的,不會對offer插入元素阻塞,參數unit無效
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
/**
* 頭元素出對列(加鎖)
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
} finally {
lock.unlock();
}
}
/**
* 得到並移除頭元素,在延時到期的狀況下。
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
/**
* 得到並移除頭元素,延遲到期或指定時間到期後。
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
if (nanos <= 0)
return null;
if (delay > nanos)
delay = nanos;
long timeLeft = available.awaitNanos(delay);
nanos -= delay – timeLeft;
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
}
}
} finally {
lock.unlock();
}
}
/**
* 獲取但不移除元素(調用優先隊列的peek方法)
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
//把隊列在的元素「剪切到」集合c中
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (;;) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
break;
c.add(q.poll());
++n;
}
if (n > 0)
available.signalAll();
return n;
} finally {
lock.unlock();
}
}
//把maxElements個元素「剪切」到集合c中
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
while (n < maxElements) {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
break;
c.add(q.poll());
++n;
}
if (n > 0)
available.signalAll();
return n;
} finally {
lock.unlock();
}
}
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
……
}