溫故知新-多線程-深刻剖析AQS



摘要

本文經過ReentrantLock來窺探AbstractQueuedSynchronizer(AQS)的實現原理,在看此文以前。你須要瞭解一下park、unpark的功能,請移步至上一篇《深刻剖析park、unpark》;html

AbstractQueuedSynchronizer實現一把鎖

根據AbstractQueuedSynchronizer的官方文檔,若是想實現一把鎖的,須要繼承AbstractQueuedSynchronizer,並須要重寫tryAcquire、tryRelease、可選擇重寫isHeldExclusively提供locked state、由於支持序列化,因此須要重寫readObject以便反序列化時恢復原始值、newCondition提供條件;官方提供的java代碼以下(官方文檔見參考鏈接);java

public class MyLock implements Lock, java.io.Serializable {
    private static class Sync extends AbstractQueuedSynchronizer {
      
        // Acquires the lock if state is zero
        @Override
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
      
       // Reports whether in locked state
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }

    /**
     * The sync object does all the hard work. We just forward to it.
     */
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }


    private static volatile Integer value = 0;

    public static void main(String[] args) {

        MyLock myLock = new MyLock();
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                myLock.lock();
                value ++;
                myLock.unlock();
            }).start();
        }
        System.out.println(value);
    }
}

上面是一個不可重入的鎖,它實現了一個鎖基礎功能,目的是爲了跟ReentrantLock的實現作對比;node

ReentrantLock

ReentrantLock的特色

ReentrantLock意思爲可重入鎖,指的是一個線程可以對一個臨界資源重複加鎖。ReentrantLock跟經常使用的Synchronized進行比較;c#

image-20200603140814563

Synchronized的基礎用法

Synchronized的分析能夠參考《深刻剖析synchronized關鍵詞》,ReentrantLock能夠建立公平鎖、也能夠建立非公平鎖,接下來看一下ReentrantLock的簡單用法,非公平鎖實現比較簡單,今天重點是公平鎖;api

public class ReentrantLockTest {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock(true);
        reentrantLock.lock();
        try {
            log.info("lock");
        } catch (Exception e) {
            log.error(e);
        } finally {
            reentrantLock.unlock();
            log.info("unlock");
        }
    }
}

ReentrantLock與AQS的關聯

先看一下加鎖方法lock安全

  • 非公平鎖lock方法數據結構

    compareAndSetState很好理解,經過CAS加鎖,若是加鎖失敗調用acquire;架構

/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
  • 公平鎖lock方法
final void lock() {
    acquire(1);
}
  • AQS框架的處理流程

​ 線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續,分析實現原理oracle

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

總結:公平鎖的上鎖是必須判斷本身是否是須要排隊;而非公平鎖是直接進行CAS修改計數器看能不能加鎖成功;若是加鎖不成功則乖乖排隊(調用acquire);因此無論公平仍是不公平;只要進到了AQS隊列當中那麼他就會排隊;app

AQS架構圖

美團畫的AQS的架構圖,很詳細,當有自定義同步器接入時,只需重寫第一層所須要的部分方法便可,不須要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操做時,先通過第一層的API進入AQS內部方法,而後通過第二層進行鎖的獲取,接着對於獲取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依賴於第五層的基礎數據提供層。

82077ccf14127a87b77cefd1ccf562d3253591

AQS核心思想是,若是被請求的共享資源空閒,那麼就將當前請求資源的線程設置爲有效的工做線程,將共享資源設置爲鎖定狀態;若是共享資源被佔用,就須要必定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH隊列的變體實現的,將暫時獲取不到鎖的線程加入到隊列中。

CLH:Craig、Landin and Hagersten隊列,是單向鏈表,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS是經過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。

  • 非公平鎖的加鎖流程
img
  • 公平鎖的加鎖流程
image-20200604172945029
  • 解鎖公平鎖和非公平鎖邏輯一致
image-20200604173023011

加鎖:

  • 經過ReentrantLock的加鎖方法Lock進行加鎖操做。
  • 會調用到內部類Sync的Lock方法,因爲Sync#lock是抽象方法,根據ReentrantLock初始化選擇的公平鎖和非公平鎖,執行相關內部類的Lock方法,本質上都會執行AQS的Acquire方法。
  • AQS的Acquire方法會執行tryAcquire方法,可是因爲tryAcquire須要自定義同步器實現,所以執行了ReentrantLock中的tryAcquire方法,因爲ReentrantLock是經過公平鎖和非公平鎖內部類實現的tryAcquire方法,所以會根據鎖類型不一樣,執行不一樣的tryAcquire。
  • tryAcquire是獲取鎖邏輯,獲取失敗後,會執行框架AQS的後續邏輯,跟ReentrantLock自定義同步器無關。
  • 流程:Lock -> acquire -> tryAcquire( or nonfairTryAcquire)

解鎖:

  • 經過ReentrantLock的解鎖方法Unlock進行解鎖。
  • Unlock會調用內部類Sync的Release方法,該方法繼承於AQS。
  • Release中會調用tryRelease方法,tryRelease須要自定義同步器實現,tryRelease只在ReentrantLock中的Sync實現,所以能夠看出,釋放鎖的過程,並不區分是否爲公平鎖。
  • 釋放成功後,全部處理由AQS框架完成,與自定義同步器無關。
  • 流程:unlock -> release -> tryRelease

acquire獲取鎖

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        selfInterrupt();
    }
}

tryAcquire

acquire方法首先會調tryAcquire方法,須要注意的是tryAcquire的結果作取反;根據前面分析,tryAcquire會調用子類的實現,ReentrantLock有兩個內部類,FairSync,NonfairSync,都繼承自Sync,Sync繼承AbstractQueuedSynchronizer;

實現方式差異在是否有hasQueuedPredecessors() 的判斷條件

  • 公平鎖實現
/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 獲取lock對象的上鎖狀態,若是鎖是自由狀態則=0,若是被上鎖則爲1,大於1表示重入  
    int c = getState();
    if (c == 0) {
      	// hasQueuedPredecessors,判斷本身是否須要排隊
        // 下面我會單獨介紹,若是不須要排隊則進行cas嘗試加鎖
        // 若是加鎖成功則把當前線程設置爲擁有鎖的線程
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
  	// 若是C不等於0,可是當前線程等於擁有鎖的線程則表示這是一次重入,那麼直接把狀態+1表示重入次數+1
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平鎖

/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
  • Node

先來看下AQS中最基本的數據結構——Node,Node即爲上面CLH變體隊列中的節點。

static final class Node {
    static final Node SHARED = new Node(); // 表示線程以共享的模式等待鎖
    static final Node EXCLUSIVE = null; // 表示線程正在以獨佔的方式等待鎖
    static final int CANCELLED =  1; // 表示線程獲取鎖的請求已經取消了
    static final int SIGNAL    = -1; // 表示線程已經準備好了,就等資源釋放了
    static final int CONDITION = -2; // 表示節點在等待隊列中,節點線程等待喚醒
    static final int PROPAGATE = -3; // 當前線程處在SHARED狀況下,該字段纔會使用
    volatile int waitStatus; // 當前節點在隊列中的狀態
    volatile Node prev; // 前驅指針
    volatile Node next; // 後繼指針
    volatile Thread thread; // 表示處於該節點的線程
    Node nextWaiter; // 指向下一個處於CONDITION狀態的節點
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 返回前驅節點,沒有的話拋出npe
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

再看hasQueuedPredecessors,整個方法若是最後返回false,則去加鎖,若是返回true則不加鎖,由於這個方法被取反操做;hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節點的方法。若是返回False,說明當前線程能夠爭取共享資源;若是返回True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。

  • h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

雙向鏈表中,第一個節點爲虛節點,其實並不存儲任何信息,只是佔位。真正的第一個有數據的節點,是在第二個節點開始的。

  • 當h != t時: 若是(s = h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,須要返回True(這塊具體見下邊代碼分析)。
  • 若是(s = h.next) != null,說明此時隊列中至少有一個有效節點。
  • 若是此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是能夠獲取資源的;
  • 若是s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不一樣,當前線程必須加入進等待隊列。

若是這上面沒有看懂,沒有關係,先來分析一下構建整個隊列的過程;

  • addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // tail爲對尾,賦值給pred
    Node pred = tail;
    // 判斷pred是否爲空,其實就是判斷對尾是否有節點,其實只要隊列被初始化了對尾確定不爲空
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • enq
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

用一張圖來分析一下,整個隊列構建過程;

image-20200604203002973

  • (1)經過Node(Thread thread, Node mode) 方法構建一個node節點(node2),此時的nextWaiter爲空,線程不爲空,是當前線程;

  • (2)若是隊尾爲空,則說明隊列未創建,調用enq構建第一個虛擬節點(node1),經過compareAndSetHead方法構建一個頭節點,須要注意的是該頭節點thread是null,後續不少都是用線程是否爲null來判讀是否爲第一個虛擬節點;

  • (3)將node1 cas設置爲head

  • (4)將頭節點賦值爲tail = head

  • (5)進入下一次for循環時,會走到else分支,會將傳入的node的指向頭部節點的next,此時node2的prev指向node1(tail)

  • (6)將node2 cas設置爲tail;

  • (7)將node2指向node1的next;

    通過上面的步驟,就構建了一個長度爲2的隊列;

添加第二個隊列時,走的是這段代碼,流程就簡單多了,代碼以下

if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
}

image-20200604202859302

再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());由於整個構建過程並非原子操做,因此這個條件判斷,如今再是否是就看明白了?

  • 當h != t時(3)步驟已經完成: 若是(s = h.next) == null 此時步驟(4)未完成,等待隊列正在有線程進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時隊列中有元素,須要返回True
  • 若是(s = h.next) != null,說明此時隊列中至少有一個有效節點。
  • 若是此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節點中的線程與當前線程相同,那麼當前線程是能夠獲取資源的;
  • 若是s.thread != Thread.currentThread(),說明等待隊列的第一個有效節點線程與當前線程不一樣,當前線程必須加入進等待隊列。

acquireQueued

addWaiter方法其實就是把對應的線程以Node的數據結構形式加入到雙端隊列裏,返回的是一個包含該線程的Node。而這個Node會做爲參數,進入到acquireQueued方法中。acquireQueued方法能夠對排隊中的線程進行「獲鎖」操做。總的來講,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者再也不須要獲取(中斷)。

下面經過代碼從「什麼時候出隊列?」和「如何出隊列?」兩個方向來分析一下acquireQueued源碼:

final boolean acquireQueued(final Node node, int arg) {
    // 標記是否成功拿到資源
    boolean failed = true;
    try {
        // 標記等待過程當中是否中斷過
        boolean interrupted = false;
        for (;;) {
            // 獲取當前節點的前驅節點,有兩種狀況;一、上一個節點爲頭部;2上一個節點不爲頭部
            final Node p = node.predecessor();
            // 若是p是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(頭結點是虛節點)
            // 由於第一次tryAcquire判斷是否須要排隊,若是須要排隊,那麼我就入隊,此處再重試一次
            if (p == head && tryAcquire(arg)) {
                // 獲取鎖成功,頭指針移動到當前node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 說明p爲頭節點且當前沒有獲取到鎖(多是非公平鎖被搶佔了)或者是p不爲頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus爲-1),防止無限循環浪費資源。具體兩個方法下面細細分析
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
       // 成功拿到資源,準備釋放
        if (failed)
            cancelAcquire(node);
    }
}

setHead

設置當前節點爲頭節點,而且將node.thread爲空(剛纔提到判斷是否爲頭部虛擬節點的條件就是node.thread == null。waitStatus狀態併爲修改,等下咱們再分析;

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

shouldParkAfterFailedAcquire

接下來看shouldParkAfterFailedAcquire代碼,須要注意的是,每個新建立Node的節點是被下一個排隊的node設置爲等待狀態爲SIGNAL, 這裏比較難以理解爲何須要去改變上一個節點的park狀態?

每一個node都有一個狀態,默認爲0,表示無狀態,-1表示在park;當時不能本身把本身改爲-1狀態?由於你得肯定你本身park了纔是能改成-1;因此只能先park;在改狀態;可是問題你本身都park了;徹底釋放CPU資源了,故而沒有辦法執行任何代碼了,因此只能別人來改;故而能夠看到每次都是本身的後一個節點把本身改爲-1狀態;

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅節點的狀態 
    int ws = pred.waitStatus;
    // 說明頭結點處於喚醒狀態
    if (ws == Node.SIGNAL)
        return true;
    // static final int CANCELLED =  1; // 表示線程獲取鎖的請求已經取消了
    // static final int SIGNAL    = -1; // 表示線程已經準備好了,就等資源釋放了
    // static final int CONDITION = -2; // 表示節點在等待隊列中,節點線程等待喚醒
    // static final int PROPAGATE = -3; // 當前線程處在SHARED狀況下,該字段纔會使用
    if (ws > 0) {
        do {
            // 把取消節點從隊列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 設置前任節點等待狀態爲SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

調用LockSupport.park掛起當前線程,本身已經park,沒法再修改狀態了!

private final boolean parkAndCheckInterrupt() {
    // 調⽤用park()使線程進⼊入waiting狀態
    LockSupport.park(this);
    // 若是被喚醒,查看⾃自⼰己是不不是被中斷的,這⾥裏裏先清除⼀下標記位
    return Thread.interrupted(); 
}

shouldParkAfterFailedAcquire的整個流程仍是比較清晰的,若是不清楚,能夠參考美團畫的流程圖;

cancelAcquire

經過上面的分析,當failed爲true時,也就意味着park結束,線程被喚醒了,for循環已經跳出,開始執行cancelAcquire,經過cancelAcquire方法,將Node的狀態標記爲CANCELLED;代碼以下:

private void cancelAcquire(Node node) {
    // 將無效節點過濾
    if (node == null)
        return;
    // 設置該節點不關聯任何線程,也就是虛節點(上面已經提到,node.thread = null是判讀是不是頭節點的條件)
    node.thread = null;
    Node pred = node.prev;
    // 經過前驅節點,處理waitStatus > 0的node
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 把當前node的狀態設置爲CANCELLED,當下一個node排隊結束時,本身就會被上一行代碼處理掉;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;
    // 若是當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置爲尾節點,更新失敗的話,則進入else,若是更新成功,將tail的後繼節點設置爲null
    if (node == tail && compareAndSetTail(node, pred)) {
        // 把本身設置爲null
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 若是當前節點不是head的後繼節點
        // 1:判斷當前節點前驅節點的是否爲SIGNAL
        // 2:若是不是,則把前驅節點設置爲SINGAL看是否成功
        // 若是1和2中有一個爲true,再判斷當前節點的線程是否爲null
        // 若是上述條件都知足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點 
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 若是當前節點是head的後繼節點,或者上述條件不知足,那就喚醒當前節點的後繼節點
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

當前的流程:

  • 獲取當前節點的前驅節點,若是前驅節點的狀態是CANCELLED,那就一直往前遍歷,找到第一個waitStatus <= 0的節點,將找到的Pred節點和當前Node關聯,將當前Node設置爲CANCELLED。

  • 根據當前節點的位置,考慮如下三種狀況:

    (1) 當前節點是尾節點。

    (2) 當前節點是Head的後繼節點。

    (3) 當前節點不是Head的後繼節點,也不是尾節點。

(1)當前節點時尾節點

image-20200607180254816

(2)當前節點是Head的後繼節點。

這張圖描述的是這段代碼:unparkSuccessor

Node s = node.next;
if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
            s = t;
}
image-20200607180140169

(3)當前節點不是Head的後繼節點,也不是尾節點。

這張圖描述的是這段代碼跟(2)同樣;

image-20200607180034112

經過上面的圖,你會發現全部的變化都是對Next指針進行了操做,而沒有對Prev指針進行操做,緣由是執行cancelAcquire的時候,當前節點的前置節點可能已經從隊列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),也就是下圖中代碼1和代碼2直接的間隙就會出現這種狀況,此時修改Prev指針,有可能會致使Prev指向另外一個已經移除隊列的Node,所以這塊變化Prev指針不安全。

image-20200607180655529

unlock解鎖

解鎖時並不區分公平和不公平,由於ReentrantLock實現了鎖的可重入,能夠進一步的看一下時如何處理的,上代碼:

public void unlock() {
    sync.release(1);
}

release

public final boolean release(int arg) {
    // 自定義的tryRelease若是返回true,說明該鎖沒有被任何線程持有
    if (tryRelease(arg)) {
        // 獲取頭結點
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 頭結點不爲空而且頭結點的waitStatus不是初始化節點狀況,解除線程掛起狀態
            unparkSuccessor(h);
        return true;
    }
    return false;
}

這裏的判斷條件爲何是h != null && h.waitStatus != 0

  1. h == null Head還沒初始化。初始狀況下,head == null,第一個節點入隊,Head會被初始化一個虛擬節點。若是還沒來得及入隊,就會出現head == null 的狀況。
  2. h != null && waitStatus == 0 代表後繼節點對應的線程仍在運行中,不須要喚醒。
  3. h != null && waitStatus < 0 代表後繼節點可能被阻塞了,須要喚醒,(還記得一個node是在shouldParkAfterFailedAcquire方法中被設置爲SIGNAL = -1的吧?不記得翻看一下上面吧)

tryRelease

protected final boolean tryRelease(int releases) {
    // 減小可重入次數,setState(c);
    int c = getState() - releases;
    // 當前線程不是持有鎖的線程,拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若是持有線程所有釋放,將當前獨佔鎖全部線程設置爲null,並更新state
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor

這個方法在cancelAcquire其實也用到了,簡單分析一下

// 若是當前節點是head的後繼節點,或者上述條件不知足,就喚醒當前節點的後繼節點unparkSuccessor(node);

private void unparkSuccessor(Node node) {
    // 獲取結點waitStatus,CAS設置狀態state=0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當前節點的下一個節點
    Node s = node.next;
    // 若是下個節點是null或者下個節點被cancelled,就找到隊列最開始的非cancelled的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 就從尾部節點開始找,到隊首,找到隊列第一個waitStatus<0的節點。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 若是當前節點的下個節點不爲空,並且狀態<=0,就把當前節點unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}

爲何要從後往前找第一個非Cancelled的節點呢?

緣由1:addWaiter方法並不是原子,構建鏈表結構時以下圖中 一、2間隙執行unparkSuccessor,此時鏈表是不完整的,沒辦法從前日後找了;

image-20200607184934087

緣由2:還有一點緣由,在產生CANCELLED狀態節點的時候,先斷開的是Next指針,Prev指針並未斷開,所以也是必需要從後往前遍歷纔可以遍歷徹底部的Node;

中斷恢復

喚醒後,會執行return Thread.interrupted();,這個函數返回的是當前執行線程的中斷狀態,並清除。

private final boolean parkAndCheckInterrupt() {
	LockSupport.park(this);
	return Thread.interrupted();
}

acquireQueued代碼,當parkAndCheckInterrupt返回True或者False的時候,interrupted的值不一樣,但都會執行下次循環。若是這個時候獲取鎖成功,就會把當前interrupted返回。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		boolean interrupted = false;
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
			}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}

若是acquireQueued爲True,就會執行selfInterrupt方法。

該方法實際上是爲了中斷線程。但爲何獲取了鎖之後還要中斷線程呢?這部分屬於Java提供的協做式中斷知識內容,感興趣同窗能夠查閱一下。這裏簡單介紹一下:

  1. 當中斷線程被喚醒時,並不知道被喚醒的緣由,多是當前線程在等待中被中斷,也多是釋放了鎖之後被喚醒。所以咱們經過Thread.interrupted()方法檢查中斷標記(該方法返回了當前線程的中斷狀態,並將當前線程的中斷標識設置爲False),並記錄下來,若是發現該線程被中斷過,就再中斷一次。
  2. 線程在等待資源的過程當中被喚醒,喚醒後仍是會不斷地去嘗試獲取鎖,直到搶到鎖爲止。也就是說,在整個流程中,並不響應中斷,只是記錄中斷記錄。最後搶到鎖返回了,那麼若是被中斷過的話,就須要補充一次中斷。

這裏的處理方式主要是運用線程池中基本運做單元Worder中的runWorker,經過Thread.interrupted()進行額外的判斷處理,能夠看下ThreadPoolExecutor源碼的判斷條件;

image-20200607210022068

其它

AQS在JUC中有⽐比較⼴普遍的使⽤用,如下是主要使⽤用的地⽅方:

  • ReentrantLock:使⽤用AQS保存鎖重複持有的次數。當⼀一個線程獲取鎖時, ReentrantLock記錄當
    前得到鎖的線程標識,⽤用於檢測是否重複獲取,以及錯誤線程試圖解鎖操做時異常狀況的處理理。
  • Semaphore:使⽤用AQS同步狀態來保存信號量量的當前計數。 tryRelease會增長計數,
    acquireShared會減小計數。
  • CountDownLatch:使⽤用AQS同步狀態來表示計數。計數爲0時,全部的Acquire操做
    (CountDownLatch的await⽅方法)才能夠經過。
  • ReentrantReadWriteLock:使⽤用AQS同步狀態中的16位保存寫鎖持有的次數,剩下的16位⽤用於保
    存讀鎖的持有次數。
  • ThreadPoolExecutor: Worker利利⽤用AQS同步狀態實現對獨佔線程變量量的設置(tryAcquire和
    tryRelease)。

至此,經過ReentrantLock分析AQS的實現原理一家完畢,須要說明的是,此文深度參考了美團分析的ReentrantLock,是參考連接的第三個,有興趣能夠對比差別,感謝!

參考

JDK API 文檔

Java的LockSupport.park()實現分析

[從ReentrantLock的實現看AQS的原理及應用

[Thread的中斷機制(interrupt)


你的鼓勵也是我創做的動力

打賞地址

相關文章
相關標籤/搜索