//從這可知,LinkedBlockingQueue是個單鏈表 static class Node<E> { E item; //數據 Node<E> next; //後繼結點 Node(E x) { item = x; } }
2.LinkedBlockingQueue的繼承體系java
LinkedBlockingQueue的繼承關係以下圖所示,由繼承關係可知LinkedBlockingQueue與ArrayBlockingQueue實現的功能是相同的,只在存儲數據結構上不一樣。其父類及實現的接口在以前的學習中都已分析過,這裏不在多說。node
3.重要的屬性以及構造方法數組
在LinkedBlockingQueue中保證線程併發安全所使用的的方式與ArrayBlockingQueue類似,都是經過使用重入鎖ReentrantLock來實現的,不一樣的是ArrayBlockingQueue不論出隊仍是入隊使用的都是同一把鎖,所以ArrayBlockingQueue在實際使用使是不能出入隊併發執行的,而LinkedBlockingQueue在這邊方面則不一樣,LinkedBlockingQueue的出入隊是各一把鎖,分別控制出入隊操做。安全
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //阻塞隊列的容量,不能超過int類型的最大值 private final int capacity; //隊列中元素的數量,是個原子計數類型 private final AtomicInteger count = new AtomicInteger(); //底層鏈表的頭結點,即隊首 transient Node<E> head; //底層鏈表的尾結點,即隊尾 private transient Node<E> last; //重入鎖,用於出隊操做 private final ReentrantLock takeLock = new ReentrantLock(); //隊列容許出隊條件 private final Condition notEmpty = takeLock.newCondition(); //重入鎖,用於入地操做 private final ReentrantLock putLock = new ReentrantLock(); //隊列容許入隊條件 private final Condition notFull = putLock.newCondition(); //默認構造方法,隊列容量爲最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //指定容量的構造方法 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); //初始化鏈表 } //擁有初始數據的阻塞隊列 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } }
4.入隊過程數據結構
與ArrayBlockingQueue相同,LinkedBlockingQueue中的入隊方法也是put、add和offer三種,不過相比ArrayBlockingQueue中方法,它們更簡單一些。併發
//父類AbstractQueue中的添加方法,新增失敗就拋異常 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } //入隊方法,容量不足就放棄入隊 public boolean offer(E e) { //從這能夠知道,LinkedBlockingQueue中也是不容許null元素的 if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //獲取元素計數器 //判斷隊列是否已滿,已滿不容許在添加元素到隊列中 if (count.get() == capacity) return false; int c = -1; //用於記錄隊列中原有元素的數量 Node<E> node = new Node<E>(e); //新建e元素接地 final ReentrantLock putLock = this.putLock; putLock.lock(); //入隊鎖 try { //獲取鎖後再次判斷隊列容量是否已滿,已滿放棄入隊 if (count.get() < capacity) { enqueue(node); //真正執行入隊的方法 c = count.getAndIncrement(); //元素計數+1,而且將原來元素的個數返回 //判斷入隊後,隊列是否還有剩餘容量,如有就喚醒其餘某個入隊操做線程 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); //釋放鎖 } //判斷隊列原來是否是空隊列,即未新增元素以前是否是爲空 //若爲空,那麼出隊操做此時應該都在等待狀態,須要喚醒某個出隊操做的線程 if (c == 0) signalNotEmpty(); //喚醒一個出隊操做線程 return c >= 0; } //向隊列中添加隊尾元素 private void enqueue(Node<E> node) { last = last.next = node; //直接添加元素到鏈表尾結點 } //喚醒出隊操做的線程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; //獲取出隊鎖 takeLock.lock(); try { notEmpty.signal(); //隨機喚醒某個出隊操做線程 } finally { takeLock.unlock(); } } //在必定時間內執行入隊操做,若超過指定時間仍沒法入隊,那就放棄入隊,可被中斷 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //從這能夠知道,LinkedBlockingQueue中也是不容許null元素的 if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); //換算超時時間單位 int c = -1; final ReentrantLock putLock = this.putLock; //獲取入隊鎖 final AtomicInteger count = this.count; //獲取計數器 putLock.lockInterruptibly(); //可被中斷的加鎖 try { //判斷隊列容量是否已滿,使用循環是爲了防止虛假喚醒 //隊列如果已滿,則等待必定時間在嘗試入隊 while (count.get() == capacity) { if (nanos <= 0) //判斷等待時間是否還有剩餘 return false; //超時,返回入隊失敗 nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } //向隊列中添加元素,若隊列容量不足,則等待直到隊列有空間後繼續添加 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //獲取入隊鎖 final AtomicInteger count = this.count; //計數器 putLock.lockInterruptibly(); try { //判斷隊列是否已滿,隊列已滿則,入隊操做線程進入等待狀態 //喚醒後要再次判斷隊列是否有空間,防止虛假喚醒 while (count.get() == capacity) { //隊列已滿,線程進入條件隊列等待 notFull.await(); } enqueue(node); //入隊 c = count.getAndIncrement(); //獲取原數量,並增長隊列中的元素數量 if (c + 1 < capacity) //到此,說明隊列中至少有一個元素,那麼就能進行出隊操做 //所以能夠喚醒一個等待出隊的線程執行出隊操做 notFull.signal(); } finally { putLock.unlock(); } /判斷隊列原來是否是空隊列,即未新增元素以前是否是爲空 //若爲空,那麼出隊操做此時應該都在等待狀態,須要喚醒某個出隊操做的線程 if (c == 0) signalNotEmpty(); }
5.出隊過程學習
在LinkedBlockingQueue中的出隊方法也是隻有兩種poll和take,一個不等待,一個等待。this
//移除並返回隊首元素,若隊列爲空,則返回null public E poll() { final AtomicInteger count = this.count; //獲取隊列元素個數 if (count.get() == 0) //判斷是否是空隊列,空隊列直接返回null return null; E x = null; //用於記錄移除出隊的結點 int c = -1; //用於記錄隊列原來結點數量 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //判斷隊列是否爲空,不爲空才能移除並獲取隊首結點 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); //記錄舊計數,並結點更新計數 //出隊後,隊列仍不爲空,那麼久能夠繼續喚醒一個出隊操做的線程 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //在必定時間內嘗試出隊操做,若超時仍未成功,則返回null public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) //判斷是否超時 return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //移除並返回隊首元素,若隊列爲空,則等待 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //判斷隊列是不是空隊列,如果空隊列那麼當前出隊操做進入等待狀態 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); //判斷隊列是否爲空,隊列不空,則能夠繼續喚醒其餘的出隊操做線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //判斷未進行本次出隊操做前隊列是否已滿,隊列如果已滿,說明全部的 //入隊操做要麼處於等待狀態,要麼不能成功,而如今至少隊列執行過一次 //出隊操做,此時隊列必然還有容量能夠執行入隊操做,所以能夠喚醒任意一個 //執行入隊操做的線程 if (c == capacity) signalNotFull(); return x; }
6.peek方法spa
//獲取但不移除隊首元素 public E peek() { //隊列中沒有元素的話,就返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; //head.next結點就是隊首元素對應的結點 //判斷隊首是否爲null,爲null說明隊列是空隊列 if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
7.remove方法線程
//遍歷隊列查找o元素對應的結點並將其從隊列中移除 public boolean remove(Object o) { if (o == null) return false; fullyLock(); //獲取出入隊鎖,防止併發問題 try { //遍歷隊列對應的鏈表,查找要移除的元素 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //判斷結點是不是要刪除的結點,如果,那就要將該結點從 //鏈表中移除 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); //將出入隊鎖都釋放 } } //刪除隊列中元素時,要對其餘的出入隊操做進行同步 //所以刪除操做要將出隊鎖和入隊鎖都獲取到 void fullyLock() { putLock.lock(); takeLock.lock(); } //將結點p從鏈表中移除 void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; //trail的後繼結點變爲p的後繼結點 //如果p是尾結點,那麼將last變爲trail(trail是p的前驅結點) if (last == p) last = trail; //移除一個結點,那麼結點計數要-1,而且能夠喚醒入隊操做的線程了 if (count.getAndDecrement() == capacity) notFull.signal(); }
8.size的統計
public int size() { return count.get(); //隊列中的元素個數直接返回計數器值 }
2、LinkedBlockingDueue併發容器
1.LinkedBlockingDueue的底層實現
LinkedBlockingDueue能夠看作是LinkedBlockingQueue的升級版,LinkedBlockingQueue能作的LinkedBlockingDueue也能作,不能作的LinkedBlockingDueue還能作,其底層數據結構也是鏈表,不過與LinkedBlockingQueue的單鏈表不一樣,LinkedBlockingDueue是雙向鏈表,而且還能夠作堆棧使用。
結點的定義以下:
static final class Node<E> { //存儲數據 E item; //前驅結點 Node<E> prev; //後繼結點 Node<E> next; Node(E x) { item = x; } }
2.LinkedBlockingDueue的繼承關係
LinkedBlockingDueue的繼承關係以下所示,相比LinkedBlockingQueue多實現了一個雙端隊列的接口。
接下來看看BlockingDeque中定義了哪些方法:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { //雙端隊列中若還有容量,將元素添加到隊首,不然拋出異常 void addFirst(E e); //雙端隊列中若還有容量,將元素添加到隊尾,不然拋出異常 void addLast(E e); //雙端隊列中若還有容量,將元素添加到隊首,不然返回false boolean offerFirst(E e); //雙端隊列中若還有容量,將元素添加到隊尾,不然返回false boolean offerLast(E e); //雙端隊列中若還有容量,當即將元素添加到隊首,不然等待容量有空閒在添加 void putFirst(E e) throws InterruptedException; //雙端隊列中若還有容量,當即將元素添加到隊尾,不然等待容量有空閒在添加 void putLast(E e) throws InterruptedException; //在必定時間內嘗試將元素添加到隊首,若到指定時間還沒添加成功,則返回false boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException; //在必定時間內嘗試將元素添加到隊尾,若到指定時間還沒添加成功,則返回false boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException; //若隊列不空,當即獲取並移除隊首元素,若隊列已空則等待到隊列中有元素在執行 E takeFirst() throws InterruptedException; //若隊列不空,當即獲取並移除隊尾元素,若隊列已空則等待到隊列中有元素在執行 E takeLast() throws InterruptedException; //在必定時間內嘗試獲取並移除隊首元素,若到達指定時間隊列中仍沒有元素,直接放棄嘗試,返回null E pollFirst(long timeout, TimeUnit unit) throws InterruptedException; //在必定時間內嘗試獲取並移除隊尾元素,若到達指定時間隊列中仍沒有元素,直接放棄嘗試,返回null E pollLast(long timeout, TimeUnit unit) throws InterruptedException; //移除隊列中第一次出現的o元素 boolean removeFirstOccurrence(Object o); //移除隊列中最後一個出現的o元素 boolean removeLastOccurrence(Object o); //雙端隊列中若還有容量,將元素添加到隊尾,不然拋出異常 //該方法等同於addLast boolean add(E e); //雙端隊列中若還有容量,將元素添加到隊尾,不然返回false //該方法等同於offerLast boolean offer(E e); //若隊列不滿,則當即向隊尾添加元素,不然等待隊列有空間後在添加 void put(E e) throws InterruptedException; //在必定時間內嘗試將元素添加到隊尾,若到指定時間還沒添加成功,則返回false boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //獲取並移除隊首,若隊列爲空,則拋異常 E remove(); //獲取並移除隊首,若隊列爲空,則返回null E poll(); //獲取並移除隊首,若隊列爲空,則等待隊列不爲空在執行 E take() throws InterruptedException; //在必定時間內嘗試獲取並移除隊尾元素,若到達指定時間隊列中仍沒有元素,直接放棄嘗試,返回null E poll(long timeout, TimeUnit unit) throws InterruptedException; //獲取但不移除隊首元素,若隊列爲空,那麼拋異常 E element(); //獲取但不移除隊首元素,若隊列爲空,那麼返回null E peek(); //刪除隊裏中第一次出現的o元素 boolean remove(Object o); //判斷隊列中是否含有o元素 public boolean contains(Object o); //隊列中元素的數量 public int size(); //獲取隊列的迭代器 Iterator<E> iterator(); //向隊列中壓入一個元素,即向隊首添加一個元素,若隊列沒有 //容量,則拋出異常,等同於addFirst void push(E e); }
3.重要屬性及構造方法
LinkedBlockingDueue也是個容量可選(最大爲Integer.MAX_VALUE)的阻塞隊列,且線程安全。與LinkedBlockingQueue類似,其線程安全也是經過ReentrantLock來實現的,不過略微不一樣的似,LinkedBlockingDueue的底層只有一個重入鎖,而LinkedBlockingQueue則有兩個。
public class LinkedBlockingDeque<E> extends AbstractQueue<E> //隊列的頭結點 transient Node<E> first; //隊尾結點 transient Node<E> last; //隊列中的結點計數 private transient int count; //隊列容量 private final int capacity; //重入鎖 final ReentrantLock lock = new ReentrantLock(); //隊列容許出隊條件 private final Condition notEmpty = lock.newCondition(); //隊列容許入隊條件 private final Condition notFull = lock.newCondition(); //使用默認容量的隊列 public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } //指定容量的隊列 public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } //帶有初始元素的隊列 public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } } }
4.入隊過程
由對BlockingDeque的分析可知,LinkedBlockingDueue中存在着大量的入隊方法,這裏就不一一分析了,由於實現基本都差很少,只挑選個別來看看。
//addFirst方法的本質其實仍是調用offerFirst //向隊首新增元素,若隊列容量不足,則拋異常 public void addFirst(E e) { if (!offerFirst(e)) throw new IllegalStateException("Deque full"); } //向隊首新增元素,若隊列容量不足,則返回false public boolean offerFirst(E e) { //隊列中不容許null元素存在 if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); //新建對應結點 final ReentrantLock lock = this.lock; lock.lock(); //加鎖 try { return linkFirst(node); //真正執行添加隊首結點的方法 } finally { lock.unlock(); } } //向隊首新增結點 private boolean linkFirst(Node<E> node) { //判斷隊列是否已滿 if (count >= capacity) return false; //隊列已滿直接返回失敗 Node<E> f = first; //獲取隊首結點 node.next = f; //將原隊首結點設爲新增結點的後繼結點 first = node; //將新增結點設爲隊首結點 //判斷原隊列中是否爲空隊列 if (last == null) last = node; //原隊列若爲空隊列,那麼此時隊首隊尾都是同一個結點 else f.prev = node; //設置原來的隊首結點的前驅結點爲新增結點 ++count; //隊列中的結點數量+1 notEmpty.signal(); //喚醒執行出隊操做的線程 return true; } //向隊尾新增元素,若隊列容量不足,則拋異常 public void addLast(E e) { if (!offerLast(e)) throw new IllegalStateException("Deque full"); } //向隊尾新增元素,若隊列容量不足,則返回false public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); //真正實現添加隊尾結點的方法 } finally { lock.unlock(); } } //向隊尾新增結點 private boolean linkLast(Node<E> node) { //判斷隊列是否已滿 if (count >= capacity) return false; Node<E> l = last; //獲取當前隊尾結點 node.prev = l; //將新增結點的前驅設爲l last = node; //新增節點設爲隊尾 //判斷隊列本來是否爲空 //若爲空,則新增結點既是隊首也是隊尾 if (first == null) first = node; else l.next = node; //將l節點的後繼設爲新增結點 ++count; //計數+1 notEmpty.signal(); //喚醒執行出隊操做的線程 return true; } //向隊首新增結點,若隊列已滿,則等待 public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) //若新增隊首結點失敗,則線程進入等待狀態 notFull.await(); } finally { lock.unlock(); } } //向隊尾新增結點,若隊列已滿,則等待 public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) //若新增隊尾結點失敗,則線程進入等待狀態 notFull.await(); } finally { lock.unlock(); } }
5.出隊過程
同入隊同樣,出隊的方法也不少,這裏也只選個別來分析:
//獲取並移除隊首元素,若隊列爲空,則返回null public E pollFirst() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); //將隊首結點從隊列中移除 } finally { lock.unlock(); } } //移除隊首節點的方法 private E unlinkFirst() { //獲取隊首結點 Node<E> f = first; //判斷隊列是否爲空隊列,空隊列直接返回null if (f == null) return null; //獲取隊首結點的後繼結點,要做爲新的隊首結點 Node<E> n = f.next; E item = f.item; //獲取隊首節點的數據,用做返回值 f.item = null; //清空隊首結點,方便GC回收 f.next = f; //隊首出隊的後繼設爲本身 first = n; //設置新的隊首爲n //判斷隊列是否還有結點 if (n == null) last = null; //隊列如果空了,那麼隊尾也設爲null else n.prev = null; //新隊首的前驅設爲null --count; //隊列中的結點計數-1 //喚醒執行入隊操做的線程,隊列剛執行一次出隊操做,必然有剩餘空間 //所以能夠執行入隊操做 notFull.signal(); return item; } //獲取並移除隊尾元素,若隊列爲空,則返回null public E pollLast() { final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); //將隊尾結點從隊列中移除 } finally { lock.unlock(); } } //移除隊尾結點的方法 private E unlinkLast() { //獲取隊尾引用 Node<E> l = last; //判斷隊列是不是空隊列,空隊列直接返回null if (l == null) return null; Node<E> p = l.prev; //獲取隊尾的前驅結點,用做新的隊尾結點 E item = l.item; l.item = null; l.prev = l; //隊尾結點出隊後的前驅設爲自身,方便GC回收 last = p; //設置新隊尾 //判斷隊尾出隊後隊列中是否仍是有結點,即隊列是否成了空隊列 if (p == null) first = null; //空隊列的隊首也是null else p.next = null; //新隊尾的後繼設爲null --count; //結點計數-1 notFull.signal(); //喚醒入隊操做的線程 return item; } //將隊首移除並返回,若隊列已空則等待 public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //判斷移除隊首結點是否成功,失敗則等待 while ( (x = unlinkFirst()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } } //將隊尾移除並返回,若隊列已空則等待 public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; //判斷移除隊尾結點是否成功,失敗則等待 while ( (x = unlinkLast()) == null) notEmpty.await(); return x; } finally { lock.unlock(); } }
6.總結
LinkedBlockingDueue中其餘的方法就不一一分析了,都比較簡單。
與LinkedBlockingQueue對比,LinkedBlockingDueue的線程安全以及阻塞等待的實現基本沒有區別,兩個阻塞隊列基本能夠通用(LinkedBlockingDueue用做棧時除外)。兩個隊列基本上只有兩點不一樣:一個是底層數據結構的細微區別,LinkedBlockingQueue是單向鏈表,而LinkedBlockingDueue則是雙向鏈表;另外一個是重入鎖的使用有些區別,LinkedBlockingDueue不論出入隊都使用的是同一個鎖對象,而LinkedBlockingQueue的出入隊鎖是分開的。