基於鏈表的無邊界阻塞隊列,經常使用與線程池建立中做爲任務緩衝隊列使用java
先看一下內部定義,與 ArrayBlockingQueue
作一下對比,順帶看下這二者的區別及不一樣的應用場景node
/** 隊列的容量, or Integer.MAX_VALUE if none */ private final int capacity; /** 隊列中實際的個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 隊列頭,但其中沒有有效數據,它的下一個才保存實際的數據 * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * 隊列尾,其內包含有效的數據 * Invariant: last.next == null */ private transient Node<E> last; /** 出隊的鎖, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** 進隊的鎖, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); static class Node<E> { // 存放在隊列中的數據; 隊列頭的item爲null E item; // 隊列中,該節點的下一個節點,隊列尾的next爲null Node<E> next; Node(E x) { item = x; } }
說明數組
對比下ArrayBlockingQueue
,主要區別爲兩個地方數據結構
LinkedBlockingQueue
底層爲鏈表;ArrayBlockingQueue
底層爲數組(貌似有點多餘,命名上就能夠看出)LinkedBlockingQueue
出隊和入隊是兩個鎖,而ArrayBlockingQueue
是一個鎖進行控制;即前者出隊和入隊能夠併發執行;然後者會出現鎖的競爭分析阻塞原理以前,先經過註釋解釋下LinkedBlockingQueue
的使用場景併發
public void put(E e) throws InterruptedException { // 隊列中不能存在null if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 若隊列已滿,則等待`notFull(出隊後,隊列未滿時).signal()`喚醒 while (count.get() == capacity) { notFull.await(); } enqueue(node); // 進隊 c = count.getAndIncrement(); // 計數+1,並獲取隊列的實際元素個數 if (c + 1 < capacity) // 若進隊後,隊列依然沒有滿,則釋放一個信號 (why?) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 表示隊列從空到有一個數據,喚醒由於隊列爲空被阻塞的線程 signalNotEmpty(); } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } // 喚醒阻塞的出隊線程,注意使用姿式,Condition的使用必須放在對應的鎖中間,不然會報錯 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
進隊邏輯:this
notEmpty.singal()
,喚醒由於隊列空被阻塞的出隊線程public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 若是隊列爲空,則阻塞,等待入隊以後,被喚醒 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); // 進隊 c = count.getAndDecrement(); if (c > 1) // 若是隊列依然非空,則喚醒其餘由於隊列爲空被阻塞的線程 notEmpty.signal(); } finally { takeLock.unlock();} if (c == capacity) // 原來隊列爲滿的,此時出隊一個後,正好非滿,喚醒由於隊列滿被阻塞的線程 signalNotFull(); return x; } // 出隊邏輯,實現邏輯是把出隊Node節點設置爲新的head,釋放老的head節點 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } // 喚醒阻塞的出隊線程,注意使用姿式,Condition的使用必須放在對應的鎖中間,不然會報錯 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
出隊邏輯.net
查看上面的源碼時,還發現一個很是有意思的地方,出隊成功以後,會判斷若是以前的隊列中元素的個數大於1(即出隊以後,還有元素),會執行notEmpty.signal();
,喚醒被阻塞的出隊線程,爲何要這麼幹?線程
假設一種場景,一個空隊列,兩個線程(A,B)都執行出隊,被阻塞;code
此時線程C執行入隊,入隊完成,由於隊列由空到非空,會喚醒一個被阻塞的出隊線程(假設爲A); 由於出隊和入隊是能夠併發的,如今在線程A執行`c = count.getAndDecrement();`以前,若線程D又入隊成功一個,由於此時隊列非空,因此不會調用`signalNotEmpty` 如今若是線程A執行出隊以後,獲取到的c應該爲2,若是不執行`notEmpty.signal();`,就會致使線程B一直被阻塞,顯然不符合咱們的預期
除了出隊和入隊的方法以外,還有幾個有意思的方法,如隊列中元素以數組形式輸出,判斷隊列是否有元素,這兩個操做,都會競爭出隊和入隊鎖,確保在執行這個方法時,隊列不會被其餘線程修改對象
public boolean contains(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } } public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; return a; } finally { fullyUnlock(); } }
鏈表阻塞隊列的經典使用case,基本上用過線程池就會用到這個了,如jdk中自帶的
// java.util.concurrent.Executors public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ArrayBlockingQueue
: 底層存儲結構爲數組,直接將數據存入數組中
LinkedBlockingQueue
: 底層存儲結構爲單向鏈表,會將數據封裝到Node對象做爲鏈表的節點,且鏈表頭中不包含實際的元素信息
ArrayBlockingQueue
: 出隊入隊公用一把鎖,即二者沒法併發
LinkedBlockingQueue
: 出隊和入隊各一把鎖,所以出隊和入隊可併發執行
所以在線程池的建立中,通常是使用LinkedBlockingQueue,至少線程在進入等待隊列中時,出隊和進隊不會相互阻塞,可是二者之間有關聯
ArrayBlockingQueue
: 是直接將對象插入或移除
LinkedBlockingQueue
: 須要把枚舉對象轉換爲Node<E>進行插入或移除,其中會將出隊的Node節點做爲新的隊列頭,返回並置空Node的item元素
ArrayBlockingQueue
: 必須指定隊列的容量
LinkedBlockingQueue
: 能夠指定隊列的容量,不指定時,容量爲 Integer.MAX_VALUE