1、前言java
分析完了ArrayBlockingQueue後,接着分析LinkedBlockingQueue,與ArrayBlockingQueue不相同,LinkedBlockingQueue底層採用的是鏈表結構,其源碼也相對比較簡單,下面進行正式的分析。node
2、LinkedBlockingQueue數據結構數據結構
從LinkedBlockingQueue的命名就大體知道其數據結構採用的是鏈表結構,經過源碼也能夠驗證咱們的猜想,其數據結構以下。多線程
說明:能夠看到LinkedBlockingQueue採用的是單鏈表結構,包含了頭結點和尾節點。併發
3、LinkedBlockingQueue源碼分析less
3.1 類的繼承關係 ide
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}
說明:LinkedBlockingQueue繼承了AbstractQueue抽象類,AbstractQueue定義了對隊列的基本操做;同時實現了BlockingQueue接口,BlockingQueue表示阻塞型的隊列,其對隊列的操做可能會拋出異常;同時也實現了Searializable接口,表示能夠被序列化。函數
3.2 類的內部類高併發
LinkedBlockingQueue內部有一個Node類,表示結點,用於存放元素,其源碼以下。 源碼分析
static class Node<E> { // 元素 E item; // next域 Node<E> next; // 構造函數 Node(E x) { item = x; } }
說明:Node類很是簡單,包含了兩個域,分別用於存放元素和指示下一個結點。
3.3 類的屬性
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 版本序列號 private static final long serialVersionUID = -6903933977591709194L; // 容量 private final int capacity; // 元素的個數 private final AtomicInteger count = new AtomicInteger(); // 頭結點 transient Node<E> head; // 尾結點 private transient Node<E> last; // 取元素鎖 private final ReentrantLock takeLock = new ReentrantLock(); // 非空條件 private final Condition notEmpty = takeLock.newCondition(); // 存元素鎖 private final ReentrantLock putLock = new ReentrantLock(); // 非滿條件 private final Condition notFull = putLock.newCondition(); }
說明:能夠看到LinkedBlockingQueue包含了讀、寫重入鎖(與ArrayBlockingQueue不一樣,ArrayBlockingQueue只包含了一把重入鎖),讀寫操做進行了分離,而且不一樣的鎖有不一樣的Condition條件(與ArrayBlockingQueue不一樣,ArrayBlockingQueue是一把重入鎖的兩個條件)。
3.4 類的構造函數
1. LinkedBlockingQueue()型構造函數
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
說明:該構造函數用於建立一個容量爲 Integer.MAX_VALUE 的 LinkedBlockingQueue。
2. LinkedBlockingQueue(int)型構造函數
public LinkedBlockingQueue(int capacity) { // 初始化容量必須大於0 if (capacity <= 0) throw new IllegalArgumentException(); // 初始化容量 this.capacity = capacity; // 初始化頭結點和尾結點 last = head = new Node<E>(null); }
說明:該構造函數用於建立一個具備給定(固定)容量的 LinkedBlockingQueue。
3. LinkedBlockingQueue(Collection<? extends E>)型構造函數
public LinkedBlockingQueue(Collection<? extends E> c) { // 調用重載構造函數 this(Integer.MAX_VALUE); // 存鎖 final ReentrantLock putLock = this.putLock; // 獲取鎖 putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { // 遍歷c集合 if (e == null) // 元素爲null,拋出異常 throw new NullPointerException(); if (n == capacity) // throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
說明:該構造函數用於建立一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含給定 collection 的元素,元素按該 collection 迭代器的遍歷順序添加。
3.5 核心函數分析
1. put函數
public void put(E e) throws InterruptedException { // 值不爲空 if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. // int c = -1; // 新生結點 Node<E> node = new Node<E>(e); // 存元素鎖 final ReentrantLock putLock = this.putLock; // 元素個數 final AtomicInteger count = this.count; // 若是當前線程未被中斷,則獲取鎖 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { // 元素個數到達指定容量 // 在notFull條件上進行等待 notFull.await(); } // 入隊列 enqueue(node); // 更新元素個數,返回的是之前的元素個數 c = count.getAndIncrement(); if (c + 1 < capacity) // 元素個數是否小於容量 // 喚醒在notFull條件上等待的某個線程 notFull.signal(); } finally { // 釋放鎖 putLock.unlock(); } if (c == 0) // 元素個數爲0,表示已有take線程在notEmpty條件上進入了等待,則須要喚醒在notEmpty條件上等待的線程 signalNotEmpty(); }
說明:put函數用於存放元素,其流程以下。
① 判斷元素是否爲null,如果,則拋出異常,不然,進入步驟②
② 獲取存元素鎖,並上鎖,若是當前線程被中斷,則拋出異常,不然,進入步驟③
③ 判斷當前隊列中的元素個數是否已經達到指定容量,如果,則在notFull條件上進行等待,不然,進入步驟④
④ 將新生結點入隊列,更新隊列元素個數,若元素個數小於指定容量,則喚醒在notFull條件上等待的線程,表示能夠繼續存放元素。進入步驟⑤
⑤ 釋放鎖,判斷結點入隊列以前的元素個數是否爲0,如果,則喚醒在notEmpty條件上等待的線程(表示隊列中沒有元素,取元素線程被阻塞了)。
put函數中會調用到enqueue函數和signalNotEmpty函數,enqueue函數源碼以下
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; // 更新尾結點域 last = last.next = node; }
說明:能夠看到,enqueue函數只是更新了尾節點。signalNotEmpty函數源碼以下
private void signalNotEmpty() { // 取元素鎖 final ReentrantLock takeLock = this.takeLock; // 獲取鎖 takeLock.lock(); try { // 喚醒在notEmpty條件上等待的某個線程 notEmpty.signal(); } finally { // 釋放鎖 takeLock.unlock(); } }
說明:signalNotEmpty函數用於喚醒在notEmpty條件上等待的線程,其首先獲取取元素鎖,而後上鎖,而後喚醒在notEmpty條件上等待的線程,最後釋放取元素鎖。
2. offer函數
public boolean offer(E e) { // 確保元素不爲null if (e == null) throw new NullPointerException(); // 獲取計數器 final AtomicInteger count = this.count; if (count.get() == capacity) // 元素個數到達指定容量 // 返回 return false; // int c = -1; // 新生結點 Node<E> node = new Node<E>(e); // 存元素鎖 final ReentrantLock putLock = this.putLock; // 獲取鎖 putLock.lock(); try { if (count.get() < capacity) { // 元素個數小於指定容量 // 入隊列 enqueue(node); // 更新元素個數,返回的是之前的元素個數 c = count.getAndIncrement(); if (c + 1 < capacity) // 元素個數是否小於容量 // 喚醒在notFull條件上等待的某個線程 notFull.signal(); } } finally { // 釋放鎖 putLock.unlock(); } if (c == 0) // 元素個數爲0,則喚醒在notEmpty條件上等待的某個線程 signalNotEmpty(); return c >= 0; }
說明:offer函數也用於存放元素,offer函數添加元素不會拋出異常(其餘的域put函數相似)。
3. take函數
public E take() throws InterruptedException { E x; int c = -1; // 獲取計數器 final AtomicInteger count = this.count; // 獲取取元素鎖 final ReentrantLock takeLock = this.takeLock; // 若是當前線程未被中斷,則獲取鎖 takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 元素個數爲0 // 在notEmpty條件上等待 notEmpty.await(); } // 出隊列 x = dequeue(); // 更新元素個數,返回的是之前的元素個數 c = count.getAndDecrement(); if (c > 1) // 元素個數大於1,則喚醒在notEmpty上等待的某個線程 notEmpty.signal(); } finally { // 釋放鎖 takeLock.unlock(); } if (c == capacity) // 元素個數到達指定容量 // 喚醒在notFull條件上等待的某個線程 signalNotFull(); // 返回 return x; }
說明:take函數用於獲取一個元素,其與put函數相對應,其流程以下。
① 獲取取元素鎖,並上鎖,若是當前線程被中斷,則拋出異常,不然,進入步驟②
② 判斷當前隊列中的元素個數是否爲0,如果,則在notEmpty條件上進行等待,不然,進入步驟③
③ 出隊列,更新隊列元素個數,若元素個數大於1,則喚醒在notEmpty條件上等待的線程,表示能夠繼續取元素。進入步驟④
④ 釋放鎖,判斷結點出隊列以前的元素個數是否爲指定容量,如果,則喚醒在notFull條件上等待的線程(表示隊列已滿,存元素線程被阻塞了)。
take函數調用到了dequeue函數和signalNotFull函數,dequeue函數源碼以下
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; // 頭結點 Node<E> h = head; // 第一個結點 Node<E> first = h.next; // 頭結點的next域爲自身 h.next = h; // help GC // 更新頭結點 head = first; // 返回頭結點的元素 E x = first.item; // 頭結點的item域賦值爲null first.item = null; // 返回結點元素 return x; }
說明:dequeue函數的做用是將頭結點更新爲以前頭結點的下一個結點,而且將更新後的頭結點的item域設置爲null。signalNotFull函數的源碼以下
private void signalNotFull() { // 存元素鎖 final ReentrantLock putLock = this.putLock; // 獲取鎖 putLock.lock(); try { // 喚醒在notFull條件上等待的某個線程 notFull.signal(); } finally { // 釋放鎖 putLock.unlock(); } }
說明:signalNotFull函數用於喚醒在notFull條件上等待的某個線程,其首先獲取存元素鎖,而後上鎖,而後喚醒在notFull條件上等待的線程,最後釋放存元素鎖。
4. poll函數
public E poll() { // 獲取計數器 final AtomicInteger count = this.count; if (count.get() == 0) // 元素個數爲0 return null; // E x = null; int c = -1; // 取元素鎖 final ReentrantLock takeLock = this.takeLock; // 獲取鎖 takeLock.lock(); try { if (count.get() > 0) { // 元素個數大於0 // 出隊列 x = dequeue(); // 更新元素個數,返回的是之前的元素個數 c = count.getAndDecrement(); if (c > 1) // 元素個數大於1 // 喚醒在notEmpty條件上等待的某個線程 notEmpty.signal(); } } finally { // 釋放鎖 takeLock.unlock(); } if (c == capacity) // 元素大小達到指定容量 // 喚醒在notFull條件上等待的某個線程 signalNotFull(); // 返回元素 return x; }
說明:poll函數也用於存放元素,poll函數添加元素不會拋出異常(其餘的與take函數相似)。
5. remove函數
public boolean remove(Object o) { // 元素爲null,返回false if (o == null) return false; // 獲取存元素鎖和取元素鎖(不容許存或取元素) fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 遍歷整個鏈表 if (o.equals(p.item)) { // 結點的值與指定值相等 // 斷開結點 unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
說明:remove函數的流程以下
① 獲取讀、寫鎖(防止此時繼續出、入隊列)。進入步驟②
② 遍歷鏈表,尋找指定元素,若找到,則將該結點從鏈表中斷開,有利於被GC,進入步驟③
③ 釋放讀、寫鎖(能夠繼續出、入隊列)。步驟②中找到指定元素則返回true,不然,返回false。
其中,remove函數會調用unlink函數,其源碼以下
void unlink(Node<E> p, Node<E> trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. // 結點的item域賦值爲null p.item = null; // 斷開p結點 trail.next = p.next; if (last == p) // 尾節點爲p結點 // 從新賦值尾節點 last = trail; if (count.getAndDecrement() == capacity) // 更新元素個數,返回的是之前的元素個數,若結點個數到達指定容量 // 喚醒在notFull條件上等待的某個線程 notFull.signal(); }
說明:unlink函數用於將指定結點從鏈表中斷開,而且更新隊列元素個數,而且判斷若以前隊列元素的個數達到了指定容量,則會喚醒在notFull條件上等待的某個線程。
4、示例
下面經過一個示例來了解LinkedBlockingQueue的使用。
package com.hust.grid.leesf.collections; import java.util.concurrent.LinkedBlockingQueue; class PutThread extends Thread { private LinkedBlockingQueue<Integer> lbq; public PutThread(LinkedBlockingQueue<Integer> lbq) { this.lbq = lbq; } public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("put " + i); lbq.put(i); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } class GetThread extends Thread { private LinkedBlockingQueue<Integer> lbq; public GetThread(LinkedBlockingQueue<Integer> lbq) { this.lbq = lbq; } public void run() { for (int i = 0; i < 10; i++) { try { System.out.println("take " + lbq.take()); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class LinkedBlockingQueueDemo { public static void main(String[] args) { LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>(); PutThread p1 = new PutThread(lbq); GetThread g1 = new GetThread(lbq); p1.start(); g1.start(); } }
運行結果:
put 0 take 0 put 1 take 1 put 2 take 2 put 3 take 3 put 4 take 4 put 5 take 5 put 6 take 6 put 7 take 7 put 8 take 8 put 9 take 9
說明:示例中使用了兩個線程,一個用於存元素,一個用於讀元素,存和讀各10次,每一個線程存一個元素或者讀一個元素後都會休眠100ms,能夠看到結果是交替打 印,而且首先打印的確定是put線程語句(由於若取線程先取元素,此時隊列並無元素,其會阻塞,等待存線程存入元素),而且最終程序能夠正常結束。
① 若修改取元素線程,將存的元素的次數修改成15次(for循環的結束條件改成15便可),運行結果以下:
put 0 take 0 put 1 take 1 put 2 take 2 put 3 take 3 put 4 take 4 put 5 take 5 put 6 take 6 put 7 take 7 put 8 take 8 put 9 take 9
說明:運行結果與上面的運行結果相同,可是,此時程序沒法正常結束,由於take方法被阻塞了,等待被喚醒。
5、總結
LinkedBlockingQueue的源碼相對比較簡單,其也是經過ReentrantLock和Condition條件來保證多線程的正確訪問的,而且取元素(出隊列)和存元素(入隊列)是採用不一樣的鎖,進行了讀寫分離,有利於提升併發度。LinkedBockingQueue的分析就到這裏,歡迎交流,謝謝各位園友的觀看~