JDK源碼那些事兒之LinkedBlockingDeque

阻塞隊列中目前還剩下一個比較特殊的隊列實現,相比較前面講解過的隊列,本文中要講的LinkedBlockingDeque比較容易理解了,可是與以前講解過的阻塞隊列又有些不一樣,從命名上你應該能看出一些端倪,接下來就一塊兒看看這個特殊的阻塞隊列java

前言

JDK版本號:1.8.0_171

LinkedBlockingDeque在結構上有別於以前講解過的阻塞隊列,它不是Queue而是Deque,中文翻譯成雙端隊列,雙端隊列指能夠從任意一端入隊或者出隊元素的隊列,實現了在隊列頭和隊列尾的高效插入和移除node

LinkedBlockingDeque是鏈表實現的線程安全的無界的同時支持FIFO、LIFO的雙端阻塞隊列,能夠回顧下以前的LinkedBlockingQueue阻塞隊列特色,本質上是相似的,可是又有些不一樣:設計模式

  • 內部是經過Node節點組成的鏈表來實現的,固然爲了支持雙端操做,結點結構不一樣
  • LinkedBlockingQueue經過兩個ReentrantLock鎖保護競爭資源,實現了多線程對競爭資源的互斥訪問,入隊和出隊互不影響,可同時操做,然而LinkedBlockingDeque只設置了一個全局ReentrantLock鎖,兩個條件對象實現互斥訪問,性能上要比LinkedBlockingQueue差一些
  • 無界,默認鏈表長度爲Integer.MAX_VALUE,本質上仍是有界
  • 阻塞隊列,是指多線程訪問競爭資源時,當競爭資源已被某線程獲取時,其它要獲取該資源的線程須要阻塞等待

Queue和Deque的關係有點相似於單鏈表和雙向鏈表,LinkedBlockingQueue和LinkedBlockingDeque的內部結點實現就是單鏈表和雙向鏈表的區別,具體可參考源碼安全

在第二點中可能有些人有些疑問,兩個互斥鎖和一個互斥鎖的區別在哪裏?咱們能夠考慮如下場景:多線程

A線程先進行入隊操做,B線程隨後進行出隊操做,若是是LinkedBlockingQueue,A線程入隊過程還未結束(已得到鎖還未釋放),B線程出隊操做不會被阻塞等待(鎖不一樣),若是是LinkedBlockingDeque則B線程會被阻塞等待(同一把鎖)A線程完成操做才繼續執行併發

LinkedBlockingQueue通常的操做是獲取一把鎖就能夠,但有些操做例如remove操做,則須要同時獲取兩把鎖,以前的LinkedBlockingQueue講解曾經說明過,這裏就不詳細講解了源碼分析

類定義

實現BlockingDeque接口,其中定義了雙端隊列應該實現的方法,具體方法不說明了,主要是每一個方法都分爲了First和Last兩種方式,從頭部或者尾部進行隊列操做性能

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable

LinkedBlockingDeque關係圖

常量/變量

/**
     * 頭結點
     */
    transient Node<E> first;

    /**
     * 尾結點
     */
    transient Node<E> last;

    /** 雙端隊列實際結點個數 */
    private transient int count;

    /** 雙端隊列容量 */
    private final int capacity;

    /** 互斥重入鎖 */
    final ReentrantLock lock = new ReentrantLock();

    /** 非空條件對象 */
    private final Condition notEmpty = lock.newCondition();

    /** 非滿條件對象 */
    private final Condition notFull = lock.newCondition();

內部類

爲了實現雙端隊列,內部使用了雙向鏈表,不像LinkedBlockingQueue使用的是單鏈表,前驅和後繼指針的特殊狀況須要注意this

/** 雙向鏈表Node */
    static final class Node<E> {
        /**
         * 節點數據值,移除則爲null
         */
        E item;

        /**
         * 下列狀況之一:
         * - 前驅節點
         * - 前驅是尾節點,則prev指向當前節點
         * - 無前驅節點則爲null
         */
        Node<E> prev;

        /**
         * 下列狀況之一:
         * - 後繼節點
         * - 後繼是頭節點,則next指向當前節點
         * - 無後繼節點則爲null
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

構造方法

構造方法比較簡單,可設置容量上限,不設置默認Integer.MAX_VALUE,相似以前的LinkedBlockingQueuespa

public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

    public LinkedBlockingDeque(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock lock = this.lock;
        // 確保可見性
        lock.lock(); // Never contended, but necessary for visibility
        try {
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (!linkLast(new Node<E>(e)))
                    throw new IllegalStateException("Deque full");
            }
        } finally {
            lock.unlock();
        }
    }

重要方法

offerFirst/offerLast

在入隊操做中,將每種方法都分紅了隊首入隊和隊尾入隊兩種,在獲取鎖以後最終調用方法是linkFirst/linkLast。在putFirst中若是隊列已滿則經過notFull.await()阻塞操做,比較容易理解

入隊方法以下:

類別 失敗返回特殊值 失敗拋異常 阻塞等待 超時阻塞等待
隊首 offerFirst push/addFirst putFirst offerFirst(E e, long timeout, TimeUnit unit)
隊尾 offer/offerLast add/addLast put/putLast offer/offerLast(E e, long timeout, TimeUnit unit)
public boolean offerFirst(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkFirst(node);
        } finally {
            lock.unlock();
        }
    }
    public boolean offerLast(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return linkLast(node);
        } finally {
            lock.unlock();
        }
    }

linkFirst/linkLast

將結點添加到頭部或者尾部,若是隊列已滿則返回false,在調用方法以前已經獲取互斥鎖才進行操做

/**
     * Links node as first element, or returns false if full.
     */
    private boolean linkFirst(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        // 隊列已滿
        if (count >= capacity)
            return false;
        // 更新first
        Node<E> f = first;
        node.next = f;
        first = node;
        // last爲空則表示原隊列爲空,當前隊列僅有node則last更新指向node便可
        if (last == null)
            last = node;
        else
            // 原隊列非空,更新原first的前驅指針
            f.prev = node;
        ++count;
        // 經過條件對象喚醒出隊操做阻塞線程
        notEmpty.signal();
        return true;
    }

    /**
     * Links node as last element, or returns false if full.
     */
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        // 更新last
        Node<E> l = last;
        node.prev = l;
        last = node;
        // first爲空則表示原隊列爲空,當前隊列僅有node則first更新指向node便可
        if (first == null)
            first = node;
        else
            // 原隊列非空,更新原last的後繼指針
            l.next = node;
        // 實際節點數量 + 1
        ++count;
        // 經過條件對象喚醒出隊操做阻塞線程
        notEmpty.signal();
        return true;
    }

pollFirst/pollLast

在出隊操做中,一樣將每種方法都分紅了隊首出隊和隊尾出隊兩種,在獲取鎖以後最終調用方法是unlinkFirst/unlinkLast。在takeFirst中若是隊列爲空則經過notEmpty.await()阻塞操做,比較容易理解

出隊方法以下:

類別 失敗返回特殊值 失敗拋異常 阻塞等待 超時阻塞等待
隊首 poll/pollFirst pop/remove/removeFirst take/takeFirst poll/pollFirst(long timeout, TimeUnit unit)
隊尾 pollLast removeLast takeLast pollLast(long timeout, TimeUnit unit)
public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }

    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }

unlinkFirst/unlinkLast

移除返回隊頭元素或隊尾元素,假如隊列爲空則返回null

/**
     * Removes and returns first element, or null if empty.
     */
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        // 判空
        if (f == null)
            return null;
        // 更新first
        Node<E> n = f.next;
        E item = f.item;
        // item置空,next指向本身,方便gc回收
        f.item = null;
        f.next = f; // help GC
        first = n;
        // 當前隊列爲空,last指向置空
        if (n == null)
            last = null;
        else
            // 當前隊列非空,新的頭結點前驅置空
            n.prev = null;
        // 實際結點數量 - 1
        --count;
        // 經過條件對象喚醒入隊操做阻塞線程
        notFull.signal();
        return item;
    }

    /**
     * Removes and returns last element, or null if empty.
     */
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last;
        if (l == null)
            return null;
        // 更新last
        Node<E> p = l.prev;
        // item置空,prev指向本身,方便gc回收
        E item = l.item;
        l.item = null;
        l.prev = l; // help GC
        last = p;
        // 當前隊列爲空,last指向置空
        if (p == null)
            first = null;
        else
            // 當前隊列非空,新的尾結點後繼置空
            p.next = null;
        --count;
        // 經過條件對象喚醒入隊操做阻塞線程
        notFull.signal();
        return item;
    }

unlink

將隊列中匹配的結點刪除,鏈表中進行解除先後關聯便可,注意下若是x處於隊列中間(非頭和尾結點),則x自己的前驅和後繼指針不會被更新修改,爲的是防止迭代器循環到x找不到先後結點,避免迭代器異常

/**
     * Unlinks x.
     */
    void unlink(Node<E> x) {
        // assert lock.isHeldByCurrentThread();
        // 前驅結點和後繼結點
        Node<E> p = x.prev;
        Node<E> n = x.next;
        // 前驅爲空,至關於刪除頭結點
        if (p == null) {
            unlinkFirst();
        } else if (n == null) {
            // 後繼爲空,至關於刪除尾結點
            unlinkLast();
        } else {
            // 前驅後繼都不爲空,解除刪除結點與先後結點的關係,item置空
            // 注意,這裏x自己的前驅和後繼沒有被更新修改,爲的是防止迭代器循環到x找不到先後結點,避免迭代器異常
            p.next = n;
            n.prev = p;
            x.item = null;
            --count;
            // 經過條件對象喚醒入隊操做阻塞線程
            notFull.signal();
        }
    }

peekFirst/peekLast

peek操做直接經過首尾節點指向得到對應的item便可,不會刪除節點

public E peekFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (first == null) ? null : first.item;
        } finally {
            lock.unlock();
        }
    }

    public E peekLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (last == null) ? null : last.item;
        } finally {
            lock.unlock();
        }
    }

removeFirstOccurrence/removeLastOccurrence

刪除第一個/最後一個知足條件的隊列結點,removeFirstOccurrence從前向後進行匹配,removeLastOccurrence從後向前進行匹配,找到第一個知足條件的結點進行刪除操做

public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 從前向後循環判斷相等則經過unlink移除
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    public boolean removeLastOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 從後向前循環判斷相等則經過unlink移除
            for (Node<E> p = last; p != null; p = p.prev) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

drainTo

轉移隊列操做

public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(maxElements, count);
            for (int i = 0; i < n; i++) {
                // 先添加防止未添加到新隊列中時原隊列結點出隊
                c.add(first.item);   // In this order, in case add() throws.
                // 解除關聯
                unlinkFirst();
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

clear

清空隊列操做,比較簡單,很好理解

public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Node<E> f = first; f != null; ) {
                f.item = null;
                Node<E> n = f.next;
                f.prev = null;
                f.next = null;
                f = n;
            }
            first = last = null;
            count = 0;
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }

迭代器

因爲其雙向鏈表的實現,迭代器可分爲升序迭代器(Itr)和倒序迭代器(DescendingItr),經過AbstractItr封裝公共操做方法,Itr和DescendingItr分別實現對應不一樣的方法,模板方法設計模式寫法可借鑑,一個從頭節點開始向後進行遍歷,一個從尾節點向後進行遍歷

private abstract class AbstractItr implements Iterator<E> {
        /**
         * next方法返回的node
         */
        Node<E> next;

        /**
         * 保存next的item,防止hasNext爲true後節點被刪除再調用next獲取不到值的狀況
         */
        E nextItem;

        /**
         * 最近一次調用next返回的節點,若是經過調用remove刪除了此元素,則重置爲null
         */
        private Node<E> lastRet;

        // 兩個不一樣迭代器實現不一樣
        abstract Node<E> firstNode();
        abstract Node<E> nextNode(Node<E> n);

        // 構造方法初始化,設置next和nextItem
        AbstractItr() {
            // set to initial position
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                next = firstNode();
                nextItem = (next == null) ? null : next.item;
            } finally {
                lock.unlock();
            }
        }

        /**
         * 返回後繼節點
         */
        private Node<E> succ(Node<E> n) {
            for (;;) {
                Node<E> s = nextNode(n);
                if (s == null)
                    return null;
                else if (s.item != null)
                    return s;
                else if (s == n)
                    return firstNode();
                else
                    n = s;
            }
        }

        /**
         * 設置下一次執行next應該返回的值
         */
        void advance() {
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                // assert next != null;
                next = succ(next);
                nextItem = (next == null) ? null : next.item;
            } finally {
                lock.unlock();
            }
        }

        public boolean hasNext() {
            return next != null;
        }

        public E next() {
            if (next == null)
                throw new NoSuchElementException();
            lastRet = next;
            E x = nextItem;
            advance();
            return x;
        }
        
        // 移除操做,注意,這裏直接在原隊列中移除了lastRet對應的節點
        public void remove() {
            Node<E> n = lastRet;
            if (n == null)
                throw new IllegalStateException();
            lastRet = null;
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                if (n.item != null)
                    unlink(n);
            } finally {
                lock.unlock();
            }
        }
    }

    /** Forward iterator */
    // 從頭節點向後遍歷迭代器
    private class Itr extends AbstractItr {
        Node<E> firstNode() { return first; }
        Node<E> nextNode(Node<E> n) { return n.next; }
    }

    /** Descending iterator */
    // 從尾節點向後遍歷迭代器
    private class DescendingItr extends AbstractItr {
        Node<E> firstNode() { return last; }
        Node<E> nextNode(Node<E> n) { return n.prev; }
    }

總結

源碼分析完畢,整理LinkedBlockingDeque有以下特色:

  • 鏈表實現的線程安全的無界的同時支持FIFO、LIFO的雙端阻塞隊列
  • 經過結點同時持有前驅結點和後繼結點的引用支持隊列的頭和尾的雙端操做
  • 迭代器支持正向和反向兩種迭代方式
  • 大部分方法都爭搶惟一一把可重入互斥鎖,在競爭激烈的多線程併發環境下吞吐量比較低
  • 刪除中間結點(非頭尾)時,保留結點先後前驅指針,防止迭代器異常

以上內容若有問題歡迎指出,筆者驗證後將及時修正,謝謝

相關文章
相關標籤/搜索