源碼分析JDK8之AbstractQueuedSynchronizer

前言

源碼分析我認爲主要有兩個做用:知足好奇心,我想每個有追求的人都不會知足於僅僅作一個API Caller實現功能就好,咱們也想知道它究竟是怎麼實現的;借鑑與昇華,當咱們明白了一個類的設計原理,在必定的情境下咱們能夠借鑑其設計哲學,甚至針對咱們本身特殊的業務場景對其進行改良與優化。html

下面我就以這篇文章開啓個人源碼閱讀之旅。整體而言,我會從這個類基本結構入手,而後分析原理,再看看已有的應用,並進行分析與理解。java

我以前一篇文章裏提到過java的顯示鎖ReentrantLock。此外,若是你編寫過併發程序那你通常也應該用過CountDownLatch,Semaphore等等,這些都是同步器,而它們都基於AbstractQueuedSynchronizer(簡稱AQS)實現的,那麼咱們今天就來看看這個牛逼的AQS是怎麼實現這麼多功能的。node

首先打開IDEA,隨便新建一個類,而後輸入CountDownLatch,在它上面敲下Ctrl+B,就打開了CountDownLatch的源碼,而後發現有一個很是重要的靜態內部類Sync繼承了AbstractQueuedSynchronizer,再次Ctrl+B,咱們就打開了AQS的源碼,立刻就能夠解開它的神祕面紗了,哼哼。segmentfault

映入眼簾的首先就是大段大段的文檔,大意就是這個類 提供了一個基於FIFO隊列的實現了阻塞鎖和相關同步器(信號量,事件等)的框架...... 讀完了大概就瞭解這個類究竟是怎麼工做的了。下面咱們開始分類型研究源碼,固然不可能所有分析一遍,這裏只把重點的列出來。
實際代碼分析中,我通常先看看這個結構圖:api

而後讀一讀開始的綜述文檔,而後從實例開始,像方法調用那樣依次深刻查看,就能依次看到相關的方法、內部類和屬性,仍是Ctrl+B大法好啊,這屬於自底向上的源碼分析方法。若是直接從上面那張圖開始,對屬性、方法、內部類挨個分析就屬於自頂向下的分析法了。我以爲對一個陌生的東西要想有清晰的認知最好先自底向上捋一遍,便於搞清楚一個個具體功能的實現機制,而後再自頂向下看一遍,便於把控總體架構,宏觀把握。這樣走兩遍再來總結一下就能比較透徹的掌握該技術了。架構

1、方法與屬性

方法中,protected類型的通常要求具體的同步器子類來實現可是有些也能夠直接用,public類型通常都是能夠直接使用的固然也能夠本身實現,private就是AQS本身的內部實現了,與具體子類無關。併發

state相關

一個private volatile int state;屬性表明了線程之間爭用的資源。與之相關的方法有三個oracle

protected final int getState()
protected final void setState(int newState)
protected final boolean compareAndSetState(int expect, int update)//CAS原子性地修改state

都是protected類型,可見咱們能夠進行Override,來定義state的獲取與釋放從而實現咱們自定義的同步器。很是簡單就不把所有源碼擺出來了。框架

同步隊列queue相關

這個queue是一個FIFO的隊列,每一個節點都是下面的內部類Node類型,等待着state這個資源,主要由兩個屬性決定private transient volatile Node head;private transient volatile Node tail; 與之相關的方法有:ide

// 節點node進入隊列,採用CAS的方式,返回其前驅
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 隊列爲空,先初始化
            if (compareAndSetHead(new Node()))//設置頭結點
                tail = head;
        } else {// 隊列不爲空
            node.prev = t;// 插入節點至隊列尾部
            if (compareAndSetTail(t, node)) {//CAS修改隊尾爲node,之因此CAS是由於可能有多個線程爭相入隊
                t.next = node;
                return t;
            }
        }
    }
}
// 將當前線程以mode的方式(EXCLUSIVE或者SHARED)構成新節點併入隊,返回這個新節點
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 更快的入隊方式,若是失敗再採用較慢的標準入隊方式enq
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// 把node設置爲新的頭,老的頭出隊
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

資源獲取與釋放相關

資源獲取分爲EXCLUSIVESHARED兩種模式,對應acquirereleaseacquireSharedreleaseShared

首先是EXCLUSIVE資源獲取:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這裏tryAcquire須要繼承類本身實現(成功true,失敗false),若是tryAcquire成功則直接返回,不然addWaiter將當前線程以獨佔節點的方式置於同步隊列尾部等待。acquireQueued使得該節點等待獲取資源,一直獲取到資源才返回,整個等待過程當中若是有中斷是不響應的,可是獲取資源後會用selfInterrupt補上。

// 節點得到資源才能返回不然一直自旋,中斷該線程不會實時響應,可是若是被中斷過會返回true,不然返回false
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {// node前驅是頭結點,那麼即可以嘗試去獲取資源了
                setHead(node);// 獲取成功,能夠把node設爲頭結點,也就是說頭結點是獨佔資源的惟一擁有者
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 走到這裏說明獲取失敗,檢查是否應該阻塞和中斷
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);//若是失敗了,就把waitStatus置爲CANCELLED表示取消了
    }
}

// 獲取資源失敗後,當前節點是否應該阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)// 前驅pred得到資源後會通知當前節點node,因此能夠放心的阻塞了(waitStatus會在下面內部類解釋)
        return true;
    if (ws > 0) {// 前驅取消了資源獲取,那麼當前節點就要找到前面最近一個正在等待的節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;//此處 pred.waitStatus < 0,亦即pred 還在等待嘗試獲取資源
    } else {// 前驅正在等待,則設置其狀態爲SIGNAL,讓他獲取資源後通知本節點,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        // 可是本節點不能立刻阻塞,由於設置不必定能成功,須要下次再次檢查
    }
    return false;
}

// 阻塞本線程。被喚醒後要返回本線程是否被中斷過。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

而後是EXCLUSIVE資源釋放:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

與上面相對應,這裏tryRelease也須要繼承類本身實現(成功true,失敗false),若是釋放成功,則調用unparkSuccessor喚醒後繼節點返回true,不然返回false。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)// 可能須要釋放通知信號,把狀態置零,容許失敗
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;// 找到後繼節點
    if (s == null || s.waitStatus > 0) {// 若是後繼節點爲空或者已經取消
        s = null;// 確保該節點的釋放
        for (Node t = tail; t != null && t != node; t = t.prev)// 從隊尾開始找到須要通知的最近的後繼節點
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)// 若是需喚醒的後繼節點存在則喚醒之
        LockSupport.unpark(s.thread);
}

再看SHARED資源獲取:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

這裏tryAcquireShared也須要本身實現(負值說明失敗,非負值表示獲取成功後剩下的可用資源數),若是獲取失敗就調用doAcquireShared進入同步隊列等待。

// 等待獲取共享資源時不響應中斷,可是獲取資源成功後會用selfInterrupt補上
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);// 入隊尾
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {// 處於隊列第二個位置,能夠嘗試獲取資源
                int r = tryAcquireShared(arg);
                if (r >= 0) {// 獲取成功
                    setHeadAndPropagate(node, r);// 將本身設爲隊列頭,並喚醒可能獲取資源的後面幾個節點
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 同acquireQueued的分析
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 舊的頭
    setHead(node); // 設置新的頭
    // 若是還有資源,則喚醒下一個,採用保守策略,多喚醒幾回即便沒獲取到資源也無所謂,儘可能作到不漏掉資源
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {        
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

最後SHARED資源釋放:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

這裏tryReleaseShared依然要本身實現(若是能夠容許下一個節點得到資源則返回true,不然false),若是釋放成功則調用doReleaseShared喚醒後繼節點。須要注意的是tryReleaseShared因爲可能多個線程併發操做因此通常須要CAS而tryRelease不須要。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {// 須要喚醒
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//設置WaitStatus失敗
                    continue;            
                unparkSuccessor(h);// 必定要設置成功才喚醒
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;// CAS設置失敗則繼續循環
        }
        if (h == head)// 頭變了,不須要繼續喚醒
            break;
    }
}

此外,資源獲取除了一直等待的方式以外還有對應的限制等待時間的方法如tryAcquiretryAcquireNanos,沒必要多言,釋放就只有一直等而沒有限制等待時間的了。也有響應中斷與不響應的對應,如acquireInterruptiblyacquire,差異不大,沒必要多言。

2、內部類

Node

等待隊列的節點類,等待隊列是CLH(Craig,Landin,Hagersten)鎖隊列的一種變體,CLH鎖一般用來做爲自旋鎖。

每一個節點主要維護了下面一些狀態

  • 對應的線程thread
  • 等待狀態waitStatus 含,0:初始狀態;CANCELLED 1:被取消;SIGNAL -1:當前線程釋放資源或取消後須要喚醒後繼節點;CONDITION -2:條件等待;PROPAGATE -3:下一個acquireShared操做應該被無條件傳播。實際使用中,通常只關注正負,非負數就意味着節點不須要釋放信號
  • 資源獲取模式有SHARED(默認)和EXCLUSIVE兩個
  • 同步隊列中的前驅後繼節點prevnext
  • 做爲同步隊列節點時,nextWaiter有:EXCLUSIVESHARED標識當前節點是獨佔模式仍是共享模式;與ConditionObject搭配使用做爲條件等待隊列節點時,nextWaiter保存後繼節點。因此實際上這個Node類是被複用了,既用於同步隊列,也用於條件等待隊列

ConditionObject

這個類實現了Condition接口,主要用來完成常見的條件等待、喚醒等操做。一個ConditionObject 包含一個等待隊列,由firstWaiterlastWaiter決定。當前線程調用Condition.await()方法時,會被構形成爲節點,而後置於條件等待隊列隊尾。
咱們看最經常使用的條件等待方法

// 條件等待,響應中斷
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();// 響應中斷
        Node node = addConditionWaiter();// 當前線程加入條件等待隊列
        int savedState = fullyRelease(node);// 釋放資源,並得到本節點須要的資源數,以便再次獲取
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {// 若是當前節點不是在同步隊列,也就是還在等待隊列等待着條件發生
            LockSupport.park(this);// 就會一直阻塞
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//加入同步隊列等待獲取資源,成功後要檢查中斷響應
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 若是最後一個條件等待節點是取消的狀態
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();// 清理整個鏈路的無效節點
            t = lastWaiter;
        }
        //以條件等待的方式將當前線程封裝成節點
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)//條件等待隊列爲空就初始化
            firstWaiter = node;
        else// 隊列不空,插入隊尾
            t.nextWaiter = node;
        lastWaiter = node;
        return node;// 返回新插入的節點
    }

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();// 本節點須要的資源數
            if (release(savedState)) {// 釋放掉這麼多資源
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

而後是信號方法:

public final void signal() {
        if (!isHeldExclusively())// 要使用該方法必須先是獨佔線程
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);//找到第一個條件等待節點,併發出信號
    }
    
    // 去掉條件等待隊列的節點,直到趕上沒取消的或者空節點
    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 節點已被取消
            return false;
        Node p = enq(node);// 條件等待隊列的第一個節點被加入同步隊列的隊尾
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);// 喚醒節點對應線程
        return true;
    }

3、已有應用分析

下面用兩個例子來看看AQS的具體使用場景,分別是使用獨佔模式的ReentrantLock和共享模式的CountDownLatch
通常使用AQS的類,都會用一個內部類Sync來繼承AQS,並實現那幾個protected的方法。

ReentrantLock

ReentrantLockFairSyncNonfairSync兩個類來實現公平鎖和非公平鎖,咱們看非公平鎖,主要幾個方法是

  • lock(),使得NonfairSync調用compareAndSetState把state從0設爲1並用setExclusiveOwnerThread把當前線程設爲獨佔線程(亦即首次得到鎖),若是失敗則使用acquire(1)調用nonfairTryAcquire。整體流程就是若是state爲0,那麼就是本線程首次得到鎖,把state置爲1,不然若是當前線程是獨佔線程則將state+1(這也是鎖可重入的關鍵),若是都不是就進入acquireQueued流程等待得到鎖了了
  • unlock(),調用AQS的release(1)方法,其實是調用了Sync的tryRelease(1)方法,若是state-1爲0,那麼返回true,不然返回false。也就是說,重入鎖必須釋放夠重入次數纔算真正釋放成功,可是unlock()方法自己不會管這個最終結果,只管釋放
  • tryLock(),與lock()區別是不等待,當即返回,只有喚醒時就是獨佔線程才能返回true,實現方法是nonfairTryAcquire
  • newCondition()直接返回了了AQS的內部類ConditionObject
  • isLocked() 若是state爲0則表示未加鎖返回false,不然返回true

CountDownLatch

CountDownLatch 主要幾個方法是

  • CountDownLatch(int count),構造方法,設置 AQS 的 state 爲 count
  • await(),調用 AQS 的 acquireSharedInterruptibly(int arg) 方法,而後調用本身覆蓋的tryAcquireShared(int acquires)來得到state的值是否爲0,若是是0就結束等待直接返回了,若是不是0就調用 AQS 的 doAcquireSharedInterruptibly(int arg)方法,該方法會循環等待,直到state爲0才返回或者被中斷。
  • countDown(),調用 AQS 的 releaseShared(int arg) 方法,其實是調用了本身覆蓋的 tryReleaseShared(int releases) 方法,把 state 減了1,若是此時state爲0,則調用 AQS 的doReleaseShared()方法

分析

整體而言,AQS提供了一個模板方法模式,將得到鎖釋放鎖一些必要的流程操做都規定好了,咱們只須要填充一些具體的得到與釋放方法

  • getState(),setState(int newState),compareAndSetState(int expect,int update):是資源相關操做,保證原子性
  • tryAcquire(int arg):嘗試獨佔獲取資源。成功返回true,失敗返回false。
  • tryRelease(int arg):嘗試獨佔釋放資源。成功返回true,失敗返回false。
  • tryAcquireShared(int arg):嘗試共享獲取資源。負數表示失敗,非負數表示成功表明剩餘可用資源
  • tryReleaseShared(int arg):嘗試共享釋放資源。若是釋放後能夠喚醒後續等待結點返回true,不然返回false。
  • isHeldExclusively():表明當前線程是否獨佔資源,只有用到Condition之時才須要去實現它。

自定義同步器時,通常都是本身寫一個 static class Sync extends AbstractQueuedSynchronizer 靜態內部類來實現具體的方法。

閱讀原文:MageekChiu

相關文章
相關標籤/搜索