AQS實現的原理及一個實例分析(ReentrantLock)

1、 AQS是什麼?

1.1 介紹:

AQS(Abstracting Queue Sychronizer),望文生義,即一個抽象隊列 + 一個須要同步的狀態,所謂抽象隊列即這個隊列並非真是存在的(通俗的講,不是一個LinkedList對象),而是像HashMap中的鏈表同樣,只存在Node之間的關係中,每一個Node負責維護前置與後置節點,以及持有一個請求線程(能夠理解爲將一個請求線程封裝成Node);node

1.2 爲何經過上述的方式實現:

  1. 爲何不能夠經過LinkedList去維護呢?緣由是在多線程尾插List時線程不安全,咱們都知道LinkedList並非一個線程安全的類,因此AQS採用了CAS+死循環的方式實現了插入的串行化,不知道在看的你看沒看過 《高性能Mysql》 ,其中就提到一個郵箱系統,其實與這個場景相似,在事務隔離級別爲第四級串行化時是能夠保證線程安全的;
  2. 共享的狀態是什麼?是一個Volatile值,用Volatile保證每一個請求的線程均可以看見當前最新的狀態,以避免產生線程衝突;
/**
     * The synchronization state.
     */
    private volatile int state;
複製代碼

這個值當你本身去實現鎖的時候你能夠本身定義規則, 《Java併發編程的藝術》 一書中本身定義了一個能夠同時被兩個線程持有的鎖(共享式),而且將state值設置爲2,每當有一個線程獲取到鎖後,將該值減1,當state值再減去一便小於零時,這個線程便只能加入同步隊列而且開始自旋等待鎖。sql

2、爲何須要AQS?

  1. 由於安全的在多線程下訪問共享資源的需求在JAVA1.5的時候愈演愈烈,因此架包的實現者就想提供一個能夠實現同步的基礎框架; AQS(Abstracting Queue Sychronizer)面向的是鎖的實現者,它簡化了實現鎖的方式,屏蔽了同步狀態管理,FIFO隊列管理,線程的等待與喚醒等底層操做,讓鎖的實現者更多的去關注鎖須要實現的功能,並經過模板設計模式提供了比較好的靈活性;而鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現的細節。

3、AQS是如何實現的?

3.1 AQS是如何將請求的線程封裝成Node的呢?又是如何將Node鏈接成隊列的呢?

  1. 既然是封裝,那Node中便會持有一個請求Thread對象,而且爲了創建Node之間的聯繫Node中會維護前置與後置節點指針,而AQS中會維護頭尾節點指針,此時注意這裏維護的是同步隊列(在這個隊列上的線程都在不斷嘗試是否能夠獲取到鎖,由於在同步隊列上即可以調用AQS的acquireQueue方法,而這個方法使得爲獲取到鎖的線程檢查本身是否有資格獲取鎖,若是沒有,則調用LockSupport().park()方法將Node中的線程狀態改成WAITING,等待被喚醒或被中斷);

強調同步隊列是由於,還有多個等待隊列(與synchronized中Monitor對象的WaitSet一個意思,不過Monitor對象只有一個,而AQS能夠有多個等待隊列,視Conditon的數量爲定),而且在Node節點中經過Condition指針維護,由於Node是同步隊列與等待隊列複用的,因此不可避免的產生了一些冗餘;

2. 注意:此時最好要將synchronized的monitor機制與這裏的AQS機制聯繫起來看: 在monitor機制中得到鎖的線程若是調用 wait()方法,該線程所持有的鎖會被釋放並將該線程加入等待隊列中,而Condition是調用 await()方法將該線程放入對應的Condition所持有的等待隊列中去(我以爲能夠把Condition理解成操做系統中定義的線程喚醒條件),因此有幾個Condition就會有幾個對應的等待隊列;

3.2 AQS是如何維護共享變量的可訪問性呢?

  1. 在獨佔鎖中,只有在同步隊列的首節點的next節點能夠嘗試獲取共享變量,由於在acquireQueue()方法中是這樣定義判斷條件的
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取當前節點的前置節點
                final Node p = node.predecessor();
                //注意這個與判斷條件,第一個就是當前節點的前置節點是不是頭節點,而做爲第一個判斷條件是由於與判斷有一個爲非即爲非,就不會進行第二個條件的判斷,這個在之後的編程中也是值得學習的
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

這樣也就保證了獲取共享資源的順序性(即按照插入到隊列的時間來定)編程

  1. 那AQS只能用來實現獨佔且公平鎖嗎?顯然不是,AQS又是如何實現非公平鎖和共享鎖的呢?其實AQS不管用來實現什麼鎖,這些鎖本質的區別就是在於獲取共享資源訪問權的方式不一樣,而獨佔且公平的鎖很明顯獲取訪問權的方式是經過FIFO隊列的順序(即請求訪問共享資源的順序),而共享鎖也是同樣,只是能夠獲取訪問權的線程數多了些;那麼非公平鎖是如何實現的呢?其實也很簡單,就是捨棄隊列的FIFO特性,只要持有共享資源的線程釋放了鎖,全部的在同步隊列中的線程都會經過CAS操做去競爭鎖;

4、AQS提供給鎖實現者的API:

4.1 用於獲取與設置共享資源的API:

  1. getState():獲取當前同步狀態
  2. setState():設置當前同步狀態
  3. compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法可以保證狀態設置的原子性

4.2 同步器可重寫的方法:

這裏AQS的設計者採用了模板設計模式將對同步狀態的操做定義好過程,而將其中能夠改變的過程交由每一個具體的同步器(即鎖)來實現,保證了每一個同步器的特殊性; 上面講的可能有點籠統,那咱們不妨分析一下AQS定義的模板是什麼?但在此以前,咱們必定要牢記於心的是AQS是一個同步框架,即它全部的操做都是爲了保證共享變量的安全!設計模式

  1. (以獨佔鎖爲例)在多線程的場景下,可能會有多個線程想要去訪問共享變量,那麼它們首先要作的是去看看本身有沒有資格,即調用AQS的acquire()方法
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

能夠看到這個方法會調用tryAcquire()方法 和咱們以前提到的 acquireQueued()方法,前者就是一個須要子類實現的模板方法,爲何必定要子類去實現,由於每種鎖都應該本身去定義當前共享變量處在一個什麼狀態下時,請求線程能夠得到共享資源的訪問權(舉個例子,獨佔鎖時,只要當前共享資源有線程在訪問,那麼以後全部請求線程都不能夠再獲取到鎖;而若是是共享鎖,那麼這個方法就要再共享資源狀態可訪問數容許的狀況下讓該請求線程獲取到鎖);而若是子類定義的tryAcquire() 認爲當前線程獲取不到鎖,就應該調用acquireQueued() 方法去死循環+CAS嘗試獲取鎖安全

  1. 而後線程對共享資源操做完了,那它就會去釋放共享資源,就會調用AQS的release(int arg)方法
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

同acquire()同樣,如何去判斷是否能夠釋放對共享資源的訪問權也是須要不一樣的鎖本身去經過覆蓋AQS中的tryRelease()方法去本身定義;bash

  1. 總結:因此模板方法即將框架搭好,但具備特殊性但又具備一致抽象的方法須要在子類中進行特殊化的實現;

5、 一個實例ReentrantLock(這裏只看了公平鎖的實現):

  1. 咱們看一下ReentrantLock()的具體實現,通常來說建議是將繼承自AQS的實現類作爲鎖類的靜態內部類;
  2. FairSync.lock()方法
//直接調用AQS的acquire()方法,這個輸入參數在此處沒有意義)
        final void lock() {
            acquire(1);
        }
複製代碼

而這個acquire()方法利用Java多態實則是調用了FairSync的tryAcquires()方法多線程

/**
         * Fair version of tryAcquire.  Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires){ //省略了具體邏輯 } } 複製代碼

6、 線程打斷時隊列的維護(我的以爲比較難的一個點)

  1. 咱們都知道ReentrantLock() 可讓線程在等待時間過長時放棄等待轉而去作其餘事情,那若是此時這個線程在同步隊列上這麼辦?因此咱們須要將線程的當前狀態信息同步到Node節點中去;
  2. 簡單介紹一下線程的打斷機制: 寫在前面,必定要明白interrupt()方法只是改變了線程的中斷標誌位爲True,並不能讓線程直接死掉!而要等待線程自身自我kill** 當線程處於WAITING/TIMED_WAITING(無限期等待/限期等待)或者BLOCKED(等待獲取一個排他鎖)狀態時,若是此時線程對象調用了interrupt()方法,就會拋出一個受檢異常InterruptedException,並設置線程的中斷標誌位;
public class InterruptRunnableDemo extends Thread{

    @Override
    public void run() {
        //while循環的條件是當前線程的中斷標誌位是否爲True
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("running");
        }
        System.out.println("done ");
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new InterruptRunnableDemo();
        thread.start();
        Thread.sleep(1000);
        //打斷子線程,並設置線程的中斷標誌位
        thread.interrupt();
    }
}
複製代碼
輸出結果爲:
running
running
running
done 
複製代碼

參考文章: zhuanlan.zhihu.com/p/27857336併發

  1. Node的屬性:waitStatus : (1) CANCELLED :因爲在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會變化;(2)SIGNAL:當前持有同步狀態的節點釋放或被取消時,在這個狀態下,會通知後繼處於等待狀態的節點,使得後繼節點的線程得以運行 (3)CONDITION:節點在等待隊列中時節點處於此狀態 (4)PROPAGATE :表示下一次共享式同步狀態將會無條件被傳播下去 (5)INITAL: 初始化狀態 此時咱們須要關注的即是CANCELLED狀態,節點是如何從其餘狀態變爲CANCELLED狀態的,而且進入這個狀態對於維護等待隊列有什麼幫助?
  2. 當線程被另外一個線程改變了中斷標誌位時,AQS是如何改變Node的waitStatus狀態的呢? AQS的acquireInterruptibly(int arg)方法,這個方法與acquire()方法相同,可是該方法響應中斷,而且當前線程未獲取到同步狀態而進入同步隊列中時,若是這個線程被中斷,那麼該方法會拋出InterruptedException並返回; 而acquire()方法是當節點經過tryAcquire()方法成功拿到訪問共享資源的權力時,再去校驗當前線程的中斷標誌位,若是爲True則將Node的waitStatus狀態改成CANCELLED,而且seltInterrupt()
public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        //調用時也會判斷此時這個線程的中斷標誌位是否以及爲True,是則直接拋出異常,並讓調用者進行處理
        if (Thread.interrupted())
            throw new InterruptedException();
        //若是此時該節點已經獲取到鎖,但若是這個節點中的線程的中斷標誌位爲True則也會拋出異常,而後調用doAcquireInterruptibly()中的finally代碼塊中的cancelAcquire()方法,將waitStatus狀態改成CANCELLED
        if (!tryAcquire(arg)
            doAcquireInterruptibly(arg);
    }
複製代碼
  1. 那改完這個狀態有什麼用呢? 關鍵在於unparkSuccessor(Node)方法,這個方法會將全部狀態位CANCELLED的Node設置位null釋放掉,不會再影響其後活躍線程競爭共享資源的訪問權!

7、參考

  1. 參考的博客比較多,有用到圖的都已經標註
  2. 《Java併發編程之美》
相關文章
相關標籤/搜索