併發Lock之AQS(AbstractQueuedSynchronizer)詳解

1. J.U.C的lock包結構

上一篇文章講了併發編程的鎖機制:synchronized和lock,主要介紹了Java併發編程中經常使用的鎖機制。Lock是一個接口,而synchronized是Java中的關鍵字,synchronized是基於jvm實現。Lock鎖能夠被中斷,支持定時鎖等。Lock的實現類,可重入鎖ReentrantLock,咱們有講到其具體用法。而談到ReentrantLock,不得不談抽象類AbstractQueuedSynchronizer(AQS)。抽象的隊列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如經常使用的ReentrantLock、ThreadPoolExecutor。html

lock
Lock包結構

2. AQS介紹

AQS是一個抽象類,主是是以繼承的方式使用。AQS自己是沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。AQS抽象類包含以下幾個方法:java

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。共享模式時只用 Sync Queue, 獨佔模式有時只用 Sync Queue, 但若涉及 Condition, 則還有 Condition Queue。在子類的 tryAcquire, tryAcquireShared 中實現公平與非公平的區分。node

不一樣的自定義同步器爭用共享資源的方式也不一樣。自定義同步器在實現時只須要實現共享資源state的獲取與釋放方式便可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。編程

整個 AQS 分爲如下幾部分:安全

  • Node 節點, 用於存放獲取線程的節點, 存在於 Sync Queue, Condition Queue, 這些節點主要的區分在於 waitStatus 的值(下面會詳細敘述)
  • Condition Queue, 這個隊列是用於獨佔模式中, 只有用到 Condition.awaitXX 時纔會將 node加到 tail 上(PS: 在使用 Condition的前提是已經獲取 Lock)
  • Sync Queue, 獨佔 共享的模式中均會使用到的存放 Node 的 CLH queue(主要特色是, 隊列中總有一個 dummy 節點, 後繼節點獲取鎖的條件由前繼節點決定, 前繼節點在釋放 lock 時會喚醒sleep中的後繼節點)
  • ConditionObject, 用於獨佔的模式, 主要是線程釋放lock, 加入 Condition Queue, 並進行相應的 signal 操做。
  • 獨佔的獲取lock (acquire, release), 例如 ReentrantLock。
  • 共享的獲取lock (acquireShared, releaseShared), 例如 ReeantrantReadWriteLock, Semaphore, CountDownLatch

下面咱們具體來分析一下AQS實現的源碼。微信

3. 內部類 Node

Node 節點是表明獲取lock的線程, 存在於 Condition Queue, Sync Queue 裏面, 而其主要就是 nextWaiter (標記共享仍是獨佔),waitStatus 標記node的狀態。多線程

node
內部類 Node

static final class Node {
    /** 標識節點是不是 共享的節點(這樣的節點只存在於 Sync Queue 裏面) */
    static final Node SHARED = new Node();
    //獨佔模式
    static final Node EXCLUSIVE = null;
    /** * CANCELLED 說明節點已經 取消獲取 lock 了(通常是因爲 interrupt 或 timeout 致使的) * 不少時候是在 cancelAcquire 裏面進行設置這個標識 */
    static final int CANCELLED = 1;

    /** * SIGNAL 標識當前節點的後繼節點須要喚醒(PS: 這個一般是在 獨佔模式下使用, 在共享模式下有時用 PROPAGATE) */
    static final int SIGNAL = -1;
    
    //當前節點在 Condition Queue 裏面
    static final int CONDITION = -2;
    
    /** * 當前節點獲取到 lock 或進行 release lock 時, 共享模式的最終狀態是 PROPAGATE(PS: 有可能共享模式的節點變成 PROPAGATE 以前就被其後繼節點搶佔 head 節點, 而從Sync Queue中被踢出掉) */
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    /** * 節點在 Sync Queue 裏面時的前繼節點(主要來進行 skip CANCELLED 的節點) * 注意: 根據 addWaiter方法: * 1. prev節點在隊列裏面, 則 prev != null 確定成立 * 2. prev != null 成立, 不必定 node 就在 Sync Queue 裏面 */
    volatile Node prev;

    /** * Node 在 Sync Queue 裏面的後繼節點, 主要是在release lock 時進行後繼節點的喚醒 * 然後繼節點在前繼節點上打上 SIGNAL 標識, 來提醒他 release lock 時須要喚醒 */
    volatile Node next;

    //獲取 lock 的引用
    volatile Thread thread;

    /** * 做用分紅兩種: * 1. 在 Sync Queue 裏面, nextWaiter用來判斷節點是 共享模式, 仍是獨佔模式 * 2. 在 Condition queue 裏面, 節點主要是連接且後繼節點 (Condition queue是一個單向的, 不支持併發的 list) */
    Node nextWaiter;

    // 當前節點是不是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 獲取 node 的前繼節點
    final Node predecessor() throws NullPointerException{
        Node p = prev;
        if(p == null){
            throw new NullPointerException();
        }else{
            return p;
        }
    }

    Node(){
        // Used to establish initial head or SHARED marker
    }

    // 初始化 Node 用於 Sync Queue 裏面
    Node(Thread thread, Node mode){     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //初始化 Node 用於 Condition Queue 裏面
    Node(Thread thread, int waitStatus){ // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
複製代碼

waitStatus的狀態變化:併發

  1. 線程剛入 Sync Queue 裏面, 發現獨佔鎖被其餘人獲取, 則將其前繼節點標記爲 SIGNAL, 而後再嘗試獲取一下鎖(調用 tryAcquire 方法)
  2. 若調用 tryAcquire 方法獲取失敗, 則判斷一下是否前繼節點被標記爲 SIGNAL, 如果的話 直接 block(block前會確保前繼節點被標記爲SIGNAL, 由於前繼節點在進行釋放鎖時根據是否標記爲 SIGNAL 來決定喚醒後繼節點與否 <- 這是獨佔的狀況下)
  3. 前繼節點使用完lock, 進行釋放, 由於本身被標記爲 SIGNAL, 因此喚醒其後繼節點

waitStatus 變化過程:框架

  1. 獨佔模式下: 0(初始) -> signal(被後繼節點標記爲release須要喚醒後繼節點) -> 0 (等釋放好lock, 會恢復到0)
  2. 獨佔模式 + 使用 Condition狀況下: 0(初始) -> signal(被後繼節點標記爲release須要喚醒後繼節點) -> 0 (等釋放好lock, 會恢復到0)其上可能涉及 中斷與超時, 只是多了一個 CANCELLED, 當節點變成 CANCELLED, 後就等着被清除。
  3. 共享模式下: 0(初始) -> PROPAGATE(獲取 lock 或release lock 時) (獲取 lock 時會調用 setHeadAndPropagate 來進行 傳遞式的喚醒後繼節點, 直到碰到 獨佔模式的節點)
  4. 共享模式 + 獨佔模式下: 0(初始) -> signal(被後繼節點標記爲release須要喚醒後繼節點) -> 0 (等釋放好lock, 會恢復到0)

其上的這些狀態變化主要在: doReleaseShared , shouldParkAfterFailedAcquire 裏面。jvm

4. Condition Queue

Condition Queue 是一個併發不安全的, 只用於獨佔模式的隊列(PS: 爲何是併發不安全的呢? 主要是在操做 Condition 時, 線程必需獲取 獨佔的 lock, 因此不須要考慮併發的安全問題); 而當Node存在於 Condition Queue 裏面, 則其只有 waitStatus, thread, nextWaiter 有值, 其餘的都是null(其中的 waitStatus 只能是 CONDITION, 0(0 表明node進行轉移到 Sync Queue裏面, 或被中斷/timeout)); 這裏有個注意點, 就是當線程被中斷或獲取 lock 超時, 則一瞬間 node 會存在於 Condition Queue, Sync Queue 兩個隊列中.

ConditionQueue
Condition Queue
節點 Node4, Node5, Node6, Node7 都是調用 Condition.awaitXX 方法加入 Condition Queue(PS: 加入後會將原來的 lock 釋放)。

4.1 入隊列方法 addConditionWaiter

將當前線程封裝成一個 Node 節點放入到 Condition Queue 裏面你們能夠注意到, 下面對 Condition Queue 的操做都沒考慮到 併發(Sync Queue 的隊列是支持併發操做的), 這是爲何呢? 由於在進行操做 Condition 是當前的線程已經獲取了AQS的獨佔鎖, 因此不須要考慮併發的狀況。

private Node addConditionWaiter(){
    Node t = lastWaiter;                                
    // Condition queue 的尾節點 
	// 尾節點已經Cancel, 直接進行清除,
    /** * 當Condition進行 awiat 超時或被中斷時, Condition裏面的節點是沒有被刪除掉的, 須要其 * 他await 在將線程加入 Condition Queue 時調用addConditionWaiter而進而刪除, 或 await 操做差很少結束時, 調用 "node.nextWaiter != null" 進行判斷而刪除 (PS: 經過 signal 進行喚 * 醒時 node.nextWaiter 會被置空, 而中斷和超時時不會) */
    if(t != null && t.waitStatus != Node.CONDITION){
    	/** * 調用 unlinkCancelledWaiters 對 "waitStatus != Node.CONDITION" 的節點進行 * 刪除(在Condition裏面的Node的waitStatus 要麼是CONDITION(正常), 要麼就是 0 * (signal/timeout/interrupt)) */
        unlinkCancelledWaiters();                     
        t = lastWaiter;                     
    }
    //將線程封裝成 node 準備放入 Condition Queue 裏面
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if(t == null){
    	//Condition Queue 是空的
        firstWaiter = node;                           
    } else {
    	// 追加到 queue 尾部
        t.nextWaiter = node;                          
    }
    lastWaiter = node;                               
    return node;
}
複製代碼

4.2 刪除Cancelled節點的方法 unlinkCancelledWaiters

當Node在Condition Queue 中, 若狀態不是 CONDITION, 則必定是被中斷或超時。在調用 addConditionWaiter 將線程放入 Condition Queue 裏面時或 awiat 方法獲取結束時 進行清理 Condition queue 裏面的因 timeout/interrupt 而還存在的節點。這個刪除操做比較巧妙, 其中引入了 trail 節點, 能夠理解爲traverse整個 Condition Queue 時遇到的最後一個有效的節點。

private void unlinkCancelledWaiters(){
    Node t = firstWaiter;
    Node trail = null;
    while(t != null){
        Node next = t.nextWaiter;               // 1. 先初始化 next 節點
        if(t.waitStatus != Node.CONDITION){   // 2. 節點不有效, 在Condition Queue 裏面 Node.waitStatus 只有多是 CONDITION 或是 0(timeout/interrupt引發的)
            t.nextWaiter = null;               // 3. Node.nextWaiter 置空
            if(trail == null){                  // 4. 一次都沒有遇到有效的節點
                firstWaiter = next;            // 5. 將 next 賦值給 firstWaiter(此時 next 可能也是無效的, 這只是一個臨時處理)
            } else {
                trail.nextWaiter = next;       // 6. next 賦值給 trail.nextWaiter, 這一步其實就是刪除節點 t
            }
            if(next == null){                  // 7. next == null 說明 已經 traverse 完了 Condition Queue
                lastWaiter = trail;
            }
        }else{
            trail = t;                         // 8. 將有效節點賦值給 trail
        }
        t = next;
    }
}
複製代碼

4.3 轉移節點的方法 transferForSignal

transferForSignal只有在節點被正常喚醒才調用的正常轉移的方法。
將Node 從Condition Queue 轉移到 Sync Queue 裏面在調用transferForSignal以前, 會 first.nextWaiter = null;而咱們發現若節點是由於 timeout / interrupt 進行轉移, 則不會進行這步操做; 兩種狀況的轉移都會把 wautStatus 置爲 0

final boolean transferForSignal(Node node){
    /** * If cannot change waitStatus, the node has been cancelled */
    if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已經 cancelled 則失敗
        return false;
    }

    Node p = enq(node);                                 // 2. 加入 Sync Queue
    int ws = p.waitStatus;
    if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 這裏的 ws > 0 指Sync Queue 中node 的前繼節點cancelled 了, 因此, 喚醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失敗, 則說明 前繼節點已經變成 SIGNAL 或 cancelled, 因此也要 喚醒
        LockSupport.unpark(node.thread);
    }
    return true;
}
複製代碼

4.4 轉移節點的方法 transferAfterCancelledWait

transferAfterCancelledWait 在節點獲取lock時被中斷或獲取超時才調用的轉移方法。將 Condition Queue 中因 timeout/interrupt 而喚醒的節點進行轉移

final boolean transferAfterCancelledWait(Node node){
    if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 沒有 node 沒有 cancelled , 直接進行轉移 (轉移後, Sync Queue , Condition Queue 都會存在 node)
        enq(node);
        return true;
    }
    
    while(!isOnSyncQueue(node)){                // 2.這時是其餘的線程發送signal,將本線程轉移到 Sync Queue 裏面的工程中(轉移的過程當中 waitStatus = 0了, 因此上面的 CAS 操做失敗)
        Thread.yield();                         // 這裏調用 isOnSyncQueue判斷是否已經 入Sync Queue 了
    }
    return false;
}
複製代碼

5. Sync Queue

AQS內部維護着一個FIFO的CLH隊列,因此AQS並不支持基於優先級的同步策略。至於爲什麼要選擇CLH隊列,主要在於CLH鎖相對於MSC鎖,他更加容易處理cancel和timeout,同時他具有進出隊列快、無所、暢通無阻、檢查是否有線程在等待也很是容易(head != tail,頭尾指針不一樣)。固然相對於原始的CLH隊列鎖,ASQ採用的是一種變種的CLH隊列鎖:

  1. 原始CLH使用的locked自旋,而AQS的CLH則是在每一個node裏面使用一個狀態字段來控制阻塞,而不是自旋。
  2. 爲了能夠處理timeout和cancel操做,每一個node維護一個指向前驅的指針。若是一個node的前驅被cancel,這個node能夠前向移動使用前驅的狀態字段。
  3. head結點使用的是傀儡結點。

SyncQueue
Sync Queue

這個圖表明有個線程獲取lock, 而 Node1, Node2, Node3 則在Sync Queue 裏面進行等待獲取lock(PS: 注意到 dummy Node 的SINGNAL 這是叫獲取 lock 的線程在釋放lock時通知後繼節點的標示)

5.1 Sync Queue 節點入Queue方法

這裏有個地方須要注意, 就是初始化 head, tail 的節點, 不必定是 head.next, 由於期間可能被其餘的線程進行搶佔了。將當前的線程封裝成 Node 加入到 Sync Queue 裏面。

private Node addWaiter(Node mode){
    Node node = new Node(Thread.currentThread(), mode);      // 1. 封裝 Node
    Node pred = tail;
    if(pred != null){                           // 2. pred != null -> 隊列中已經有節點, 直接 CAS 到尾節點
        node.prev = pred;                       // 3. 先設置 Node.pre = pred (PS: 則當一個 node在Sync Queue裏面時 node.prev 必定 != null(除 dummy node), 可是 node.prev != null 不能說明其在 Sync Queue 裏面, 由於如今的CAS可能失敗 )
        if(compareAndSetTail(pred, node)){      // 4. CAS node 到 tail
            pred.next = node;                  // 5. CAS 成功, 將 pred.next = node (PS: 說明 node.next != null -> 則 node 必定在 Sync Queue, 但若 node 在Sync Queue 裏面不必定 node.next != null)
            return node;
        }
    }
    enq(node);                                 // 6. 隊列爲空, 調用 enq 入隊列
    return node;
}


/** * 這個插入會檢測head tail 的初始化, 必要的話會初始化一個 dummy 節點, 這個和 ConcurrentLinkedQueue 同樣的 * 將節點 node 加入隊列 * 這裏有個注意點 * 狀況: * 1. 首先 queue是空的 * 2. 初始化一個 dummy 節點 * 3. 這時再在tail後面添加節點(這一步可能失敗, 可能發生競爭被其餘的線程搶佔) * 這裏爲何要加入一個 dummy 節點呢? * 這裏的 Sync Queue 是CLH lock的一個變種, 線程節點 node 可否獲取lock的判斷經過其前繼節點 * 並且這裏在當前節點想獲取lock時一般給前繼節點 打上 signal 的標識(表示前繼節點釋放lock須要通知我來獲取lock) * 若這裏不清楚的同窗, 請先看看 CLH lock的資料 (這是理解 AQS 的基礎) */
private Node enq(final Node node){
    for(;;){
        Node t = tail;
        if(t == null){ // Must initialize // 1. 隊列爲空 初始化一個 dummy 節點 其實和 ConcurrentLinkedQueue 同樣
            if(compareAndSetHead(new Node())){  // 2. 初始化 head 與 tail (這個CAS成功後, head 就有值了, 詳情將 Unsafe 操做)
                tail = head;
            }
        }else{
            node.prev = t;                      // 3. 先設置 Node.pre = pred (PS: 則當一個 node在Sync Queue裏面時 node.prev 必定 != null, 可是 node.prev != null 不能說明其在 Sync Queue 裏面, 由於如今的CAS可能失敗 )
            if(compareAndSetTail(t, node)){     // 4. CAS node 到 tail
                t.next = node;                  // 5. CAS 成功, 將 pred.next = node (PS: 說明 node.next != null -> 則 node 必定在 Sync Queue, 但若 node 在Sync Queue 裏面不必定 node.next != null)
                return t;
            }
        }
    }
}
複製代碼

5.2 Sync Queue 節點出Queue方法

這裏的出Queue的方法其實有兩個: 新節點獲取lock, 調用setHead搶佔head, 而且剔除原head;節點因被中斷或獲取超時而進行 cancelled, 最後被剔除。

/** * 設置 head 節點(在獨佔模式沒有併發的可能, 當共享的模式有可能) */
private void setHead(Node node){
    head = node;
    node.thread = null; // 清除線程引用
    node.prev = null; // 清除原來 head 的引用 <- 都是 help GC
}

// 清除因中斷/超時而放棄獲取lock的線程節點(此時節點在 Sync Queue 裏面)
private void cancelAcquire(Node node) {
    if (node == null)
        return;

    node.thread = null;                 // 1. 線程引用清空

    Node pred = node.prev;
    while (pred.waitStatus > 0)       // 2. 若前繼節點是 CANCELLED 的, 則也一併清除
        node.prev = pred = pred.prev;
        
    Node predNext = pred.next;         // 3. 這裏的 predNext也是須要清除的(只不過在清除時的 CAS 操做須要 它)

    node.waitStatus = Node.CANCELLED; // 4. 標識節點須要清除

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { // 5. 若須要清除額節點是尾節點, 則直接 CAS pred爲尾節點
        compareAndSetNext(pred, predNext, null);    // 6. 刪除節點predNext
    } else {
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL || // 7. 後繼節點須要喚醒(但這裏的後繼節點predNext已經 CANCELLED 了)
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8. 將 pred 標識爲 SIGNAL
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) // 8. next.waitStatus <= 0 表示 next 是個一個想要獲取lock的節點
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node); // 若 pred 是頭節點, 則此刻可能有節點剛剛進入 queue ,因此進行一下喚醒
        }

        node.next = node; // help GC
    }
}
複製代碼

6. 獨佔Lock

6.1 獨佔方式獲取lock主要流程

  1. 調用 tryAcquire 嘗試性的獲取鎖(通常都是由子類實現), 成功的話直接返回
  2. tryAcquire 調用獲取失敗, 將當前的線程封裝成 Node 加入到 Sync Queue 裏面(調用addWaiter), 等待獲取 signal 信號
  3. 調用 acquireQueued 進行自旋的方式獲取鎖(有可能會 repeatedly blocking and unblocking)
  4. 根據acquireQueued的返回值判斷在獲取lock的過程當中是否被中斷, 若被中斷, 則本身再中斷一下(selfInterrupt), 如果響應中斷的則直接拋出異常

6.2 獨佔方式獲取lock主要分紅3類

  1. acquire 不響應中斷的獲取lock, 這裏的不響應中斷指的是線程被中斷後會被喚醒, 而且繼續獲取lock,在方法返回時, 根據剛纔的獲取過程是否被中斷來決定是否要本身中斷一下(方法 selfInterrupt)
  2. doAcquireInterruptibly 響應中斷的獲取 lock, 這裏的響應中斷, 指在線程獲取 lock 過程當中若被中斷, 則直接拋出異常
  3. doAcquireNanos 響應中斷及超時的獲取 lock, 當線程被中斷, 或獲取超時, 則直接拋出異常, 獲取失敗

6.3 獨佔的獲取lock 方法 acquire

acquire(int arg):以獨佔模式獲取對象,忽略中斷。

public final void acquire(int arg){
    if(!tryAcquire(arg)&&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}
複製代碼
  1. 調用 tryAcquire 嘗試性的獲取鎖(通常都是又子類實現), 成功的話直接返回
  2. tryAcquire 調用獲取失敗, 將當前的線程封裝成 Node 加入到 Sync Queue 裏面(調用addWaiter), 等待獲取 signal 信號
  3. 調用 acquireQueued 進行自旋的方式獲取鎖(有可能會 repeatedly blocking and unblocking)
  4. 根據acquireQueued的返回值判斷在獲取lock的過程當中是否被中斷, 若被中斷, 則本身再中斷一下(selfInterrupt)。

6.4 循環獲取lock 方法 acquireQueued

final boolean acquireQueued(final Node node, int arg){
        boolean failed = true;
        try {
            boolean interrupted = false;
            for(;;){
                final Node p = node.predecessor();      // 1. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
                if(p == head && tryAcquire(arg)){       // 2. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquire嘗試獲取一下
                    setHead(node);                       // 3. 獲取 lock 成功, 直接設置 新head(原來的head可能就直接被回收)
                    p.next = null; // help GC // help gc
                    failed = false;
                    return interrupted;                // 4. 返回在整個獲取的過程當中是否被中斷過 ; 但這又有什麼用呢? 若整個過程當中被中斷過, 則最後我在 自我中斷一下 (selfInterrupt), 由於外面的函數可能須要知道整個過程是否被中斷過
                }
                if(shouldParkAfterFailedAcquire(p, node) && // 5. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
                        parkAndCheckInterrupt()){      // 6. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
                    interrupted = true;
                }
            }
        }finally {
            if(failed){                             // 7. 在整個獲取中出錯
                cancelAcquire(node);                // 8. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
            }
        }
    }
複製代碼

主邏輯:

  1. 當前節點的前繼節點是head節點時,先 tryAcquire獲取一下鎖, 成功的話設置新 head, 返回
  2. 第一步不成功, 檢測是否須要sleep, 須要的話就sleep, 等待前繼節點在釋放lock時喚醒或經過中斷來喚醒
  3. 整個過程可能須要blocking nonblocking 幾回

6.5 支持中斷獲取lock 方法 doAcquireInterruptibly

private void doAcquireInterruptibly(int arg) throws InterruptedException{
    final Node node = addWaiter(Node.EXCLUSIVE);  // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
    boolean failed = true;
    try {
        for(;;){
            final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
            if(p == head && tryAcquire(arg)){  // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquire嘗試獲取一下
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 4. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
                    parkAndCheckInterrupt()){           // 5. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
                throw new InterruptedException();       // 6. 線程此時喚醒是經過線程中斷, 則直接拋異常
            }
        }
    }finally {
        if(failed){                 // 7. 在整個獲取中出錯(好比線程中斷)
            cancelAcquire(node);    // 8. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
        }
    }
}
複製代碼

acquireInterruptibly(int arg): 以獨佔模式獲取對象,若是被中斷則停止。

public final void acquireInterruptibly(int arg) throws InterruptedException {    
        if (Thread.interrupted())    
            throw new InterruptedException();    
        if (!tryAcquire(arg))       
            doAcquireInterruptibly(arg);     
    }
複製代碼

經過先檢查中斷的狀態,而後至少調用一次tryAcquire,返回成功。不然,線程在排隊,不停地阻塞與喚醒,調用tryAcquire直到成功或者被中斷。

6.6 超時&中斷獲取lock 方法

tryAcquireNanos(int arg, long nanosTimeout):獨佔且支持超時模式獲取: 帶有超時時間,若是通過超時時間則會退出。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{
    if(nanosTimeout <= 0L){
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout; // 0. 計算截至時間
    final Node node = addWaiter(Node.EXCLUSIVE);  // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor(); // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
            if(p == head && tryAcquire(arg)){  // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquire嘗試獲取一下
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }

            nanosTimeout = deadline - System.nanoTime(); // 4. 計算還剩餘的時間
            if(nanosTimeout <= 0L){                      // 5. 時間超時, 直接返回
                return false;
            }
            if(shouldParkAfterFailedAcquire(p, node) && // 6. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
                    nanosTimeout > spinForTimeoutThreshold){ // 7. 若沒超時, 而且大於spinForTimeoutThreshold, 則線程 sleep(小於spinForTimeoutThreshold, 則直接自旋, 由於效率更高 調用 LockSupport 是須要開銷的)
                LockSupport.parkNanos(this, nanosTimeout);
            }
            if(Thread.interrupted()){                           // 8. 線程此時喚醒是經過線程中斷, 則直接拋異常
                throw new InterruptedException();
            }
        }
    }finally {
        if(failed){                 // 9. 在整個獲取中出錯(好比線程中斷/超時)
            cancelAcquire(node);    // 10. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
        }
    }
}
複製代碼

嘗試以獨佔模式獲取,若是中斷和超時則放棄。實現時先檢查中斷的狀態,而後至少調用一次tryAcquire。

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {    
     if (Thread.interrupted())    
         throw new InterruptedException();    
     return tryAcquire(arg)|| doAcquireNanos(arg, nanosTimeout);    
}
複製代碼

6.7 釋放lock方法

釋放 lock 流程:

  • 調用子類的 tryRelease 方法釋放獲取的資源
  • 判斷是否徹底釋放lock(這裏有 lock 重複獲取的狀況)
  • 判斷是否有後繼節點須要喚醒, 須要的話調用unparkSuccessor進行喚醒
public final boolean release(int arg){
    if(tryRelease(arg)){   // 1. 調用子類, 若徹底釋放好, 則返回true(這裏有lock重複獲取)
        Node h = head;
        if(h != null && h.waitStatus != 0){ // 2. h.waitStatus !=0 其實就是 h.waitStatus < 0 後繼節點須要喚醒
            unparkSuccessor(h);   // 3. 喚醒後繼節點
        }
        return true;
    }
    return false;
}

/** * 喚醒 node 的後繼節點 * 這裏有個注意點: 喚醒時會將當前node的標識歸位爲 0 * 等於當前節點標識位 的流轉過程: 0(剛加入queue) -> signal (被後繼節點要求在釋放時須要喚醒) -> 0 (進行喚醒後繼節點) */
private void unparkSuccessor(Node node) {
    logger.info("unparkSuccessor node:" + node + Thread.currentThread().getName());
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);       // 1. 清除前繼節點的標識
    Node s = node.next;
    logger.info("unparkSuccessor s:" + node + Thread.currentThread().getName());
    if (s == null || s.waitStatus > 0) {         // 2. 這裏若在 Sync Queue 裏面存在想要獲取 lock 的節點,則必定須要喚醒一下(跳過取消的節點)&emsp;(PS: s == null發生在共享模式的競爭釋放資源)
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)              // 3. 找到 queue 裏面最前面想要獲取 Lock 的節點
                s = t;
    }
    logger.info("unparkSuccessor s:"+s);
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

7. 共享Lock

7.1 共享方式獲取lock流程

  1. 調用 tryAcquireShared 嘗試性的獲取鎖(通常都是由子類實現), 成功的話直接返回
  2. tryAcquireShared 調用獲取失敗, 將當前的線程封裝成 Node 加入到 Sync Queue 裏面(調用addWaiter), 等待獲取 signal 信號
  3. 在 Sync Queue 裏面進行自旋的方式獲取鎖(有可能會 repeatedly blocking and unblocking
  4. 當獲取失敗, 則判斷是否能夠 block(block的前提是前繼節點被打上 SIGNAL 標示)
  5. 共享與獨佔獲取lock的區別主要在於 在共享方式下獲取 lock 成功會判斷是否須要繼續喚醒下面的繼續獲取共享lock的節點(及方法 doReleaseShared)

7.2 共享方式獲取lock主要分紅3類

  1. acquireShared 不響應中斷的獲取lock, 這裏的不響應中斷指的是線程被中斷後會被喚醒, 而且繼續獲取lock,在方法返回時, 根據剛纔的獲取過程是否被中斷來決定是否要本身中斷一下(方法 selfInterrupt)
  2. doAcquireSharedInterruptibly 響應中斷的獲取 lock, 這裏的響應中斷, 指在線程獲取 lock 過程當中若被中斷, 則直接拋出異常
  3. doAcquireSharedNanos 響應中斷及超時的獲取 lock, 當線程被中斷, 或獲取超時, 則直接拋出異常, 獲取失敗

7.3 獲取共享lock 方法 acquireShared

public final void acquireShared(int arg){
    if(tryAcquireShared(arg) < 0){  // 1. 調用子類, 獲取共享 lock 返回 < 0, 表示失敗
        doAcquireShared(arg);       // 2. 調用 doAcquireShared 當前 線程加入 Sync Queue 裏面, 等待獲取 lock
    }
}
複製代碼

7.4 獲取共享lock 方法 doAcquireShared

private void doAcquireShared(int arg){
    final Node node = addWaiter(Node.SHARED);       // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
    boolean failed = true;

    try {
        boolean interrupted = false;
        for(;;){
            final Node p = node.predecessor();      // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
            if(p == head){
                int r = tryAcquireShared(arg);      // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);   // 4. 獲取 lock 成功, 設置新的 head, 並喚醒後繼獲取 readLock 的節點
                    p.next = null; // help GC
                    if(interrupted){               // 5. 在獲取 lock 時, 被中斷過, 則本身再自我中斷一下(外面的函數可能須要這個參數)
                        selfInterrupt();
                    }
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 6. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
                    parkAndCheckInterrupt()){           // 7. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
                interrupted = true;
            }
        }
    }finally {
        if(failed){             // 8. 在整個獲取中出錯(好比線程中斷/超時)
            cancelAcquire(node);  // 9. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
        }
    }
}
複製代碼

7.5 獲取共享lock 方法 doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException{
    final Node node = addWaiter(Node.SHARED);            // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor();          // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
            if(p == head){
                int r = tryAcquireShared(arg);          // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);       // 4. 獲取 lock 成功, 設置新的 head, 並喚醒後繼獲取 readLock 的節點
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 5. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
                    parkAndCheckInterrupt()){           // 6. 如今lock仍是被其餘線程佔用 那就睡一會, 返回值判斷是否此次線程的喚醒是被中斷喚醒
                throw new InterruptedException();     // 7. 若這次喚醒是 經過線程中斷, 則直接拋出異常
            }
        }
    }finally {
        if(failed){              // 8. 在整個獲取中出錯(好比線程中斷/超時)
            cancelAcquire(node); // 9. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
        }
    }
}
複製代碼

7.6 獲取共享lock 方法 doAcquireSharedNanos

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{
    if (nanosTimeout <= 0L){
        return false;
    }

    final long deadline = System.nanoTime() + nanosTimeout;  // 0. 計算超時的時間
    final Node node = addWaiter(Node.SHARED);               // 1. 將當前的線程封裝成 Node 加入到 Sync Queue 裏面
    boolean failed = true;

    try {
        for(;;){
            final Node p = node.predecessor();          // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裏面, 而且沒有獲取 lock 的 node 的前繼節點不多是 null)
            if(p == head){
                int r = tryAcquireShared(arg);          // 3. 判斷前繼節點是不是head節點(前繼節點是head, 存在兩種狀況 (1) 前繼節點如今佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 如今有機會獲取 lock); 則再次調用 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);       // 4. 獲取 lock 成功, 設置新的 head, 並喚醒後繼獲取 readLock 的節點
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }

            nanosTimeout = deadline - System.nanoTime(); // 5. 計算還剩餘的 timeout , 若小於0 則直接return
            if(nanosTimeout <= 0L){
                return false;
            }
            if(shouldParkAfterFailedAcquire(p, node) &&         // 6. 調用 shouldParkAfterFailedAcquire 判斷是否須要中斷(這裏可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是不是 signal))
                    nanosTimeout > spinForTimeoutThreshold){// 7. 在timeout 小於 spinForTimeoutThreshold 時 spin 的效率, 比 LockSupport 更高
                LockSupport.parkNanos(this, nanosTimeout);
            }
            if(Thread.interrupted()){                           // 7. 若這次喚醒是 經過線程中斷, 則直接拋出異常
                throw new InterruptedException();
            }
        }
    }finally {
        if (failed){                // 8. 在整個獲取中出錯(好比線程中斷/超時)
            cancelAcquire(node);    // 10. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 而後再刪除)
        }
    }
}
複製代碼

7.7 釋放共享lock

當 Sync Queue中存在連續多個獲取 共享lock的節點時, 會出現併發的喚醒後繼節點(由於共享模式下獲取lock後會喚醒近鄰的後繼節點來獲取lock)。首先調用子類的 tryReleaseShared來進行釋放 lock,而後判斷是否須要喚醒後繼節點來獲取 lock

private void doReleaseShared(){
    for(;;){
        Node h = head;                      // 1. 獲取 head 節點, 準備 release
        if(h != null && h != tail){        // 2. Sync Queue 裏面不爲 空
            int ws = h.waitStatus;
            if(ws == Node.SIGNAL){         // 3. h節點後面多是 獨佔的節點, 也多是 共享的, 而且請求了喚醒(就是給前繼節點打標記 SIGNAL)
                if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){ // 4. h 恢復 waitStatus 值置0 (爲啥這裏要用 CAS 呢, 由於這裏的調用多是在 節點剛剛獲取 lock, 而其餘線程又對其進行中斷, 所用cas就出現失敗)
                    continue; // loop to recheck cases
                }
                unparkSuccessor(h);         // 5. 喚醒後繼節點
            }
            else if(ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){ //6. h後面沒有節點須要喚醒, 則標識爲 PROPAGATE 表示須要繼續傳遞喚醒(主要是區別 獨佔節點最終狀態0 (獨佔的節點在沒有後繼節點, 而且release lock 時最終 waitStatus 保存爲 0))
                continue; // loop on failed CAS // 7. 一樣這裏可能存在競爭
            }
        }

        if(h == head){ // 8. head 節點沒變化, 直接 return(從這裏也看出, 一個共享模式的 節點在其喚醒後繼節點時, 只喚醒一個, 可是它會在獲取 lock 時喚醒, 釋放 lock 時也進行, 因此或致使競爭的操做)
            break;           // head 變化了, 說明其餘節點獲取 lock 了, 本身的任務完成, 直接退出
        }

    }
}
複製代碼

8. 總結

本文主要講過了抽象的隊列式的同步器AQS的主要方法和實現原理。分別介紹了Node、Condition Queue、 Sync Queue、獨佔獲取釋放lock、共享獲取釋放lock的具體源碼實現。AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它。

訂閱最新文章,歡迎關注個人公衆號

微信公衆號

參考

  1. Java併發之AQS詳解
  2. AbstractQueuedSynchronizer 源碼分析 (基於Java 8)
相關文章
相關標籤/搜索