解讀AbstractQueuedSynchronizer

前言

在咱們學校java併發編程的時候,併發包絕對是最值得咱們研究的資源。在分析併發包源碼的時候,少不了須要AbstractQueuedSynchronizer(如下簡寫AQS),它是整個併發包的基礎工具類,是實現ReentranLock、CountDownLatch、Semaphore、FutrureTask等類的基礎。
本文從ReentranLock的公平鎖源碼出發,分析下AQS怎麼工做的,但願能給你們一點幫助。java

AQS結構

先來看看 AQS 有哪些屬性,搞清楚這些基本就知道 AQS 是什麼套路了,畢竟能夠猜嘛!node

// 頭結點,你直接把它當作 當前持有鎖的線程 多是最好理解的
private transient volatile Node head;
// 阻塞的尾節點,每一個新的節點進來,都插入到最後,也就造成了一個隱視的鏈表
private transient volatile Node tail;
// 這個是最重要的,不過也是最簡單的,表明當前鎖的狀態,0表明沒有被佔用,大於0表明有線程持有當前鎖
// 之因此說大於0,而不是等於1,是由於鎖能夠重入嘛,每次重入都加上1
private volatile int state;
// 表明當前持有獨佔鎖的線程,舉個最重要的使用例子,由於鎖能夠重入
// reentrantLock.lock()能夠嵌套調用屢次,因此每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

怎麼樣,看樣子應該是很簡單的吧,畢竟也就四個屬性啊。web

AbstractQueuedSynchronizer 的等待隊列示意以下所示,注意了,以後分析過程當中所說的 queue,也就是阻塞隊列不包含 head,不包含 head,不包含 head。
圖片描述spring

等待隊列中每一個線程被包裝成一個 node,數據結構是鏈表,一塊兒看看源碼吧:編程

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 標識節點當前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 標識節點當前在獨佔模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個int常量是給waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代碼此線程取消了爭搶這個鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示當前node的後繼節點對應的線程須要被喚醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,因此略過吧,下一篇文章會介紹這個
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 一樣的不分析,略過吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值爲上面的一、-一、-二、-3,或者0(之後會講到)
    // 這麼理解,暫時只須要知道若是這個值 大於0 表明此線程取消了等待,
    // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是能夠指定timeouot的。。。
    volatile int waitStatus;
    // 前驅節點的引用
    volatile Node prev;
    // 後繼節點的引用
    volatile Node next;
    // 這個就是線程本尊
    volatile Thread thread;

}

Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已,你們先要有這個概念在內心。數據結構

上面的是基礎知識,後面會屢次用到,內心要時刻記着它們,內心想着這個結構圖就能夠了。下面,咱們開始說 ReentrantLock 的公平鎖。多嘴一下,我說的阻塞隊列不包含 head 節點。
圖片描述多線程

首先,咱們先看下 ReentrantLock 的使用方式。併發

// 我用個web開發中的service概念吧
public class OrderService {
    // 使用static,這樣每一個線程拿到的是同一把鎖,固然,spring mvc中service默認就是單例,別糾結這個
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        // 好比咱們同一時間,只容許一個線程建立訂單
        reentrantLock.lock();
        // 一般,lock 以後緊跟着 try 語句
        try {
            // 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程),
            // 其餘的線程在lock()方法上阻塞,等待獲取到鎖,再進來
            // 執行代碼...
            // 執行代碼...
            // 執行代碼...
        } finally {
            // 釋放鎖
            reentrantLock.unlock();
        }
    }
}

ReentrantLock 在內部用了內部類 Sync 來管理鎖,因此真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。Sync 有兩個實現,分別爲 NonfairSync(非公平鎖)和 FairSync(公平鎖),咱們看 FairSync 部分。mvc

abstract static class Sync extends AbstractQueuedSynchronizer {
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
}

線程搶鎖

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
      // 爭鎖
    final void lock() {
        acquire(1);
    }
      // 來自父類AQS,我直接貼過來這邊,下面分析的時候一樣會這樣作,不會給讀者帶來閱讀壓力
    // 咱們看到,這個方法,若是tryAcquire(arg) 返回true, 也就結束了。
    // 不然,acquireQueued方法會將線程壓到隊列中
    public final void acquire(int arg) { // 此時 arg == 1
        // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試
        // 由於有可能直接就成功了呢,也就不須要進隊列排隊了,
        // 對於公平鎖的語義就是:原本就沒人持有鎖,根本不必進隊列等待(又是掛起,又是等待被喚醒的)
        if (!tryAcquire(arg) &&
            // tryAcquire(arg)沒有成功,這個時候須要把當前線程掛起,放到阻塞隊列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
              selfInterrupt();
        }
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    // 嘗試直接獲取鎖,返回值是boolean,表明是否獲取到鎖
    // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程原本就持有鎖,也就能夠理所固然能夠直接獲取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此時此刻沒有線程持有鎖
        if (c == 0) {
            // 雖然此時此刻鎖是能夠用的,可是這是公平鎖,既然是公平,就得講究先來後到,
            // 看看有沒有別人在隊列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 若是沒有線程在等待,那就用CAS嘗試一下,成功了就獲取到鎖了,
                // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_=
                // 由於剛剛還沒人的,我判斷過了???
                compareAndSetState(0, acquires)) {

                // 到這裏就是獲取到鎖了,標記一下,告訴你們,如今是我佔用了鎖
                setExclusiveOwnerThread(current);
                return true;
            }
        }
          // 會進入這個else if分支,說明是重入了,須要操做:state=state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 若是到這裏,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖
        // 回到上面一個外層調用方法繼續看:
        // if (!tryAcquire(arg) 
        //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        //     selfInterrupt();
        return false;
    }

    // 假設tryAcquire(arg) 返回false,那麼代碼將執行:
      //        acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
    // 這個方法,首先須要執行:addWaiter(Node.EXCLUSIVE)

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    // 此方法的做用是把線程包裝成node,同時進入到隊列中
    // 參數mode此時是Node.EXCLUSIVE,表明獨佔模式
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 如下幾行代碼想把當前node加到鏈表的最後面去,也就是進到阻塞隊列的最後
        Node pred = tail;

        // tail!=null => 隊列不爲空(tail==head的時候,其實隊列是空的,不過無論這個吧)
        if (pred != null) { 
            // 設置本身的前驅 爲當前的隊尾節點
            node.prev = pred; 
            // 用CAS把本身設置爲隊尾, 若是成功後,tail == node了
            if (compareAndSetTail(pred, node)) { 
                // 進到這裏說明設置成功,當前node==tail, 將本身與以前的隊尾相連,
                // 上面已經有 node.prev = pred
                // 加上下面這句,也就實現了和以前的尾節點雙向鏈接了
                pred.next = node;
                // 線程入隊了,能夠返回了
                return node;
            }
        }
        // 仔細看看上面的代碼,若是會到這裏,
        // 說明 pred==null(隊列是空的) 或者 CAS失敗(有線程在競爭入隊)
        // 讀者必定要跟上思路,若是沒有跟上,建議先不要往下讀了,往回仔細看,不然會浪費時間的
        enq(node);
        return node;
    }

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    // 採用自旋的方式入隊
    // 以前說過,到這個方法只有兩種可能:等待隊列爲空,或者有線程競爭入隊,
    // 自旋在這邊的語義是:CAS設置tail過程當中,競爭一次競爭不到,我就屢次競爭,總會排到的
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 以前說過,隊列爲空也會進來這裏
            if (t == null) { // Must initialize
                // 初始化head節點
                // 細心的讀者會知道原來head和tail初始化的時候都是null,反正我不細心
                // 仍是一步CAS,你懂的,如今多是不少線程同時進來呢
                if (compareAndSetHead(new Node()))
                    // 給後面用:這個時候head節點的waitStatus==0, 看new Node()構造方法就知道了

                    // 這個時候有了head,可是tail仍是null,設置一下,
                    // 把tail指向head,放心,立刻就有線程要來了,到時候tail就要被搶了
                    // 注意:這裏只是設置了tail=head,這裏可沒return哦,沒有return,沒有return
                    // 因此,設置完了之後,繼續for循環,下次就到下面的else分支了
                    tail = head;
            } else {
                // 下面幾行,和上一個方法 addWaiter 是同樣的,
                // 只是這個套在無限循環裏,反正就是將當前線程排到隊尾,有線程競爭的話排不上重複排
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    // 如今,又回到這段代碼了
    // if (!tryAcquire(arg) 
    //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    //     selfInterrupt();

    // 下面這個方法,參數node,通過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞隊列
    // 注意一下:若是acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話,
    // 意味着上面這段代碼將進入selfInterrupt(),因此正常狀況下,下面應該返回false
    // 這個方法很是重要,應該說真正的線程掛起,而後被喚醒後去獲取鎖,都在這個方法裏了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // p == head 說明當前節點雖然進到了阻塞隊列,可是是阻塞隊列的第一個,由於它的前驅是head
                // 注意,阻塞隊列不包含head節點,head通常指的是佔有鎖的線程,head後面的才稱爲阻塞隊列
                // 因此當前節點能夠去試搶一下鎖
                // 這裏咱們說一下,爲何能夠去試試:
                // 首先,它是隊頭,這個是第一個條件,其次,當前的head有多是剛剛初始化的node,
                // enq(node) 方法裏面有提到,head是延時初始化的,並且new Node()的時候沒有設置任何線程
                // 也就是說,當前的head不屬於任何一個線程,因此做爲隊頭,能夠去試一試,
                // tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操做一下state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 到這裏,說明上面的if分支沒有成功,要麼當前node原本就不是隊頭,
                // 要麼就是tryAcquire(arg)沒有搶贏別人,繼續往下看
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    // 剛剛說過,會到這裏就是沒有搶到鎖唄,這個方法說的是:"當前線程沒有搶到鎖,是否須要掛起當前線程?"
    // 第一個參數是前驅節點,第二個參數纔是表明當前線程的節點
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程須要掛起,直接能夠返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;

        // 前驅節點 waitStatus大於0 ,以前說過,大於0 說明前驅節點取消了排隊。這裏須要知道這點:
        // 進入阻塞隊列排隊的線程會被掛起,而喚醒的操做是由前驅節點完成的。
        // 因此下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點,
        // 簡單說,就是爲了找個好爹,由於你還得依賴它來喚醒呢,若是前驅節點取消了排隊,
        // 找前驅節點的前驅節點作爹,往前循環總能找到一個好爹的
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 仔細想一想,若是進入到這個分支意味着什麼
            // 前驅節點的waitStatus不等於-1和1,那也就是隻多是0,-2,-3
            // 在咱們前面的源碼中,都沒有看到有設置waitStatus的,因此每一個新的node入隊時,waitStatu都是0
            // 用CAS將前驅節點的waitStatus設置爲Node.SIGNAL(也就是-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
    // 這個方法結束根據返回值咱們簡單分析下:
    // 若是返回true, 說明前驅節點的waitStatus==-1,是正常狀況,那麼當前線程須要被掛起,等待之後被喚醒
    //        咱們也說過,之後是被前驅節點喚醒,就等着前驅節點拿到鎖,而後釋放鎖的時候叫你好了
    // 若是返回false, 說明當前不須要被掛起,爲何呢?日後看

    // 跳回到前面是這個方法
    // if (shouldParkAfterFailedAcquire(p, node) &&
    //                parkAndCheckInterrupt())
    //                interrupted = true;

    // 1. 若是shouldParkAfterFailedAcquire(p, node)返回true,
    // 那麼須要執行parkAndCheckInterrupt():

    // 這個方法很簡單,由於前面返回true,因此須要掛起線程,這個方法就是負責掛起線程的
    // 這裏用了LockSupport.park(this)來掛起線程,而後就停在這裏了,等待被喚醒=======
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    // 2. 接下來講說若是shouldParkAfterFailedAcquire(p, node)返回false的狀況

   // 仔細看shouldParkAfterFailedAcquire(p, node),咱們能夠發現,其實第一次進來的時候,通常都不會返回true的,緣由很簡單,前驅節點的waitStatus=-1是依賴於後繼節點設置的。也就是說,我都還沒給前驅設置-1呢,怎麼多是true呢,可是要看到,這個方法是套在循環裏的,因此第二次進來的時候狀態就是-1了。

    // 解釋下爲何shouldParkAfterFailedAcquire(p, node)返回false的時候不直接掛起線程:
    // => 是爲了應對在通過這個方法後,node已是head的直接後繼節點了。剩下的讀者本身想一想吧。

說到這裏,也就明白了,多看幾遍 final boolean acquireQueued(final Node node, int arg) 這個方法吧。本身推演下各個分支怎麼走,哪一種狀況下會發生什麼,走到哪裏。app

解鎖操做

最後,就是還須要介紹下喚醒的動做了。咱們知道,正常狀況下,若是線程沒獲取到鎖,線程會被 LockSupport.park(this); 掛起中止,等待被喚醒。

// 喚醒的代碼仍是比較簡單的,你若是上面加鎖的都看懂了,下面都不須要看就知道怎麼回事了
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    // 日後看吧
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否徹底釋放鎖
    boolean free = false;
    // 其實就是重入的問題,若是c==0,也就是說沒有嵌套鎖了,能夠釋放了,不然還不能釋放掉
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
// 喚醒後繼節點
// 從上面調用處知道,參數node是head頭結點
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    // 若是head節點當前waitStatus<0, 將其修改成0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 下面的代碼就是喚醒後繼節點,可是有可能後繼節點取消了等待(waitStatus==1)
    // 從隊尾往前找,找到waitStatus<=0的全部節點中排在最前面的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從後往前找,仔細看代碼,沒必要擔憂中間有節點取消(waitStatus==1)的狀況
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒線程
        LockSupport.unpark(s.thread);
}

喚醒線程之後,被喚醒的線程將從如下代碼中繼續往前走:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 剛剛線程被掛起在這裏了
    return Thread.interrupted();
}
// 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了

好了,後面就不分析源碼了,剩下的還有問題本身去仔細看看代碼吧。

總結

總結一下吧。

在併發環境下,加鎖和解鎖須要如下三個部件的協調:

鎖狀態。咱們要知道鎖是否是被別的線程佔有了,這個就是 state 的做用,它爲 0 的時候表明沒有線程佔有鎖,能夠去爭搶這個鎖,用 CAS 將 state 設爲 1,若是 CAS 成功,說明搶到了鎖,這樣其餘線程就搶不到了,若是鎖重入的話,state進行+1 就能夠,解鎖就是減 1,直到 state 又變爲 0,表明釋放鎖,因此 lock() 和 unlock() 必需要配對啊。而後喚醒等待隊列中的第一個線程,讓其來佔有鎖。線程的阻塞和解除阻塞。AQS 中採用了 LockSupport.park(thread) 來掛起線程,用 unpark 來喚醒線程。阻塞隊列。由於爭搶鎖的線程可能不少,可是隻能有一個線程拿到鎖,其餘的線程都必須等待,這個時候就須要一個 queue 來管理這些線程,AQS 用的是一個 FIFO 的隊列,就是一個鏈表,每一個 node 都持有後繼節點的引用。AQS 採用了 CLH 鎖的變體來實現,感興趣的讀者能夠參考這篇文章關於CLH的介紹,寫得簡單明瞭。

相關文章
相關標籤/搜索