JUC源碼分析-集合篇(六)LinkedBlockingQueue

JUC源碼分析-集合篇(六)LinkedBlockingQueue

1. 數據結構

LinkedBlockingQueue數據結構

LinkedBlockingQueue 和 ConcurrentLinkedQueue 同樣都是由 head 節點和 last 節點組成,每一個節點(Node)由節點元素(item)和指向下一個節點(next)的引用組成,節點與節點之間就是經過這個 next 關聯起來,從而組成一張鏈表結構的隊列。默認狀況下 head 節點存儲的元素爲空,last 節點等於 head 節點。和 ConcurrentLinkedQueue 不一樣的是 LinkedBlockingQueue 是基於 ReentrantLock 鎖實現的,所以 head、last 以及 Node.item、Node.next 都不用 volatile 修辭。java

// head.item == null
transient Node<E> head;
// last.next == null
private transient Node<E> last;

private static class Node<E> {
    E item;
    Node<E> next;
}

默認狀況下 head、last 都是空節點。node

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

2. 基於 ReentrantLock 的實現

private final ReentrantLock takeLock = new ReentrantLock();
// 集合已空則調用notEmpty.await,等集合添加元素後調用notEmpty.singal
private final Condition notEmpty = takeLock.newCondition();

private final ReentrantLock putLock = new ReentrantLock();
// 集合已滿則調用notFull.await,等集合取出元素後調用notFull.singal
private final Condition notFull = putLock.newCondition();

3. 入隊 offer

LinkedBlockingQueue入隊節點變化

和 ConcurrentLinkedQueue 不一樣,last 是實時指向尾節點的,也就是每次插入元素時都會更新尾節點。代碼以下數據結構

// offer 非阻塞
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    // 1. c表示插入前元素的個數
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 2. 元素入隊有2個操做:一是元素添加到last.next並更新last;
        //    二是喚醒阻塞的put操做繼續添加元素(只有put時會阻塞notFull.await)
        if (count.get() < capacity) {
            // 2.1 元素入隊
            enqueue(node);
            // 2.2 c表示插入前元素的個數
            c = count.getAndIncrement();
            // 2.3 集合未滿,喚醒put操做,繼續添加元素
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // 3. 插入前集合爲空,則喚醒take操做,能夠取元素了
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

元素入隊 enqueue 有兩個操做:一是 last.next 節點指向 node;二是 last 指向新的尾節點 node。也就是說 last 必定是指向尾節點的。ide

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

4. 出隊 poll

LinkedBlockingQueue出隊節點變化

// poll 非阻塞
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    // 1. poll操做前元素的個數
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 2. 元素出隊有2個操做:一是head.next出隊
        //    二是喚醒阻塞的take操做繼續取出元素(只有take時會阻塞notEmpty.await)
        if (count.get() > 0) {
            // 2.1 head.next出隊
            x = dequeue();
            // 2.2 c爲poll前元素的個數
            c = count.getAndDecrement();
            // 2.3 集合中元素不爲空,喚醒take操做,斷續取元素
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    // 3. 取元素前集合已滿,則喚醒put操做,能夠繼續添加元素
    if (c == capacity)
        signalNotFull();
    return x;
}

元素出隊 dequeue 有三個操做:一是 head.next 出隊;二是 head.next 指向本身,等待 GC 回收;三是修改 head 節點。源碼分析

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

5. 刪除元素 remove

// 刪除指定 value 的元素
public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                // 刪除指定節點 p,其中 trail 爲 p 的前驅節點
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

// 刪除指定節點 p,其中 trail 爲 p 的前驅節點
// 注意 p.next 沒變
void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
    p.item = null;
    trail.next = p.next;
    if (last == p)
        last = trail;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

5. 將集合中的元素取出 drainTo

// 將集合中的所有元素取出到集合 c 中
public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

// 將集合中的 maxElements 個元素取出到集合 c 中
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    boolean signalNotFull = false;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        int n = Math.min(maxElements, count.get());
        // count.get provides visibility to first n Nodes
        Node<E> h = head;
        int i = 0;
        try {
            while (i < n) {
                Node<E> p = h.next;
                c.add(p.item);
                p.item = null;
                h.next = h;
                h = p;
                ++i;
            }
            return n;
        } finally {
            if (i > 0) {
                // assert h.item == null;
                head = h;
                signalNotFull = (count.getAndAdd(-i) == capacity);
            }
        }
    } finally {
        takeLock.unlock();
        if (signalNotFull)
            signalNotFull();
    }
}

天天用心記錄一點點。內容也許不重要,但習慣很重要!this

相關文章
相關標籤/搜索