自學LinkedBlockingQueue源碼

自學LinkedBlockingQueue源碼

參考:http://www.jianshu.com/p/cc2281b1a6bcnode

本文須要關注的地方設計模式

  1. 生產者-消費者模式好處;
  2. 讀取和插入操做所使用的鎖是兩個不一樣的ReentrantLock(takeLock和putLock),它們之間的操做互相不受干擾,所以兩種操做能夠並行完成;
  3. 經過Condition的線程間通訊來實現線程的等待通知。特別注意:在Condition對象中,與wait、notify和notifyAll方法對應的分別是await、signal和signalAll。可是,Condition對Object進行了擴展,於是它也包含了wait和notify方法。必定要確保使用正確的版本——await和signal。

1、基礎知識

  阻塞隊列主要用於「生產者-消費者」 設計模式。
  BlockingQueue 方法以四種形式出現,對於不能當即知足但可能在未來某一時刻能夠知足的操做,這四種形式的處理方式不一樣:第一種是拋出一個異常,第二種是返回一個特殊值(null 或 false,具體取決於操做),第三種是在操做能夠成功前,無限期地阻塞當前線程,第四種是在放棄前只在給定的最大時間限制內阻塞。下表中總結了這些方法:
數組

1. 阻塞隊列和生產者 - 消費者模式

  LinkedBlockingQueue在BlockingQueue的實現類中使用最多(若是知道隊列的大小,能夠考慮使用ArrayBlockIngQueue,它使用循環數組實現。可是若是不知道隊列將來的大小,那麼使用ArrayBlockingQueue就必然會致使數組的來回複製,下降效率)。咱們主要關心可阻塞的put和take方法,以及支持定時的offer和poll方法。若是隊列已經滿了,那麼put方法將阻塞直到有空間可用;若是隊列爲空,那麼take方法將會阻塞直到有元素可用。隊列能夠是有界的也能夠是無界的,無界隊列永遠都不會充滿,所以無界隊列上 的put方法也永遠不會阻塞(若是沒有定義上限,將使用 Integer.MAX_VALUE 做爲上限)。安全

生產者-消費者模式好處

  • 解耦:假設生產者和消費者分別是兩個類。若是讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。若是未來消費者的代碼發生變化, 可能會影響到生產者。而若是二者都依賴於某個緩衝區(好比阻塞隊列),二者之間不直接依賴,耦合也就相應下降了,同時提升了代碼可讀性和可重用性
  • 提升併發性能:生產者直接調用消費者的某個方法,還有另外一個弊端。因爲函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回以前,生產者只好一直等在那邊。使用了生產者/消費者模式以後,因爲生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區做爲橋樑鏈接,生產者只須要往緩衝區裏丟數據,就能夠繼續生產下一個數據,而消費者只須要從緩衝區裏拿數據便可,減小了由於彼此的處理速度差別而引發的阻塞。
  • 在高併發場景下平滑短期內大量的服務請求:在訪問量劇增的狀況下,你的應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。而在生產者-消費者模式中,當數據生產快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中,等生產者的生產速度慢下來,消費者再慢慢處理掉。

2. 源碼分析

  LinkedBlockingQueue採用的是單鏈表結構,包含了頭結點和尾節點,last入隊,head出隊。
入隊:last.next=node;last = node;,即last中保存了最後一個有效元素;
出隊:Node<E> h = head; Node<E> first = h.next; head = first; E x = first.item; first.item = null; return x;,即head並無存放有效的值(爲null),將head指向的下一個節點的值返回,並將下一個節點的設爲新的head。併發

// 全部的元素都經過Node這個靜態內部類來進行存儲,這與LinkedList的處理方式徹底同樣
    static class Node<E> {
        //使用item來保存元素自己
        E item;
        //保存當前節點的後繼節點
        Node<E> next;
        Node(E x) { item = x; }
    }
    /**
        阻塞隊列所能存儲的最大容量
        用戶能夠在建立時手動指定最大容量,若是用戶沒有指定最大容量
        那麼最默認的最大容量爲Integer.MAX_VALUE.
    */
    private final int capacity;

    /** 
        當前阻塞隊列中的元素數量,因爲它的入隊列和出隊列使用的是兩個    
        不一樣的lock對象,所以不管是在入隊列仍是出隊列,都會涉及對元素數
        量的併發修改,所以這裏使用了一個原子操做類來解決對同一個變量進行併發修改的線程安全問題。
    */
    private final AtomicInteger count = new AtomicInteger(0);

    /**
     * 鏈表的頭部
     * LinkedBlockingQueue的頭部具備一個不變性:
     * 頭部的元素老是爲null,head.item==null   
     */
    private transient Node<E> head;

    /**
     * 鏈表的尾部
     * LinkedBlockingQueue的尾部也具備一個不變性:
     * 即last.next==null
     */
    private transient Node<E> last;

    /**
     元素出隊列時線程所獲取的鎖
     當執行take、poll等操做時線程須要獲取的鎖
    */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
    當隊列爲空時,經過該Condition讓從隊列中獲取元素的線程處於等待狀態
    */
    private final Condition notEmpty = takeLock.newCondition();

    /** 
      元素入隊列時線程所獲取的鎖
      當執行add、put、offer等操做時線程須要獲取鎖
    */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 
     當隊列的元素已經達到capactiy,經過該Condition讓元素入隊列的線程處於等待狀態
    */
    private final Condition notFull = putLock.newCondition();

  經過上面的分析,咱們能夠發現LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味着它們之間的操做不會存在互斥操做。在多個CPU的狀況下,它們能夠作到真正的在同一時刻既消費、又生產,可以作到並行處理。函數

put方法

/**
* 其實下面的代碼等價於以下內容:
* last.next=node;
* last = node;
*/
private void enqueue(Node<E> node) {
        last = last.next = node;
    }
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();

        int c = -1;
        Node<E> node = new Node(e);

        /* 
         在這裏首先獲取到putLock,以及當前隊列的元素數量
        */
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;

        /*
            執行可中斷的鎖獲取操做
        */
        putLock.lockInterruptibly();
        try {
            /*
            當隊列的容量到達最大容量時,此時線程將處於等待狀態,直到隊列有空閒的位置才繼續執行。使用while判
            斷依舊是爲了防止線程被"僞喚醒」而出現的狀況,即當線程被喚醒時而隊列的大小依舊等於capacity時,線程應該繼續等待。
            */
            while (count.get() == capacity) {
                notFull.await();
            }
            //讓元素進行隊列的末尾
            enqueue(node);
            c = count.getAndIncrement();
            /*注:c+1獲得的結果是新元素入隊列以後隊列元素的總和。
            當前隊列中的總元素個數小於最大容量時,此時喚醒其餘執行入隊列的線程
            讓它們能夠放入元素,若是新加入元素以後,隊列的大小等於capacity,
            那麼就意味着此時隊列已經滿了,也就沒有必需要喚醒其餘正在等待入隊列的線程,由於喚醒它們以後,它們也仍是繼續等待。
            */
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        /*當c=0時,即意味着以前的隊列是空隊列,出隊列的線程都處於等待狀態,
        如今新添加了一個新的元素,即隊列再也不爲空,所以它會喚醒正在等待獲取元素的線程。
        */
        if (c == 0)
            signalNotEmpty();
    }

    /*
    喚醒正在等待獲取元素的線程,告訴它們如今隊列中有元素了
    */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //經過notEmpty喚醒獲取元素的線程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

offer方法

/**
    該方法會返回一個boolean值,當入隊列成功返回true,入隊列失敗返回false
    */
     public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        /*
            當隊列已經滿了,它不會繼續等待,而是直接返回。所以該方法是非阻塞的。
        */
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            /*
            當獲取到鎖時,須要進行二次的檢查,由於獲取鎖和判斷不是原子操做
            */
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

take方法

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //經過takeLock獲取鎖,而且支持線程中斷
        takeLock.lockInterruptibly();
        try {
            //當隊列爲空時,則讓當前線程處於等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            //完成元素的出隊列
            x = dequeue();
            /*            
               隊列元素個數完成原子化操做-1,能夠看到count元素會在插入元素的線程和獲取元素的線程進行併發修改操做。
            */
            c = count.getAndDecrement();
            /*
              當一個元素出隊列以後,隊列的大小依舊大於1時當前線程會喚醒其餘執行元素出隊列的線程,讓它們也能夠執行元素的獲取
            */
            if (c > 1)
                notEmpty.signal();
        } finally {
            //完成鎖的釋放
            takeLock.unlock();
        }
        /*
            當c==capaitcy時,即在獲取當前元素以前,隊列已經滿了,而此時獲取元素以後,隊列就會空出一個位置,故當前線程會喚醒執行插入操做的線程通知其餘中的一個能夠進行插入操做。
        */
        if (c == capacity)
            signalNotFull();
        return x;
    }


    /**
     * 讓頭部元素出隊列的過程
     * 其最終的目的是讓原來的head被GC回收,讓其的next成爲head
     * 而且新的head的item爲null.
     * 由於LinkedBlockingQueue的頭部具備一致性:即元素爲null。
     */
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

peek方法

peek類操做其實比較簡單。由於有一個head節點去維護當前的隊首元素。只有判斷先first(head的後繼)是否爲空就好。高併發

public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

remove方法

public boolean remove(Object o) {
        if (o == null) return false;
        fullyLock();//獲取存元素鎖和取元素鎖(不容許存或取元素),由於有可能同時涉及到頭尾結點的訪問問題
        try {
            for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {// 遍歷整個鏈表
                if (o.equals(p.item)) {// 結點的值與指定值相等
                    unlink(p, trail); // 斷開結點
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

    void unlink(Node<E> p, Node<E> trail) {
        p.item = null; 
        trail.next = p.next;// 斷開p結點
        if (last == p) // 尾節點爲p結點
            last = trail; // 從新賦值尾節點
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

3.LinkedBlockingQueue與ArrayBlockingQueue的比較

  1. ArrayBlockIngQueue,它使用循環數組實現,在建立時指定存儲的大小,必定是有界的,若是不知道隊列將來的大小,那麼使用ArrayBlockingQueue就必然會致使數組的來回複製,下降效率;而LinkedBlockingQueue能夠由用戶指定最大存儲容量,也能夠無需指定,若是不指定則最大存儲容量將是Integer.MAX_VALUE。
  2. ArrayBlockingQueue中在入隊列和出隊列操做過程當中,使用的是同一個lock,因此即便在多核CPU的狀況下,其讀取和操做的都沒法作到並行,而LinkedBlockingQueue的讀取和插入操做所使用的鎖是兩個不一樣的lock,它們之間的操做互相不受干擾,所以兩種操做能夠並行完成,故LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。
相關文章
相關標籤/搜索