AQS 原理剖析

AQSAbstractQueuedSynchronizer類稱做隊列同步器,是構建其餘同步器的一個重要的基礎框架,同步器自身是沒有實現任何同步接口。它是經過控制一個int類型的state變量來表示同步狀態,使用一個內置的FIFO(先進先出)隊列來構建工做隊列操做。java

同步器定義有兩種資源共享方式:Exclusive(獨佔式)和Share(共享式)的獲取同步狀態。node

獨佔式:一個時間點只能執行一個線程。 共享式:一個時間點可多個線程同時執行。設計模式

使用方式

同步器的設計採用模板模式,要實現一個同步組件得先繼承AbstractQueuedSynchronizer類,經過調用同步器提供的方法和重寫同步器的方法來實現。安全

調用同步器中的方法就是調用前面提到的經過state變量值的操做來表示同步操做,state是被volatile修飾來保證線程可見性。併發

方法名 描述
getState() 獲取當前線程同步狀態值。
setState(int newState) 設置當前同步狀態值。
compareAndSetState(int expect, int update) 經過CAS設置state的值。

爲了不被重寫,以上方法都被final修飾了。app

實現同步組件,須要本身根據本身定製化的需求進行處理,因此須要本身重寫同步器提供的方法,要重寫的方法主要是獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態。框架

tryAcquire(int arg) 獨佔式獲取同步狀態,返回值爲boolean類型,獲取成返回true,獲取失敗返回falseelasticsearch

tryRelease(int arg) 獨佔式釋放同步狀態,返回值爲boolean類型,釋放成返回true,釋放失敗返回falseoop

tryAcquireShared(int arg) 共享式獲取同步狀態,返回值爲int類型,獲取成功返回大於 0 的值。ui

tryReleaseShared(int arg) 共享式釋放同步狀態,返回值爲boolean類型,釋放成返回true,釋放失敗返回false

isHeldExclusively() 獨佔模式下是否被當前前程獨佔,返回值爲boolean類型,已被當前線程所獨佔返回true,反之爲false

同步器隊列

一個同步器裏面擁有一個同步隊列多個等待隊列

同步隊列

AbstractQueuedSynchronizer類中,有一個內部類Node,經過該類構造一個內部的同步隊列,這是一個FIFO 雙向隊列。 當前運行線程回去同步狀態時,若是獲取失敗,則將當前線程信息建立一個Node追加到同步隊列尾部,而後阻塞當前線程,直到隊列的上一個節點的同步狀態釋放,再喚醒當前線程嘗試從新獲取同步狀態。這個從新獲取同步狀態操做的節點,必定要是同步隊列中第一節點。

Node 源碼以下:

static final class Node {
    // 共享模式下等待的標記
    static final Node SHARED = new Node();
    // 獨佔模式下等待的標記
    static final Node EXCLUSIVE = null;

    /*
    * 等待狀態常量值,如下四個常量都是
    */
    static final int CANCELLED =  1;
    
    static final int SIGNAL    = -1;
    
    static final int CONDITION = -2;
    
    static final int PROPAGATE = -3;

    // 等待狀態
    volatile int waitStatus;

    // 當前節點的前驅節點
    volatile Node prev;

    // 當前節點的後繼節點
    volatile Node next;

    // 獲取同步狀態的線程(引用)
    volatile Thread thread;

    // 等待隊列中的後繼節點
    Node nextWaiter;

    // 是否共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 獲取前驅節點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

經過以上代碼,能夠看到節點中保存了節點模式、等待狀態、線程引用、前驅和後繼節點,構造節點。

同步隊列中被阻塞的線程的等待狀態包含有四個常量值:CANCELLED、SIGNAL、CONDITION、PROPAGATE ,它們對應的被阻塞緣由以下:

  • CANCELLED 同步隊列中當前節點的線程等待超時或被中斷,須要從同步隊列中取消等待。
  • SIGNAL 當前節點釋放同步狀態或被取消後,通知後繼節點的線程運行。
  • CONDITION 當前節點在 Condition 上等待,當其餘線程對 Condition 調用了 signal() 方法後,該節點將添加到同步隊列中。
  • PROPAGATE 該狀態存在共享模式的首節點中,當前節點喚醒後將傳播喚醒其餘節點。

同步器中持有同步隊列的首節點和尾節點的引用,在AbstractQueuedSynchronizer中分別對應headtail字段。

因此同步隊列的基本結構如圖:

等待隊列

AbstractQueuedSynchronizer類中包含一個內部類ConditionObject,該類實現了Condition的接口。一個Condition對象包含一個等待隊列,同時Condition對象能夠實現等待/通知功能。

Condition持有等待隊列中的首節點(firstWaiter)和尾節點(lastWaiter),以下圖代碼所示:

若是當前線程調用Condition.await()時,會將當前線程信息構建一個 Node 節點,由於Condition持有等待隊列中的首尾節點,因此將當前等待隊列中的尾節點的nextWaiter指向當前線程構建的節點,同時更新lastWaiter的引用節點。

上述過程當中的節點、隊列的操做,是獲取到鎖的線程來調用Condition.await()的,因此整個執行過程在沒有基於 CAS 的狀況下,也是線程安全的。

經過以上的描述,能夠知道一個同步器中同步隊列、等待隊列構成的示意圖:

當調用Condition.await()時,同步隊列中的首節點,也就是當前線程所建立的節點,會加入到等待隊列中的尾部,釋放當前線程的同步狀態而且喚醒同步隊列的後繼節點,當前線程也就進入等待狀態,這個前後順序不能顛倒。這個過程至關於同步隊列的首節點的線程構造新的節點加入到等待隊列的尾部。

當調用Condition.signal()方法時,會先將等待隊列中首節點轉移到同步隊列尾部,而後喚醒該同步隊列中的線程,該線程從Condition.await()中自旋退出,接着在在同步器的acquireQueued()中自旋獲取同步狀態。

當調用Condition.wait()方法,同步隊列首節點轉移到等待隊列方法:

public final void await() throws InterruptedException {
    // 若是線程已中斷,則拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加節點到等待隊列中
    Node node = addConditionWaiter();
    // 修改 state 來達到釋放同步狀態,避免死鎖
    int savedState = fullyRelease(node);
    
    int interruptMode = 0;
    // 判斷當前節點是否在同步隊列中
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 繼續獲取同步狀態競爭
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 清除已取消的節點
        unlinkCancelledWaiters();
    if (interruptMode != 0) // 被中斷時的處理
        reportInterruptAfterWait(interruptMode);
}

上面addc方法是向等待隊列中添加一個新的節點。

private Node addConditionWaiter() {
    // 獲取等待隊列中尾節點
    Node t = lastWaiter;
    // 若是最後一個節點已取消,則清除取消節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 利用當前線程信息建立等待隊列的節點
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    
    if (t == null) // 若是最後尾節點爲空,當前節點則爲等待隊列的首節點
        firstWaiter = node;
    else // 不然將當前尾節點的下一個節點指向當前線程信息所構造的節點
        t.nextWaiter = node;
    lastWaiter = node;  // 更新 Condition 的尾節點引用
    return node;
}

當調用Condition.signal()方法,等待隊列首節點轉移到同步隊列方法:

public final void signal() {
    // 是否被當前線程所獨佔
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取等待隊列中首節點
    Node first = firstWaiter;
    if (first != null)
        // 轉移到同步隊列,而後喚醒該節點
        doSignal(first);
}

轉移同步隊列首節點到同步隊列,並喚醒該節點方法doSignal()

private void doSignal(Node first) {
    do {
      if ( (firstWaiter = first.nextWaiter) == null)
          lastWaiter = null;
          // 去除首節點
          first.nextWaiter = null;
    } while (!transferForSignal(first) && // 從等待隊列中轉移到同步隊列
              (first = firstWaiter) != null);
}

轉移等待隊列到同步隊列方法transferForSignal(Node node)

final boolean transferForSignal(Node node) {
    // 驗證節點是否被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 轉移節點至同步隊列
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

等待隊列中的頭結點線程安全移動到同步隊列方法enq(final Node node)

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 同步隊列中若是爲空,則初始化同步器
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不然新節點的前驅節點爲當前同步隊列的尾節點
            node.prev = t;
            // 設置當前新節點爲同步隊列的尾節點,並更新先前同步隊列的尾節點的後繼節點指向當前新節點
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

獨佔式同步狀態

獨佔式同步狀態獲取和釋放是線程安全的操做,一個時間點確保只有一個線程獲取到同步狀態。

獨佔式同步狀態獲取

acquire(int arg)方法是獲取獨佔式同步狀態的方法,當線程獲取同步失敗時,會加入到同步隊列中。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代碼中,當執行tryAcquire(int arg)方法獲取同步狀態失敗時,接着經過addWaiter(Node.EXCLUSIVE)構造當前線程信息的節點,隨後將新構造的節點經過acquireQueued(final Node node, int arg)方法加入到同步隊列中,節點在同步隊列中自旋等待獲取同步狀態。

tryAcquire(int arg)是自定義同步器實現的,實現該方法須要保證線程安全獲取同步狀態,前面講到AQS提供的compareAndSetState(int expect, int update)方法經過CAS設置state值來保證線程安全。

上面獲取獨佔式同步狀態時,主要分析acquireQueued(final Node node, int arg)方法,節點加入隊列並自旋等待。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 是否中斷標識
        boolean interrupted = false;
        for (;;) {
            // 獲取當前節點的前驅節點
            final Node p = node.predecessor();
            // 若是前驅節點是首節點,而且當前節點獲取到同步狀態
            if (p == head && tryAcquire(arg)) {
                // 將當前節點設置爲首節點
                setHead(node);
                // 將原首節點(即當前節點的前驅節點)引用清空,利於 GC 回收
                p.next = null;
                // 成功獲取到同步狀態標誌
                failed = false;
                return interrupted;
            }
            // 判斷前驅節點是否超時或取消,以及當前線程是否被中斷
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 若是被中斷,則節點出隊
        if (failed)
            cancelAcquire(node);
    }
}

在首節點釋放同步狀態後,同時喚醒後繼節點。後繼節點經過自旋的方式(這裏利用死循環方式)也會檢查本身的前驅節點是否爲首節點,若是是前驅節點則會嘗試獲取同步狀態。獲取成功則返回,不然判斷是否被中斷或者繼續自旋上述獲取同步狀態操做。

獨佔式同步狀態釋放

release(int arg)方法是釋放同步狀態,當釋放同步狀態後會喚醒後繼節點。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease(int arg)方法一樣也是自定義同步器實現。當首節點不爲空且處於等待狀態時,那麼調用unparkSuccessor(Node node)方法喚醒後繼節點。

private void unparkSuccessor(Node node) {
    // CAS 設置等待狀態爲初始狀態
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 若是當前釋放同步狀態的節點不存在後繼節點或後繼節點超時/被中斷
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾節點中開始尋找等待狀態的節點做爲新首節點,這裏已排除當前節點(t != node)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

釋放同步狀態的整個過程就是:釋放同步狀態,喚醒後繼節點。這個後繼節點必須知足,非空、非當前節點、等待狀態小於或等於 0 ,即SIGNALCONDITIONPROPAGATE和初始化狀態。

獨佔式資源共享方式除了上面的同步狀態獲取,還有獨佔式超時獲取使用的方法是doAcquireNanos(int arg, long nanosTimeout)獨佔式可中斷獲取使用的方法是acquireInterruptibly(int arg)

共享式同步狀態

共享式同步狀態同一時間點能夠有多個線程獲取到同步狀態。

共享式同步狀態獲取

acquireShared(int arg)方法是共享式同步狀態獲取的方法。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        // 獲取同步狀態失敗後調用的方法
        doAcquireShared(arg);
}

tryAcquireShared(int arg)方法是自定義同步器實現的,返回大於或等於 0 時,表示獲取成功。若是小於 0 時,獲取同步狀態失敗後會調用doAcquireShared(int arg)方法進行再次嘗試獲取。

private void doAcquireShared(int arg) {、
    // 構造一個當前線程信息的新節點
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋式獲取同步狀態
        for (;;) {
            final Node p = node.predecessor();
            // 判斷新節點的前驅節點是否爲首節點
            if (p == head) {
                // 再次嘗試獲取同步狀態
                int r = tryAcquireShared(arg);
                // 獲取成功後退出自旋
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上面代碼中,當獲取同步狀態失敗後,則建立一個共享模式類型的節點,而後自旋式獲取同步狀態,若是前驅節點爲首節點時則嘗試再次獲取同步狀態,獲取同步狀態成功後退出當前自旋。

共享式釋放同步狀態

releaseShared(int arg)方法來釋放共享式同步狀態。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 同步狀態釋放成功後,喚醒後面等待狀態的節點
        doReleaseShared();
        return true;
    }
    return false;
}

上面tryReleaseShared(int arg)釋放同步狀態方法必須保證線程安全,由於它多個線程獲取到同步狀態時會引起併發操做,能夠經過循環操做和 CAS 來確保安前行。

doReleaseShared()方法喚醒後續等待狀態的節點。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 驗證後繼節點的線程處於等待狀態
            if (ws == Node.SIGNAL) {
                // 再次檢查後繼節點的線程是否處於等待狀態
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;   
                // 喚醒後繼節點,這時每喚醒一次就更新一次首節點
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

共享同步狀態釋放後,自旋式依次喚醒隊列中節點。

總結

從 AQS 中能夠借鑑它利用循環和 CAS 來確保併發的安全性的思路,同時它採用模板設計模式定義一個處理邏輯,將具體的特定處理邏輯交由子類自定義實現。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 以及 Tomncat 的 LimitLatch 都有用其做爲同步器。

推薦閱讀

《Java 線程基礎,從這篇開始》

《你必須會的 JDK 動態代理和 CGLIB 動態代理》

《synchronized 原理知多少》

《你必須會的 JDK 動態代理和 CGLIB 動態代理》

《Netty 中粘包/拆包處理》

《ElasticSearch之映射經常使用操做》

相關文章
相關標籤/搜索