Java AQS源碼解讀

一、先聊點別的

說實話,關於AQS的設計理念、實現、使用,我有打算寫過一篇技術文章,可是在寫完初稿後,發現掌握的仍是模模糊糊的,模棱兩可。
痛定思痛,腳踏實地從新再來一遍。此次以 Java 8源碼爲基礎進行解讀。java

二、AQS簡介

java.util.concurrent.locks包下,有兩個這樣的類:node

  • AbstractQueuedSynchronizer
  • AbstractQueuedLongSynchronizer

這兩個類的惟一區別就是:web

  • AbstractQueuedSynchronizer內部維護的state變量是int類型
  • AbstractQueuedLongSynchronizer內部維護的state變量是long類型

咱們常說的AQS其實泛指的就是這兩個類,即抽象隊列同步器編程

抽象隊列同步器AbstractQueuedSynchronizer (如下都簡稱AQS),是用來構建鎖或者其餘同步組件的骨架類,減小了各功能組件實現的代碼量,也解決了在實現同步器時涉及的大量細節問題,例如等待線程採用FIFO隊列操做的順序。在不一樣的同步器中還能夠定義一些靈活的標準來判斷某個線程是應該經過仍是等待。ruby

AQS採用模板方法模式,在內部維護了n多的模板的方法的基礎上,子類只須要實現特定的幾個方法(不是抽象方法!不是抽象方法!不是抽象方法!),就能夠實現子類本身的需求。多線程

基於AQS實現的組件,諸如:併發

  • ReentrantLock 可重入鎖(支持公平和非公平的方式獲取鎖)
  • Semaphore 計數信號量
  • ReentrantReadWriteLock 讀寫鎖

AQS是Doug Lea的大做之一,在維基百科查關於他的資料時,偶然發現老爺子喜歡紅色或淡粉色襯衫?app

三、AQS設計思路

AQS內部維護了一個int成員變量來表示同步狀態,經過內置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。ide

咱們能夠猜想出,AQS其實主要作了這麼幾件事情:函數

  • 同步狀態(state)的維護管理
  • 等待隊列的維護管理
  • 線程的阻塞與喚醒

ps: 固然了,其內部還維護了一個ConditionObject 內部類,主要用做線程的協做與通訊,咱們暫時先不講這個帥哥。

經過AQS內部維護的int型的state,能夠用於表示任意狀態!

  • ReentrantLock用它來表示鎖的持有者線程已經重複獲取該鎖的次數,而對於非鎖的持有者線程來講,若是state大於0,意味着沒法獲取該鎖,將該線程包裝爲Node,加入到同步等待隊列裏。
  • Semaphore用它來表示剩餘的許可數量,當許可數量爲0時,對未獲取到許可但正在努力嘗試獲取許可的線程來講,會進入同步等待隊列,阻塞,直到一些線程釋放掉持有的許可(state+1),而後爭用釋放掉的許可。
  • FutureTask用它來表示任務的狀態(未開始、運行中、完成、取消)。
  • ReentrantReadWriteLock在使用時,稍微有些不一樣,int型state用二進制表示是32位,前16位(高位)表示爲讀鎖,後面的16位(低位)表示爲寫鎖。
  • CountDownLatch使用state表示計數次數,state大於0,表示須要加入到同步等待隊列並阻塞,直到state等於0,纔會逐一喚醒等待隊列裏的線程。

3.1 僞代碼之獲取鎖:

boolean acquire() throws InterruptedException {
  while(當前狀態不容許獲取操做) {
    if(須要阻塞獲取請求) {
      若是當前線程不在隊列中,則將其插入隊列
      阻塞當前線程
    }
    else
      返回失敗
  }
  可能更新同步器的狀態
  若是線程位於隊列中,則將其移出隊列
  返回成功
}
複製代碼

3.2 僞代碼之釋放鎖:

void release() {
  更新同步器的狀態
  if (新的狀態容許某個被阻塞的線程獲取成功)
    解除隊列中一個或多個線程的阻塞狀態
}
複製代碼

大概就是闡述這麼個思路。

3.3 提供的方法

3.3.1 共通方法

如下三個方法,均爲protected final修飾,每一個繼承AQS的類均可以調用這三個方法。

  • protected final int getState() 獲取同步狀態
  • protected final void setState(int newState) 設置同步狀態
  • protected final boolean compareAndSetState(int expect, int update) 若是當前狀態值等於預期值,原子性地將同步狀態設置爲給定的更新值,並返回true;不然返回false
3.3.2 子類須要實現的方法

如下五個方法,在AQS內部並未實現,而是交由子類去實現,而後AQS再調用子類的實現方法,完成邏輯處理。

  • protected boolean tryAcquire(int) 嘗試以獨佔模式獲取操做,應查詢對象的狀態是否容許以獨佔模式獲取它,若是容許則獲取它。
  • protected boolean tryRelease(int) 嘗試釋放同步狀態
  • protected int tryAcquireShared(int) 共享的方式嘗試獲取操做
  • protected boolean tryReleaseShared(int) 共享的方式嘗試釋放
  • protected boolean isHeldExclusively() 調用此方法的線程,是不是獨佔鎖的持有者

子類無須實現上述的全部方法,能夠選擇其中一部分進行覆寫,可是要保持實現邏輯完整,不能穿插實現。根據實現方式不一樣,分爲獨佔鎖策略實現和共享鎖策略實現。

這也是爲何上述方法沒有定義爲抽象方法的緣由。若是定義爲抽象方法,子類必須實現全部的五個方法,哪怕你壓根就用不到。

獨佔鎖:

  • ReentrantLock
  • ReentrantReadWriteLock.WriteLock
    實現策略:
  • tryAcquire(int)
  • tryRelease(int)
  • isHeldExclusively()

共享鎖:

  • CountDownLatch
  • ReentrantReadWriteLock.ReadLock
  • Semaphore
    實現策略:
  • tryAcquireShared(int)
  • tryReleaseShared(int)

AQS還有不少內部模板方法,就不一一舉例了,以後的源碼解讀,會展現一部分,並會配上騷氣的註釋。

四、AQS內部屬性

4.1 CLH隊列

AQS經過內置的FIFO(first-in-first-out)同步隊列來控制獲取共享資源的線程。CLH隊列是FIFO的雙端雙向隊列,AQS的同步機制就是依靠這個CLH隊列完成的。隊列的每一個節點,都有前驅節點指針和後繼節點指針。

頭結點並不在阻塞隊列內!

AQS-Node.jpg
AQS-Node.jpg

Node源碼:

static final class Node {
    // 共享模式下等待標記
    static final Node SHARED = new Node();

    // 獨佔模式下等待標記
    static final Node EXCLUSIVE = null;

    // 表示當前的線程被取消
    static final int CANCELLED = 1;

    // 表示當前節點的後繼節點包含的線程須要運行,也就是unpark
    static final int SIGNAL = -1;

    // 表示當前節點在等待condition,也就是在condition隊列中
    static final int CONDITION = -2;

    // 表示當前場景下後續的acquireShared可以得以執行
    static final int PROPAGATE = -3;
    /**
     * CANCELLED =  1 // 當前線程由於超時或者中斷被取消。這是一個終結態,也就是狀態到此爲止。
     * SIGNAL    = -1 // 表示當前線程的後繼線程被阻塞或即將被阻塞,當前線程釋放鎖或者取消後須要喚醒後繼線程。這個狀態通常都是後繼節點設置前驅節點的
     * CONDITION = -2 // 表示當前線程在Condition隊列中
     * PROPAGATE = -3 // 用於將喚醒後繼線程傳遞下去,這個狀態的引入是爲了完善和加強共享鎖的喚醒機制
     * 0              // 表示無狀態或者終結狀態!
     */

    volatile int waitStatus;

    // 前驅節點
    volatile Node prev;

    // 後繼節點
    volatile Node next;

    // 當前節點的線程,初始化使用,在使用後失效
    volatile Thread thread;

    // 存儲condition隊列中的後繼節點
    Node nextWaiter;

    // 若是該節點處於共享模式下等待,返回true
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回當前節點的前驅節點,若是爲空,直接拋出空指針異常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    // 指定線程和模式的構造方法
    Node(Thread thread, Node mode) {     // Used by addWaiter
        // SHARED和EXCLUSIVE 用於表示當前節點是共享仍是獨佔
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 指定線程和節點狀態的構造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
複製代碼

4.2 volatile state

最爲重要的屬性,這個整數能夠用於表示任意狀態!在上面有說過。

4.2 volatile head & volatile tail

head 頭結點,可是這個頭節點只是個虛節點,只是邏輯上表明持有鎖的線程節點,且head節點是不存儲thread線程信息和前驅節點信息的。

tail 尾節點,每一個新節點都會進入隊尾。不存儲後繼節點信息。

  • 這兩個屬性是延遲初始化的,在第一次且第一個線程持有鎖時,第二個線程由於獲取失敗,進入同步隊列時會對head和tail進行初始化,也就是說在全部線程都能獲取到鎖時,其內部的head和tail都爲null,一旦head 和 tail被初始化後,即便後來沒有線程持有鎖,其內部的head 和 tail 依然保留最後一個持有鎖的線程節點!(head 和 tail都指向一個內存地址)
  • 當一個線程獲取鎖失敗而被加入到同步隊列時,會用CAS來設置尾節點tail爲當前線程對應的Node節點。
  • AQS內部的cas操做,都是依賴Unsafe類的,自Java9以後的版本,Unsafe類被移除,取而代之的是VarHandle類。

這兩個屬性均爲volatile所修飾(保證了變量具備有序性和可見性)

4.3 spinForTimeoutThreshold

自旋超時閥值,在doAcquireSharedNanos()等方法中有使用到。

  • 若是用戶定義的等待時間超過這個閥值,那麼線程將阻塞,在阻塞期間若是可以等到喚醒的機會並tryAcquireShared成功,則返回true,不然返回false,超時也返回false。
  • 若是用戶定義的等待時間小於等於這個閥值,則會無限循環,線程不阻塞,直到有線程釋放同步狀態或者超時,而後返回對應的結果。

4.4 exclusiveOwnerThread

這是AQS經過繼承AbstractOwnableSynchronizer類,得到的屬性,表示獨佔模式下的同步器持有者。

五、AQS具體實現

5.1 獨佔鎖實現思路

5.1.1 獲取鎖 ReentrantLock.lock()
/**
 * 獲取獨佔鎖,忽略中斷。
 * 首先嚐試獲取鎖,若是成功,則返回true;不然會把當前線程包裝成Node插入到隊尾,在隊列中會檢測是否爲head的直接後繼,並嘗試獲取鎖,
 * 若是獲取失敗,則會經過LockSupport阻塞當前線程,直至被釋放鎖的線程喚醒或者被中斷,隨後再次嘗試獲取鎖,如此反覆。被喚醒後繼續以前的代碼執行
 */

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
---------------------------------------------------------------------------------------
其中tryAcquire()方法須要由子類實現,ReentrantLock經過覆寫這個方法實現了公平鎖和非公平鎖
---------------------------------------------------------------------------------------

/**
 * 在同步等待隊列中插入節點
 */

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 判斷尾節點是否爲null
    if (pred != null) {
        node.prev = pred;
        // 經過CAS在隊尾插入當前節點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // tail節點爲null,則將新節點插入隊尾,必要時進行初始化
    enq(node);
    return node;
}

/**
 * 經過無限循環和CAS操做在隊列中插入一個節點成功後返回。
 * 將節點插入隊列,必要時進行初始化
 */

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 初始化head和tail
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            /*
             CAS設置tail爲node
             表面上看是把老tail的next鏈接到node。
             若是同步隊列head節點和tail節點剛剛被這個線程初始化,實際上也把head的next也鏈接到了node,而老tail節點被node取締。
             反之則是,把老tail的next鏈接到node,head並無與node產生鏈接,這樣就造成了鏈表 head <-> old_tail <-> tail
             */

            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 在隊列中的節點經過此方法獲取鎖,忽略中斷。
 * 這個方法很重要,若是上述沒有獲取到鎖,將線程包裝成Node節點加入到同步隊列的尾節點,而後看代碼裏的註釋
 */

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /*
             * 檢測當前節點前驅是否head,這是試獲取鎖。
             * 若是是的話,則調用tryAcquire嘗試獲取鎖,
             * 成功,則將head置爲當前節點。原head節點的next被置爲null等待GC垃圾回收
             */

            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null// help GC
                failed = false;
                return interrupted;
            }
            /*
             * 若是未成功獲取鎖則根據前驅節點判斷是否要阻塞。
             * 若是阻塞過程當中被中斷,則置interrupted標誌位爲true。
             * shouldParkAfterFailedAcquire方法在前驅狀態不爲SIGNAL的狀況下都會循環重試獲取鎖。
             * 若是shouldParkAfterFailedAcquire返回true,則會將當前線程阻塞並檢查是否被中斷
             */

            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * 根據前驅節點中的waitStatus來判斷是否須要阻塞當前線程。
 */

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前驅節點設置爲SIGNAL狀態,在釋放鎖的時候會喚醒後繼節點,
         * 因此後繼節點(也就是當前節點)如今能夠阻塞本身。
         */

        return true;
    if (ws > 0) {
        /*
         * 前驅節點狀態爲取消,向前遍歷,更新當前節點的前驅爲往前第一個非取消節點。
         * 當前線程會以後會再次回到循環並嘗試獲取鎖。
         */

        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         /**
          * 等待狀態爲0或者PROPAGATE(-3),設置前驅的等待狀態爲SIGNAL,
          * 而且以後會回到循環再次重試獲取鎖。
          */

        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 該方法實現某個node取消獲取鎖。
 */

private void cancelAcquire(Node node) {
   if (node == null)
       return;

   node.thread = null;

   // 遍歷並更新節點前驅,把node的prev指向前部第一個非取消節點。
   Node pred = node.prev;
   while (pred.waitStatus > 0)
       node.prev = pred = pred.prev;

   // 記錄pred節點的後繼爲predNext,後續CAS會用到。
   Node predNext = pred.next;

   // 直接把當前節點的等待狀態置爲取消,後繼節點調用cancelAcquire方法時,也能夠跨過該節點
   node.waitStatus = Node.CANCELLED;

   // 若是當前節點是尾節點,則將尾節點置爲當前節點的前驅節點
   if (node == tail && compareAndSetTail(node, pred)) {
       compareAndSetNext(pred, predNext, null);
   } else {
       // 若是node還有後繼節點,這種狀況要作的是把pred和後繼非取消節點拼起來。
       int ws;
       if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
           Node next = node.next;
           /* 
            * 若是node的後繼節點next非取消狀態的話,則用CAS嘗試把pred的後繼置爲node的後繼節點
            * 這裏if條件爲false或者CAS失敗都不要緊,這說明可能有多個線程在取消,總歸會有一個能成功的。
            */

           if (next != null && next.waitStatus <= 0)
               compareAndSetNext(pred, predNext, next);
       } else {
           unparkSuccessor(node);
       }

       /*
        * 在GC層面,和設置爲null具備相同的效果
        */

       node.next = node; 
   }
}
複製代碼

獲取獨佔鎖的執行過程大體以下:
假設當前鎖已經被線程A持有,且持有鎖的時間足夠長(方便咱們講解,也防止擡槓),線程B、C獲取鎖失敗。

線程B:

  • 一、將線程B包裝成Node節點(簡稱BN),加入到同步等待隊列,此時BN的waitStatus=0
  • 二、將tail節點設置爲BN,且與head節點相連,造成鏈表
  • 三、head節點是個虛擬節點,也就是持有鎖的線程(但並不包含有線程信息),tail節點就是BN
  • 四、線程B進入"無限循環",判斷前驅節點是否爲頭節點(true)並再次嘗試獲取鎖(false,獲取鎖失敗)
  • 五、線程B將進入shouldParkAfterFailedAcquire方法,在方法內部,將BN的前驅節點(也就是頭結點)的waitStatus設置爲 -1,此方法返回false
  • 六、由於是無限循環,因此線程B再次進入shouldParkAfterFailedAcquire方法,因爲BN的前驅節點(也就是頭結點)的waitStatus爲 -1,因此直接返回true
  • 七、調用parkAndCheckInterrupt,當前線程B被阻塞,等待喚醒。

線程C:

  • 一、將線程C包裝成Node節點(簡稱CN),加入到同步等待隊列,此時CN的waitStatus=0
  • 二、將tail節點設置爲CN,且與原tail節點(BN節點)相連
  • 三、線程C進入"無限循環",判斷前驅節點是否爲頭節點(false)
  • 四、線程C將進入shouldParkAfterFailedAcquire方法,在方法內部,將CN的前驅節點(也就是BN結點)的waitStatus設置爲 -1,此方法返回false
  • 五、由於是無限循環,因此線程C再次進入shouldParkAfterFailedAcquire方法,因爲CN的前驅節點(也就是BN結點)的waitStatus爲 -1,因此直接返回true
  • 六、調用parkAndCheckInterrupt,線程C被阻塞,等待喚醒。

最終的隊列以下:

+------+        +------+        +------+
|      |  <---  |      |  <---  |      |
| head |        |  BN  |        | tail |
|  AN  |  --->  |      |  --->  | (CN) |
+------+        +------+        +------+
複製代碼
5.1.2 釋放鎖 ReentrantLock.unlock()

對於釋放獨佔鎖,會調用tryRelaes(int)方法,該方法由子類實現,在徹底釋放掉鎖後,釋放掉鎖的線程會將後繼線程喚醒,後繼線程進行鎖爭用(非公平鎖)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 頭結點不爲null且後繼節點是須要被喚醒的
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼

釋放獨佔鎖的執行過程大體以下(假設有後繼節點須要喚醒):

  • 將head節點的waitStatus設置爲0
  • 喚醒後繼節點
  • 後繼節點線程被喚醒後,會將後繼節點設置爲head,並對後繼節點內的prev和thread屬性設置爲null
  • 對原head節點的next指針設置爲null,等待GC回收原head節點。
+------+        +------+        +------+
| old  |  <-X-  | new  |  <---  |      |
| head |        | head |        | tail |
|  AN  |  -X->  |  BN  |  --->  | (CN) |
+------+        +------+        +------+
複製代碼

如上所示,AN節點(原head節點)等待被GC垃圾回收。

5.2 共享鎖實現思路

5.2.1 獲取鎖

與獲取獨佔鎖不一樣,關鍵在於,共享鎖能夠被多個線程持有。

若是須要AQS實現共享鎖,在實現tryAcquireShared()方法時:

  • 返回負數,表示獲取失敗
  • 返回0,表示獲取成功,可是後繼爭用線程不會成功
  • 返回正數,表示獲取成功,表示後繼爭用線程也可能成功
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 一旦共享獲取成功,設置新的頭結點,而且喚醒後繼線程
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null// help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * 這個函數作的事情有兩件:
 * 1. 在獲取共享鎖成功後,設置head節點
 * 2. 根據調用tryAcquireShared返回的狀態以及節點自己的等待狀態來判斷是否要須要喚醒後繼線程
 */

private void setHeadAndPropagate(Node node, int propagate) {
    // 把當前的head封閉在方法棧上,用如下面的條件檢查
    Node h = head;
    setHead(node);
    /*
     * propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據之一
     * h.waitStatus爲SIGNAL或者PROPAGATE時也根據node的下一個節點共享來決定是否傳播喚醒
     */

    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

/**
 * 這是共享鎖中的核心喚醒函數,主要作的事情就是喚醒下一個線程或者設置傳播狀態。
 * 後繼線程被喚醒後,會嘗試獲取共享鎖,若是成功以後,則又會調用setHeadAndPropagate,將喚醒傳播下去。
 * 這個函數的做用是保障在acquire和release存在競爭的狀況下,保證隊列中處於等待狀態的節點可以有辦法被喚醒。
 */

private void doReleaseShared() {
    /*
     * 如下的循環作的事情就是,在隊列存在後繼線程的狀況下,喚醒後繼線程;
     * 或者因爲多線程同時釋放共享鎖因爲處在中間過程,讀到head節點等待狀態爲0的狀況下,
     * 雖然不能unparkSuccessor,但爲了保證喚醒可以正確穩固傳遞下去,設置節點狀態爲PROPAGATE。
     * 這樣的話獲取鎖的線程在執行setHeadAndPropagate時能夠讀到PROPAGATE,從而由獲取鎖的線程去釋放後繼等待線程。
     */

    for (;;) {
        Node h = head;
        // 若是隊列中存在後繼線程。
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);
            }
            // 若是h節點的狀態爲0,須要設置爲PROPAGATE用以保證喚醒的傳播。
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 檢查h是否仍然是head,若是不是的話須要再進行循環。
        if (h == head)
            break;
    }
}
複製代碼
5.2.1 釋放鎖

釋放共享鎖與獲取共享鎖的代碼都使用了doReleaseShared(int)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // doReleaseShared的實現上面獲取共享鎖已經介紹
        doReleaseShared();
        return true;
    }
    return false;
}
複製代碼

我以爲你們應該都能看懂,仍是簡單說一下吧(手動狗頭~):

同步等待隊列中,在喚醒由於獲取共享鎖失敗而阻塞的後繼節點線程後,後繼節點線程會依次喚醒其後繼節點!依次類推。

再換種說法?

這種狀況有多是:寫鎖致使獲取讀鎖的一些線程阻塞,而寫鎖釋放後,會喚醒後繼節點線程,若是該後繼節點,剛好是由於獲取讀鎖失敗而阻塞的線程,那麼該後繼節點線程會喚醒其後繼節點…直到所有獲取讀鎖成功,或者某一節點獲取寫鎖成功。

六、拓展

6.1 不得不說的PROPAGATE

有個關於AQS的bug,真的值得你們看一看

在共享鎖獲取與釋放的操做中,我以爲有個特別的重要的waitStatus狀態值,要和你們說一說,就是PROPAGATE,這個屬性值的意思是,用於將喚醒後繼線程傳遞下去,這個狀態的引入是爲了完善和加強共享鎖的喚醒機制。

以前翻閱了不少關於AQS的文章,講到這個狀態值的少之又少,哪怕是《Java併發編程實戰》這本書,也是沒有說起,最終我看到有一位博客園的做者很是詳實的闡述了這個PEOPAGATE狀態,也是給了我很大的啓發。

沒錯,我第一次看AQS的源碼的時候,甚至直接把這個PROPAGATE狀態值忽略掉了。事實上,不只僅閱讀源碼的人,容易把這個PROPAGATE狀態值忽略掉,哪怕是Doug Lea老爺子本人,在開發時也沒有意識到,若是沒有這個狀態值會致使什麼樣的後果,直到上面連接的bug出現後,老爺子才加上了這個狀態,完全修復了這個bug。

復現該bug的代碼:

import java.util.concurrent.Semaphore;

public class TestSemaphore {

    private static Semaphore sem = new Semaphore(0);

    private static class Thread1 extends Thread {
        @Override
        public void run() {
            sem.acquireUninterruptibly();
        }
    }

    private static class Thread2 extends Thread {
        @Override
        public void run() {
            sem.release();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000000; i++) {
            Thread t1 = new Thread1();
            Thread t2 = new Thread1();
            Thread t3 = new Thread2();
            Thread t4 = new Thread2();
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t1.join();
            t2.join();
            t3.join();
            t4.join();
            System.out.println(i);
        }
    }
}
複製代碼

程序執行時,會偶發線程hang住。

咱們再來看看以前的setHeadAndPropagate方法是什麼樣的。

private void setHeadAndPropagate(Node node, int propagate{
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}
複製代碼

而後Semaphore.release()調用的是AQS的releaseShared,看看當時的releaseShared長什麼樣:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼

再看看當時的Node:

static final class Node {
    // 忽略掉無關的代碼,只展現waitStatus的狀態值

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
}
複製代碼

setHeadAndPropagate方法和releaseShared方法,設計的也是很簡單。

當時源碼裏,Node的waitStatus是沒有PROPAGATE=-3這個狀態值的。

爲了方便你們對照,我把當時unparkSuccessor方法的源碼,也一併展現出來:

private void unparkSuccessor(Node node{

    // 將node的waitStatus設置爲0
    compareAndSetWaitStatus(node, Node.SIGNAL, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

接下來,咱們慢慢聊~

ps: 說真的,如今老闆的位置離個人位置不遠,雖然個人工做已經提早數日完成了,但,仍是有點慌~,冒着風險還要繼續寫!

在AQS獲取共享鎖的操做中,進入同步等待的線程(被阻塞掉),有兩種途徑能夠被喚醒:

  • 其餘線程釋放信號量後,調用unparkSuccessor(releaseShared方法中)
  • 其餘線程獲取共享鎖成功後,會經過傳播機制來喚醒後繼節點(也就是在setHeadAndPropagate方法中)。

bug重現的例子,很簡單,就是在循環中重複不斷的實例化4個線程,前兩個線程獲取信號量,兩個線程釋放信號量,主線程等待4個線程全都執行完畢再執行打印。

在後兩個線程沒有進行釋放信號量的操做時,AQS內部的同步等待隊列是下面這種狀況:

+------+        +------+        +------+
|      |  <---  |      |  <---  |      |
| head |        |  t1  |        |  t2  |
|      |  --->  |      |  --->  |      |
+------+        +------+        +------+
複製代碼
  • 一、t3釋放信號量,調用releaseShared,喚醒後繼節點裏的線程t1,同時,head的waitStatus變爲0
  • 二、t1被喚醒,調用Semaphore.NonfairSync的tryAcquireShared方法,返回0
  • 三、t4釋放信號量,調用releaseShared,在releaseShared方法中讀到的head仍是原head,可是此時head的waitStatus已經變爲0,因此不會調用unparkSuccessor方法
  • 四、t1被喚醒了,因爲在步驟2裏,調用Semaphore.NonfairSync的tryAcquireShared方法,返回的是0,因此它也不會調用unparkSuccessor方法

至此,兩種途徑所有被封死,沒有任何線程去喚醒t2了,線程被hang住…

ps:Doug Lea 黑人問號臉,哈哈~

老爺子爲了修復這個bug,作出了以下改進:
一、增長一個waitStatus的狀態,即PROPAGATE
二、在releaseShared方法中抽取提煉出了doReleaseShared()(上面有展現)在doReleaseShared方法中,若是head節點的狀態爲0,須要設置爲PROPAGATE用以保證喚醒的傳播。
三、在setHeadAndPropagate方法中也多了一些判斷,其中就有head節點的waitStatus若是小於0,就喚醒後繼節點(PROPAGATE = -3)。

經過改進以後的代碼,咱們再來複盤一下:

  • 一、t3釋放信號量,調用releaseShared,喚醒後繼節點裏的線程t1,同時,head的waitStatus變爲0
  • 二、t1被喚醒,調用Semaphore.NonfairSync的tryAcquireShared方法,返回0
  • 三、此步驟和2和同一時刻發生,t4釋放信號量,調用releaseShared,在doReleaseShared方法中讀到的head仍是原head,可是此時head的waitStatus已經變爲0,將head的waitStatus設置爲PROPAGATE(-3)
  • 四、t1被喚醒了,調用setHeadAndPropagate方法,將t1設置爲head,符合條件判斷,進入分支語句,調用doReleaseShared方法,繼而喚醒t2節點線程。

6.2 unparkSuccessor的一點思考

private void unparkSuccessor(Node node{
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 一般狀況下,要喚醒的線程都是當前節點的後繼線程
     * 可是,若是當前節點的後繼節點被取消了,則從隊列尾部向前遍歷,直到找到未被取消的後繼節點
     */

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

unparkSuccessor方法中,若是當前節點的後繼節點被取消了,則從隊列尾部向前遍歷,直到找到未被取消的後繼節點。

這個問題,你們也能夠本身思考一下,爲何要從tail節點開始向前遍歷?

假設,CLH隊列以下圖所示:

+------+        +------+        +------+
|      |  <---  |      |  <---  |      |
| head |        |  t1  |        | tail |
|      |  --->  |      |  --->  |      |
+------+        +------+        +------+
複製代碼

t1.waitStatus = 1 且 tail.waitStatus = 1

head嘗試喚醒後繼節點t1,發現t1是被取消狀態,遂找出t1的後繼節點tail,發現tail也是被取消狀態,可是tail.next == null。

與此同時,有個新節點加入到隊列尾部,可是尚未將原tail.next指向新節點。

也就是說,tail.next 若是剛好處在步驟1和步驟2中間的話,遍歷就會中斷。

摘錄addWaiter部分代碼:

node.prev = pred;
// 經過CAS在隊尾插入當前節點
if (compareAndSetTail(pred, node)) { // 步驟1
    pred.next = node; // 步驟2
    return node;
}
複製代碼

6.3 acquireQueued 方法裏,爲何還要再tryAcquire?

以獨佔模式來講,對於這個問題,我是這麼想的:

時刻1:線程B嘗試獲取鎖,可是,因爲鎖被線程A持有,因此,線程B準備調用addWaiter,將本身入到隊列(但尚未和head節點產生指針鏈接)

時刻1:同一時刻,線程A嘗試釋放鎖,進入release方法,調用子類的tryRelease(),將表明鎖持有次數的state置爲0(表明鎖沒有被任何線程持有),進入unparkSuccessor方法,發現並無後繼節點(由於新節點還未入隊),因此不會喚醒任何線程,到這裏,線程A釋放鎖操做完成。

時刻2:線程B調用addWaiter方法完畢,已經入隊,並和head節點產生指針鏈接

時刻3:線程B調用acquireQueued方法(以下方代碼展現),若是在這個方法裏面不調用tryAcquire,就會發生這樣的狀況:明明能夠獲取鎖,可是線程卻被休眠了,進而致使整個同步隊列不可用

因此,再次調用tryAcquire是爲了防止新節點還未入隊,可是頭結點已經釋放了鎖,致使整個同步隊列癱瘓的狀況發生。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 防止新節點還未入隊,可是頭結點已經釋放了鎖,致使整個同步隊列中斷癱瘓的狀況發生
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null// help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

結束

經過閱讀AQS的源碼,對於咱們學習和掌握基於AQS實現的組件,是有很大幫助的。

尤爲是它的設計理念和思想,更是咱們學習的重點!

Doug Lea的AQS論文,英語較好的朋友,不妨去讀一讀

相關文章
相關標籤/搜索