隊列同步器(AQS)源碼分析

掃描下方二維碼或者微信搜索公衆號菜鳥飛呀飛,便可關注微信公衆號,閱讀更多Spring源碼分析Java併發編程文章。java

微信公衆號

在上一篇文章中分析了隊列同步器(AQS)的設計原理,這一篇文章將結合具體的源代碼來分析AQS。若是理解了上一篇文章中介紹的同步隊列的數據結構,那麼源碼看起來相對比較好理解。讀過Spring源碼的朋友應該能感覺到Spring中方法和變量的命名是很是規範的,見名知意。與Spring相比,JDK中某些類的源碼,命名則不是那麼規範,裏面存在各類簡寫,單個字母命名變量,有時候看起來十分燒腦。其中AQS中就存在這種狀況,因此若是不瞭解AQS的設計原理,直接看源碼,就會顯得比較困難。node

  • AQS中獲取鎖和釋放鎖的方法不少,今天以acquire()和release()爲例來進行源碼分析,其餘方法的邏輯與這兩個方法邏輯基本同樣。
public static void main(String[] args) {
    // 公平鎖
    ReentrantLock lock = new ReentrantLock(true);
    try{
        // 加鎖
        lock.lock();
        System.out.println("Hello World");
    }finally {
        // 釋放鎖
        lock.unlock();
    }
}
複製代碼
  • 以上面demo爲例,先建立了一個公平鎖,而後調用lock()方法來加鎖,最後調用unlock()方法釋放鎖。在lock()方法中會調用到Sync的lock()方法,因爲咱們建立的是公平鎖,因此這個時候調用的是FailSync的lock()方法。
static final class FairSync extends Sync {

    final void lock() {
        // 調用acquire()獲取同步狀態
        acquire(1);
    }
}
複製代碼

獲取鎖:acquire()

  • 在FailSync的lock()方法中就會調用到AQS的模板方法:acquire()。最終的加鎖邏輯就是在acquire()方法中實現的。acquire()的源碼以下:
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 調用selfInterrupt()方法從新設置中斷標識
        selfInterrupt();
}
複製代碼
  • 這個方法就3行代碼,可是邏輯至關複雜。能夠拆分爲四段邏輯:第一步:調用tryAcquire()方法;第二步:執行addWaiter()方法;第三步:執行acquireQueued()方法;第四步:執行selfInterrupt()方法。下面將圍繞這4步展開分析。我在acquire()這個方法上簡單寫了些註釋,也能夠直接參考。
public final void acquire(int arg) {
    /* * acquire()的做用是獲取同步狀態,這裏同步狀態的含義等價於鎖 * tryAcquire()方法是嘗試獲取同步狀態,若是該方法返回true,表示獲取同步狀態成功;返回false表示獲取同步狀態失敗 * 第1種狀況:當tryAcquire()返回true時,acquire()方法會直接結束。 * 由於此時!tryAcquire(arg) 就等於false,而&&判斷具備短路做用,當由於&&判斷前面的條件判斷爲false,&&後面的判斷就不會進行了,因此此時就不會執行後面的if語句,此時方法就直接返回了。 * 這種狀況表示線程獲取鎖成功了 * 第2中狀況:當tryAcquire()返回false時,表示線程沒有獲取到鎖, * 這個時候就須要將線程加入到同步隊列中了。此時 !tryAcquire() == true,所以會進行&&後面的判斷,即acquireQueued()方法的判斷,在進行acquireQueued()方法判斷以前會先執行addWaiter()方法。 * * addWaiter()方法返回的是當前線程表明的節點,這個方法的做用是將當前線程放入到同步隊列中。 * 而後再調用acquireQueued()方法,在這個方法中,會先判斷當前線程表明的節點是否是第二個節點,若是是就會嘗試獲取鎖,若是獲取不到鎖,線程就會被阻塞;若是獲取到鎖,就會返回。 * 若是當前線程表明的節點不是第二個節點,那麼就會直接阻塞,只有當獲取到鎖後,acquireQueued()方法纔會返回 * * acquireQueued()方法若是返回的是true,表示線程是被中斷後醒來的,此時if的條件判斷成功,就會執行selfInterrupt()方法,該方法的做用就是將當前線程的中斷標識位設置爲中斷狀態。 * 若是acquireQueued()方法返回的是false,表示線程不是被中斷後醒來的,是正常喚醒,此時if的條件判斷不會成功。acquire()方法執行結束 * * 總結:只有當線程獲取到鎖時,acquire()方法纔會結束;若是線程沒有獲取到鎖,那麼它就會一直阻塞在acquireQueued()方法中,那麼acquire()方法就一直不結束。 * */

    // tryAcquire()方法是AQS中定義的一個方法,它須要同步組件的具體實現類來重寫該方法。所以在公平鎖的同步組件FairSync和非公平鎖的同步組NonfairSync中,tryAcquire()方法的實現代碼邏輯是不同的。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 調用selfInterrupt()方法從新設置中斷標識
        selfInterrupt();
}
複製代碼

第一步:tryAcquire()

  • tryAcquire()是由AQS的子類重寫的方法,因此代碼邏輯和具體的鎖的實現有關係,因爲本次Demo中使用的是公平鎖,因此它最終會調用FairSync類的tryAcquire()方法。tryAcquire()方法的做用就是獲取同步狀態(也就是獲取鎖),若是當前線程成功獲取到鎖,那麼就會將AQS中的同步狀態state加1,而後返回true,若是沒有獲取到鎖,將會返回false。FairSync類上tryAcquire()方法的源碼以下:
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 獲取同步狀態state的值(在AQS中,state就至關於鎖,若是線程能成功修改state的值,那麼就表示該線程獲取到了鎖)
        int c = getState();
        if (c == 0) {
            // 若是c等於0,表示尚未任何線程獲取到鎖

            /** * 此時可能存在多個線程同時執行到這兒,均知足c==0這個條件。 * 在if條件中,會先調用hasQueuedPredecessors()方法來判斷隊列中是否已經有線程在排隊,該方法返回true表示有線程在排隊,返回false表示沒有線程在排隊 * 第1種狀況:hasQueuedPredecessors()返回true,表示有線程排隊, * 此時 !hasQueuedPredecessors() == false,因爲&& 運算符的短路做用,if的條件判斷爲false,那麼就不會進入到if語句中,tryAcquire()方法就會返回false * * 第2種狀況:hasQueuedPredecessors()返回false,表示沒有線程排隊 * 此時 !hasQueuedPredecessors() == true, 那麼就會進行&&後面的判斷,就會調用compareAndSetState()方法去進行修改state字段的值 * compareAndSetState()方法是一個CAS方法,它會對state字段進行修改,它的返回值結果又須要分兩種狀況 * 第 i 種狀況:對state字段進行CAS修改爲功,就會返回true,此時if的條件判斷就爲true了,就會進入到if語句中,同時也表示當前線程獲取到了鎖。那麼最終tryAcquire()方法會返回true * 第 ii 種狀況:若是對state字段進行CAS修改失敗,說明在這一瞬間,已經有其餘線程獲取到了鎖,那麼if的條件判斷就爲false了,就不會進入到if語句塊中,最終tryAcquire()方法會返回false。 */
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 當前線程成功修改了state字段的值,那麼就表示當前線程獲取到了鎖,那麼就將AQS中鎖的擁有者設置爲當前線程,而後返回true。
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 若是c等於0,則表示已經有線程獲取到了鎖,那麼這個時候,就須要判斷獲取到鎖的線程是否是當前線程
        else if (current == getExclusiveOwnerThread()) {
            // 若是是當前線程,那麼就將state的值加1,這就是鎖的重入
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            // 由於此時確定只有一個線程獲取到了鎖,只有獲取到鎖的線程纔會執行到這行代碼,因此能夠直接調用setState(nextc)方法來修改state的值,這兒不會存在線程安全的問題。
            setState(nextc);
            // 而後返回true,表示當前線程獲取到了鎖
            return true;
        }
        // 若是state不等於0,且當前線程也等於已經獲取到鎖的線程,那麼就返回false,表示當前線程沒有獲取到鎖
        return false;
    }
}
複製代碼
  • 在tryAcquire()方法中,先判斷了同步狀態state是否是等於0。
  • 1. 若是等於0,就表示目前尚未線程持有到鎖,那麼這個時候就會先調用hasQueuedPredecessors()方法判斷同步隊列中有沒有等待獲取鎖的線程,若是有線程在排隊,那麼當前線程確定獲取鎖失敗(由於AQS的設計的原則是FIFO,既然前面有人已經在排隊了,你就不能插隊,老老實實去後面排隊去),那麼tryAcquire()方法會返回false。若是同步隊列中沒有線程排隊,那麼就讓當前線程對state進行CAS操做,若是設置成功,就表示當前獲取到鎖了,返回true;若是CAS失敗,表示在這一瞬間,鎖被其餘線程搶走了,那麼當前線程就獲取鎖失敗,就返回false。
  • 2. 若是不等於0,就表示已經有線程獲取到鎖了,那麼此時就會去判斷當前線程是否是等於已經持有鎖的線程(getExclusiveOwnerThread()方法的做用就是返回已經持有鎖的線程),若是相等,就讓state加1,這就是所謂的重入鎖,重入鎖就是容許同一個線程屢次獲取到鎖。
  • 3. state既不等於0,當前線程也不等於持有鎖的線程,那麼就返回false,表示當前線程獲取到鎖失敗。
  • 就這樣,經過tryAcquire()方法,就實現了線程可否獲取到鎖的功能和邏輯。在不一樣的鎖的實現中,tryAcquire()方法的邏輯是不同的,上面的示例是以公平鎖的實現舉例的。

第二步:addWaiter()

  addWaiter()方法不必定會執行到,它依賴第一步tryAcquire()的執行結果。若是線程獲取鎖成功,那麼tryAcquire()方法就會返回true,此時就不會進行後面的第二步、第三步、第四步了。只有當線程獲取鎖失敗了,tryAcquire()方法會返回false,這個時候須要將線程加入到同步隊列中,因此須要執行後面的步驟。編程

  • addWaiter()方法的邏輯相對比較簡單,它的做用就是將當前線程封裝成一個Node,而後加入到同步隊列中。若是同步隊列此時尚未初始化(也就是AQS的head、tail屬性均爲null),那麼它還會進行同步隊列的初始化。源碼以下:
private Node addWaiter(Node mode) {
    // 先根據當前線程,構建一個Node節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 而後須要把當前線程表明的節點添加到隊列的尾部,此時隊尾多是null,由於可能這是第一次出現多個線程爭搶鎖,隊列尚未初始化,即頭尾均爲null
    Node pred = tail;
    if (pred != null) {
        // 若是隊尾不爲null,就直接將當前線程表明的node添加到隊尾,修改node節點中prev的指針指向
        node.prev = pred;
        // 利用CAS操做設置隊尾,若是設置成功,就修改倒數第二個節點的next的指針指向,而後方法return結束
        // 在多個線程併發的狀況下,CAS操做可能會失敗,若是失敗,就會進入到後面的邏輯
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若是隊尾爲null或者CAS設置隊尾失敗,就調用enq()方法將當前線程表明的節點加入到同步隊列當中
    enq(node);
    // 將當前線程表明的節點返回
    return node;
}
複製代碼
  • 在addWaiter()方法中會先判斷尾結點是否是null,若是是null,就表示同步隊列尚未被初始化,那麼就不會進入到if的邏輯中了,將會調用enq(node)方法進行同步隊列的初始化和當前線程的入隊操做。若是尾結點不爲null,就表示同步隊列已經被初始化了,只須要將當前線程所表明的節點加入到同步隊列隊尾便可。因爲加入到隊尾這個操做存在併發的可能性,因此須要利用CAS操做(即compareAndSetTail()方法)來保證原子性,若是CAS成功,就將舊的隊尾節點的next指針指向新的隊尾節點,這樣就完成了節點加入到隊尾的操做了,同時addWaiter()方法return結束。若是CAS操做失敗,那麼就會調用enq(node)方法來實現入隊操做。
  • enq(node)方法的源碼以下:
private Node enq(final Node node) {
    // 無限循環
    for (;;) {
        Node t = tail;
        // 若是尾結點爲null,說明這是第一次出現多個線程同時爭搶鎖,所以同步隊列沒有被初始化,tail和head此時均爲null
        if (t == null) { // Must initialize
            // 設置head節點,注意此時是new了一個Node節點,節點的next,prev,thread屬性均爲null
            // 這是由於對於頭結點而言,做爲隊列的頭部,它的next指向原本就應該是null
            // 而頭結點的thread屬性爲空,這是AQS同步器特地爲之的
            // 頭結點的prev屬性此時爲空,當進行第二次for循環的時候,tail結點就不爲空了,所以不會進入到if語句中,而是進入到else語句中,在這裏對頭結點的prev進行了賦值
            if (compareAndSetHead(new Node()))
                // 第一次初始化隊列時,頭結點和尾結點是同一個結點
                tail = head;
        } else {
            // 隊尾結點不爲null,說明隊列已經進行了初始化,這個時候只須要將當前線程表示的node結點加入到隊列的尾部

            // 設置當前線程所表明的節點的prev的指針,使其指向尾結點
            node.prev = t;
            // 利用CAS操做設置隊尾結點
            if (compareAndSetTail(t, node)) {
                // 而後修改隊列中倒數第二個節點的next指針,使其指向隊尾結點
                t.next = node;
                // 而後返回的是隊列中倒數第二個節點(也就是舊隊列中的尾結點)
                return t;
            }
        }
    }
}
複製代碼
  • enq(node)方法中進行了一個for(;;)的無限循環,在循環中會先判斷隊尾是否是null,是null就表示同步隊列沒有被初始化,就採用CAS操做(即compareAndSetHead()方法)來初始化隊列,並讓tail = head。注意:在建立頭結點時,頭結點的thread、prev、next屬性均爲null,在進行第二次循環時,next屬性會被賦值,可是thread、prev屬性始終是null。
  • 當同步隊列初始化完成後,後面進入for循環的時候,就不會進入到if語句塊中了,而是進入到else中,此時也是利用CAS操做(即compareAndSetTail()方法)將當前線程所表明的Node節點設置爲隊尾,而後修改以前隊尾的next指針,維護好隊列節點中的雙向關聯關係,就將舊的隊尾節點返回。
  • 在執行完enq()方法後,就會回到addWaiter()方法中,而後addWaiter()方法就會將當前線程所表明的Node節點返回。

第三步:acquireQueued()

  • 在執行完addWaiter()方法後,就會執行acquireQueued()方法。該方法的做用就是讓線程以不間斷的方式獲取鎖,若是獲取不到,就會一直阻塞在這個方法裏面,直到獲取到鎖,纔會從該方法裏面返回。
  • acquireQueued()方法的源碼以下:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 無限for循環
        for (;;) {
            // 獲取當前線程所表明節點的前一個節點
            final Node p = node.predecessor();
            // 若是前一個節點是頭結點(即當前線程是隊列中的第二個節點),那麼就調用tryAcquire()方法讓當前線程去嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                // 若是當前線程獲取鎖成功,那麼就將當前線程所表明的節點設置爲頭結點(由於AQS的設計原則就是:隊列中佔有鎖的線程必定是頭結點)
                setHead(node);
                // 而後將原來的頭節點的next指針設置爲null,這樣頭節點就沒有任何引用了,有助於GC回收。
                p.next = null; // help GC
                failed = false;
                // 返回interrupted所表明的值,表示當前線程是否是經過中斷而醒來的
                return interrupted;
            }
            // 若是前一個節點不是頭結點 或者 前一個節點是頭結點,可是當前節點調用tryAcquire()方法獲取鎖失敗,那麼就會執行到下面的if判斷
            /** * 在if判斷中,先調用了shouldParkAfterFailedAcquire()方法。 * 在第一次調用shouldParkAfterFailedAcquire()時,確定返回false(爲何會返回false,能夠看下shouldParkAfterFailedAcquire()方法的源碼) * 因爲當前代碼是處於無限for循環當中的,因此當後面出現第二次代碼執行到這兒時,會再次調用shouldParkAfterFailedAcquire()方法,此時這個方法會返回true。 * 當shouldParkAfterFailedAcquire()返回true時,if判斷中就會再調用parkAndCheckInterrupt()方法,該方法會將當前線程進行阻塞,直到這個線程被喚醒或者被中斷。 * 所以當線程獲取不到鎖時,就會一直阻塞到這兒。直到被其餘線程喚醒,纔會繼續向下執行,當線程想來後,再次進入到當前代碼的無限for循環中,除非線程獲取到鎖,纔會return返回 */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 若是在try中出現異常,那麼此時failed就會爲true,就會取消獲取鎖
        // failed的初始值爲true,若是try語句中不出現異常,最終線程獲取到鎖後,就會將failed置爲false,那麼下面的if判斷條件就不會成立,也就不執行cancelAcquire()方法了
        // 若是上面的try語句存在異常,那麼就不會將failed設置爲false,那麼就會執行cancelAcquire()方法
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼
  • 在acquireQueued()方法中也是使用了無限for循環:for(;;)。在該方法中會先獲取到當前線程的前一個節點。
    1. 若是前一個節點是頭結點,那麼就會讓當前線程調用tryAcquired()方法嘗試獲取鎖。若是獲取鎖成功,就將當前線程設置爲頭結點,這裏設置頭結點時只須要調用setHead(head)方法便可,由於確定只有一個線程獲取到鎖,因此不用考慮併發的問題。而後舊的頭結點的next引用置爲null,便於GC。接着acquireQueued()方法直接返回。若是當前線程調用tryAcquire()方法獲取鎖失敗,就會進入到下面2中的邏輯。
    1. 若是前一個節點不是頭結點或者是頭結點可是獲取鎖失敗,就會進入到後面的邏輯中,即執行到if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  • 在if判斷中,會先執行shouldParkAfterFailedAcquire()方法,該方法用來判斷當前線程是否應該park,便是否應該將當前線程阻塞。若是返回true,就表示須要阻塞,那麼就會接着執行parkAndCheckInterrupt()方法,在parkAndCheckInterrupt()方法中,就會將當前線程阻塞起來,直到當前線程被喚醒並獲取到鎖之後,acquireQueued()方法纔會返回。parkAndCheckInterrupt()方法的源碼以下:
private final boolean parkAndCheckInterrupt() {
    // 將當前線程掛起
    LockSupport.park(this);
    return Thread.interrupted();
}
複製代碼
  • shouldParkAfterFailedAcquire()方法是一個比較有意思的方法,在第一次調用它的時候,它會返回false,因爲acquireQueued()方法裏有一個無限for循環,因此會進行第二次調用shouldParkAfterFailedAcquire()方法,這個時候它會返回true。shouldParkAfterFailedAcquire()方法的源碼以下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 在初始狀況下,全部的Node節點的waitStatus均爲0,由於在初始化時,waitStatus字段是int類型,咱們沒有顯示給它賦值,因此它默認是0
    // 後面waitStatus字段會被修改

    // 在第一進入到shouldParkAfterFailedAcquire()時,因此waitStatus是默認值0(由於此時沒有任何地方對它進行修改),因此ws=0
    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)
        // 若是ws = -1,就返回true
        // 若是第一次進入shouldParkAfterFailedAcquire()方法時,waitStatus=0,那麼就會進入到後面的else語句中
        // 在else語句中,會將waitStatus設置爲-1,那麼當後面第二次進入到shouldParkAfterFailedAcquire()方法時,就會返回true了
        return true;
    if (ws > 0) {
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        // 若是ws既不等於-1,也不大於0,就會進入到當前else語句中
        // 此時會調用CAS方法將pred節點的waitStatus字段的值改成-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
複製代碼
  • 通過shouldParkAfterFailedAcquire()方法後,隊列中全部節點的狀態的示意圖以下。

等待示意圖

  • 最終對於acquireQueued()方法而言,只有線程獲取到了鎖或者被中斷,線程纔會從這個方法裏面返回,不然它會一直阻塞在裏面。

第四步:selfInterrupt()

  • 當線程從acquireQueued()方法處返回時,返回值有兩種狀況,若是返回false,表示線程不是被中斷才喚醒的,因此此時在acquire()方法中,if判斷不成立,就不會執行selfInterrupt()方法,而是直接返回。若是返回true,則表示線程是被中斷才喚醒的,因爲在parkAndCheckInterrupt()方法中調用了Thread.interrupted()方法,這會將線程的中斷標識重置,因此此時須要返回true,使acquire()方法中的if判斷成立,而後這樣就會調用selfInterrupt()方法,將線程的中斷標識從新設置一下。最後acquire()方法返回。

釋放鎖:release()

  • 釋放鎖的代碼實現相對比較簡單,當持有鎖的線程釋放鎖後,會喚醒同步隊列中下一個處於等待狀態的節點去嘗試獲取鎖。仍是以上面的Demo爲例,當調用lock.unlock()時,最終會調用到AQS中的release()方法,release()方法的源碼以下。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 當釋放鎖成功之後,須要喚醒同步隊列中的其餘線程
        Node h = head;
        // 當waitStatus!=0時,表示同步隊列中還有其餘線程在等待獲取鎖
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼
  • 在release()方法中會先調用AQS子類的tryRelease()方法,在本文的示例中,就是調用ReentrantLock類中Sync的tryRelease()方法,該方法就是讓當前線程釋放鎖。方法源碼以下。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 判斷當前線程是否是持有鎖的線程,若是不是就拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 由於上面一步已經確認了是當前線程持有鎖,因此在修改state時,確定是線程安全的
    boolean free = false;
    // 由於可能鎖被重入了,重入了幾回就須要釋放幾回鎖,因此這個地方須要判斷,只有當state=0時才表示徹底釋放了鎖。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
複製代碼
  • 在tryRelease()方法中,會先檢驗當前線程是否是持有鎖的線程,若是不是持有鎖的線程,就會拋出異常(這一點很容易理解,沒有獲取到鎖,卻去調用釋放鎖的方法,這個時候就告訴你出錯了)。而後再判斷state減去1後是否等於0,若是等於0,就表示鎖被釋放了,最終tryRelease()方法就會返回true。若是state減去1後不爲0,表示鎖尚未被釋放,最終tryRelease()方法就會返回false。由於對於重入鎖而言,每獲取一次鎖,state就會加1,獲取了幾回鎖,就應該釋放幾回,這樣當徹底釋放完時,state纔會等於0。
  • 當tryRelease()返回true時,在release()方法中,就會進入到if語句塊中。在if語句塊中,會先判斷同步隊列中是否有線程在等待獲取鎖,若是有,就調用unparkSuccessor()方法來喚醒下一個處於等待狀態的線程;若是沒有,tryRelease()就會直接返回true。
  • 如何判斷同步隊列中是否有線程在等待獲取鎖的呢?是經過h != null && h.waitStatus != 0這一個判斷條件來判斷的。
    1. 當頭節點等於null時,這個時候確定沒有線程在等待獲取鎖,由於同步隊列的初始化是當有線程獲取不到鎖之後,將本身加入到同步隊列的時候才初始化同步隊列的(即給head、tail賦值),若是頭結點爲null,說明同步隊列沒有被初始化,即沒有線程在等待獲取鎖。
    1. 在acquire()方法中咱們提到了shouldParkAfterFailedAcquire()方法,該方法會讓等待獲取鎖的節點的waitStatus的值等於-1,因此當h != null && h.waitStatus != 0時,能夠認爲同步隊列中有線程在等待獲取鎖。
  • 若是同步隊列中有線程在等待獲取鎖,那麼此時在release()方法中調用unparkSuccessor()方法去喚醒下一個等待狀態的節點。unparkSuccessor()方法的源碼以下
private void unparkSuccessor(Node node) {
    /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        /** * 同步隊列中的節點鎖表明的線程,可能被取消了,此時這個節點的waitStatus=1 * 所以這個時候利用for循環從同步隊列尾部開始向前遍歷,判斷節點是否是被取消了 * 正常狀況下,當頭結點釋放鎖之後,應該喚醒同步隊列中的第二個節點,可是若是第二個節點的線程被取消了,此時就不能喚醒它了, * 就應該判斷第三個節點,若是第三個節點也被取消了,再依次日後判斷,直到第一次出現沒有被取消的節點。若是都被取消了,此時s==null,因此不會喚醒任何線程 */
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼
  • 在unparkSuccessor()方法中,會找到頭結點後第一個waitStatus小於0的節點,而後將其喚醒。爲何是waitStatus小於等於0?由於waitStatus=1表示線程是取消狀態,沒資格獲取鎖。最後調用LockSupport.unpark()方法喚醒符合條件的線程,此時線程被喚醒後,就回到上面獲取鎖流程中的parkAndCheckInterrupt()方法處,接着就執行後面的邏輯了。

總結

  • 本文經過分析獲取鎖acquire()和釋放鎖release()方法的源碼,講解了線程獲取鎖和入隊、出隊的流程。對於AQS中其餘獲取鎖和釋放鎖的方法,如可響應中斷的獲取鎖,支持超時的獲取鎖以及共享式的獲取鎖,其方法的源碼和邏輯和acquire()方法的邏輯很類似,有興趣的朋友能夠本身去閱讀下源碼。
  • 在本文中提到了公平鎖、非公平鎖以及重入鎖的概念,關於這些會在下一篇文章ReentrantLock的源碼分析中詳細介紹。

微信公衆號
相關文章
相關標籤/搜索