本文首發於一世流雲專欄: https://segmentfault.com/blog...
LinkedBlockingQueue
是在JDK1.5時,隨着J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基於單鏈表實現:java
LinkedBlockingQueue是一種近似有界阻塞隊列,爲何說近似?由於LinkedBlockingQueue既能夠在初始構造時就指定隊列的容量,也能夠不指定,若是不指定,那麼它的容量大小默認爲Integer.MAX_VALUE
。node
LinkedBlockingQueue除了底層數據結構(單鏈表)與ArrayBlockingQueue不一樣外,另一個特色就是:
它維護了兩把鎖——takeLock
和putLock
。
takeLock用於控制出隊的併發,putLock用於入隊的併發。這也就意味着,同一時刻,只能只有一個線程能執行入隊/出隊操做,其他入隊/出隊線程會被阻塞;可是,入隊和出隊之間能夠併發執行,即同一時刻,能夠同時有一個線程進行入隊,另外一個線程進行出隊,這樣就能夠提高吞吐量。segmentfault
在ArrayBlockingQueue章節中,咱們說過,ArrayBlockingQueue維護了一把全局鎖,不管是出隊仍是入隊,都共用這把鎖,這就致使任一時間點只有一個線程可以執行。那麼對於「生產者-消費者」模式來講,意味着生產者和消費者不能併發執行。
LinkedBlockingQueue提供了三種構造器:數組
/** * 默認構造器. * 隊列容量爲Integer.MAX_VALUE. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
/** * 顯示指定隊列容量的構造器 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
/** * 從已有集合構造隊列. * 隊列容量爲Integer.MAX_VALUE */ public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 這裏加鎖僅僅是爲了保證可見性 try { int n = 0; for (E e : c) { if (e == null) // 隊列不能包含null元素 throw new NullPointerException(); if (n == capacity) // 隊列已滿 throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); // 隊尾插入元素 ++n; } count.set(n); // 設置元素個數 } finally { putLock.unlock(); } }
能夠看到,若是不指定容量,那麼它的容量大小默認爲Integer.MAX_VALUE
。另外,LinkedBlockingQueue使用了一個原子變量AtomicInteger
記錄隊列中元素的個數,以保證入隊/出隊併發修改元素時的數據一致性。數據結構
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 隊列容量. * 若是不指定, 則爲Integer.MAX_VALUE */ private final int capacity; /** * 隊列中的元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 隊首指針. * head.item == null */ transient Node<E> head; /** * 隊尾指針. * last.next == null */ private transient Node<E> last; /** * 出隊鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 隊列空時,出隊線程在該條件隊列等待 */ private final Condition notEmpty = takeLock.newCondition(); /** * 入隊鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 隊列滿時,入隊線程在該條件隊列等待 */ private final Condition notFull = putLock.newCondition(); /** * 鏈表結點定義 */ static class Node<E> { E item; Node<E> next; // 後驅指針 Node(E x) { item = x; } } //... }
構造完成後,LinkedBlockingQueue的初始結構以下:併發
插入部分元素後的LinkedBlockingQueue結構:性能
因爲接口和ArrayBlockingQueue徹底同樣,因此LinkedBlockingQueue會阻塞線程的方法也一共有4個:put(E e)
、offer(e, time, unit)
和take()
、poll(time, unit)
,咱們先來看插入元素的方法。this
插入元素——put(E e)spa
/** * 在隊尾插入指定的元素. * 若是隊列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 獲取「入隊鎖」 try { while (count.get() == capacity) { // 隊列已滿, 則線程在notFull上等待 notFull.await(); } enqueue(node); // 將新結點連接到「隊尾」 /** * c+1 表示的元素個數. * 若是,則喚醒一個「入隊線程」 */ c = count.getAndIncrement(); // c表示入隊前的隊列元素個數 if (c + 1 < capacity) // 入隊後隊列未滿, 則喚醒一個「入隊線程」 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 隊列初始爲空, 則喚醒一個「出隊線程」 signalNotEmpty(); }
插入元素時,首先須要得到「入隊鎖」,若是隊列滿了,則當前線程須要在notFull條件隊列等待;不然,將新元素連接到隊列尾部。線程
這裏須要注意的是兩個地方:
①每入隊一個元素後,若是隊列還沒滿,則須要喚醒其它可能正在等待的「入隊線程」:
/** * c+1 表示的元素個數. * 若是,則喚醒一個「入隊線程」 */ c = count.getAndIncrement(); // c表示入隊前的隊列元素個數 if (c + 1 < capacity) // 入隊後隊列未滿, 則喚醒一個「入隊線程」 notFull.signal();
② 每入隊一個元素,都要判斷下隊列是否空了,若是空了,說明可能存在正在等待的「出隊線程」,須要喚醒它:
if (c == 0) // 隊列爲空, 則喚醒一個「出隊線程」 signalNotEmpty();
這裏爲何不像ArrayBlockingQueue那樣,入隊完成後,直接喚醒一個在notEmpty上等待的出隊線程?
由於ArrayBlockingQueue中,入隊/出隊用的是同一把鎖,二者不會併發執行,因此每入隊一個元素(拿到鎖),就能夠通知可能正在等待的「出隊線程」。(同一個鎖的兩個條件隊列:notEmpty、notFull)
ArrayBlockingQueue中的enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊列已滿,則重置索引爲0 putIndex = 0; count++; // 元素個數+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(能夠來隊列取元素了) }
而LinkedBlockingQueue中,入隊/出隊用的是兩把鎖,入隊/出隊是會併發執行的。入隊鎖對應的是notFull條件隊列,出隊鎖對應的是notEmpty條件隊列,因此每入隊一個元素,應當當即去喚醒可能阻塞的其它入隊線程。當隊列爲空時,說明後面再來「出隊線程」,必定都會阻塞,因此此時能夠去喚醒一個出隊線程,以提高性能。
試想如下,若是去掉上面的①和②,當入隊線程拿到「入隊鎖」,入隊元素後,直接嘗試喚醒出隊線程,會要求去拿出隊鎖,這樣持有鎖A的同時,再去嘗試獲取鎖B,極可能引發死鎖,就算經過打破死鎖的條件避免死鎖,每次操做同時獲取兩把鎖也會下降性能。
刪除元素——table()
刪除元素的邏輯和插入元素相似。刪除元素時,首先須要得到「出隊鎖」,若是隊列爲空,則當前線程須要在notEmpty條件隊列等待;不然,從隊首出隊一個元素:
/** * 從隊首出隊一個元素 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 獲取「出隊鎖」 takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊列爲空, 則阻塞線程 notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); // c表示出隊前的元素個數 if (c > 1) // 出隊前隊列非空, 則喚醒一個出隊線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) // 隊列初始爲滿,則喚醒一個入隊線程 signalNotFull(); return x; }
/** * 隊首出隊一個元素. */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
上面須要的注意點和插入元素同樣:
①每出隊一個元素前,若是隊列非空,則須要喚醒其它可能正在等待的「出隊線程」:
c = count.getAndDecrement(); // c表示出隊前的元素個數 if (c > 1) // 出隊前隊列非空, 則喚醒一個出隊線程 notEmpty.signal();
② 每入隊一個元素,都要判斷下隊列是否滿,若是是滿的,說明可能存在正在等待的「入隊線程」,須要喚醒它:
if (c == capacity) // 隊列初始爲滿,則喚醒一個入隊線程 signalNotFull();
概括一下,LinkedBlockingQueue和ArrayBlockingQueue比較主要有如下區別:
Integer.MAX_VALUE
,近似於無界);