讓人抓頭的Java併發(三) 強大的AQS!

前兩篇文章介紹了多線程和線程池的一些概念,在這一篇終於要介紹JUC中很是重要的一個基礎組件AQS了!node

AQS簡介

AQS是隊列同步器AbstractQueuedSynchronizer的簡稱,是用來構建鎖和其餘隊列同步組件(ReentrantLock、CountDownLatch、Semaphore等)的基礎框架。它使用一個volatile修飾的int類型成員變量表示同步狀態,使用一個靜態內部類Node構成的隊列(雙向鏈表)實現獲取同步資源線程的排隊工做。AQS中使用了模板方法模式,子類經過繼承AQS並重寫它的部分方法來管理同步狀態,通常都將子類做爲自定義同步組件的靜態內部類。編程

AQS的實現分析

同步狀態

/**
 * 同步狀態
 */
private volatile int state;
/**
 * 返回同步狀態的當前值
 */
protected final int getState() {
    return state;
}
/**
 * 設置同步狀態的值
 */
protected final void setState(int newState) {
    state = newState;
}
/**
 * 利用CAS操做更新當前的status值
 */
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼

status字段由volatile修飾保證了其可見性,利用CAS操做保證了修改status操做的原子性。所以在AQS中它是線程安全的(並不意味這volatile修飾的變量是線程安全的,只對單個變量讀/寫具備原子性,因此這裏更新操做須要由CAS保證原子性)。status的值能夠被用來控制這個同步器是屬於獨佔模式(小於等於1)仍是共享模式(大於1)。安全

獨佔模式:同一時刻最多隻有一個線程獲取到同步狀態;
共享模式:同一時刻會有多個線程獲取到同步狀態;bash

AQS中提供了一些模板方法控制同步狀態,同步組件須要使用AQS提供的模板方法來實現同步,重寫的關鍵方法有:多線程

方法 說明
tryAcquire(int arg) 獨佔式獲取同步狀態
tryAcquireShared(int arg) 共享式獲取同步狀態
tryRelease(int arg) 獨佔式釋放同步狀態
tryReleaseShared(int arg) 共享式釋放同步狀態

同步隊列

AQS依賴同步隊列來完成同步狀態的管理,當前線程獲取同步狀態失敗時會被構形成一個Node節點並被加入同步隊列中(經過CAS保證線程安全),同時會阻塞當前線程。當同步狀態釋放時,會把隊列首節點中的線程喚醒從新嘗試獲取同步狀態;併發

private transient volatile Node head;
private transient volatile Node tail;
/** CAS設置頭節點.只被enq()方法調用.後續不少節點入隊出隊操做都是使用enq()方法*/
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/** CAS設置尾節點.只被enq()方法調用.*/
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
    
static final class Node {
    /** 標記表示節點正在共享模式中等待 */
    static final Node SHARED = new Node();
    /** 標記表示節點正在獨佔模式中等待 */
    static final Node EXCLUSIVE = null;
    /**
    * 在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待,
    * 節點進入該狀態將不會變化
    */
    static final int CANCELLED =  1;
    /** 
    * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消
    * 會通知後繼節點,使後繼節點的線程得以運行
    */
    static final int SIGNAL    = -1;
    /** 
    * 節點在等待隊列中,當其餘線程對condition調用了signal()後,
    * 該節點會從等待隊列轉移到同步隊列中
    */
    static final int CONDITION = -2;
    /** 表示下一次共享式同步狀態獲取將會被傳播 */
    static final int PROPAGATE = -3;
    /** 等待狀態*/
    volatile int waitStatus;
    /** 前驅節點*/
    volatile Node prev;
    /** 後繼節點*/
    volatile Node next;
    /** 獲取同步狀態的線程*/
    volatile Thread thread;
    /** 等待隊列Condition中的後繼節點,若是式共享模式則是SHARED常量*/
    Node nextWaiter;
    
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
}
複製代碼

同步狀態的獲取流程(獨佔模式)

關於獲取同步狀態的源碼部分這裏簡單分析下。一圖勝千言,給你們一個流程圖讓你們瞭解下這個大體的過程。 框架

源碼分析

acquire(int arg)方法用來獲取同步狀態工具

/**
 * 以獨佔模式獲取同步狀態,忽略中斷.經過至少調用tryAcquire實現,成功返回.
 * 不然線程排隊,可能重複阻塞和解除阻塞,調用tryAcquire直到成功。
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

其中tryAcquire()方法對於不一樣的同步組件有不一樣的實現方式,例如ReentrantLock中的 公平鎖和非公平鎖的實現方式就略有不一樣。本篇不詳細分析,主要功能是獲取同步狀態,成功返回true,不然返回false。源碼分析

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 嘗試enq方法的快速路徑.失敗後備份到完整enq方法
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 若是隊列爲空,初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
複製代碼

addWaiter(Node.EXCLUSIVE)方法用於構造一個獨佔模式的Node節點並嘗試使用CAS操做將其加入到等待隊列做爲尾節點;若是加入失敗則進入enq()方法經過在死循環中CAS來設置爲尾節點直到成功。ui

/**
* 對於已經在同步隊列中的線程,以獨佔不間斷模式獲取。由條件等待方法使用以及獲取。
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            // 若是前驅節點是頭節點則嘗試獲取同步狀態
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 若是前驅節點不是頭節點則阻塞當前線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

在acquireQueued()方法中,當前線程經過死循環嘗試獲取同步狀態,並且只有當前驅節點是頭節點的時候才能嘗試獲取同步狀態。頭節點的線程釋放同步狀態以後會喚醒其後繼節點,這裏在獲取同步狀態以前檢查前驅是否爲頭節點是爲了防止過早的通知(等待線程因爲中斷被喚醒)

同步狀態的釋放流程(獨佔模式)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 若是隊列中頭節點不爲null而且該節點不是處於初始狀態(waitStatus爲0)
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * 若是狀態爲負,請嘗試以預期信號清
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 若是後繼節點爲null或者waitStatus>0(爲1,從同步隊列中取消了)
     * 則繼續向後尋找節點
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

該方法經過調用tryRelease(arg)釋放同步狀態,具體根據不一樣組件實現,以後會喚醒它的後繼節點使後繼節點嘗試獲取同步狀態。

Condition簡介

Condition接口定義了等待/通知兩種類型的方法,調用這些方法的前提是須要獲取到Condition對象關聯的鎖(相似於調用Object的wait、notify須要先獲取Synchronized鎖)。ConditionObject是AQS中的一個內部類,實現了Condition接口,有兩個Node類型的成員變量firstWaiter、lastWaiter,實際上就是個雙向隊列。

等待

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 加入等待隊列 
    Node node = addConditionWaiter();
    // 釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 進入等待狀態
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒,從新嘗試進入同步隊列獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 若是被中斷則拋出異常
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
複製代碼

調用await()方法會使當前線程加入等待隊列並釋放同步狀態,喚醒同步隊列中的後繼節點,而後當前線程進入等待狀態。當處於等待隊列的該節點被喚醒(signal())後從新嘗試獲取同步狀態,若是是因爲中斷被喚醒則拋出異常。

通知

public final void signal() {
    // 判斷當前線程是否獲取了鎖 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    // 喚醒等待隊列中的第一個節點
    if (first != null)
        doSignal(first);
}
複製代碼

調用該方法必需要先獲取同步狀態,該方法會將等待隊列中的第一個節點移動到同步隊列而後喚醒它,被喚醒的節點會從新嘗試獲取同步狀態

總結

上面我簡單分析了JUC中的重要工具AQS,其實對於AQS的同步狀態的獲取和釋放就至關於進入和退出Synchronized同步。AQS的等待隊列至關於Synchronized的鎖等待池。AQS中的Condition至關於Synchronized的對象等待池。Condition的await()和signal()至關於Object的wait()和notify()。







參考:《Java併發編程的藝術》

相關文章
相關標籤/搜索