我在前段時間寫了一篇關於AQS源碼解析的文章AbstractQueuedSynchronizer超詳細原理解析 ,在文章裏邊我說JUC
包中的大部分多線程相關的類都和AQS
相關,今天咱們就學習一下依賴於AQS
來實現的阻塞隊列BlockingQueue
的實現原理。本文中的源碼未加說明即來自於以ArrayBlockingQueue
。java
相信大多數同窗在學習線程池時會了解阻塞隊列的概念,熟記各類類型的阻塞隊列對線程池初始化的影響。當從阻塞隊列獲取元素可是隊列爲空時,當前線程會阻塞直到另外一個線程向阻塞隊列中添加一個元素;相似的,當向一個阻塞隊列加入元素時,若是隊列已經滿了,當前線程也會阻塞直到另一個線程從隊列中讀取一個元素。阻塞隊列通常都是先進先出的,用來實現生產者和消費者模式。當發生上述兩種狀況時,阻塞隊列有四種不一樣的處理方式,這四種方式分別爲拋出異常,返回特殊值(null或在是false),阻塞當前線程直到執行結束,最後一種是隻阻塞固定時間,到時後還沒法執行成功就放棄操做。這些方法都總結在下邊這種表中了。node
咱們就只分析put
和take
方法。數組
咱們都知道,使用同步隊列能夠很輕鬆的實現生產者-消費者模式,其實,同步隊列就是按照生產者-消費者的模式來實現的,咱們能夠將put
函數看做生產者的操做,take
是消費者的操做。bash
咱們首先看一下ArrayListBlock
的構造函數。它初始化了put
和take
函數中使用到的關鍵成員變量,分別是ReentrantLock
和Condition
。多線程
public ArrayBlockingQueue(int capacity, boolean fair) {
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
複製代碼
ReentrantLock是AQS
的子類,其newCondition
函數返回的Condition
接口實例是定義在AQS類內部的ConditionObject
實現類。它能夠直接調用AQS
相關的函數。併發
put
函數會在隊列末尾添加元素,若是隊列已經滿了,沒法添加元素的話,就一直阻塞等待到能夠加入爲止。函數的源碼以下所示。函數
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //先得到鎖
try {
while (count == items.length)
//若是隊列滿了,就NotFull這個Condition對象上進行等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
//這裏能夠注意的是ArrayBlockingList實際上使用Array實現了一個環形數組,
//當putIndex達到最大時,就返回到起點,繼續插入,
//固然,若是此時0位置的元素尚未被取走,
//下次put時,就會由於cout == item.length未被阻塞。
if (++putIndex == items.length)
putIndex = 0;
count++;
//由於插入了元素,通知等待notEmpty事件的線程。
notEmpty.signal();
}
複製代碼
咱們會發現put函數使用了wait/notify的機制。與通常生產者-消費者的實現方式不一樣,同步隊列使用ReentrantLock
和Condition
相結合的先得到鎖,再等待的機制;而不是Synchronized
和Object.wait
的機制。這裏的區別咱們下一節再詳細講解。 看完了生產者相關的put
函數,咱們再來看一下消費者調用的take
函數。take
函數在隊列爲空時會被阻塞,一直到阻塞隊列加入了新的元素。學習
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//若是隊列爲空,那麼在notEmpty對象上等待,
//當put函數調用時,會調用notEmpty的notify進行通知。
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
E x = (E) items[takeIndex];
items[takeIndex] = null; //取出takeIndex位置的元素
if (++takeIndex == items.length)
//若是到了尾部,將指針從新調整到頭部
takeIndex = 0;
count--;
....
//通知notFull對象上等待的線程
notFull.signal();
return x;
}
複製代碼
咱們發現ArrayBlockingList
並無使用Object.wait
,而是使用的Condition.await
,這是爲何呢?其中又有哪些緣由呢? Condition
對象能夠提供和Object
的wait
和notify
同樣的行爲,可是後者必須先獲取synchronized
這個內置的monitor鎖,才能調用;而Condition
則必須先獲取ReentrantLock
。這兩種方式在阻塞等待時都會將相應的鎖釋放掉,可是Condition
的等待能夠中斷,這是兩者惟一的區別。ui
咱們先來看一下Condition
的wait
函數,wait
函數的流程大體以下圖所示。this
wait
函數主要有三個步驟。一是調用addConditionWaiter
函數,在condition wait queue隊列中添加一個節點,表明當前線程在等待一個消息。而後調用fullyRelease
函數,將持有的鎖釋放掉,調用的是AQS的函數,不清楚的同窗能夠查看本篇開頭的介紹的文章。最後一直調用isOnSyncQueue
函數判斷節點是否被轉移到sync queue
隊列上,也就是AQS中等待獲取鎖的隊列。若是沒有,則進入阻塞狀態,若是已經在隊列上,則調用acquireQueued
函數從新獲取鎖。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//在condition wait隊列上添加新的節點
Node node = addConditionWaiter();
//釋放當前持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//因爲node在以前是添加到condition wait queue上的,如今判斷這個node
//是否被添加到Sync的得到鎖的等待隊列上,Sync就是AQS的子類
//node在condition queue上說明還在等待事件的notify,
//notify函數會將condition queue 上的node轉化到Sync的隊列上。
while (!isOnSyncQueue(node)) {
//node尚未被添加到Sync Queue上,說明還在等待事件通知
//因此調用park函數來中止線程執行
LockSupport.park(this);
//判斷是否被中斷,線程從park函數返回有兩種狀況,一種是
//其餘線程調用了unpark,另一種是線程被中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//代碼執行到這裏,已經有其餘線程調用notify函數,或則被中斷,該線程能夠繼續執行,可是必須先
//再次得到調用await函數時的鎖.acquireQueued函數在AQS文章中作了介紹.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
 ....
}
final int fullyRelease(Node node) {
//AQS的方法,當前已經在鎖中了,因此直接操做
boolean failed = true;
try {
int savedState = getState();
//獲取state當前的值,而後保存,以待之後恢復
// release函數是AQS的函數,不清楚的同窗請看開頭介紹的文章。
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
private int checkInterruptWhileWaiting(Node node) {
//中斷可能發生在兩個階段中,一是在等待signa時,另一個是在得到signal以後
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//這裏要和下邊的transferForSignal對應着看,這是線程中斷進入的邏輯.那邊是signal的邏輯
//兩邊可能有併發衝突,可是成功的一方必須調用enq來進入acquire lock queue中.
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//若是失敗了,說明transferForSignal那邊成功了,等待node 進入acquire lock queue
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
複製代碼
signal
函數將condition wait queue
隊列中隊首的線程節點轉移等待獲取鎖的sync queue
隊列中。這樣的話,wait
函數中調用isOnSyncQueue
函數就會返回true,致使wait
函數進入最後一步從新獲取鎖的狀態。
咱們這裏來詳細解析一下condition wait queue
和sync queue
兩個隊列的設計原理。condition wait queue
是等待消息的隊列,由於阻塞隊列爲空而進入阻塞狀態的take
函數操做就是在等待阻塞隊列不爲空的消息。而sync queue
隊列則是等待獲取鎖的隊列,take函數得到了消息,就能夠運行了,可是它還必須等待獲取鎖以後才能真正進行運行狀態。
signal
函數的示意圖以下所示。
signal
函數其實就作了一件事情,就是不斷嘗試調用transferForSignal
函數,將condition wait queue
隊首的一個節點轉移到sync queue
隊列中,直到轉移成功。由於一次轉移成功,就表明這個消息被成功通知到了等待消息的節點。
public final void signal() {
if (!isHeldExclusively())
//若是當前線程沒有得到鎖,拋出異常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//將Condition wait queue中的第一個node轉移到acquire lock queue中.
doSignal(first);
}
private void doSignal(Node first) {
do {
   //因爲生產者的signal在有消費者等待的狀況下,必需要通知
//一個消費者,因此這裏有一個循環,直到隊列爲空
//把first 這個node從condition queue中刪除掉
//condition queue的頭指針指向node的後繼節點,若是node後續節點爲null,那麼也將尾指針也置爲null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
//transferForSignal將node轉而添加到Sync的acquire lock 隊列
}
final boolean transferForSignal(Node node) {
//若是設置失敗,說明該node已經被取消了,因此返回false,讓doSignal繼續向下通知其餘未被取消的node
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將node添加到acquire lock queue中.
Node p = enq(node);
int ws = p.waitStatus;
//須要注意的是這裏的node進行了轉化
//ws>0表明canceled的含義因此直接unpark線程
//若是compareAndSetWaitStatus失敗,因此直接unpark,讓線程繼續執行await中的
//進行isOnSyncQueue判斷的while循環,而後進入acquireQueue函數.
//這裏失敗的緣由多是Lock其餘線程釋放掉了鎖,同步設置p的waitStatus
//若是compareAndSetWaitStatus成功了呢?那麼該node就一直在acquire lock queue中
//等待鎖被釋放掉再次搶奪鎖,而後再unpark
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
複製代碼
後邊一篇文章主要講解如何本身使用AQS
來建立符合本身業務需求的鎖,請你們繼續關注個人文章啦.一塊兒進步偶。