本文在我的技術博客同步發佈,詳情可用力戳
亦可掃描屏幕右側二維碼關注我的公衆號,公衆號內有我的聯繫方式,等你來撩...java
看過我上一篇文章的應該知道(家裏條件容許的能夠先看看上一篇文章),若是想實現一個生產者消費者模型,咱們能夠基於JVM自帶的synchronized+wait+notify實現,也能夠用JDK裏面的ReentrantLock+Condition實現!不過從上篇文章的demo看,實現起來也不是那麼容易!由於咱們既要關心何時須要阻塞線程,又要須要關心什麼時候喚醒線程。控制的細節太多,一個疏忽可能就致使了一個不易發現的bug,好比上篇文章中的虛假喚醒的例子!那有沒有一種咱們不用關心那麼多複雜細節就能實現生產者消費者模式的方法呢?本文要講的阻塞隊列就是一種很好的實現!node
在咱們剛開始學數據結構的時候,都接觸過一種先進先出(first in first out,簡稱「FIFO」)的數據結構,叫隊列。阻塞隊列從名字看也是隊列的一種,所以知足隊列的特性,而後這個隊列是可阻塞的!這個阻塞怎麼理解呢?就是當咱們一個線程往阻塞隊列裏面添加元素的時候,若是隊列滿了,那這個線程不會直接返回,而是會被阻塞,直到元素添加成功!當咱們一個線程從阻塞隊列裏面獲取元素的時候,若是隊列是空的,那這個線程不會直接返回,而是會被阻塞直到元素獲取成功。而阻塞以及喚醒的操做都由阻塞隊列來管理!數組
咱們先看在java中阻塞隊列基本的繼承關係圖:數據結構
完整的繼承關係要比這張圖複雜一些,但爲了清晰起見圖中我只畫了主要的類和關係。隊列的基接口Queue與咱們開發中常常用到的List、Set是兄弟關係,所以我這裏也列出來了方便對比記憶!阻塞隊列的基接口是繼承自Queue接口的BlockingQueue接口,其餘阻塞隊列具體實現都繼承BlockingQueue接口!併發
咱們先看隊列基接口Queue中的方法ide
這個接口一共6個方法,咱們能夠分爲兩組
一、「異常」組函數
一、add(e):將元素放到隊列末尾,成功返回true,失敗則拋異常。
二、remove():獲取並移除隊首元素,獲取失敗則拋異常。
三、element():獲取隊首元素,不移除,獲取失敗則拋異常。源碼分析
二、「特殊值」組this
一、offer(e):將元素放到隊列末尾,成功返回true,失敗返回false。
二、poll():獲取並返回隊首元素,獲取失敗則返回null。
三、peek():獲取隊首元素,不移除,獲取失敗則返回null。idea
「異常」組的3個方法在操做失敗的時候會拋異常,所以叫「異常」組!
「特殊值」組3個方法與「異常」組的3個方法是一一對應的,功能都同樣,只是在操做失敗的時候不會拋異常而是返回一個特殊值,所以叫「特殊值組」。
這兩組方法都是在Queue接口中定義的,所以跟阻塞就沒有什麼關係了。那咱們再看看BlockingQueue接口中的方法
這個接口咱們重點關注標記出來的4個方法,這幾個方法咱們也能夠分爲兩組
三、「阻塞」組
一、put(e):將元素放到隊列末尾,若是隊列滿了,則等待。
二、take():獲取並移除隊首元素,若是隊列爲空,則等待。
四、「超時」組
一、offer(e,time,unit):將元素放到隊列末尾,若是隊列滿了,則等待,當等待超過指定時間後仍添加元素失敗,則返回false,不然返回true。
二、poll(time,unit):獲取並返回隊首元素,若是隊列爲空,則等待,當等待超過指定時間後仍獲取失敗則返回null,不然返回獲取到的元素。
這兩組方法都是在BlockingQueue接口中定義的,所以都是跟阻塞相關的!
「阻塞」組2個方法在操做不成功的時候會一直阻塞線程,直到可以操做成功,所以叫「阻塞」組!用一個成語形容就是「不見不散」!
「超時」組2個方法與「超時」組的2個方法是一一對應的,功能都同樣,只是這2個方法不會一直阻塞,超過了指定的時間還沒成功就中止阻塞並返回,所以叫「超時」組!用一個成語形容就是「過期不候」!
這四組方法合在一塊兒就有了下面的一張表格:
方法功能 | 異常組 | 特殊值組 | 阻塞組 | 超時組 |
---|---|---|---|---|
元素入隊 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
元素出隊 | remove() | pool() | take() | poll(time,unit) |
檢查元素 | element() | peek() | 無 | 無 |
BlockingQueue的實現類有多個,可是若是每個源碼都進行分析那不只很影響篇幅且不必,所以我這裏拿三個經常使用的阻塞隊列源碼進行分析!在源碼中jdk的版本爲1.8!
咱們先看下ArrayBlockingQueue中的幾個屬性
/** The queued items 使用數組存儲元素 */ final Object[] items; /** items index for next take, poll, peek or remove 下一個出隊元素索引 */ int takeIndex; /** items index for next put, offer, or add 下一個入隊元素索引 */ int putIndex; /** Number of elements in the queue 隊列元素個數 */ int count; /* * ReentrantLock+Condition控制併發 * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
1.object類型數組,也意味着ArrayBlockingQueue底層數據結構是數組。
2.ReentrantLock+Condition,若是看過我上一篇文章的應該很熟悉,這是用作來線程同步和線程通訊的。
咱們再看下ArrayBlockingQueue的構造函數。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c){ this(capacity, fair); //初始化一個集合到隊列 .... }
這三個構造函數都必須傳入一個int類型的capacity參數,這個參數也意味着ArrayBlockingQueue是一個有界的阻塞隊列!
咱們前面說過隊列有經常使用的四組方法,而跟阻塞相關的是「阻塞」組和「超時」組的四個方法!咱們以「阻塞」組的put()和take()方法爲例,來窺探一下源碼裏面的奧祕:
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. */ public void put(E e) throws InterruptedException { checkNotNull(e); //加鎖操做 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //判斷隊列是否知足入隊條件,若是隊列已滿,則阻塞等待一個「不滿」的信號 while (count == items.length) notFull.await(); //知足條件,則進行入隊操做 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; // 下一個入隊元素索引超過了數組的長度,則又從0開始。 if (++putIndex == items.length) putIndex = 0; count++; //放入元素後,釋放一個「不空」的信號。喚醒等待中的出隊線程。 notEmpty.signal(); }
public E take() throws InterruptedException { //加鎖操做 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //判斷隊列是否知足出隊條件,若是隊列爲空,則阻塞等待一個「不空」的信號 while (count == 0) notEmpty.await(); //知足條件,則進行出隊操做 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null;//help GC // 下一個出隊元素索引超過了數組的長度,則又從0開始。 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued();//更新迭代器元素數據 //取出元素後,釋放一個「不滿」的信號。喚醒等待中的入隊線程。 notFull.signal(); return x; }
ArrayBlockingQueue的入隊出隊代碼仍是很簡單的,當咱們往一個阻塞隊列裏面添加數據的時候,阻塞隊列用一個固定長度的數據存儲數據,若是數組的長度達到了最大容量,則添加數據的線程會被阻塞。當咱們從阻塞隊列獲取數據的時候,若是隊列爲空,則獲取數據的線程會被阻塞!相信代碼上的註釋已經足夠理解這塊的代碼邏輯了!
咱們先看下LinkedBlockingQueue中的幾個屬性
/** The capacity bound, or Integer.MAX_VALUE if none 隊列容量 */ private final int capacity; /** Current number of elements 隊列元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 隊列頭 * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * 隊列尾 * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc 出隊操做用到的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc 入隊操做用到的鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
1.Node類型的變量head和last,這是鏈表常見操做,也意味着LinkedBlockingQueue底層數據結構是鏈表。
2.與ArrayBlockingQueue不一樣的是,這裏有兩個ReentrantLock對象,put操做個take操做的鎖對象是分開的,這樣作也是爲了提升容器的併發能力。
再看下Node這個內部類
/** * Linked list node class */ static class Node<E> { E item; //指向下一個節點 Node<E> next; Node(E x) { item = x; } }
只有next屬性意味着這是一個單向鏈表!
再看下LinkedBlockingQueue的構造函數
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); ... }
1.當構造函數不傳capacity參數的時候,LinkedBlockingQueue就是一個無界阻塞隊列(其實也並不是無界,不傳默認值就是Integer.MAX_VALUE)。
2.當構造函數傳入capacity參數的時候,LinkedBlockingQueue就是一個有界阻塞隊列。
咱們依然看看在LinkedBlockingQueue中「阻塞」組的兩個方法put()和take()分別怎麼實現的
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); //存儲隊列元素數量 int c = -1; //建立新節點 Node<E> node = new Node<E>(e); //獲取putLock final ReentrantLock putLock = this.putLock; //隊列元素數量 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //判斷隊列是否知足入隊條件,若是隊列已滿,則阻塞等待一個「不滿」的信號 while (count.get() == capacity) { notFull.await(); } //入隊操做 enqueue(node); //隊列元素數量+1,執行完下面這句後,count是入隊後的元素數量,而c的值仍是入隊前的元素數量。 c = count.getAndIncrement(); //當前入隊操做成功後,若是元素數量還小於隊列容量,則釋放一個「不滿」的信號 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } //這裏的c前面說了是元素入隊前的數量,若是入隊前元素數量爲0(隊列是空的),那可能會有出隊線程在等待一個「不空」的信號,因此這裏釋放一個「不空」的信號。 if (c == 0) signalNotEmpty(); } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } public E take() throws InterruptedException { //出隊元素 E x; //存儲隊列元素數量 int c = -1; //隊列元素數量 final AtomicInteger count = this.count; //獲取takeLock final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //判斷隊列是否知足出隊條件,若是隊列爲空,則阻塞等待一個「不空」的信號 while (count.get() == 0) { notEmpty.await(); } //出隊操做 x = dequeue(); //隊列元素數量-1,執行完下面這句後,count是出隊後的元素數量,而c的值仍是出隊前的元素數量。 c = count.getAndDecrement(); //當前出隊操做成功前隊列元素大於1,那當前出隊操做成功後隊列元素也就大於0,則釋放一個「不空」的信號 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //這裏的c前面說了是元素出隊前的數量,若是出隊前元素數量爲總容量(隊列是滿的),那可能會有入隊線程在等待一個「不滿」的信號,因此這裏釋放一個「不滿」的信號。 if (c == capacity) signalNotFull(); return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
這裏源碼的同步邏輯比ArrayBlockingQueue中要稍微複雜一點,在ArrayBlockingQueue中每次入隊都釋放一個「不空」的信號,每次出隊都釋放一個「不滿」的信號,而LinkedBlockingQueue則不一樣。
元素入隊的時候
1.入隊後還有空位,則釋放一個「不滿」的信號。
2.入隊時隊列爲空,則釋放一個「不空」的信號。
元素出隊的時候
1.出隊後隊列還有元素,則釋放一個「不空」的信號。
2.出隊前隊列是滿的,則釋放一個「不滿」的信號。
SynchronousQueue從名字看叫「同步隊列」,怎麼理解呢?雖然他也叫隊列,可是他不提供空間存儲元素!當一個線程往隊列添加元素,須要匹配到有另一個線程從隊列取元素,不然線程阻塞!當一個線程從隊列獲取元素,須要匹配到有另一個線程往隊列添加元素,不然線程阻塞!因此這裏的同步指的就是入隊線程和出隊線程須要同步!這裏有點相似你媽媽對你說:「今年你再找不到女友,過年你就別回來了!」,因而你第二年就真的沒回去過年!由於你是一個獲取數據(找女友)的線程,數據沒獲取到則一直阻塞!
瞭解了大體概念,咱們再來看看源碼!
/** * Creates a {@code SynchronousQueue} with nonfair access policy. */ public SynchronousQueue() { this(false); } /** * Creates a {@code SynchronousQueue} with the specified fairness policy. * * @param fair if true, waiting threads contend in FIFO order for * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
兩個構造函數,fair參數指定公平策略,默認爲false,所以是非公平模式!先看看put和take方法的實現:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
put和take方法很相似,都是調用transferer.transfer(...)方法,區別在於第一個參數!put方法在調用時候會參入入隊的值,而take方法傳入null。
上面說過有公平和非公平策略,今天將重點分析公平模式TransferQueue的源碼!從名字能看出來這也是一個隊列,咱們先看TransferQueue的重點屬性和構造方法:
// 指向隊列頭部 transient volatile QNode head; // 指向隊列尾部 transient volatile QNode tail; TransferQueue() { //初始化一個空 //#1 QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; }
一頭一尾,鏈表的一向操做!構造方法中,建立了一個QNode結點,而且將head和tail都指向這個結點!咱們再看看QNode類的重要屬性和構造方法:
volatile QNode next; // 指向隊列的下一個節點 volatile Object item; // 節點存儲的元素 volatile Thread waiter; // 被阻塞的線程 final boolean isData; // 是不是「數據」結點(入隊線程爲true,出隊線程爲false) QNode(Object item, boolean isData) { this.item = item; this.isData = isData; }
咱們再回到上面提到的transferer.transfer(...)方法,也就是TransferQueue中的transfer(...)方法,核心邏輯都在這個方法中體現:
/** * 「存」或者「取」一個元素 */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed //當前操做類型,傳非null的值則爲生產線程,傳null則爲消費線程。 boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; //上面咱們說過在構造方法中就建立了一個QNode結點,而且將head和tail都指向這個結點 //所以這裏t、h通常狀況下不會爲null if (t == null || h == null) // saw uninitialized value continue; // spin //根據SynchronousQueue的特性,不一樣類型的操做會配對成功。 //所以在阻塞隊列中只會存在一種類型的阻塞節點,要麼全是消費線程要麼全是生產線程! //因此分三種狀況: //1.h == t,這種狀況下隊列爲空,須要將當前節點入隊。 //2.t.isData == isData尾部節點的操做類型與當前操做類型 // 一致(尾部節點的操做類型表明着隊列中全部節點的操做類型),須要將當前節點入隊。 //3.隊列不爲空且尾部節點的操做類型與當前操做類型不一致, // 須要從隊列頭部匹配一個節點並返回。 //所以再看下面的代碼,會根據上面3種狀況走不一樣的分支。 if (h == t || t.isData == isData) { // empty or same-mode //進入這個分支就是上面一、2的狀況 //獲取尾部節點的next指向,正常狀況下tn等於null QNode tn = t.next; //下面是判斷是否出現併發致使尾節點被更改 if (t != tail) // inconsistent read continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } //超時判斷 if (timed && nanos <= 0) // can't wait return null; //將當前操做建立爲新節點,傳入數據值和操做類型。 //#2 if (s == null) s = new QNode(e, isData); //一、將阻塞隊列中尾部節點的next指向新節點 //二、將tail屬性的指向設置爲新節點 //#3 if (!t.casNext(null, s)) // failed to link in continue; advanceTail(t, s); // swing tail and wait //在這個方法內部會進行自旋或者阻塞,直到配對成功。 //建議這裏先跳到下面這個方法內部看完邏輯再回來。 Object x = awaitFulfill(s, e, timed, nanos); //只有在線程被中斷的狀況下會進入這個分支 if (x == s) { // wait was cancelled clean(t, s); return null; } if (!s.isOffList()) { // not already unlinked advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } //若是爲生產線程,則返回入隊的值;若是爲消費線程,則返回匹配到的生產線程的值。 return (x != null) ? (E)x : e; } else { // complementary-mode //進入這個分支就是上面3的狀況 //找到頭部節點的next指向 //#4 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) continue; // inconsistent read Object x = m.item; //m.casItem(x, e)方法很重要,會將匹配到的節點的item修改成當前操做的值。 //這樣awaitFulfill方法的x != e條件才能成立,被匹配的阻塞線程才能返回。 //#5 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) { // lost CAS advanceHead(h, m); // dequeue and retry continue; } //調整head屬性的指向,這裏建議這裏先跳到下面這個方法內部看完邏輯再回來。 advanceHead(h, m); // successfully fulfilled //喚醒匹配到的阻塞線程 LockSupport.unpark(m.waiter); //若是爲生產線程,則返回入隊的值;若是爲消費線程,則返回匹配到的生產線程的值。 return (x != null) ? (E)x : e; } } } Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { /* Same idea as TransferStack.awaitFulfill */ final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //若是頭節點的next指向當前的數據節點,也就是當前數據節點是下一個待匹配的節點,那就自旋等待一下子。 //若是設置了超時時間就少自旋一下子,沒有設置超時時間就多自旋一下子。 //能夠看看maxTimedSpins和maxUntimedSpins兩個屬性的值設置,是與cpu數量相關的。 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; // 第一次進來這裏確定是相等的,因此不會進入這個分支。 // 當有其餘的線程匹配到當前節點,這裏的s.item的值會被更改(前面說到過的m.casItem(x, e)方法),因此方法返回。 if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(e); continue; } } if (spins > 0) --spins; else if (s.waiter == null) s.waiter = w; else if (!timed) //這裏線程會阻塞,若是有線程與當前線程匹配,則被喚醒進行下一次循環。 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } void advanceHead(QNode h, QNode nh) { //這個方法作了兩個操做 //一、將head屬性的指向調整爲頭節點的下一個結點 //二、將原頭節點的next指向原頭節點自己 //#6 if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next }
不知道看完上面的SynchronousQueue基於公平模式TransferQueue的源碼有沒有對SynchronousQueue有一個很好的瞭解!下面我模擬了一個場景,先有一個生產線程進入隊列,而後一個消費線程進入隊列。結合上面源碼我畫了幾張節點變化的圖例以便更好的理解上面整個過程,能夠結合上面的源碼一塊兒看
//建立SynchronousQueue對象 SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(true); //生產線程 new Thread(new Runnable() { @Override public void run() { try { synchronousQueue.put("VALUE"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); Thread.sleep(1000); //消費線程 new Thread(new Runnable() { @Override public void run() { try { synchronousQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
咱們在建立SynchronousQueue對象時候會執行構造函數,也就是在源碼#1處執行完後,會建立一個新的節點node,以下圖所示,一頭一尾都指向構造函數中建立出來的新節點node!
而後會執行synchronousQueue.put()的邏輯,也就是TransferQueue中的transfer(...)方法邏輯。按照咱們以前的分析,會執行到源碼#2處,執行完後新的節點node1會被建立,以下圖所示。
接着在代碼#3處執行完後,節點圖示以下,注意紅色箭頭指向的調整。
到這裏,生產線程會進入awaitFulfill方法自旋後阻塞!等待消費線程的喚醒!
而後執行synchronousQueue.take()的邏輯,也就是TransferQueue中的transfer(...)方法邏輯。按照咱們以前的分析,會執行到源碼#4處,執行完後就找到了咱們須要匹配的節點node1,注意紅色箭頭指向。
執行到#5處的方法會改變匹配到節點的item屬性值,注意node1節點item屬性的變化,以下圖所示。
而後在代碼#6處執行完後,節點圖示以下,注意紅色箭頭指向的調整。
最後就是消費線程喚醒生產線程,消費線程返回,生產線程也返回,過程結束!
好了,源碼分析就到這裏結束了,你看懂了嗎?