【JUC】JDK1.8源碼分析之LinkedBlockingQueue(四)

1、前言java

  分析完了ArrayBlockingQueue後,接着分析LinkedBlockingQueue,與ArrayBlockingQueue不相同,LinkedBlockingQueue底層採用的是鏈表結構,其源碼也相對比較簡單,下面進行正式的分析。node

2、LinkedBlockingQueue數據結構數據結構

  從LinkedBlockingQueue的命名就大體知道其數據結構採用的是鏈表結構,經過源碼也能夠驗證咱們的猜想,其數據結構以下。多線程

  說明:能夠看到LinkedBlockingQueue採用的是單鏈表結構,包含了頭結點和尾節點。併發

3、LinkedBlockingQueue源碼分析less

  3.1 類的繼承關係  ide

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {}

  說明:LinkedBlockingQueue繼承了AbstractQueue抽象類,AbstractQueue定義了對隊列的基本操做;同時實現了BlockingQueue接口,BlockingQueue表示阻塞型的隊列,其對隊列的操做可能會拋出異常;同時也實現了Searializable接口,表示能夠被序列化。函數

  3.2 類的內部類高併發

  LinkedBlockingQueue內部有一個Node類,表示結點,用於存放元素,其源碼以下。  源碼分析

    static class Node<E> {
    // 元素
        E item;
    // next域
    Node<E> next;
    // 構造函數
        Node(E x) { item = x; }
    }

  說明:Node類很是簡單,包含了兩個域,分別用於存放元素和指示下一個結點。

  3.3 類的屬性  

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 版本序列號
    private static final long serialVersionUID = -6903933977591709194L;
    // 容量
    private final int capacity;
    // 元素的個數
    private final AtomicInteger count = new AtomicInteger();
    // 頭結點
    transient Node<E> head;
    // 尾結點
    private transient Node<E> last;
    // 取元素鎖
    private final ReentrantLock takeLock = new ReentrantLock();
    // 非空條件
    private final Condition notEmpty = takeLock.newCondition();
    // 存元素鎖
    private final ReentrantLock putLock = new ReentrantLock();
    // 非滿條件
    private final Condition notFull = putLock.newCondition();
}
View Code

  說明:能夠看到LinkedBlockingQueue包含了讀、寫重入鎖(與ArrayBlockingQueue不一樣,ArrayBlockingQueue只包含了一把重入鎖),讀寫操做進行了分離,而且不一樣的鎖有不一樣的Condition條件(與ArrayBlockingQueue不一樣,ArrayBlockingQueue是一把重入鎖的兩個條件)。

  3.4 類的構造函數

  1. LinkedBlockingQueue()型構造函數  

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
View Code

  說明:該構造函數用於建立一個容量爲 Integer.MAX_VALUE 的 LinkedBlockingQueue。

  2. LinkedBlockingQueue(int)型構造函數  

    public LinkedBlockingQueue(int capacity) {
        // 初始化容量必須大於0
        if (capacity <= 0) throw new IllegalArgumentException();
        // 初始化容量
        this.capacity = capacity;
        // 初始化頭結點和尾結點
        last = head = new Node<E>(null);
    }
View Code

  說明:該構造函數用於建立一個具備給定(固定)容量的 LinkedBlockingQueue。

  3. LinkedBlockingQueue(Collection<? extends E>)型構造函數 

    public LinkedBlockingQueue(Collection<? extends E> c) {
        // 調用重載構造函數
        this(Integer.MAX_VALUE);
        // 存鎖
        final ReentrantLock putLock = this.putLock;
        // 獲取鎖
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) { // 遍歷c集合
                if (e == null) // 元素爲null,拋出異常
                    throw new NullPointerException();
                if (n == capacity) // 
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
View Code

  說明:該構造函數用於建立一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含給定 collection 的元素,元素按該 collection 迭代器的遍歷順序添加。

  3.5 核心函數分析

  1. put函數  

    public void put(E e) throws InterruptedException {
        // 值不爲空
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        // 
        int c = -1;
        // 新生結點
        Node<E> node = new Node<E>(e);
        // 存元素鎖
        final ReentrantLock putLock = this.putLock;
        // 元素個數
        final AtomicInteger count = this.count;
        // 若是當前線程未被中斷,則獲取鎖
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) { // 元素個數到達指定容量
                // 在notFull條件上進行等待
                notFull.await();
            }
            // 入隊列
            enqueue(node);
            // 更新元素個數,返回的是之前的元素個數
            c = count.getAndIncrement();
            if (c + 1 < capacity) // 元素個數是否小於容量
                // 喚醒在notFull條件上等待的某個線程
                notFull.signal();
        } finally {
            // 釋放鎖
            putLock.unlock();
        }
        if (c == 0) // 元素個數爲0,表示已有take線程在notEmpty條件上進入了等待,則須要喚醒在notEmpty條件上等待的線程
            signalNotEmpty();
    }
View Code

  說明:put函數用於存放元素,其流程以下。

  ① 判斷元素是否爲null,如果,則拋出異常,不然,進入步驟②

  ② 獲取存元素鎖,並上鎖,若是當前線程被中斷,則拋出異常,不然,進入步驟③

  ③ 判斷當前隊列中的元素個數是否已經達到指定容量,如果,則在notFull條件上進行等待,不然,進入步驟④

  ④ 將新生結點入隊列,更新隊列元素個數,若元素個數小於指定容量,則喚醒在notFull條件上等待的線程,表示能夠繼續存放元素。進入步驟⑤

  ⑤ 釋放鎖,判斷結點入隊列以前的元素個數是否爲0,如果,則喚醒在notEmpty條件上等待的線程(表示隊列中沒有元素,取元素線程被阻塞了)。

  put函數中會調用到enqueue函數和signalNotEmpty函數,enqueue函數源碼以下  

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        // 更新尾結點域
        last = last.next = node;
    }
View Code

  說明:能夠看到,enqueue函數只是更新了尾節點。signalNotEmpty函數源碼以下 

    private void signalNotEmpty() {
        // 取元素鎖
        final ReentrantLock takeLock = this.takeLock;
        // 獲取鎖
        takeLock.lock();
        try {
            // 喚醒在notEmpty條件上等待的某個線程
            notEmpty.signal();
        } finally {
            // 釋放鎖
            takeLock.unlock();
        }
    }
View Code

  說明:signalNotEmpty函數用於喚醒在notEmpty條件上等待的線程,其首先獲取取元素鎖,而後上鎖,而後喚醒在notEmpty條件上等待的線程,最後釋放取元素鎖。

  2. offer函數 

    public boolean offer(E e) {
        // 確保元素不爲null
        if (e == null) throw new NullPointerException();
        // 獲取計數器
        final AtomicInteger count = this.count;
        if (count.get() == capacity) // 元素個數到達指定容量
            // 返回
            return false;
        // 
        int c = -1;
        // 新生結點
        Node<E> node = new Node<E>(e);
        // 存元素鎖
        final ReentrantLock putLock = this.putLock;
        // 獲取鎖
        putLock.lock();
        try {
            if (count.get() < capacity) { // 元素個數小於指定容量
                // 入隊列
                enqueue(node);
                // 更新元素個數,返回的是之前的元素個數
                c = count.getAndIncrement();
                if (c + 1 < capacity) // 元素個數是否小於容量
                    // 喚醒在notFull條件上等待的某個線程
                    notFull.signal();
            }
        } finally {
            // 釋放鎖
            putLock.unlock();
        }
        if (c == 0) // 元素個數爲0,則喚醒在notEmpty條件上等待的某個線程
            signalNotEmpty();
        return c >= 0;
    }
View Code

  說明:offer函數也用於存放元素,offer函數添加元素不會拋出異常(其餘的域put函數相似)。

  3. take函數

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 獲取計數器
        final AtomicInteger count = this.count;
        // 獲取取元素鎖
        final ReentrantLock takeLock = this.takeLock;
        // 若是當前線程未被中斷,則獲取鎖
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) { // 元素個數爲0
                // 在notEmpty條件上等待
                notEmpty.await();
            }
            // 出隊列
            x = dequeue();
            // 更新元素個數,返回的是之前的元素個數
            c = count.getAndDecrement();
            if (c > 1) // 元素個數大於1,則喚醒在notEmpty上等待的某個線程
                notEmpty.signal();
        } finally {
            // 釋放鎖
            takeLock.unlock();
        }
        if (c == capacity) // 元素個數到達指定容量
            // 喚醒在notFull條件上等待的某個線程
            signalNotFull();
        // 返回
        return x;
    }
View Code

  說明:take函數用於獲取一個元素,其與put函數相對應,其流程以下。

  ① 獲取取元素鎖,並上鎖,若是當前線程被中斷,則拋出異常,不然,進入步驟②

  ② 判斷當前隊列中的元素個數是否爲0,如果,則在notEmpty條件上進行等待,不然,進入步驟③

  ③ 出隊列,更新隊列元素個數,若元素個數大於1,則喚醒在notEmpty條件上等待的線程,表示能夠繼續取元素。進入步驟④

  ④ 釋放鎖,判斷結點出隊列以前的元素個數是否爲指定容量,如果,則喚醒在notFull條件上等待的線程(表示隊列已滿,存元素線程被阻塞了)。

  take函數調用到了dequeue函數和signalNotFull函數,dequeue函數源碼以下  

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        // 頭結點
        Node<E> h = head;
        // 第一個結點
        Node<E> first = h.next;
        // 頭結點的next域爲自身
        h.next = h; // help GC
        // 更新頭結點
        head = first;
        // 返回頭結點的元素
        E x = first.item;
        // 頭結點的item域賦值爲null
        first.item = null;
        // 返回結點元素
        return x;
    }
View Code

  說明:dequeue函數的做用是將頭結點更新爲以前頭結點的下一個結點,而且將更新後的頭結點的item域設置爲null。signalNotFull函數的源碼以下

    private void signalNotFull() {
        // 存元素鎖
        final ReentrantLock putLock = this.putLock;
        // 獲取鎖
        putLock.lock();
        try {
            // 喚醒在notFull條件上等待的某個線程
            notFull.signal();
        } finally {
            // 釋放鎖
            putLock.unlock();
        }
    }
View Code

  說明:signalNotFull函數用於喚醒在notFull條件上等待的某個線程,其首先獲取存元素鎖,而後上鎖,而後喚醒在notFull條件上等待的線程,最後釋放存元素鎖。

  4. poll函數  

    public E poll() {
        // 獲取計數器
        final AtomicInteger count = this.count;
        if (count.get() == 0) // 元素個數爲0
            return null;
        // 
        E x = null;
        int c = -1;
        // 取元素鎖
        final ReentrantLock takeLock = this.takeLock;
        // 獲取鎖
        takeLock.lock();
        try {
            if (count.get() > 0) { // 元素個數大於0
                // 出隊列
                x = dequeue();
                // 更新元素個數,返回的是之前的元素個數
                c = count.getAndDecrement();
                if (c > 1) // 元素個數大於1
                    // 喚醒在notEmpty條件上等待的某個線程
                    notEmpty.signal();
            }
        } finally {
            // 釋放鎖
            takeLock.unlock();
        }
        if (c == capacity) // 元素大小達到指定容量
            // 喚醒在notFull條件上等待的某個線程
            signalNotFull();
        // 返回元素
        return x;
    }
View Code

  說明:poll函數也用於存放元素,poll函數添加元素不會拋出異常(其餘的與take函數相似)。

  5. remove函數  

    public boolean remove(Object o) {
        // 元素爲null,返回false
        if (o == null) return false;
        // 獲取存元素鎖和取元素鎖(不容許存或取元素)
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) { // 遍歷整個鏈表
                if (o.equals(p.item)) { // 結點的值與指定值相等
                    // 斷開結點
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }
View Code

  說明:remove函數的流程以下

  ① 獲取讀、寫鎖(防止此時繼續出、入隊列)。進入步驟②

  ② 遍歷鏈表,尋找指定元素,若找到,則將該結點從鏈表中斷開,有利於被GC,進入步驟③

  ③ 釋放讀、寫鎖(能夠繼續出、入隊列)。步驟②中找到指定元素則返回true,不然,返回false。

  其中,remove函數會調用unlink函數,其源碼以下  

    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        // 結點的item域賦值爲null
        p.item = null;
        // 斷開p結點
        trail.next = p.next;
        if (last == p) // 尾節點爲p結點
            // 從新賦值尾節點
            last = trail;
        if (count.getAndDecrement() == capacity) // 更新元素個數,返回的是之前的元素個數,若結點個數到達指定容量
            // 喚醒在notFull條件上等待的某個線程
            notFull.signal();
    }
View Code

  說明:unlink函數用於將指定結點從鏈表中斷開,而且更新隊列元素個數,而且判斷若以前隊列元素的個數達到了指定容量,則會喚醒在notFull條件上等待的某個線程。

4、示例

  下面經過一個示例來了解LinkedBlockingQueue的使用。  

package com.hust.grid.leesf.collections;

import java.util.concurrent.LinkedBlockingQueue;
class PutThread extends Thread {
    private LinkedBlockingQueue<Integer> lbq;
    public PutThread(LinkedBlockingQueue<Integer> lbq) {
        this.lbq = lbq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("put " + i);
                lbq.put(i);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class GetThread extends Thread {
    private LinkedBlockingQueue<Integer> lbq;
    public GetThread(LinkedBlockingQueue<Integer> lbq) {
        this.lbq = lbq;
    }
    
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println("take " + lbq.take());
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class LinkedBlockingQueueDemo {
    public static void main(String[] args) {
        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>();
        
        PutThread p1 = new PutThread(lbq);
        GetThread g1 = new GetThread(lbq);
        
        p1.start();
        g1.start();
    }
}
View Code

  運行結果:  

put 0
take 0
put 1
take 1
put 2
take 2
put 3
take 3
put 4
take 4
put 5
take 5
put 6
take 6
put 7
take 7
put 8
take 8
put 9
take 9
View Code

  說明:示例中使用了兩個線程,一個用於存元素,一個用於讀元素,存和讀各10次,每一個線程存一個元素或者讀一個元素後都會休眠100ms,能夠看到結果是交替打 印,而且首先打印的確定是put線程語句(由於若取線程先取元素,此時隊列並無元素,其會阻塞,等待存線程存入元素),而且最終程序能夠正常結束。

  ① 若修改取元素線程,將存的元素的次數修改成15次(for循環的結束條件改成15便可),運行結果以下:  

put 0
take 0
put 1
take 1
put 2
take 2
put 3
take 3
put 4
take 4
put 5
take 5
put 6
take 6
put 7
take 7
put 8
take 8
put 9
take 9
View Code

  說明:運行結果與上面的運行結果相同,可是,此時程序沒法正常結束,由於take方法被阻塞了,等待被喚醒。

5、總結

  LinkedBlockingQueue的源碼相對比較簡單,其也是經過ReentrantLock和Condition條件來保證多線程的正確訪問的,而且取元素(出隊列)和存元素(入隊列)是採用不一樣的鎖,進行了讀寫分離,有利於提升併發度。LinkedBockingQueue的分析就到這裏,歡迎交流,謝謝各位園友的觀看~

相關文章
相關標籤/搜索