系列傳送門:java
LinkedBlockingQueue是由單鏈表構成的界限可選的阻塞隊列,如不指定邊界,則爲Integer.MAX_VALUE
,所以如不指定邊界,通常來講,插入的時候都會成功。node
LinkedBlockingQueue支持FIFO先進先出的次序對元素進行排序。編程
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; // 單鏈表節點 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 容量,若是不指定就是Integer.MAX_VALUE */ private final int capacity; /** 原子變量,記錄元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 哨兵頭節點,head.next纔是隊列的第一個元素 */ transient Node<E> head; /** * 指向最後一個元素 */ private transient Node<E> last; /** 用來控制同時只有一個線程能夠從隊頭獲取元素 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 條件隊列,隊列爲空時,執行出隊take操做的線程將會被置入該條件隊列 */ private final Condition notEmpty = takeLock.newCondition(); /** 用來控制同時只有一個線程能夠從隊尾插入元素 */ private final ReentrantLock putLock = new ReentrantLock(); /** 條件隊列,隊列滿時,執行入隊操做put的線程將會被置入該條件隊列 */ private final Condition notFull = putLock.newCondition(); }
若是但願獲取一個元素,須要先獲取takeLock鎖,且notEmpty條件成立。併發
若是但願插入一個元素,須要先獲取putLock鎖,且notFull條件成立。工具
使用LinkedBlockingQueue的時候,能夠指定容量,也能夠使用默認的Integer.MAX_VALUE,幾乎就是無界的了,固然,也能夠傳入集合對象,直接構造。學習
// 若是不指定容量,默認容量爲Integer.MAX_VALUE (1 << 30) - 1 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 傳入指定的容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化last 和 head指針 last = head = new Node<E>(null); } // 傳入指定集合對象,容量視爲Integer.MAX_VALUE,直接構造queue public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); // 寫線程獲取putLock final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
隊列的操做最核心的部分莫過於入隊和出隊了,後面分析的方法基本上都基於這兩個工具方法。this
LinkedBlockingQueue的出隊和入隊相對ArrayBlockingQueue來講就簡單不少啦:.net
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; // head向後移一位 E x = first.item; first.item = null; return x; }
隊列中的元素其實是從head.first開始的,那麼移除隊頭,其實就是將head指向head.next便可。線程
take操做將會獲取當前隊列頭部元素並移除,若是隊列爲空則阻塞當前線程直到隊列不爲空,退出阻塞時返回獲取的元素。3d
若是線程在阻塞時被其餘線程設置了中斷標誌,則拋出InterruptedException異常並返回。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; // 首先要獲取takeLock final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 若是隊列爲空, notEmpty不知足,就等着 while (count.get() == 0) { notEmpty.await(); } // 出隊 x = dequeue(); // c先賦值爲count的值, count 減 1 c = count.getAndDecrement(); // 此次出隊後至少還有一個元素,喚醒notEmpty中的讀線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // c == capacity 表示在該元素出隊以前,隊列是滿的 if (c == capacity) // 由於在這以前隊列是滿的,可能會有寫線程在等着,這裏作個喚醒 signalNotFull(); return x; } // 用於喚醒寫線程 private void signalNotFull() { final ReentrantLock putLock = this.putLock; // 獲取putLock putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
put操做將向隊尾插入元素,若是隊列未滿則插入,若是隊列已滿,則阻塞當前線程直到隊列不滿。
若是線程在阻塞時被其餘線程設置了中斷標誌,則拋出InterruptedException異常並返回。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 全部的插入操做 都約定 本地變量c 做爲是否失敗的標識 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 插入操做獲取 putLock putLock.lockInterruptibly(); try { // 隊列滿,這時notFull條件不知足,await while (count.get() == capacity) { notFull.await(); } enqueue(node); // c先返回count的值 , 原子變量 + 1 , c = count.getAndIncrement(); // 至少還有一個空位能夠插入,notFull條件是知足的,喚醒它 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c == 0 表示在該元素入隊以前,隊列是空的 if (c == 0) // 由於在這以前隊列是空的,可能會有讀線程在等着,這裏作個喚醒 signalNotEmpty(); } // 用於喚醒讀線程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; // 獲取takeLock takeLock.lock(); try { // 喚醒 notEmpty.signal(); } finally { takeLock.unlock(); } }
在take阻塞式獲取方法的基礎上額外增長超時功能,傳入一個timeout,獲取不到而阻塞的時候,若是時間到了,即便還獲取不到,也只能當即返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 這裏就是超時機制的邏輯所在 while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
在put阻塞式插入方法的基礎上額外增長超時功能,傳入一個timeout,獲取不到而阻塞的時候,若是時間到了,即便還獲取不到,也只能當即返回null。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
offer(E e)是非阻塞的方法,向隊尾插入一個元素,若是隊列未滿,則插入成功並返回true;若是隊列已滿則丟棄當前元素,並返回false。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; // 此時隊列已滿,直接返回false if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); // 插入操做 獲取putLock final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 加鎖後再校驗一次 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; // 只要不是-1,就表明成功~ }
從隊列頭部獲取並移除第一個元素,若是隊列爲空則返回null。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 若是隊列不爲空,則出隊, 並遞減計數 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
瞅一瞅隊頭的元素是啥,若是隊列爲空,則返回null。
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 實際上第一個元素是head.next Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
移除隊列中與元素o相等【指的是equals方法斷定相同】的元素,移除成功返回true,若是隊列爲空或沒有匹配元素,則返回false。
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { // trail 和 p 同時向後遍歷, 若是p匹配了,就讓trail.next = p.next表明移除p for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } // trail爲p的前驅, 但願移除p節點 void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next;// 移除p // 若是p已是最後一個節點了,就更新一下last if (last == p) last = trail; // 移除一個節點以後,隊列從滿到未滿, 喚醒notFull if (count.getAndDecrement() == capacity) notFull.signal(); } //----- 多個鎖 獲取和釋放的順序是 相反的 // 同時上鎖 void fullyLock() { putLock.lock(); takeLock.lock(); } // 同時解鎖 void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
Integer.MAX_VALUE
,所以如不指定邊界,通常來講,插入的時候都會成功。若是但願獲取一個元素,須要先獲取takeLock鎖,且notEmpty條件成立。
若是但願插入一個元素,須要先獲取putLock鎖,且notFull條件成立。