AbstractQueuedSynchronizer AQS源碼分析

申明:jdk版本爲1.8java

AbstractQueuedSynchronizer是jdk中實現鎖的一個抽象類,有排他和共享兩種模式。node

咱們這裏先看排他模式,共享模式後面結合java.util.concurrent.locks.ReentrantReadWriteLock單獨寫一篇隨筆。併發

後面還會分析可重入鎖java.util.concurrent.locks.ReentrantLock。ui

言歸正傳,一塊兒來看看吧。this

AQS中主要是經過state變量來控制鎖狀態的,子類經過重寫下面的某些方法並控制state值來達到獲取鎖或者解鎖效果:spa

通常狀況下要實現本身的鎖,會實現java.util.concurrent.locks.Lock接口,並經過內部類繼承自java.util.concurrent.locks.AbstractQueuedSynchronizer實現。形以下面這樣:線程

好了,AQS的擴展就先到這裏,後面講具體的鎖時再詳細分析。3d

下面分析AQS自己的代碼實現。code

核心方法是java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(int)獲取指定數量的鎖(實際就是給state加多少的問題)以及java.util.concurrent.locks.AbstractQueuedSynchronizer.release(int)釋放指定數量的鎖(實際就是給state減多少的問題)對象

acquire

代碼以及註釋說明以下;

看註釋就明白了,這是以排他模式獲取鎖,並忽略線程中斷,與acquireInterruptibly相對應。acquireInterruptibly在遇到線程被中斷時會拋出InterruptedException異常。

這裏會先調用一次tryAcquire嘗試獲取鎖,若是獲取失敗,就會新建一個Node並加到雙向鏈表尾部,該Node會在自旋里面重復地嘗試直到得到鎖爲止,過程當中線程可能會阻塞或者不阻塞,這點徹底取決於Node中的waitStatus狀態值,這一點後面會詳細分析。

tryAcquire是子類實現的,負責操做state值,經過state值的控制,子類能夠實現各類各樣的鎖,父類的主要邏輯就是控制好Node的鏈表,線程的阻塞、喚醒等。

下面看下addWaiter方法,方法的邏輯就是用一個Node對象包裝當前線程,並將Node加入到雙向鏈表的尾部:

acquire採用的是排他模式,這裏的入參是Node.EXCLUSIVE,建立一個排他模式的Node。大部分狀況下tail是不爲null的,看註釋就知道了,

這裏外部加了層判斷嘗試以最快方式將新建的Node掛到鏈表尾部。

由於是併發環境,因此compareAndSetTail有可能失敗,失敗的話就進入enq方法,以自旋方式往鏈表尾部添加。

addWaiter完以後將剛剛新建的Node傳入acquireQueued,自旋嘗試得到鎖:

經過上面代碼可知,若是node的前任node是head,而且tryAcquire得到鎖成功,就將當前node設爲head並返回是不是由於線程中斷而從阻塞狀態喚醒的。

不然,shouldParkAfterFailedAcquire,先檢查是否要阻塞當前線程,若是須要阻塞,則調用parkAndCheckInterrupt,阻塞線程,

並在喚醒後檢查是不是終端致使的線程喚醒,同時設置interrupted = true

下面分別看下這兩個方法:

shouldParkAfterFailedAcquire:

根據前任節點的狀態決定是否應該阻塞,若是前任節點狀態爲Node.SIGNAL就直接阻塞當前節點對應的線程,不然檢查前任節點狀態是否爲cancelled,

若是是的話,就將前面全部狀態爲cancelled的節點剔除,剩下的邏輯就是前任狀態設置爲Node.SIGNAL,總結一句話,在自旋過程當中若是沒有成功得到鎖

的狀況下,兜兜轉轉最終會返回true,阻塞當前線程。

node狀態描述以下:

返回true後就會執行parkAndCheckInterrupt方法,以下:

調用LockSupport.park阻塞當前線程,LockSupport的分析見個人另一篇隨筆。

這裏喚醒通常經過LockSupport.unpark喚醒,注意,線程的interrupt方法也會喚醒該線程,因此這裏調用了Thread.interrupted()靜態方法

判斷喚醒方式。喚醒後就會繼續進入自旋嘗試得到鎖。

release

釋放指定數量的鎖,調用子類tryRelease,將State減去入參對應的數值,若是爲0,則表示鎖完全釋放,進入unparkSuccessor方法,喚醒head後面節點對應的線程。

 正常狀況下喚醒head後面一個節點的thread,可是,若是nextNode爲cancel狀態,就從tail往前找最前面符合條件的node。

至此,鎖的獲取、釋放都講完了,還有一個涉及到共享模式的那一路邏輯後面結合java.util.concurrent.locks.ReentrantReadWriteLock再分析。

ConditionObject

首先,一把AQS鎖能夠new多個ConditionObject對象,調用await和signal必須在鎖的lock和unlock中間,也就是必須得到鎖的狀況下才能調用,不然會拋出異常。

相似synchronized中調用監視器的wait和notify,不一樣的是AQS同一把鎖能夠有多個conditionObject。

下面詳細分析下AQS的await和signal。

await

/**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
       // 檢查線程是否已經中斷
if (Thread.interrupted()) throw new InterruptedException();
       // 將當前線程包裝進Node,狀態爲Node.CONDITION,並加到條件隊列末尾 Node node
= addConditionWaiter();
       // 釋放掉該線程獲取的全部鎖狀態
int savedState = fullyRelease(node); int interruptMode = 0;
       // 循環等待當前node直到該node從條件隊列轉入等待隊列
       // 或者該線程被中斷,也就是checkInterruptWhileWaiting返回0
while (!isOnSyncQueue(node)) {
         // 若是尚未轉移到等待隊列則阻塞當前線程
         
// 這裏的線程喚醒有一下幾種狀況:
         // 1.調用signal後轉移到等待隊列並排隊到該node正常喚醒
         // 2.線程中斷喚醒
         // 3.signal時調用transferForSignal入隊後檢查發現前驅節點取消了,或者對前驅節點設置狀態爲Node.SIGNAL時CAS失敗
         LockSupport.park(this);
         // 喚醒後檢查狀態,是否被中斷喚醒,若是不是返回0,若是是,再看是在signal以前仍是以後,
         // 以前的話返回THROW_IE,await結束後拋出InterruptedException異常
         // 以後的話返回REINTERRUPT,須要從新設置中斷標誌
         // 值得說明的是,線程被中斷喚醒後,無論處在條件隊列的什麼位置,都會直接將對應node轉移到等待隊列
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
}
       // 分析checkInterruptWhileWaiting可知,即便線程中斷也會轉移到等待隊列,因此這裏acquireQueued自旋排隊獲取鎖
       // 若是acquireQueued返回true則說明等待過程當中發生了中斷,若是不是signal以前發生的中斷,須要從新設置中斷狀態以便外部邏輯感知到
       // 由於checkInterruptWhileWaiting和parkAndCheckInterrupt中都是調用的Thread.interrupted靜態方法,這個方法會清除中斷狀態
       if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
       // 正常signal的node會doSignal的時候設置nextWaiter = null,
       //因此這裏的條件知足狀況就是在直到得到鎖以前都沒有調用signal(由於signal開始執行就設置了
nextWaiter = null)
       if (node.nextWaiter != null) // clean up if cancelled
         // 在上述狀況下,將node從條件隊列中移除
          unlinkCancelledWaiters();
      // 若是發生中斷,則根據中斷狀況向外反饋
      if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
   }

整個主線邏輯就是上面分析的那樣,下面詳解下關鍵的方法:

checkInterruptWhileWaiting,檢查是不是中斷致使的線程喚醒並返回後續如何處理的標誌:

THROW_IE:拋出異常

REINTERRUPT:從新設置中斷標誌

從上面代碼可知,若是不是中斷,則返回0,從新進入while循環判斷是否在等待隊列中,若是是中斷,則判斷中斷時機:

這裏的中斷時機主要針對的signal以前仍是以後,由於在signal中會先將node狀態設置爲0再將node轉移到等待隊列,以下圖所示:

由於都是CAS操做,因此這裏經過判斷可否將狀態由CONDITION設置爲0判斷是在signal以前仍是以後,若是設置成功,說明在signal操做以前,則將node加入到等待隊列並返回true,對應的狀態是THROW_IE,若是設置失敗,說明signal中的CAS操做搶先了,那就while循環等待調用signal的線程將該node轉移到等待隊列,對應的狀態是REINTERRUPT。

備註:ConditionObject在jdk的BlockingQueue中有很好應用,能夠結合一塊兒看下效果更佳。

相關文章
相關標籤/搜索