掃描下方二維碼或者微信搜索公衆號
菜鳥飛呀飛
,便可關注微信公衆號,閱讀更多Spring源碼分析
和Java併發編程
文章。java
在上一篇文章中分析了隊列同步器(AQS)的設計原理,這一篇文章將結合具體的源代碼來分析AQS。若是理解了上一篇文章中介紹的同步隊列的數據結構,那麼源碼看起來相對比較好理解。讀過Spring源碼的朋友應該能感覺到Spring中方法和變量的命名是很是規範的,見名知意。與Spring相比,JDK中某些類的源碼,命名則不是那麼規範,裏面存在各類簡寫,單個字母命名變量,有時候看起來十分燒腦。其中AQS中就存在這種狀況,因此若是不瞭解AQS的設計原理,直接看源碼,就會顯得比較困難。node
public static void main(String[] args) {
// 公平鎖
ReentrantLock lock = new ReentrantLock(true);
try{
// 加鎖
lock.lock();
System.out.println("Hello World");
}finally {
// 釋放鎖
lock.unlock();
}
}
複製代碼
static final class FairSync extends Sync {
final void lock() {
// 調用acquire()獲取同步狀態
acquire(1);
}
}
複製代碼
acquire()
。最終的加鎖邏輯就是在acquire()方法中實現的。acquire()
的源碼以下:public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 調用selfInterrupt()方法從新設置中斷標識
selfInterrupt();
}
複製代碼
acquire()
這個方法上簡單寫了些註釋,也能夠直接參考。public final void acquire(int arg) {
/* * acquire()的做用是獲取同步狀態,這裏同步狀態的含義等價於鎖 * tryAcquire()方法是嘗試獲取同步狀態,若是該方法返回true,表示獲取同步狀態成功;返回false表示獲取同步狀態失敗 * 第1種狀況:當tryAcquire()返回true時,acquire()方法會直接結束。 * 由於此時!tryAcquire(arg) 就等於false,而&&判斷具備短路做用,當由於&&判斷前面的條件判斷爲false,&&後面的判斷就不會進行了,因此此時就不會執行後面的if語句,此時方法就直接返回了。 * 這種狀況表示線程獲取鎖成功了 * 第2中狀況:當tryAcquire()返回false時,表示線程沒有獲取到鎖, * 這個時候就須要將線程加入到同步隊列中了。此時 !tryAcquire() == true,所以會進行&&後面的判斷,即acquireQueued()方法的判斷,在進行acquireQueued()方法判斷以前會先執行addWaiter()方法。 * * addWaiter()方法返回的是當前線程表明的節點,這個方法的做用是將當前線程放入到同步隊列中。 * 而後再調用acquireQueued()方法,在這個方法中,會先判斷當前線程表明的節點是否是第二個節點,若是是就會嘗試獲取鎖,若是獲取不到鎖,線程就會被阻塞;若是獲取到鎖,就會返回。 * 若是當前線程表明的節點不是第二個節點,那麼就會直接阻塞,只有當獲取到鎖後,acquireQueued()方法纔會返回 * * acquireQueued()方法若是返回的是true,表示線程是被中斷後醒來的,此時if的條件判斷成功,就會執行selfInterrupt()方法,該方法的做用就是將當前線程的中斷標識位設置爲中斷狀態。 * 若是acquireQueued()方法返回的是false,表示線程不是被中斷後醒來的,是正常喚醒,此時if的條件判斷不會成功。acquire()方法執行結束 * * 總結:只有當線程獲取到鎖時,acquire()方法纔會結束;若是線程沒有獲取到鎖,那麼它就會一直阻塞在acquireQueued()方法中,那麼acquire()方法就一直不結束。 * */
// tryAcquire()方法是AQS中定義的一個方法,它須要同步組件的具體實現類來重寫該方法。所以在公平鎖的同步組件FairSync和非公平鎖的同步組NonfairSync中,tryAcquire()方法的實現代碼邏輯是不同的。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 調用selfInterrupt()方法從新設置中斷標識
selfInterrupt();
}
複製代碼
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取同步狀態state的值(在AQS中,state就至關於鎖,若是線程能成功修改state的值,那麼就表示該線程獲取到了鎖)
int c = getState();
if (c == 0) {
// 若是c等於0,表示尚未任何線程獲取到鎖
/** * 此時可能存在多個線程同時執行到這兒,均知足c==0這個條件。 * 在if條件中,會先調用hasQueuedPredecessors()方法來判斷隊列中是否已經有線程在排隊,該方法返回true表示有線程在排隊,返回false表示沒有線程在排隊 * 第1種狀況:hasQueuedPredecessors()返回true,表示有線程排隊, * 此時 !hasQueuedPredecessors() == false,因爲&& 運算符的短路做用,if的條件判斷爲false,那麼就不會進入到if語句中,tryAcquire()方法就會返回false * * 第2種狀況:hasQueuedPredecessors()返回false,表示沒有線程排隊 * 此時 !hasQueuedPredecessors() == true, 那麼就會進行&&後面的判斷,就會調用compareAndSetState()方法去進行修改state字段的值 * compareAndSetState()方法是一個CAS方法,它會對state字段進行修改,它的返回值結果又須要分兩種狀況 * 第 i 種狀況:對state字段進行CAS修改爲功,就會返回true,此時if的條件判斷就爲true了,就會進入到if語句中,同時也表示當前線程獲取到了鎖。那麼最終tryAcquire()方法會返回true * 第 ii 種狀況:若是對state字段進行CAS修改失敗,說明在這一瞬間,已經有其餘線程獲取到了鎖,那麼if的條件判斷就爲false了,就不會進入到if語句塊中,最終tryAcquire()方法會返回false。 */
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 當前線程成功修改了state字段的值,那麼就表示當前線程獲取到了鎖,那麼就將AQS中鎖的擁有者設置爲當前線程,而後返回true。
setExclusiveOwnerThread(current);
return true;
}
}
// 若是c等於0,則表示已經有線程獲取到了鎖,那麼這個時候,就須要判斷獲取到鎖的線程是否是當前線程
else if (current == getExclusiveOwnerThread()) {
// 若是是當前線程,那麼就將state的值加1,這就是鎖的重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 由於此時確定只有一個線程獲取到了鎖,只有獲取到鎖的線程纔會執行到這行代碼,因此能夠直接調用setState(nextc)方法來修改state的值,這兒不會存在線程安全的問題。
setState(nextc);
// 而後返回true,表示當前線程獲取到了鎖
return true;
}
// 若是state不等於0,且當前線程也等於已經獲取到鎖的線程,那麼就返回false,表示當前線程沒有獲取到鎖
return false;
}
}
複製代碼
1.
若是等於0,就表示目前尚未線程持有到鎖,那麼這個時候就會先調用hasQueuedPredecessors()
方法判斷同步隊列中有沒有等待獲取鎖的線程,若是有線程在排隊,那麼當前線程確定獲取鎖失敗(由於AQS的設計的原則是FIFO,既然前面有人已經在排隊了,你就不能插隊,老老實實去後面排隊去),那麼tryAcquire()方法會返回false。若是同步隊列中沒有線程排隊,那麼就讓當前線程對state進行CAS操做,若是設置成功,就表示當前獲取到鎖了,返回true;若是CAS失敗,表示在這一瞬間,鎖被其餘線程搶走了,那麼當前線程就獲取鎖失敗,就返回false。2.
若是不等於0,就表示已經有線程獲取到鎖了,那麼此時就會去判斷當前線程是否是等於已經持有鎖的線程(getExclusiveOwnerThread()方法的做用就是返回已經持有鎖的線程
),若是相等,就讓state加1,這就是所謂的重入鎖,重入鎖就是容許同一個線程屢次獲取到鎖。3.
state既不等於0,當前線程也不等於持有鎖的線程,那麼就返回false,表示當前線程獲取到鎖失敗。addWaiter()方法不必定會執行到,它依賴第一步tryAcquire()的執行結果。若是線程獲取鎖成功,那麼tryAcquire()方法就會返回true,此時就不會進行後面的第二步、第三步、第四步了。只有當線程獲取鎖失敗了,tryAcquire()方法會返回false,這個時候須要將線程加入到同步隊列中,因此須要執行後面的步驟。編程
private Node addWaiter(Node mode) {
// 先根據當前線程,構建一個Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 而後須要把當前線程表明的節點添加到隊列的尾部,此時隊尾多是null,由於可能這是第一次出現多個線程爭搶鎖,隊列尚未初始化,即頭尾均爲null
Node pred = tail;
if (pred != null) {
// 若是隊尾不爲null,就直接將當前線程表明的node添加到隊尾,修改node節點中prev的指針指向
node.prev = pred;
// 利用CAS操做設置隊尾,若是設置成功,就修改倒數第二個節點的next的指針指向,而後方法return結束
// 在多個線程併發的狀況下,CAS操做可能會失敗,若是失敗,就會進入到後面的邏輯
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若是隊尾爲null或者CAS設置隊尾失敗,就調用enq()方法將當前線程表明的節點加入到同步隊列當中
enq(node);
// 將當前線程表明的節點返回
return node;
}
複製代碼
enq(node)
方法進行同步隊列的初始化和當前線程的入隊操做。若是尾結點不爲null,就表示同步隊列已經被初始化了,只須要將當前線程所表明的節點加入到同步隊列隊尾便可。因爲加入到隊尾這個操做存在併發的可能性,因此須要利用CAS操做(即compareAndSetTail()
方法)來保證原子性,若是CAS成功,就將舊的隊尾節點的next指針指向新的隊尾節點,這樣就完成了節點加入到隊尾的操做了,同時addWaiter()方法return結束。若是CAS操做失敗,那麼就會調用enq(node)
方法來實現入隊操做。enq(node)
方法的源碼以下:private Node enq(final Node node) {
// 無限循環
for (;;) {
Node t = tail;
// 若是尾結點爲null,說明這是第一次出現多個線程同時爭搶鎖,所以同步隊列沒有被初始化,tail和head此時均爲null
if (t == null) { // Must initialize
// 設置head節點,注意此時是new了一個Node節點,節點的next,prev,thread屬性均爲null
// 這是由於對於頭結點而言,做爲隊列的頭部,它的next指向原本就應該是null
// 而頭結點的thread屬性爲空,這是AQS同步器特地爲之的
// 頭結點的prev屬性此時爲空,當進行第二次for循環的時候,tail結點就不爲空了,所以不會進入到if語句中,而是進入到else語句中,在這裏對頭結點的prev進行了賦值
if (compareAndSetHead(new Node()))
// 第一次初始化隊列時,頭結點和尾結點是同一個結點
tail = head;
} else {
// 隊尾結點不爲null,說明隊列已經進行了初始化,這個時候只須要將當前線程表示的node結點加入到隊列的尾部
// 設置當前線程所表明的節點的prev的指針,使其指向尾結點
node.prev = t;
// 利用CAS操做設置隊尾結點
if (compareAndSetTail(t, node)) {
// 而後修改隊列中倒數第二個節點的next指針,使其指向隊尾結點
t.next = node;
// 而後返回的是隊列中倒數第二個節點(也就是舊隊列中的尾結點)
return t;
}
}
}
}
複製代碼
enq(node)
方法中進行了一個for(;;)的無限循環,在循環中會先判斷隊尾是否是null,是null就表示同步隊列沒有被初始化,就採用CAS操做(即compareAndSetHead()
方法)來初始化隊列,並讓tail = head。注意:在建立頭結點時,頭結點的thread、prev、next屬性均爲null,在進行第二次循環時,next屬性會被賦值,可是thread、prev屬性始終是null。compareAndSetTail()
方法)將當前線程所表明的Node節點設置爲隊尾,而後修改以前隊尾的next指針,維護好隊列節點中的雙向關聯關係,就將舊的隊尾節點返回。final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 無限for循環
for (;;) {
// 獲取當前線程所表明節點的前一個節點
final Node p = node.predecessor();
// 若是前一個節點是頭結點(即當前線程是隊列中的第二個節點),那麼就調用tryAcquire()方法讓當前線程去嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
// 若是當前線程獲取鎖成功,那麼就將當前線程所表明的節點設置爲頭結點(由於AQS的設計原則就是:隊列中佔有鎖的線程必定是頭結點)
setHead(node);
// 而後將原來的頭節點的next指針設置爲null,這樣頭節點就沒有任何引用了,有助於GC回收。
p.next = null; // help GC
failed = false;
// 返回interrupted所表明的值,表示當前線程是否是經過中斷而醒來的
return interrupted;
}
// 若是前一個節點不是頭結點 或者 前一個節點是頭結點,可是當前節點調用tryAcquire()方法獲取鎖失敗,那麼就會執行到下面的if判斷
/** * 在if判斷中,先調用了shouldParkAfterFailedAcquire()方法。 * 在第一次調用shouldParkAfterFailedAcquire()時,確定返回false(爲何會返回false,能夠看下shouldParkAfterFailedAcquire()方法的源碼) * 因爲當前代碼是處於無限for循環當中的,因此當後面出現第二次代碼執行到這兒時,會再次調用shouldParkAfterFailedAcquire()方法,此時這個方法會返回true。 * 當shouldParkAfterFailedAcquire()返回true時,if判斷中就會再調用parkAndCheckInterrupt()方法,該方法會將當前線程進行阻塞,直到這個線程被喚醒或者被中斷。 * 所以當線程獲取不到鎖時,就會一直阻塞到這兒。直到被其餘線程喚醒,纔會繼續向下執行,當線程想來後,再次進入到當前代碼的無限for循環中,除非線程獲取到鎖,纔會return返回 */
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 若是在try中出現異常,那麼此時failed就會爲true,就會取消獲取鎖
// failed的初始值爲true,若是try語句中不出現異常,最終線程獲取到鎖後,就會將failed置爲false,那麼下面的if判斷條件就不會成立,也就不執行cancelAcquire()方法了
// 若是上面的try語句存在異常,那麼就不會將failed設置爲false,那麼就會執行cancelAcquire()方法
if (failed)
cancelAcquire(node);
}
}
複製代碼
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
。shouldParkAfterFailedAcquire()
方法,該方法用來判斷當前線程是否應該park,便是否應該將當前線程阻塞。若是返回true,就表示須要阻塞,那麼就會接着執行parkAndCheckInterrupt()方法,在parkAndCheckInterrupt()
方法中,就會將當前線程阻塞起來,直到當前線程被喚醒並獲取到鎖之後,acquireQueued()方法纔會返回。parkAndCheckInterrupt()方法的源碼以下:private final boolean parkAndCheckInterrupt() {
// 將當前線程掛起
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 在初始狀況下,全部的Node節點的waitStatus均爲0,由於在初始化時,waitStatus字段是int類型,咱們沒有顯示給它賦值,因此它默認是0
// 後面waitStatus字段會被修改
// 在第一進入到shouldParkAfterFailedAcquire()時,因此waitStatus是默認值0(由於此時沒有任何地方對它進行修改),因此ws=0
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 若是ws = -1,就返回true
// 若是第一次進入shouldParkAfterFailedAcquire()方法時,waitStatus=0,那麼就會進入到後面的else語句中
// 在else語句中,會將waitStatus設置爲-1,那麼當後面第二次進入到shouldParkAfterFailedAcquire()方法時,就會返回true了
return true;
if (ws > 0) {
/* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
// 若是ws既不等於-1,也不大於0,就會進入到當前else語句中
// 此時會調用CAS方法將pred節點的waitStatus字段的值改成-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
處於等待狀態
的節點去嘗試獲取鎖。仍是以上面的Demo爲例,當調用lock.unlock()時,最終會調用到AQS中的release()方法,release()方法的源碼以下。public final boolean release(int arg) {
if (tryRelease(arg)) {
// 當釋放鎖成功之後,須要喚醒同步隊列中的其餘線程
Node h = head;
// 當waitStatus!=0時,表示同步隊列中還有其餘線程在等待獲取鎖
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 判斷當前線程是否是持有鎖的線程,若是不是就拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 由於上面一步已經確認了是當前線程持有鎖,因此在修改state時,確定是線程安全的
boolean free = false;
// 由於可能鎖被重入了,重入了幾回就須要釋放幾回鎖,因此這個地方須要判斷,只有當state=0時才表示徹底釋放了鎖。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
unparkSuccessor()
方法來喚醒下一個處於等待狀態的線程;若是沒有,tryRelease()就會直接返回true。h != null && h.waitStatus != 0
這一個判斷條件來判斷的。h != null && h.waitStatus != 0
時,能夠認爲同步隊列中有線程在等待獲取鎖。unparkSuccessor()
方法去喚醒下一個等待狀態的節點。unparkSuccessor()方法的源碼以下private void unparkSuccessor(Node node) {
/* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/** * 同步隊列中的節點鎖表明的線程,可能被取消了,此時這個節點的waitStatus=1 * 所以這個時候利用for循環從同步隊列尾部開始向前遍歷,判斷節點是否是被取消了 * 正常狀況下,當頭結點釋放鎖之後,應該喚醒同步隊列中的第二個節點,可是若是第二個節點的線程被取消了,此時就不能喚醒它了, * 就應該判斷第三個節點,若是第三個節點也被取消了,再依次日後判斷,直到第一次出現沒有被取消的節點。若是都被取消了,此時s==null,因此不會喚醒任何線程 */
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
第一個waitStatus小於0
的節點,而後將其喚醒。爲何是waitStatus小於等於0
?由於waitStatus=1表示線程是取消狀態,沒資格獲取鎖。最後調用LockSupport.unpark()方法喚醒符合條件的線程,此時線程被喚醒後,就回到上面獲取鎖流程中
的parkAndCheckInterrupt()方法處,接着就執行後面的邏輯了。