在閱讀本文以前,建議先閱讀個人《初步瞭解AQS是什麼(一)》,畢竟有一些內容是和前文是相通的,若是對AQS熟悉的話,也能夠直接往下看,不過仍是先建議先看下java
ReentrantLock默認使用的是非公平鎖,除非在構造方法裏面傳入true就能夠變爲公平鎖啦!node
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼
先看看公平鎖的lock函數
final void lock() {
acquire(1);
}
//----------------------------acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//-----------------------------tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//由於是公平鎖,那麼到這裏若是有線程在等待,就公平而言,確定是不能搶前面的鎖啦!
//和非公平鎖的區別
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
非公平鎖的lock源碼分析
final void lock() {
if (compareAndSetState(0, 1)) // 非公平鎖會先試着去佔有這個鎖先,若是不行就再獲取!
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//-------------------------acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//-----------------------tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//-----------------------nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//若是資源能夠搶,非公平鎖可無論你前面是否有節點在等待,我先搶了再說!公平鎖就等待啦!
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
總結:post
非公平鎖在lock的時候,在tryAcquire函數裏面會CAS獲取state,若是剛好成功,那麼就直接取得鎖啦,不用進行下面的操做了性能
非公平鎖在 CAS 失敗後,和公平鎖同樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,若是發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,可是公平鎖會判斷等待隊列是否有線程處於等待狀態,若是有則不去搶鎖,乖乖排到後面。ui
非公平鎖若是兩次CAS都不成功,那麼接下來的操做和公平鎖同樣啦!都要進入到阻塞隊列等待喚醒。this
非公平鎖會有更好的性能,由於它的吞吐量比較大。固然,非公平鎖讓獲取鎖的時間變得更加不肯定,可能會致使在阻塞隊列中的線程長期處於飢餓狀態。spa
在讀下面的內容的時候,讓咱們來先了解一下生產者和消費者的模式,主要有個例子,更好地理解下面的內容!線程
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依賴於 lock 來產生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生產
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去
} finally {
lock.unlock();
}
}
// 消費
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 隊列爲空,等待,直到隊列 not empty,才能繼續消費
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去
return x;
} finally {
lock.unlock();
}
}
}
複製代碼
上面只是貼個代碼,先讓讀者知道怎麼用ReentrantLock和Condition,關於更多的細節,請讀者自行百度便可。
Condition接口的實現類。Condition的方法依賴於ReentrantLock,而ReentrantLock又依賴於AbstractQueueSynchronized類。
內部結構
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
複製代碼
在AQS中,其中線程等待的隊列咱們成爲阻塞隊列,這裏先引入條件隊列(condition queue),這裏先上圖看二者的關係。
await():是能夠被中斷的,調用這個方法的線程會阻塞,直到調用Signal()或者被中斷。
awaitUninterruptibly():不可被中斷,就是說有中斷信號來了但不會響應。
public final void await() throws InterruptedException {
//剛開始的時候看看有沒有被中斷過,若是有就響應唄。
if (Thread.interrupted())
throw new InterruptedException();
// 把當前線程封裝爲Node節點,添加到條件隊列的隊尾。
Node node = addConditionWaiter();
//由於在await以前,這個線程確定是擁有鎖的,因此這裏徹底釋放鎖,爲何要徹底釋放鎖,是由於考慮到重入的問題。
//這裏的saveState返回的是釋放鎖以前的state值。
int savedState = fullyRelease(node);
int interruptMode = 0;
//這裏判斷的await的節點有沒有進入到阻塞隊列,也叫同步隊列。
//若是進入,返回true,不會執行while循環
//若是沒進入,就等着唄,等待其餘線程叫醒你或者中斷信號來臨唄。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//到了這一步,已經說明node節點在阻塞隊列了。那麼就爭搶資源,也就是AQS的爭搶資源的方式。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
複製代碼
private Node addConditionWaiter() {
Node t = lastWaiter; //指向條件隊列的尾節點
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//若是進入到這裏,就說明條件隊列這時候不爲空,可是尾節點取消了
//由於在尾節點加入的時候,會把ws設施爲CONDITION的。
//先把取消等待的節點清除,而後再找到真正的尾節點
unlinkCancelledWaiters();
t = lastWaiter;
}
//以前說過,加入條件隊列的時候會把ws設置爲CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)//這裏說明條件隊列爲空,也就是說第一次加入
//設置好頭指針
firstWaiter = node;
else
//那麼這裏就是條件隊列不爲空唄,那就加入真正尾節點後頭就行了
t.nextWaiter = node;
lastWaiter = node;
return node;
}
複製代碼
在addConditionWaiter中,有一個unlinkCancelledWaiters()方法,顧名思義就是取消不等待的節點,由於條件隊列是單向鏈表,因此涉及到鏈表的操做。
就是說若是在await的時候,節點取消等待或者是說節點入隊的時候,發現最後一個節點已經取消了,那麼就調用一次這個方法
// 等待隊列是一個單向鏈表,遍歷鏈表將已經取消等待的節點清除出去
// 純屬鏈表操做,很好理解,看不懂多看幾遍就能夠了
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 若是節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
複製代碼
在await方法中,把節點加入到條件隊列以後,就須要徹底釋放鎖了。由於考慮到重入的問題,因此必需要徹底釋放。
例子:condition.await以前,當前節點就已經執行了兩次lock()操做,那麼state爲2,也就是說擁有兩把鎖。那麼fullyRelease就應該設置state爲0,返回2,返回沒釋放鎖以前所擁有的鎖的數量。而後再進行掛起。當被喚醒的時候,依舊須要兩把鎖才能進行執行下去。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState; //若是徹底釋放鎖成功,那麼這裏返回的是釋放鎖以前所用有鎖的個數
} else {
//到這一步說明釋放鎖不成功,好比說有個沒有鎖的線程可是執行了釋放鎖,那麼確定會到達這一步,因此會跑出異常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
//到達這裏說明faile = true,也就是說release(savedState)返回的是false,拋出了異常,說明該節點操做不當,那麼就得設置爲取消,方便後面有節點加入的時候,觸發unlinkCancelledWaiters,把這個節點請出去
node.waitStatus = Node.CANCELLED;
}
}
複製代碼
在await方法中,徹底釋放鎖以後,這個方法會判斷該線程表明的隊列是否在阻塞隊列中,注意是阻塞隊列。
前面咱們也說過,在節點加入阻塞隊列的時候,會把ws 設置爲Node.Condition
final boolean isOnSyncQueue(Node node) {
//若是在條件隊列的節點被移到阻塞隊列,那麼waitStatus會被設置爲0,也就是在signal的時候,這個後面說
//因此,若是當前節點的waitStatus若是仍是Condition的話,那麼確定不在阻塞隊列中
//由於是把條件隊列的firstWaiter移動到阻塞隊列的,因此firstWaiter的pre是null,那麼確定不在阻塞隊列中。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
//這裏很簡單理解啦,在上面的前提下,若是next不爲null,那麼確定在阻塞隊列中啦,由於條件隊列用的是nextWaiter,阻塞隊列用的是next屬性
return true;
//上面曾經試過用node.pre == null 判斷不在阻塞隊列中,那麼node.pre != null就能夠判斷在阻塞隊列中了嗎?
//答案非也,由於在節點加入AQS的阻塞隊列的時候,會先將節點的pre設置爲tail,而後再進行CAS操做將當前節點設置爲tail節點,若是這個CAS操做失敗了,那麼此時tail節點並非當前節點,那麼也說明該節點並不在隊列中
//因此這裏有一個從尾節點往前找的函數
return findNodeFromTail(node);
}
//這個函數很簡單,就是若是沒在隊列的話,那麼t的最終結果爲null,返回false,若是找到就返回true唄
//這裏的不在隊列有種狀況,就是上面說的CAS將當前節點設置爲tail節點失敗,讀者能夠整理下思路。
private boolean findNodeFromTail(Node node) Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
複製代碼
若是isOnSynchronized返回false的話,那麼就先掛起等待唄,也就是等待其餘線程signal或者中斷 喚醒線程,將這個線程加入阻塞隊列!
前面說到線程掛起了,就等待着被喚醒唄!那麼這裏先說下signal函數,方便後面理解。
喚醒操做通常是由另一個線程操做的,調用同一個對象的signal對象喚醒就好,其實喚醒的操做就是將節點從條件隊列移動到阻塞隊列中。
public final void signal() {
if (!isHeldExclusively())//若是線程沒有擁有排他鎖,也就是沒有獨佔鎖,拋出異常唄
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//到這裏就是找到條隊列的第一個節點,而後從這個節點找並無取消等待的線程!由於可能這時候firstWaiter這個節點已經取消等待了唄。
doSignal(first);
}
//----------------------------------------doSignal--------------------------------------
//從隊友找到第一個加入阻塞隊列的節點
//由於以前說過條件隊列的節點取消放棄等待
private void doSignal(Node first) {
do {
//將firstWaiter指向first後面一個節點,由於如今這個firstWaiter將要離開隊列啦!
if ( (firstWaiter = first.nextWaiter) == null)
//到這裏說明若是firstWaiter離開以後條件隊列都爲空了,那麼也lastWaiter指針也置爲null啦
lastWaiter = null;
//斷開firstWaiter啦!
first.nextWaiter = null;
} while (!transferForSignal(first) && //這個while循環就是判斷這個first指向的節點是否轉移成功,若是返回false的話,那麼就是轉移不成功,也就是說取消了,那麼就查詢下一個節點。返回true就是說加入阻塞隊列成功啦,那麼這個函數就完成使命啦!
(first = firstWaiter) != null);
}
//-----------------------------------------------transferForSignal---------------------
final boolean transferForSignal(Node node) {
//這裏以前說過加入條件隊列的時候節點的waitStatus會初始化爲Condition,若是這裏CAS更新失敗,說明狀態不爲Condition,就說明取消了唄。那麼返回false,繼續尋找下一個想加入阻塞隊列的節點。
//若是更新成功就把節點的ws設置爲0,爲初始態,就是爲了加入阻塞隊列。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//到達這裏說明CAS成功,ws也爲0,那麼說明就能夠自旋進入阻塞隊列啦!
Node p = enq(node);//返回加入阻塞隊列後的前驅節點。
int ws = p.waitStatus;
//這裏是根據前驅節點的狀態,若是ws>0,說明就是取消等待啦
// 若是ws < 0,就是說該節點沒有取消等待進入阻塞隊列。就先把加入阻塞隊列的節點的ws設置爲SIGNAL,這個就很少說啦,具體看上一篇文章便可!
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//到這裏說明節點要麼取消,要麼CAS想要加入阻塞隊列的節點的狀態失敗,那麼就先喚醒,爲啥要喚醒呢?後面講!
LockSupport.unpark(node.thread);
// 無論有沒有執行上一個if語句,都說明進入阻塞隊列已經成功啦!
return true;
}
複製代碼
在signal以後,線程所表明的node節點確定已經加入到阻塞隊列中啦!而後就準備去獲取鎖。
以前不是說到線程已經掛起了嗎?signal以後或者被中斷就已經喚醒啦
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 線程掛起
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
複製代碼
有如下幾種狀況會讓線程從LockSupport.park(this);這步往下走
線程被喚醒以後,第一步就是執行checkInterruptWhileWaiting(node)這個函數啦,若是返回非0,就是說await的線程在park期間或者說signal以後(unpark以後)被中斷過,因此就要區分究竟是signal以前中斷仍是signal以後中斷。
若是返回0,那麼就是說線程在park的時候,沒有被中斷,被unpark就是在阻塞隊列正常被喚醒的!
checkInterruptWhileWaiting返回的值
- 若是在signal以前已經進行中斷,那麼返回THROW_IE -1
- 若是在signal以後已經進行中斷,那麼返回REINTERRUPT 1
- 若是返回0,那麼表示在park期間沒有進行中斷
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
複製代碼
主要在transferAfterCancelledWait這個方法裏,咱們下面來看一下它的代碼
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//若是進行到這裏,那麼說明是signal以前就被中斷的。由於signal的時候,會把await在條件隊列的線程的waitStatus置爲0,if判斷裏面的那個CAS操做就會失敗了。因此就說明若是到這裏,那麼確定還在條件對了,而且沒有被signal
enq(node);//從條件隊列進入阻塞隊列,注意這裏的nextWaiter並無置位空,若是後面還有節點,那麼也會一塊兒帶去阻塞隊列
return true;
}
/* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */
//到達這裏說明CAS失敗,有兩種狀況
// 1. 是已經signal完成,已經把節點移到阻塞隊列中了,就不會進入下面那個while循環
// 2. 尚未徹底轉移到阻塞隊列中,那麼就進入while循壞咯,自旋等待,直到進入阻塞隊列爲止,可是這種狀況如翻譯所示,是很是罕見和稀少的
while (!isOnSyncQueue(node))
Thread.yield();
return false;
複製代碼
因此通過上面的分析,只要是某個線程被中斷,那麼無論這個線程所表明的node節點是否是firstWaiter,也就是是否是會被signal,都會被喚醒,而後進入阻塞隊列。
咱們回到await函數,在上面的while退出以後,也就是咱們的節點不論是被signal也好仍是被中斷也好,都已經進入到阻塞隊列了,這點是毋庸置疑的。進入阻塞隊列後幹嗎呢?固然是爲了獲取鎖啦,那麼如何獲取鎖呢?還記得上一節咱們講的進入阻塞隊列以後如何獲取鎖麼?若是不記得就去看看吧!這裏放出鏈接
// acquireQueued上節已經分析過,再也不綴訴。這裏判斷中斷是否是signal以後中斷的
//若是是signal前的,到後面就直接拋出異常,不會進行這一步。
//若是是signal以後,可是在阻塞隊列等待的時候被中斷了,也就是說acquireQueued的時候被中斷過。前面退出while循環的時候多是沒有中斷退出(interruptMode == 0),也多是中斷退出,只要acquireQueued的時候被中斷過,而且是signal以後,都要從新設置下interruptMode,這裏讀者能夠好好捋一下!
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
複製代碼
if (node.nextWaiter != null) // clean up if cancelled
//若是執行到這裏,那麼就是說是這個線程時被中斷加入阻塞隊列的,不是經過signal加入的,由於signal的話的nextWaiter早就已經被設置爲null啦!
unlinkCancelledWaiters();
複製代碼
if (interruptMode != 0) // 若是到這裏確定是被中斷啦,管你是signal以前仍是以後
reportInterruptAfterWait(interruptMode);
//-----------------reportInterruptAfterWait------------------------------------------------------+
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE) //就是這裏,若是signal以前就被中斷,拋出異常!!
throw new InterruptedException();
else if (interruptMode == REINTERRUPT) //若是是signal以後中斷,那麼就設置下標誌位就好啊!讓開發者本身去寫外層邏輯去檢測!
selfInterrupt();
}
//---------------------------selfInterrupt------------------------------------
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
複製代碼
下面咱們來分析下,如何取消鎖之間的競爭,當有幾個線程想競爭某個鎖的時候,咱們但願有一個線程不去競爭這個鎖了。
在第一篇文章中咱們用的lock,若是有中斷是不會拋出異常的,只是標記一下狀態而已,具體的由外層函數決定。
在ReentrantLock中,有這樣的一個lock方法,能夠檢測到中斷而且拋出異常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//------------------------------------------------------------
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //若是還沒開始競爭就中斷,也會拋出異常返回
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//-------------------------------doAcquireInterruptibly--------------------------------
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 就是這裏,一旦異常,立刻結束這個方法,拋出異常。
// 這裏再也不只是標記這個方法的返回值表明中斷狀態
// 而是直接拋出異常,並且外層也不捕獲,一直往外拋到 lockInterruptibly,
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
上面咱們看到,acquireInterruptibly裏面的doAcquireInterruptibly在競爭鎖的時候,一旦其餘線程對其產生了中斷,那麼這個線程就立刻結束,再也不去競爭。若是是acquired的話,若是在競爭鎖期間其餘線程對其產生中斷,那麼先先搶得鎖再說,中斷後面再處理!這就是他們的區別啦!
經過寫這篇博客,對AQS的源碼有了進一步的瞭解,本文主要花了大量的篇幅去寫ConditionObject,主要是爲了弄明白Node節點在條件隊列和阻塞隊列的轉換的過程,同時本身因爲以前對中斷不是很熟悉,因此補了一些知識,起碼比以前熟悉了一點,也算是一點小進步吧!後面會繼續瞭解AQS,下次見!