LinkedBlockingQueue 和 ConcurrentLinkedQueue 同樣都是由 head 節點和 last 節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是經過這個 next 關聯起來,從而組成一張鏈表結構的隊列。默認狀況下 head 節點存儲的元素爲空,last 節點等於 head 節點。和 ConcurrentLinkedQueue 不一樣的是 LinkedBlockingQueue 是基於 ReentrantLock 鎖實現的,所以 head、last 以及 Node.item、Node.next 都不用 volatile 修辭。java
// head.item == null transient Node<E> head; // last.next == null private transient Node<E> last; private static class Node<E> { E item; Node<E> next; }
默認狀況下 head、last 都是空節點。node
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
private final ReentrantLock takeLock = new ReentrantLock(); // 集合已空則調用notEmpty.await,等集合添加元素後調用notEmpty.singal private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); // 集合已滿則調用notFull.await,等集合取出元素後調用notFull.singal private final Condition notFull = putLock.newCondition();
和 ConcurrentLinkedQueue 不一樣,last 是實時指向尾節點的,也就是每次插入元素時都會更新尾節點。代碼以下數據結構
// offer 非阻塞 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; // 1. c表示插入前元素的個數 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 2. 元素入隊有2個操做:一是元素添加到last.next並更新last; // 二是喚醒阻塞的put操做繼續添加元素(只有put時會阻塞notFull.await) if (count.get() < capacity) { // 2.1 元素入隊 enqueue(node); // 2.2 c表示插入前元素的個數 c = count.getAndIncrement(); // 2.3 集合未滿,喚醒put操做,繼續添加元素 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } // 3. 插入前集合爲空,則喚醒take操做,能夠取元素了 if (c == 0) signalNotEmpty(); return c >= 0; }
元素入隊 enqueue 有兩個操做:一是 last.next 節點指向 node;二是 last 指向新的尾節點 node。也就是說 last 必定是指向尾節點的。ide
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
// poll 非阻塞 public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; // 1. poll操做前元素的個數 int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 2. 元素出隊有2個操做:一是head.next出隊 // 二是喚醒阻塞的take操做繼續取出元素(只有take時會阻塞notEmpty.await) if (count.get() > 0) { // 2.1 head.next出隊 x = dequeue(); // 2.2 c爲poll前元素的個數 c = count.getAndDecrement(); // 2.3 集合中元素不爲空,喚醒take操做,斷續取元素 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } // 3. 取元素前集合已滿,則喚醒put操做,能夠繼續添加元素 if (c == capacity) signalNotFull(); return x; }
元素出隊 dequeue 有三個操做:一是 head.next 出隊;二是 head.next 指向本身,等待 GC 回收;三是修改 head 節點。源碼分析
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
// 刪除指定 value 的元素 public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { // 刪除指定節點 p,其中 trail 爲 p 的前驅節點 unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } // 刪除指定節點 p,其中 trail 爲 p 的前驅節點 // 注意 p.next 沒變 void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); }
// 將集合中的所有元素取出到集合 c 中 public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } // 將集合中的 maxElements 個元素取出到集合 c 中 public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; boolean signalNotFull = false; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node<E> h = head; int i = 0; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null; h.next = h; h = p; ++i; } return n; } finally { if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }
天天用心記錄一點點。內容也許不重要,但習慣很重要!this