本文的主要詳細分析ArrayBlockingQueue的實現原理,因爲該併發集合其底層是使用了java.util.ReentrantLock和java.util.Condition來完成併發控制的,咱們能夠經過JDK的源代碼更好的學習這些併發控制類的使用,同時該類也是全部併發集合中最簡單的一個,分析該類的源碼也是爲以後分析其餘併發集合作好基礎。java
在Queue接口中,除了繼承Collection接口中定義的方法外,它還分別額外地定義插入、刪除、查詢這3個操做,其中每個操做都以兩種不一樣的形式存在,每一種形式都對應着一個方法。數組
方法說明:安全
操做 | 拋出異常 | 返回特殊值 |
---|---|---|
Insert | add(e) | offer(e) |
Remove | remove() | poll() |
Examine | element() | peek() |
後面咱們在分析ArrayBlockingQueue的方法時,主要也是圍繞着這幾個方法來進行分析。併發
BlockingQueue是JDK1.5出現的接口,它在原來的Queue接口基礎上提供了更多的額外功能:當獲取隊列中的頭部元素時,若是隊列爲空,那麼它將會使執行線程處於等待狀態;當添加一個元素到隊列的尾部時,若是隊列已經滿了,那麼它一樣會使執行的線程處於等待狀態。工具
前面咱們在說Queue接口時提到過,它針對於相同的操做提供了2種不一樣的形式,而BlockingQueue更誇張,針對於相同的操做提供了4種不一樣的形式。性能
該四種形式分別爲:學習
對應的方法說明:this
操做 | 拋出異常 | 返回特殊值 | 阻塞 | 超時 |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | 無 | 無 |
BlockingQueue雖然比起Queue在操做上提供了更多的支持,可是它在使用的使用也應該以下的幾點:spa
- BlockingQueue中是不容許添加null的,該接受在聲明的時候就要求全部的實現類在接收到一個null的時候,都應該拋出NullPointerException。
有了上面的鋪墊,下面咱們就能夠真正開始分析ArrayBlockingQueue了。在分析以前,首先讓咱們看看API對其的描述。
注意:這裏使用的JDK版本爲1.7,不一樣的JDK版本在實現上存在不一樣線程
ArrayBlockingQueueAPI說明.png
首先讓咱們看下ArrayBlockingQueue的核心組成:
/** 底層維護隊列元素的數組 */ final Object[] items; /** 當讀取元素時數組的下標(這裏稱爲讀下標) */ int takeIndex; /** 添加元素時數組的下標 (這裏稱爲寫小標)*/ int putIndex; /** 隊列中的元素個數 */ int count; /**用於併發控制的工具類**/ final ReentrantLock lock; /** 控制take操做時是否讓線程等待 */ private final Condition notEmpty; /** 控制put操做時是否讓線程等待 */ private final Condition notFull;
take方法分析(369-379行):
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; /* 嘗試獲取鎖,若是此時鎖被其餘線程鎖佔用,那麼當前線程就處於Waiting的狀態。 注意:當方法是支持線程中斷響應的若是其餘線程此時中斷當前線程, 那麼當前線程就會拋出InterruptedException */ lock.lockInterruptibly(); try { /* 若是此時隊列中的元素個數爲0,那麼就讓當前線程wait,而且釋放鎖。 注意:這裏使用了while進行重複檢查,是爲了防止當前線程可能因爲 其餘未知的緣由被喚醒。 (一般這種狀況被稱爲"spurious wakeup") */ while (count == 0) notEmpty.await(); //若是隊列不爲空,則從隊列的頭部取元素 return extract(); } finally { //完成鎖的釋放 lock.unlock(); } }
extract方法分析(163-171):
/* 根據takeIndex來獲取當前的元素,而後通知其餘等待的線程。 Call only when holding lock.(只有當前線程已經持有了鎖以後,它才能調用該方法) */ private E extract() { final Object[] items = this.items; //根據takeIndex獲取元素,由於元素是一個Object類型的數組,所以它經過cast方法將其轉換成泛型。 E x = this.<E>cast(items[takeIndex]); //將當前位置的元素設置爲null items[takeIndex] = null; //而且將takeIndex++,注意:這裏由於已經使用了鎖,所以inc方法中沒有使用到原子操做 takeIndex = inc(takeIndex); //將隊列中的總的元素減1 --count; //喚醒其餘等待的線程 notFull.signal(); return x; }
put方法分析(318-239)
public void put(E e) throws InterruptedException { //首先檢查元素是否爲空,不然拋出NullPointerException checkNotNull(e); final ReentrantLock lock = this.lock; //進行鎖的搶佔 lock.lockInterruptibly(); try { /*當隊列的長度等於數組的長度,此時說明隊列已經滿了,這裏一樣 使用了while來方式當前線程被"僞喚醒"。*/ while (count == items.length) //則讓當前線程處於等待狀態 notFull.await(); //一旦獲取到鎖而且隊列還未滿時,則執行insert操做。 insert(e); } finally { //完成鎖的釋放 lock.unlock(); } } //檢查元素是否爲空 private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } //該方法的邏輯很是簡單 private void insert(E x) { //將當前元素設置到putIndex位置 items[putIndex] = x; //讓putIndex++ putIndex = inc(putIndex); //將隊列的大小加1 ++count; //喚醒其餘正在處於等待狀態的線程 notEmpty.signal(); }
注:ArrayBlockingQueue實際上是一個循環隊列
咱們使用一個圖來簡單說明一下:
黃色表示數組中有元素
1-1.png
當再一次執行put的時候,其結果爲:
1-2.png
此時放入的元素會從頭開始置,咱們經過其incr方法更加清晰的看出其底層的操做:
/** * Circularly increment i. */ final int inc(int i) { //當takeIndex的值等於數組的長度時,就會從新置爲0,這個一個循環遞增的過程 return (++i == items.length) ? 0 : i; }
至此,ArrayBlockingQueue的核心部分就分析完了,其他的隊列操做基本上都是換湯不換藥的,此處再也不一一列舉。
做者:碼農一枚 連接:https://www.jianshu.com/p/9a652250e0d1 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。