AbstractQueuedSynchronizer 原理分析

AQS 簡介

什麼是AQS

AQS ,AbstractQueuedSynchronizer ,即隊列同步器。它是構建鎖或者其餘同步組件的基礎框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等),J.U.C 併發包的做者(Doug Lea)指望它可以成爲實現大部分同步需求的基礎。java

它是 J.U.C 併發包中的核心基礎組件。node

AQS 優點

AQS 解決了在實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO 同步隊列。設計模式

基於 AQS 來構建同步器能夠帶來不少好處。它不只可以極大地減小實現工做,並且也沒必要處理在多個位置上發生的競爭問題。安全

在基於 AQS 構建的同步器中,只能在一個時刻發生阻塞,從而下降上下文切換的開銷,提升了吞吐量。同時在設計 AQS 時充分考慮了可伸縮性,所以 J.U.C 中,全部基於 AQS 構建的同步器都可以得到這個優點。多線程

同步狀態

AQS 的主要使用方式是繼承,子類經過繼承同步器,並實現它的抽象方法來管理同步狀態。併發

AQS 使用一個 int 類型的成員變量 state 來表示同步狀態:app

當 state > 0 時,表示已經獲取了鎖。
當 state = 0 時,表示釋放了鎖。
複製代碼

它提供了三個方法,來對同步狀態 state 進行操做,而且 AQS 能夠確保對 state 的操做是安全的:框架

#getState()
#setState(int newState)
#compareAndSetState(int expect, int update)
複製代碼

同步隊列

AQS 經過內置的 FIFO 同步隊列來完成資源獲取線程的排隊工做:工具

若是當前線程獲取同步狀態失敗(鎖)時,AQS 則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程 當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。oop

主要內置方法

AQS 主要提供了以下方法:

  1. #getState():返回同步狀態的當前值
  2. #setState(int newState):設置當前同步狀態。
  3. #compareAndSetState(int expect, int update):使用 CAS 設置當前狀態,該方法可以保證狀態設置的原子性。
  4. 【可重寫】#tryAcquire(int arg):獨佔式獲取同步狀態,獲取同步狀態成功後,其餘線程須要等待該線程釋放同步狀態才能獲取同步狀態。
  5. 【可重寫】#tryRelease(int arg):獨佔式釋放同步狀態。
  6. 【可重寫】#tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於 0 ,則表示獲取成功;不然,獲取失敗。
  7. 【可重寫】#tryReleaseShared(int arg):共享式釋放同步狀態。
  8. 【可重寫】#isHeldExclusively():當前同步器是否在獨佔式模式下被線程佔用,通常該方法表示是否被當前線程所獨佔。
  9. acquire(int arg):獨佔式獲取同步狀態。若是當前線程獲取同步狀態成功,則由該方法返回;不然,將會進入同步隊列等待。該方法將會調用可重寫的 #tryAcquire(int arg) 方法;不響應中斷
  10. #acquireInterruptibly(int arg):與 #acquire(int arg) 相同,可是該方法響應中斷。當前線程爲獲取到同步狀態而進入到同步隊列中,若是當前線程被中斷,則該方法會拋出InterruptedException 異常並返回。
  11. #tryAcquireNanos(int arg, long nanos):超時獲取同步狀態。若是當前線程在 nanos 時間內沒有獲取到同步狀態,那麼將會返回 false ,已經獲取則返回 true 。
  12. #acquireShared(int arg):共享式獲取同步狀態,若是當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨佔式的主要區別是在同一時刻能夠有多個線程獲取到同步狀態;
  13. #acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷。
  14. #tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增長超時限制。
  15. #release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態以後,將同步隊列中第一個節點包含的線程喚醒。
  16. #releaseShared(int arg):共享式釋放同步狀態。

從上面的方法看下來,基本上能夠分紅 3 類:

獨佔式獲取與釋放同步狀態
共享式獲取與釋放同步狀態
查詢同步隊列中的等待線程狀況
複製代碼

CLH 同步隊列

CLH簡介

CLH 同步隊列是一個 FIFO 雙向隊列,AQS 依賴它來完成同步狀態的管理:

當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程

當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。

Node

Node 是 AbstractQueuedSynchronizer 的內部靜態類。

static final class Node {

    // 共享
    static final Node SHARED = new Node();
    // 獨佔
    static final Node EXCLUSIVE = null;

    /** * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態 */
    static final int CANCELLED =  1;
    /** * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 */
    static final int SIGNAL    = -1;
    /** * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */
    static final int CONDITION = -2;
    /** * 表示下一次共享式同步狀態獲取,將會無條件地傳播下去 */
    static final int PROPAGATE = -3;

    /** 等待狀態 */
    volatile int waitStatus;

    /** 前驅節點,當節點添加到同步隊列時被設置(尾部添加) */
    volatile Node prev;

    /** 後繼節點 */
    volatile Node next;

    /** 等待隊列中的後續節點。若是當前節點是共享的,那麼字段將是一個 SHARED 常量,也就是說節點類型(獨佔和共享)和等待隊列中的後續節點共用同一個字段 */
    Node nextWaiter;
    
    /** 獲取同步狀態的線程 */
    volatile Thread thread;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
    
}
複製代碼
  1. waitStatus 字段

等待狀態,用來控制線程的阻塞和喚醒,而且能夠避免沒必要要的調用LockSupport的 #park(...) 和 #unpark(...) 方法。。目前有 4 種:CANCELLED SIGNAL CONDITION PROPAGATE 。實際上,有第 5 種,INITAL ,值爲 0 ,初始狀態。每一個等待狀態表明的含義,它不只僅指的是 Node 本身的線程的等待狀態,也能夠是下一個節點的線程的等待狀態

  1. CLH 同步隊列

head 和 tail 字段,是 AbstractQueuedSynchronizer 的字段,分別指向同步隊列的頭和尾。再配合上 prev 和 next 字段,快速定位到同步隊列的頭尾。

prev 和 next 字段,分別指向 Node 節點的前一個和後一個 Node 節點,從而實現鏈式雙向隊列。

  1. thread 字段,Node 節點對應的線程 Thread 。

  2. nextWaiter 字段,Node 節點獲取同步狀態的模型( Mode )。#tryAcquire(int args) 和 #tryAcquireShared(int args) 方法,分別是獨佔式和共享式獲取同步狀態。在獲取失敗時,它們都會調用 #addWaiter(Node mode) 方法入隊。而 nextWaiter 就是用來表示是哪一種模式:

    SHARED 靜態 + 不可變字段,枚舉共享模式。
     EXCLUSIVE 靜態 + 不可變字段,枚舉獨佔模式。
     #isShared() 方法,判斷是否爲共享式獲取同步狀態。
    複製代碼
  3. #predecessor() 方法,得到 Node 節點的前一個 Node 節點。在方法的內部,Node p = prev 的本地拷貝,是爲了不併髮狀況下,prev 判斷完 == null 時,剛好被修改,從而保證線程安全。

  4. 構造方法有 3 個,分別是:

    #Node() 方法:用於 SHARED 的建立。
     
     #Node(Thread thread, Node mode) 方法:用於 #addWaiter(Node mode) 方法。
     從 mode 方法參數中,咱們也能夠看出它表明獲取同步狀態的模式。
     
     #Node(Thread thread, int waitStatus) 方法,用於 #addConditionWaiter() 方法。
    複製代碼

入列

CLH 隊列入列很簡單: tail 指向新節點。 新節點的 prev 指向當前最後的節點。 當前最後一個節點的 next 指向當前節點。

可是,實際上,入隊邏輯實現的 #addWaiter(Node) 方法,須要考慮併發的狀況。它經過 CAS 的方式,來保證正確的添加 Node 。代碼以下:

private Node addWaiter(Node mode) {
    // 新建節點
    Node node = new Node(Thread.currentThread(), mode);
    // 記錄原尾節點
    Node pred = tail;
    // 快速嘗試,添加新節點爲尾節點
    //當原尾節點非空,才執行快速嘗試的邏輯. 在下面的 #enq(Node node) 方法中,咱們會看到,首節點未初始化的時,head 和 tail 都爲空。
    if (pred != null) {
        // 設置新 Node 節點的尾節點爲原尾節點
        node.prev = pred;
        // CAS 設置新的尾節點
        if (compareAndSetTail(pred, node)) {
            // 成功,原尾節點的下一個節點爲新節點
            pred.next = node;
            return node;
        }
    }
    // 失敗,屢次嘗試,直到成功
    enq(node);
    return node;
}
複製代碼
  1. 建立新節點 node 。在建立的構造方法,mode 方法參數,傳遞獲取同步狀態的模式。
  2. 記錄原尾節點 tail 。
  3. 快速嘗試,添加新節點爲尾節點。
  4. enq添加失敗,屢次嘗試,直到成功添加。

調用 #enq(Node node) 方法,屢次嘗試,直到成功添加

private Node enq(final Node node) {
     // 屢次嘗試,直到成功爲止
     for (;;) {
         // 記錄原尾節點
         Node t = tail;
         // 原尾節點不存在,建立首尾節點都爲 new Node()
         if (t == null) {
             if (compareAndSetHead(new Node()))
                 tail = head;
         // 原尾節點存在,添加新節點爲尾節點
         } else {
             //設置爲尾節點
             node.prev = t;
             // CAS 設置新的尾節點
             if (compareAndSetTail(t, node)) {
                 // 成功,原尾節點的下一個節點爲新節點
                 t.next = node;
                 return t;
             }
         }
     }
 }
複製代碼
  1. 「死」循環,屢次嘗試,直到成功添加爲止
  2. 記錄原尾節點 t 。和 #addWaiter(Node node) 方法的相同。
  3. 原尾節點存在,添加新節點爲尾節點。和 #addWaiter(Node node) 方法的相同。
  4. 原尾節點不存在,則首節點也不存在了,建立首尾節點都爲 new Node() 。
  5. #compareAndSetHead(Node update) 方法,使用 Unsafe 來 CAS 設置尾節點 head 爲新節點。

出列

CLH 同步隊列遵循 FIFO,首節點的線程釋放同步狀態後,將會喚醒它的下一個節點(Node.next)。然後繼節點將會在獲取同步狀態成功時,將本身設置爲首節點( head )。

這個過程很是簡單,head 執行該節點並斷開原首節點的 next 和當前節點的 prev 便可。注意,在這個過程是不須要使用 CAS 來保證的,由於只有一個線程,可以成功獲取到同步狀態。

setHead(Node node) 方法,實現上述的出列邏輯。代碼以下:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
複製代碼

AQS:同步狀態的獲取與釋放

AQS 的設計模式採用的模板方法模式,子類經過繼承的方式,實現它的抽象方法來管理同步狀態。

對於子類而言,它並無太多的活要作,AQS 已經提供了大量的模板方法來實現同步,主要是分爲三類:

獨佔式獲取和釋放同步狀態
共享式獲取和釋放同步狀態
查詢同步隊列中的等待線程狀況。
複製代碼

自定義子類使用 AQS 提供的模板方法,就能夠實現本身的同步語義。

獨佔式

獨佔式,同一時刻,僅有一個線程持有同步狀態。

獨佔式同步狀態獲取

acquire(int arg)

acquire(int arg) 方法,爲 AQS 提供的模板方法。該方法爲獨佔式獲取同步狀態,可是該方法對中斷不敏感。也就是說,因爲線程獲取同步狀態失敗而加入到 CLH 同步隊列中,後續對該線程進行中斷操做時,線程不會從 CLH 同步隊列中移除。代碼以下:

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }
複製代碼

調用 #tryAcquire(int arg) 方法,去嘗試獲取同步狀態,獲取成功則設置鎖狀態並返回 true ,不然獲取失敗,返回 false 。

若tryAcquire獲取成功,則acquire(int arg) 方法直接返回,不用線程阻塞

若 tryAcquire 獲取失敗調用 addWaiter(Node mode) 方法,將當前線程加入到 CLH 同步隊列尾部,而且, mode 方法參數爲 Node.EXCLUSIVE ,表示獨佔模式。而後調用 boolean #acquireQueued(Node node, int arg) 方法,自旋直到得到同步狀態成功。

另外,該 acquireQueued 方法的返回值類型爲 boolean ,當返回 true 時,表示在這個過程當中,發生過線程中斷。可是呢,這個方法又會清理線程中斷的標識,因此在種狀況下,須要調用 #selfInterrupt() 方法,恢復線程中斷的標識,代碼以下:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
複製代碼

tryAcquire(int arg)

tryAcquire(int arg)方法,須要自定義同步組件本身實現,該方法必需要保證線程安全的獲取同步狀態。AQS裏代碼以下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼

直接拋出 UnsupportedOperationException 異常。

acquireQueued

boolean #acquireQueued(Node node, int arg) 方法,爲一個自旋的過程,也就是說,當前線程(Node)進入同步隊列後,就會進入一個自旋的過程,每一個節點都會自省地觀察,當條件知足,獲取到同步狀態後,就能夠從這個自旋過程當中退出,不然會一直執行下去。

流程圖以下:

代碼以下:

final boolean acquireQueued(final Node node, int arg) {
     // 記錄是否獲取同步狀態成功
     boolean failed = true;
     try {
         // 記錄過程當中,是否發生線程中斷
         boolean interrupted = false;
         /* * 自旋過程,其實就是一個死循環而已 */
         for (;;) {
             // 當前線程的前驅節點
             final Node p = node.predecessor();
             // 當前線程的前驅節點是頭結點,且同步狀態成功
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
             }
             // 獲取失敗,線程等待--具體後面介紹
             if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         // 獲取同步狀態發生異常,取消獲取。
         if (failed)
             cancelAcquire(node);
     }
 }
複製代碼
  1. failed 變量,記錄是否獲取同步狀態成功。
  2. interrupted 變量,記錄獲取過程當中,是否發生線程中斷。
  3. 調用 Node#predecessor() 方法,得到當前線程的前一個節點 p 。
  4. p == head 代碼塊,若知足,則表示當前線程的前一個節點爲頭節點,由於 head 是最後一個得到同步狀態成功的節點,此時調用 #tryAcquire(int arg) 方法,嘗試得到同步狀態
  5. 當前節點( 線程 )獲取同步狀態成功:
    • 設置當前節點( 線程 )爲新的 head 。
    • 設置老的頭節點 p 再也不指向下一個節點,讓它自身更快的被 GC 。
    • 標記 failed = false ,表示獲取同步狀態成功。
    • 返回記錄獲取過程當中,是否發生線程中斷。
  6. 調用 #shouldParkAfterFailedAcquire(Node pre, Node node) 方法,判斷獲取失敗後,是否當前線程須要阻塞等待。
  7. 調用 #cancelAcquire(Node node) 方法,取消獲取同步狀態。

shouldParkAfterFailedAcquire

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     // 得到前一個節點的等待狀態
     int ws = pred.waitStatus;
     if (ws == Node.SIGNAL) // Node.SIGNAL
         /* * This node has already set status asking a release * to signal it, so it can safely park. */
         return true;
     if (ws > 0) { // Node.CANCEL
         /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
         do {
             node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
     } else { // 0 或者 Node.PROPAGATE
         /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     }
     return false;
 }
複製代碼
  1. pred 和 node 方法參數,傳入時,要求前者必須是後者的前一個節點。
  2. 得到前一個節點( pre )的等待狀態。下面會根據這個狀態有三種狀況的處理。
    • 等待狀態爲 Node.SIGNAL 時,表示 pred 的下一個節點 node 的線程須要阻塞等待。
    • 在 pred 的線程釋放同步狀態時,會對 node 的線程進行喚醒通知。因此返回 true ,代表當前線程能夠被 park,安全的阻塞等待。
    • 等待狀態爲 0 或者 Node.PROPAGATE 時,經過 CAS 設置,將狀態修改成 Node.SIGNAL ,即下一次從新執行 #shouldParkAfterFailedAcquire(Node pred, Node node) 方法時,知足條件。可是,對於本次執行,返回 false 。
    • 另外,等待狀態不會爲 Node.CONDITION ,由於它用在 ConditonObject 中。
    • 等待狀態爲 NODE.CANCELLED 時,則代表該線程的前一個節點已經等待超時或者被中斷了,則須要從 CLH 隊列中將該前一個節點刪除掉,循環回溯,直到前一個節點狀態 <= 0 。 對於本次執行,返回 false ,須要下一次再從新執行 #shouldParkAfterFailedAcquire(Node pred, Node node) 方法,看看知足哪一個條件。

整個過程以下圖:

cancelAcquire

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
複製代碼
  1. 若傳入參數 node 爲空。

  2. 將節點的等待線程置空。

  3. 得到 node 節點的前一個節點 pred 。

  4. 得到 pred 的下一個節點 predNext 。predNext 從表面上看,和 node 是等價的。 可是實際上,存在多線程併發的狀況,因此咱們調用 #compareAndSetNext(...) 方法,使用 CAS 的方式,設置 pred 的下一個節點。 若是設置失敗,說明當前線程和其它線程競爭失敗,不須要作其它邏輯,由於 pred 的下一個節點已經被其它線程設置成功。

  5. 設置 node 節點的爲取消的等待狀態 Node.CANCELLED 。 這裏可使用直接寫,而不是 CAS 。 在這個操做以後,其它 Node 節點能夠忽略 node 。 Before, we are free of interference from other threads. 如何理解。

  6. 下面開始開始修改 pred 的新的下一個節點,一共分紅三種狀況。

    • 若是 node 是尾節點,調用 #compareAndSetTail(...) 方法,CAS 設置 pred 爲新的尾節點。若上述操做成功,調用 #compareAndSetNext(...) 方法,CAS 設置 pred 的下一個節點爲空( null )。
    • pred 非首節點。pred 的等待狀態爲 Node.SIGNAL ,或者可被 CAS 爲 Node.SIGNAL 。pred 的線程非空。若 node 的 下一個節點 next 的等待狀態非 Node.CANCELLED ,則調用 #compareAndSetNext(...) 方法,CAS 設置 pred 的下一個節點爲 next 。
    • 若是 pred 爲首節點,調用 #unparkSuccessor(Node node) 方法,喚醒 node 的下一個節點的線程等待。爲何此處須要喚醒呢?由於,pred 爲首節點,node 的下一個節點的阻塞等待,須要 node 釋放同步狀態時進行喚醒。可是,node 取消獲取同步狀態,則不會再出現 node 釋放同步狀態時進行喚醒 node 的下一個節點。所以,須要此處進行喚醒。

獨佔式獲取響應中斷

AQS 提供了acquire(int arg) 方法,以供獨佔式獲取同步狀態,可是該方法對中斷不響應,對線程進行中斷操做後,該線程會依然位於CLH同步隊列中,等待着獲取同步狀態。

爲了響應中斷,AQS 提供了 #acquireInterruptibly(int arg) 方法。該方法在等待獲取同步狀態時,若是當前線程被中斷了,會馬上響應中斷,並拋出 InterruptedException 異常。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
複製代碼
  1. 首先,校驗該線程是否已經中斷了,若是是,則拋出InterruptedException 異常。
  2. 而後,調用 #tryAcquire(int arg) 方法,嘗試獲取同步狀態,若是獲取成功,則直接返回。
  3. 最後,調用 #doAcquireInterruptibly(int arg) 方法,自旋直到得到同步狀態成功,或線程中斷拋出 InterruptedException 異常。

doAcquireInterruptibly

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // <1>
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

它與 #acquire(int arg) 方法僅有兩個差異:

  1. 方法聲明拋出 InterruptedException 異常。

  2. 在中斷方法處再也不是使用 interrupted 標誌,而是直接拋出 InterruptedException 異常。

獨佔式超時獲取

AQS 除了提供上面兩個方法外,還提供了一個加強版的方法 #tryAcquireNanos(int arg, long nanos) 。該方法爲 #acquireInterruptibly(int arg) 方法的進一步加強,它除了響應中斷外,還有超時控制。即若是當前線程沒有在指定時間內獲取同步狀態,則會返回 false ,不然返回 true 。

流程圖以下:

代碼以下:

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
複製代碼
  1. 首先,校驗該線程是否已經中斷了,若是是,則拋出InterruptedException 異常。
  2. 而後,調用 #tryAcquire(int arg) 方法,嘗試獲取同步狀態,若是獲取成功,則直接返回。
  3. 最後,調用 #tryAcquireNanos(int arg) 方法,自旋直到得到同步狀態成功,或線程中斷拋出 InterruptedException 異常,或超過指定時間返回獲取同步狀態失敗。

tryAcquireNanos

static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // nanosTimeout <= 0
    if (nanosTimeout <= 0L)
        return false;
    // 超時時間
    final long deadline = System.nanoTime() + nanosTimeout;
    // 新增 Node 節點
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 獲取同步狀態成功
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            /* * 獲取失敗,作超時、中斷判斷 */
            // 從新計算須要休眠的時間
            nanosTimeout = deadline - System.nanoTime();
            // 已經超時,返回false
            if (nanosTimeout <= 0L)
                return false;
            // 若是沒有超時,則等待nanosTimeout納秒
            // 注:該線程會直接從LockSupport.parkNanos中返回,
            // LockSupport 爲 J.U.C 提供的一個阻塞和喚醒的工具類,後面作詳細介紹
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 線程是否已經中斷了
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

由於是在 #doAcquireInterruptibly(int arg) 方法的基礎上,作了超時控制的加強,因此相同部分,咱們直接跳過。

  1. 若是超時時間小於 0 ,直接返回 false ,已經超時。
  2. 計算最終超時時間 deadline 。
  3. 從新計算剩餘可獲取同步狀態的時間 nanosTimeout 。
  4. 若是剩餘時間小於 0 ,直接返回 false ,已經超時。
  5. 若是剩餘時間大於 spinForTimeoutThreshold ,則調用 LockSupport#parkNanos(Object blocker, long nanos) 方法,休眠 nanosTimeout 納秒。不然,就不須要休眠了,直接進入快速自旋的過程。緣由在於,spinForTimeoutThreshold 已經很是小了,很是短的時間等待沒法作到十分精確,若是這時再次進行超時等待,相反會讓 nanosTimeout 的超時從總體上面表現得不是那麼精確。因此,在超時很是短的場景中,AQS 會進行無條件的快速自旋。
  6. 若線程已經中斷了,拋出 InterruptedException 異常。

獨佔式同步狀態釋放

當線程獲取同步狀態後,執行完相應邏輯後,就須要釋放同步狀態。AQS 提供了#release(int arg)方法,釋放同步狀態。代碼以下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製代碼
  1. 調用 #tryRelease(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設置鎖狀態並返回 true ,不然獲取失敗,返回 false 。

  2. tryRelease(int arg) 方法,須要自定義同步組件本身實現,該方法必需要保證線程安全的釋放同步狀態。代碼以下:

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼

直接拋出 UnsupportedOperationException 異常。 3. 得到當前的 head ,避免併發問題。

  1. 頭結點不爲空,而且頭結點狀態不爲 0 ( INITAL 未初始化)。爲何會出現 0 的狀況呢?

  2. 調用 #unparkSuccessor(Node node) 方法,喚醒下一個節點的線程等待。

總結

  1. 在 AQS 中維護着一個 FIFO 的同步隊列。

  2. 當線程獲取同步狀態失敗後,則會加入到這個 CLH 同步隊列的對尾,並一直保持着自旋。

  3. 在 CLH 同步隊列中的線程在自旋時,會判斷其前驅節點是否爲首節點,若是爲首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步隊列。

  4. 當線程執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

共享式

共享式與獨佔式的最主要區別在於,同一時刻:

獨佔式只能有一個線程獲取同步狀態。
共享式能夠有多個線程獲取同步狀態。
複製代碼

例如,讀操做能夠有多個線程同時進行,而寫操做同一時刻只能有一個線程進行寫操做,其餘操做都會被阻塞。例子爲 ReentrantReadWriteLock 。

共享式同步狀態獲取

acquireShared(int arg) 方法,對標 #acquire(int arg) 方法。

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
 }
複製代碼

調用 #tryAcquireShared(int arg) 方法,嘗試獲取同步狀態,獲取成功則設置鎖狀態並返回大於等於 0 ,不然獲取失敗,返回小於 0 。

若獲取成功,直接返回,不用線程阻塞,獲取失敗則自旋直到得到同步狀態成功。

tryAcquireShared(int arg) 方法

須要自定義同步組件本身實現,該方法必需要保證線程安全的獲取同步狀態。代碼以下:

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼

直接拋出 UnsupportedOperationException 異常。

doAcquireShared

private void doAcquireShared(int arg) {
     // 共享式節點
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             // 前驅節點
             final Node p = node.predecessor();
             // 若是其前驅節點,獲取同步狀態
             if (p == head) {
                 // 嘗試獲取同步
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     if (interrupted)
                         selfInterrupt();
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) &&
                     parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }
複製代碼

由於和 #acquireQueued(int arg) 方法的基礎上,因此相同部分,直接跳過。

  1. 調用 #addWaiter(Node mode) 方法,將當前線程加入到 CLH 同步隊列尾部。而且, mode 方法參數爲 Node.SHARED ,表示共享模式。

  2. 調用 #tryAcquireShared(int arg) 方法,嘗試得到同步狀態。

  3. 調用 #setHeadAndPropagate(Node node, int propagate) 方法,設置新的首節點,並根據條件,喚醒下一個節點。這裏和獨佔式同步狀態獲取很大的不一樣:經過這樣的方式,不斷喚醒下一個共享式同步狀態, 從而實現同步狀態被多個線程的共享獲取。

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
複製代碼
  1. 記錄原來的首節點 h 。
  2. 調用 #setHead(Node node) 方法,設置 node 爲新的首節點。
  3. propagate > 0 代碼塊,說明同步狀態還能被其餘線程獲取。
  4. 判斷原來的或者新的首節點,等待狀態爲 Node.PROPAGATE 或者 Node.SIGNAL 時,能夠繼續向下喚醒。
  5. 調用 Node#isShared() 方法,判斷下一個節點爲共享式獲取同步狀態。
  6. 調用 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。

共享式獲取響應中斷

acquireSharedInterruptibly(int arg) 方法

代碼以下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

共享式超時獲取

tryAcquireSharedNanos(int arg, long nanosTimeout) 方法

代碼以下:

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

共享式同步狀態釋放

當線程獲取同步狀態後,執行完相應邏輯後,就須要釋放同步狀態。AQS 提供了#releaseShared(int arg)方法,釋放同步狀態。代碼以下:

public final boolean releaseShared(int arg) {
     if (tryReleaseShared(arg)) {
         doReleaseShared();
         return true;
     }
     return false;
 }
複製代碼

調用 #tryReleaseShared(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設置鎖狀態並返回 true ,不然獲取失敗,返回 false 。調用 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。

tryReleaseShared(int arg) 方法

須要自定義同步組件本身實現,該方法必需要保證線程安全的釋放同步狀態。代碼以下:

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
複製代碼

直接拋出 UnsupportedOperationException 異常。

doReleaseShared

private void doReleaseShared() {
     /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */
     for (;;) {
         Node h = head;
         if (h != null && h != tail) {
             int ws = h.waitStatus;
             if (ws == Node.SIGNAL) {
                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                     continue;            // loop to recheck cases
                 unparkSuccessor(h);
             }
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                 continue;                // loop on failed CAS
         }
         if (h == head)                   // loop if head changed
             break;
     }
 }

複製代碼

AQS:阻塞和喚醒線程

parkAndCheckInterrupt

在線程獲取同步狀態時,若是獲取失敗,則加入 CLH 同步隊列,經過經過自旋的方式不斷獲取同步狀態,可是在自旋的過程當中,則須要判斷當前線程是否須要阻塞,其主要方法在acquireQueued(int arg) ,代碼以下:

// ... 省略前面無關代碼

if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

// ... 省略前面無關代碼
複製代碼

經過這段代碼咱們能夠看到,在獲取同步狀態失敗後,線程並非立馬進行阻塞,須要檢查該線程的狀態,檢查狀態的方法爲 #shouldParkAfterFailedAcquire(Node pred, Node node)方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞。

若是 #shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回 true ,則調用parkAndCheckInterrupt() 方法,阻塞當前線程。代碼以下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
複製代碼

開始,調用 LockSupport#park(Object blocker) 方法,將當前線程掛起,此時就進入阻塞等待喚醒的狀態。

而後,在線程被喚醒時,調用 Thread#interrupted()方法,返回當前線程是否被打斷,並清理打斷狀態。

public static boolean interrupted() {
        return currentThread().isInterrupted(true);
    }
    private native boolean isInterrupted(boolean ClearInterrupted);
複製代碼

因此,實際上,線程被喚醒有兩種狀況:

第一種,當前節點(線程)的前序節點釋放同步狀態時,喚醒了該線程 。
第二種,當前線程被打斷致使喚醒。
複製代碼

unparkSuccessor

當線程釋放同步狀態後,則須要喚醒該線程的後繼節點。代碼以下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 喚醒後繼節點
        return true;
    }
    return false;
}
複製代碼

調用 unparkSuccessor(Node node) 方法,喚醒後繼節點:

private void unparkSuccessor(Node node) {
    //當前節點狀態
    int ws = node.waitStatus;
    //當前狀態 < 0 則設置爲 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //當前節點的後繼節點
    Node s = node.next;
    //後繼節點爲null或者其狀態 > 0 (超時或者被中斷了)
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從tail節點來找可用節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //喚醒後繼節點
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼
  1. 可能會存在當前線程的後繼節點爲 null,例如:超時、被中斷的狀況。若是遇到這種狀況了,則須要跳過該節點。

  2. 可是,爲什麼是從 tail 尾節點開始,而不是從 node.next 開始呢?緣由在於,取消的 node.next.next 指向的是 node.next 本身。若是順序遍歷下去,會致使死循環。因此此時,只能採用 tail 回溯的辦法,找到第一個( 不是最新找到的,而是最前序的 )可用的線程。

  3. 可是,爲何取消的 node.next.next 指向的是 node.next 本身呢?在 #cancelAcquire(Node node) 的末尾,node.next = node; 代碼塊,取消的 node 節點,將其 next 指向了本身。 最後,調用 LockSupport的unpark(Thread thread) 方法,喚醒該線程。

LockSupport

LockSupport 是用來建立鎖和其餘同步類的基本線程阻塞原語。

每一個使用 LockSupport 的線程都會與一個許可與之關聯:

若是該許可可用,而且可在進程中使用,則調用 #park(...) 將會當即返回,不然可能阻塞。
若是許可尚不可用,則能夠調用 #unpark(...) 使其可用。
可是,注意許可不可重入,也就是說只能調用一次 park(...) 方法,不然會一直阻塞。
LockSupport 定義了一系列以 park 開頭的方法來阻塞當前線程,unpark(Thread thread) 方法來喚醒一個被阻塞的線程。
複製代碼

以下圖所示:

park(Object blocker)

方法的blocker參數,主要是用來標識當前線程在等待的對象,該對象主要用於問題排查和系統監控。

park 方法和 unpark(Thread thread) 方法,都是成對出現的。同時 unpark(Thread thread) 方法,必需要在 park 方法執行以後執行。固然,並非說沒有調用 unpark(Thread thread) 方法的線程就會一直阻塞

park 有一個方法,它是帶了時間戳的 #parkNanos(long nanos) 方法:爲了線程調度禁用當前線程,最多等待指定的等待時間,除非許可可用。

public static void park() {
    UNSAFE.park(false, 0L);
}
複製代碼

unpark

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}
複製代碼

實現原理

從上面能夠看出,其內部的實現都是經過 sun.misc.Unsafe 來實現的,其定義以下:

// UNSAFE.java
public native void park(boolean var1, long var2);
public native void unpark(Object var1);
複製代碼
相關文章
相關標籤/搜索