美團後臺篇中的ReentrantLock

以前的文章中,簡單的介紹了ReentrantLock鎖。那麼這裏我就要進行裏面的方法以及屬性介紹啦(此文章基於裏面的非公平鎖進行說明)!!!java

ReentrantLock

ReentrantLock 特性概覽

ReentrantLock 意思爲可重入鎖,指的是一個線程可以對一個臨界資源重複加鎖。這裏就對ReentrantLock 跟經常使用的 Synchronized 進行比較。node

ReentrantLock Synchronized
鎖實現機制 依賴AQS 監視器模式
靈活性 支持響應中斷、超時、嘗試獲取鎖 不靈活
釋放形式 必須顯示調用unlock()進行解鎖 自動釋放監視器
鎖類型 必須顯示調用unlock()進行解鎖 自動釋放監視器
條件隊列 可關聯多個條件隊列 關聯一個條件隊列
可重入性 可重入 可重入

ReentrantLock 與 AQS 的關聯

final void lock() {
          if (compareAndSetState(0, 1)) // 設置同步狀態
              setExclusiveOwnerThread(Thread.currentThread());//當前線程設置爲獨佔線程。
          else
              acquire(1);// 設置失敗,進入acquire 方法進行後續處理。
      }
複製代碼

上面的代碼就是非公平鎖加鎖的方法。主要是作了兩點:編程

  • 若經過 CAS 設置變量 State(同步狀態)成功,也就是獲取鎖成功,則將當前 線程設置爲獨佔線程。
  • 若經過 CAS 設置變量 State(同步狀態)失敗,也就是獲取鎖失敗,則進入 Acquire 方法進行後續處理。

若是設置同步狀態失敗,則會進入到對應的acquire()方法中去進行加鎖處理。而acquire()不管是非公平鎖或公平鎖,最後調用的都是父類中的方法。c#

AQS(AbstractQueuedSynchronizer)

先經過下面的架構圖來總體瞭解一下 AQS 框架: 安全

  • 圖中有顏色的爲 Method,無顏色的爲 Attribution
  • 當有自定義同步器接入時,只需重寫第一層所須要的部分方法便可,不須要關 注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操做時,先通過第 一層的 API 進入 AQS 內部方法,而後通過第二層進行鎖的獲取,接着對於獲 取鎖失敗的流程,進入第三層和第四層的等待隊列處理,而這些處理方式均依 賴於第五層的基礎數據提供層。

AQS原理概覽:

若是被請求的共享資源空閒,那麼就將當前請求資源的線程設置爲有效的工做線程,將共享資源設置爲鎖定狀態;若是共享資源被佔用,就須要必定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是 CLH 隊列的變體實現的,將暫時獲取不到鎖的線程加入到隊列中。數據結構

CLH:Craig、Landin and Hagersten 隊列,是單向鏈表,AQS 中的隊列是CLH變體的虛擬雙向隊列(FIFO),AQS 是經過將每條請求共享資源的線程封裝成一個節點來實現鎖的分配。多線程

AQS 使用一個 Volatileint 類型的成員變量來表示同步狀態,經過內置的 FIFO 隊列來完成資源獲取的排隊工做,經過 CAS 完成對 State 值的修改。

AQS數據結構:

AQS中最基本的數據結構是-節點。內含方法以下:架構

解釋一下幾個方法和屬性值的含義:

方法和屬性值 含義
waitStatus 當前節點在隊列中的狀態
thread 表示處於該節點的線程
prev 前驅指針
predecessor 返回前驅節點,沒有的話拋出 NPE
nextWaiter 指向下一個處於 CONDITION 狀態的節點(因爲本篇文章不講述 ConditionQueue 隊列,這個指針很少介紹)
next 後繼指針

線程兩種鎖的模式:併發

模式 含義
SHARED 表示線程以共享的模式等待鎖
EXCLUSIVE 表示線程正在以獨佔的方式等待鎖

waitStatus 有下面幾個枚舉值:框架

枚舉 含義
0 當一個 Node 被初始化的時候的默認值
CANCELLED 爲 1,表示線程獲取鎖的請求已經取消了
CONDITION 爲 -2,表示節點在等待隊列中,節點線程等待喚醒
PROPAGATE 爲 -3,當前線程處在 SHARED 狀況下,該字段纔會使用
SIGNAL 爲 -1,表示線程已經準備好了,就等資源釋放了

AQS中的同步狀態:

AQS 中維護了一個名爲 state 的字段,意爲同步狀態,是由 Volatile 修飾的,用於展現當前臨界 資源的獲鎖狀況。

/** * The synchronization state. */
  private volatile int state;
複製代碼

獨佔模式狀況下:

共享模式狀況下:

 AQS 重要方法與 ReentrantLock 的關聯

下面列舉了自定義同步器須要實現如下方法,通常來講,自定義同步器要麼是獨佔方式,要麼是共享方式,它們也只需實現tryAcquire-tryReleasetryAcquireShared-tryReleaseShared 中 的 一 種 便可。AQS 也支持自定義同步器同時實現獨佔和共享兩種方式,如 ReentrantReadWriteLockReentrantLock是獨佔鎖,因此實現了tryAcquire-tryRelease

方法名 描述
protected boolean isHeldExclusively() 該線程是否正在獨佔資源。只有用到 Condition 才須要去實現它。
protected boolean tryAcquire(int arg) 獨佔方式。arg 爲獲取鎖的次數,嘗試獲取資源,成功則返回 True,失敗則返回 False。
protected boolean tryRelease(int arg) 獨佔方式。arg 爲釋放鎖的次數,嘗試釋放資源,成功則返回 True,失敗則返回 False。
protected int tryAcquireShared(int arg) 共享方式。arg爲獲取鎖的次數,嘗試獲取資源。負數表示失敗;0 表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
protected boolean tryReleaseShared(int arg) 共享方式。arg 爲釋放鎖的次數,嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回 True,不然返回 False。

下圖舉例說明 非公平鎖與AQS之間方法的關聯之處:

加鎖和解鎖的交互流程:

加鎖:

  1. 經過 ReentrantLock 的加鎖方法 Lock 進行加鎖操做。
  2. 會調用到內部類 Sync 的 Lock 方法,因爲 Sync#lock 是抽象方法,根據 ReentrantLock 初始化選擇的公平鎖和非公平鎖,執行相關內部類的 Lock 方 法,本質上都會執行 AQS 的 Acquire 方法。
  3. AQS 的 Acquire 方法會執行 tryAcquire 方法,可是因爲 tryAcquire 須要自 定義同步器實現,所以執行了 ReentrantLock 中的 tryAcquire 方法,因爲 ReentrantLock 是經過公平鎖和非公平鎖內部類實現的 tryAcquire 方法,所以會根據鎖類型不一樣,執行不一樣的 tryAcquire。
  4. tryAcquire 是獲取鎖邏輯,獲取失敗後,會執行框架 AQS 的後續邏輯,跟 ReentrantLock 自定義同步器無關。

解鎖:

  1. 經過 ReentrantLock 的解鎖方法 Unlock 進行解鎖。
  2. Unlock 會調用內部類 Sync 的 Release 方法,該方法繼承於 AQS。
  3. Release 中會調用 tryRelease 方法,tryRelease 須要自定義同步器實現, tryRelease 只在 ReentrantLock 中的 Sync 實現,所以能夠看出,釋放鎖的 過程,並不區分是否爲公平鎖。
  4. 釋放成功後,全部處理由 AQS 框架完成,與自定義同步器無關。

從上面的描述,大概能夠總結出 ReentrantLock 加鎖解鎖時 API 層核心方法的映射關係:

經過ReentrantLock理解 AQS

從上面的簡單分析,咱們知道若是當前線程沒有獲取到鎖的話,則會進入到等待隊列中去,咱們接下來看看線程是什麼時候以及怎樣被加入進等待隊列中的。

線程加入等待隊列

當執行 Acquire(1) 時,會經過 tryAcquire 獲取鎖。在這種狀況下,若是獲取鎖 失敗,就會調用 addWaiter 加入到等待隊列中去。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
複製代碼

主要的流程以下:

  1. 經過當前的線程和鎖模式新建一個節點。
  2. Pred 指針指向尾節點 Tail。
  3. 將 New 中 Node 的 Prev 指針指向 Pred。
  4. 經過 compareAndSetTail 方法,完成尾節點的設置。這個方法主要是對 tailOffset 和 Expect 進行比較,若是 tailOffset 的 Node 和 Expect 的 Node 地址是相同的,那麼設置 Tail 的值爲 Update 的值(利用的是CAS)。
  5. 若是 Pred 指針是 Null(說明等待隊列中沒有元素),或者當前 Pred 指針和 Tail 指向的位置不一樣(說明被別的線程已經修改),就須要看一下 Enq 的方法。
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

若是沒有被初始化,須要進行初始化一個頭結點出來。但請注意,初始化的頭 結點並非當前線程節點,而是調用了無參構造函數的節點。若是經歷了初始化或 者併發致使隊列中有元素,則與以前的方法相同。其實,addWaiter 就是一個在雙 端鏈表添加尾節點的操做,須要注意的是,雙端鏈表的頭結點是一個無參構造函數 的頭結點。

總結下線程獲取鎖的步驟:

  1. 當沒有線程獲取到鎖時,線程 1 獲取鎖成功。
  2. 線程 2 申請鎖,可是鎖被線程 1 佔有。
  3. 若是再有線程要獲取鎖,依次在隊列中日後排隊便可。

回到上邊的代碼,hasQueuedPredecessors 是公平鎖加鎖時判斷等待隊列中 是否存在有效節點的方法。若是返回 False,說明當前線程能夠爭取共享資源;若是 返回 True,說明隊列中存在有效節點,當前線程必須加入到等待隊列中。

public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
複製代碼

看到這裏,咱們理解一下 h != t && ((s = h.next) == null || s.thread != Thread. currentThread());

爲何要判斷的頭結點的下一個節點?第一個節點儲存的數據是 什麼?

其實在雙向鏈表中,第一個節點爲虛節點,其實並不存儲任何信息,只是佔位。真 正的第一個有數據的節點,是在第二個節點開始的。當 h != t 時:若是 (s =h.next) == null,等待隊列正在有線程進行初始化,但只是進行到了 Tail 指 向 Head,沒有將Head 指向 Tail,此時隊列中有元素,須要返回 True(這塊 具體見下邊代碼分析)。 若是 (s = h.next) != null,說明此時隊列中至少有一 個有效節點。若是此時 s.thread == Thread.currentThread(),說明等待隊 列的第一個有效節點中的線程與當前線程相同,那麼當前線程是能夠獲取資源 的;若是 s.thread != Thread.currentThread(),說明等待隊列的第一個有效 節點線程與當前線程不一樣,當前線程必須加入進等待隊列。

1 if (t == null) { // Must initialize
 2               if (compareAndSetHead(new Node()))
 3                   tail = head;
 4           } else {
 5               node.prev = t;
 6               if (compareAndSetTail(t, node)) {
 7                   t.next = node;
 8                   return t;
 9               }
 10           }
複製代碼

節點入隊不是原子操做,因此會出現短暫的 head != tail,此時 Tail 指向最後 一個節點,並且 Tail 指向 Head。若是 Head 沒有指向 Tail(可見 五、六、7 行), 這種狀況下也須要將相關線程加入隊列中。因此這塊代碼是爲了解決極端狀況下的 併發問題。

等待隊列中線程出隊列時機

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

上文解釋了 addWaiter 方法,這個方法其實就是把對應的線程以 Node 的數據 結構形式加入到雙端隊列裏,返回的是一個包含該線程的 Node。而這個Node 會做爲參數,進入到 acquireQueued方法中。acquireQueued 方法能夠對排隊中的線 程進行「獲鎖」操做。 總的來講,一個線程獲取鎖失敗了,被放入等待隊列,acquireQueued 會把放 入隊列中的線程不斷去獲取鎖,直到獲取成功或者再也不須要獲取(中斷)。 下面咱們從「什麼時候出隊列?」和「如何出隊列?」兩個方向來分析一下 acquireQueued源碼:

final boolean acquireQueued(final Node node, int arg) {
        // 標記是否成功拿到資源
        boolean failed = true;
        try {
             // 標記等待過程當中是否中斷過
            boolean interrupted = false;
            // 開始自旋,要麼獲取鎖,要麼中斷
            for (;;) {
                // 獲取當前節點的前驅節點
                final Node p = node.predecessor();
                // 若是 p 是頭結點,說明當前節點在真實數據隊列的首部,就嘗試獲取鎖(別忘了頭結點是虛節點)
                if (p == head && tryAcquire(arg)) {
                    // 獲取鎖成功,頭指針移動到當前 node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 說明 p 爲頭節點且當前沒有獲取到鎖(多是非公平鎖被搶佔了)或者是 p不爲頭結點,這個時候就要判斷當前 node 是否要被阻塞(被阻塞條件:前驅節點的waitStatus 爲 -1),防止無限循環浪費資源。具體兩個方法下面細細分析
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

注:setHead 方法是把當前節點置爲虛節點,但並無修改 waitStatus,由於 它是一直須要用的數據。

private void setHead(Node node) {
         head = node;
         node.thread = null;
         node.prev = null; 
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer
// 靠前驅節點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 獲取頭結點的節點狀態
        int ws = pred.waitStatus;
        // 說明頭結點處於喚醒狀態
        if (ws == Node.SIGNAL)
            return true; 
        // 經過枚舉值咱們知道 waitStatus>0 是取消狀態
        if (ws > 0) {
            do {
            // 循環向前查找取消節點,把取消節點從隊列中剔除
             node.prev = pred = pred.prev;
             } while (pred.waitStatus > 0);
            pred.next = node;
         } else {
        // 設置前任節點等待狀態爲 SIGNAL
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
         }
        return false; 
    
}
複製代碼

parkAndCheckInterrupt 主要用於掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態。

private final boolean parkAndCheckInterrupt() {
         LockSupport.park(this);
         return Thread.interrupted();
}
複製代碼

上述方法的流程圖以下:

從上圖能夠看出,跳出當前循環的條件是當「前置節點是頭結點,且當前線程獲 取鎖成功」。爲了防止因死循環致使 CPU 資源被浪費,咱們會判斷前置節點的狀態 來決定是否要將當前線程掛起,具體掛起流程用流程圖表示以下(shouldParkAfterFailedAcquire 流程):

從隊列中釋放節點的疑慮打消了,那麼又有新問題了:

  1. shouldParkAfterFailedAcquire中取消節點是怎麼生成的呢?何時會把一個節點的 waitStatus 設置爲-1 ?
  2. 是在什麼時間釋放節點通知到被掛起的線程呢?

### CANCELLED 狀態節點生成

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

經過 cancelAcquire 方法,將 Node 的狀態標記爲 CANCELLED。接下來, 咱們逐行來分析這個方法的原理:

private void cancelAcquire(Node node) {
        // 將無效節點過濾
        if (node == null)
            return;
        // 設置該節點不關聯任何線程,也就是虛節點
        node.thread = null;
        Node pred = node.prev;
        // 經過前驅節點,跳過取消狀態的 node
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 獲取過濾後的前驅節點的後繼節點
        Node predNext = pred.next;
        // 把當前 node 的狀態設置爲 CANCELLED
        node.waitStatus = Node.CANCELLED;
        // 若是當前節點是尾節點,將從後往前的第一個非取消狀態的節點設置爲尾節點
        // 更新失敗的話,則進入 else,若是更新成功,將 tail 的後繼節點設置爲 null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // 若是當前節點不是 head 的後繼節點,
            // 1: 判斷當前節點前驅節點的是否爲 SIGNAL,
            // 2: 若是不是,則把前驅節點設置爲 SINGAL 看是否成功
            // 若是 1 和 2 中有一個爲 true,再判斷當前節點的線程是否爲 null
            // 若是上述條件都知足,把當前節點的前驅節點的後繼指針指向當前節點的後繼節點
            int ws;
            if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 若是當前節點是 head 的後繼節點,或者上述條件不知足,那就喚醒當前節點的後繼節點
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
複製代碼

當前流程:

  1. 獲取當前節點的前驅節點,若是前驅節點的狀態是 CANCELLED,那就一直 往前遍歷,找到第一個 waitStatus <= 0 的節點,將找到的 Pred 節點和當前 Node 關聯,將當前 Node 設置爲 CANCELLED。

根據當前節點的位置,考慮如下三種狀況:

1. 當前節點是尾節點。
2. 當前節點是 Head 的後繼節點。
3. 當前節點不是 Head 的後繼節點,也不是尾節點。
複製代碼

根據上述第二條,咱們來分析每一種狀況的流程。

當前節點是尾節點:

當前節點是 Head 的後繼節點:
當前節點不是 Head 的後繼節點,也不是尾節點:

經過上面的流程,咱們對於 CANCELLED 節點狀態的產生和變化已經有了大體 的瞭解,可是爲何全部的變化都是對 Next 指針進行了操做,而沒有對 Prev 指針 進行操做呢?什麼狀況下會對 Prev 指針進行操做?

  1. 執行 cancelAcquire 的時候,當前節點的前置節點可能已經從隊列中出去了 (已經執行過 Try 代碼塊中的 shouldParkAfterFailedAcquire 方法了),若是此時修改 Prev 指針,有可能會致使 Prev 指向另外一個已經移除隊列的 Node, 所以這塊變化 Prev 指針不安全。 shouldParkAfterFailedAcquire 方法中, 會執行下面的代碼,其實就是在處理Prev 指針。shouldParkAfterFailedAcquire 是獲取鎖失敗的狀況下才會執行,進入該方法後,說明共享資源已 被獲取,當前節點以前的節點都不會出現變化,所以這個時候變動 Prev 指針 比較安全。
do {
 node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
複製代碼

如何解鎖

因爲 ReentrantLock 在解鎖的時候,並不區分公平鎖和非公平鎖,因此咱們直接看解鎖的源碼:

public void unlock() {
    sync.release(1);
}
複製代碼
public final boolean release(int arg) {
        if (tryRelease(arg)) {
             Node h = head;
        // 頭結點不爲空而且頭結點的 waitStatus 不是初始化節點狀況,解除線程掛起狀態
        if (h != null && h.waitStatus != 0)
             unparkSuccessor(h);
            return true;
         }
        return false; 
}
複製代碼
// 方法返回當前鎖是否是沒有被線程持有
protected final boolean tryRelease(int releases) {
    // 減小可重入次數
    int c = getState() - releases;
    // 當前線程不是持有鎖的線程,拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 若是持有線程所有釋放,將當前獨佔鎖全部線程設置爲 null,並更新 state
    if (c == 0) {
         free = true;
         setExclusiveOwnerThread(null);
     }
    setState(c);
    return free;
}
複製代碼

這裏的判斷條件爲何是 h != null && h.waitStatus != 0h == null 則說明Head 還沒初始化。初始狀況下,head == null,第一個節點入隊,Head 會被初始化一個虛擬節點。因此說,這裏若是還沒來得及入隊,就會出 現 head == null 的狀況。 h != null && waitStatus == 0 代表後繼節點對應的線程仍在運行中,不須要喚醒。 h != null && waitStatus < 0 代表後繼節點可能被阻塞了,須要喚醒。

private void unparkSuccessor(Node node) {
    // 獲取頭結點 waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取當前節點的下一個節點
     Node s = node.next;
    // 若是下個節點是 null 或者下個節點被 cancelled,就找到隊列最開始的非cancelled 的節點
    if (s == null || s.waitStatus > 0) {
         s = null;
    // 就從尾部節點開始找,到隊首,找到隊列第一個 waitStatus<0 的節點。
    for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
             }
    // 若是當前節點的下個節點不爲空,並且狀態 <=0,就把當前節點 unpark
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製代碼

爲何要從後往前找第一個非 Cancelled 的節點呢?緣由以下。 以前的 addWaiter 方法:

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
    if (pred != null) {
         node.prev = pred;
    if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
    return node;
}
複製代碼

從這裏能夠看到節點入隊並非原子操做,也就是說,node.prev = pred;compareAndSetTail(pred, node) 這兩個地方能夠看做Tail 入隊的原子操做, 可是此時 pred.next = node; 還沒執行,若是這個時候執行了 unparkSuccessor 方法,就沒辦法從前日後找了,因此須要從後往前找。還有一點緣由,在產生 CANCELLED 狀態節點的時候,先斷開的是 Next 指針,Prev 指針並未斷開,所以 也是必需要從後往前遍歷纔可以遍歷徹底部的 Node。 綜 上 所 述, 如 果 是 從 前 往 後 找, 由 於 極 端 情 況 下 入 隊 的 非 原 子 操 做 和 CANCELLED 節點產生過程當中斷開 Next 指針的操做,可能會致使沒法遍歷所 有的節點。因此,喚醒對應的線程後,對應的線程就會繼續往下執行。繼續執行acquireQueued 方法之後,中斷如何處理?

中斷恢復後的執行流程

喚醒後,會執行 return Thread.interrupted();,這個函數返回的是當前執行線程的中斷狀態,並清除。

private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);
    return Thread.interrupted();
}
複製代碼

再 回 到 acquireQueued 代 碼, 當 parkAndCheckInterrupt 返 回True 或者 False 的時候,interrupted 的值不一樣,但都會執行下次循環。若是這個時候獲取鎖成功,就會把當前 interrupted返回。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                     setHead(node);
                     p.next = null; // help GC
                     failed = false;
                    return interrupted;
                 }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
             }
         } finally {
            if (failed)
              cancelAcquire(node);
         }
}
複製代碼

若是 acquireQueued 爲 True,就會執行 selfInterrupt 方法。

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
複製代碼

該方法實際上是爲了中斷線程。但爲何獲取了鎖之後還要中斷線程呢?這部分屬 於 Java 提供的協做式中斷知識內容,感興趣同窗能夠查閱一下。這裏簡單介紹一下:

  1. 當中斷線程被喚醒時,並不知道被喚醒的緣由,多是當前線程在等待中被中斷,也多是釋放了鎖之後被喚醒。所以咱們經過 Thread.interrupted() 方法檢查中斷標記(該方法返回了當前線程的中斷狀態,並將當前線程的中斷 標識設置爲 False),並記錄下來,若是發現該線程被中斷過,就再中斷一次。
  2. 線程在等待資源的過程當中被喚醒,喚醒後仍是會不斷地去嘗試獲取鎖,直到搶到鎖爲止。也就是說,在整個流程中,並不響應中斷,只是記錄中斷記錄。 最後搶到鎖返回了,那麼若是被中斷過的話,就須要補充一次中斷。 這裏的處理方式主要是運用線程池中基本運做單元 Worder 中的 runWorker, 經過 Thread.interrupted() 進行額外的判斷處理,感興趣的同窗能夠看下 ThreadPoolExecutor 源碼。

小結

Q:某個線程獲取鎖失敗的後續流程是什麼呢?

A:存在某種排隊等候機制,線程繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。

Q:既然說到了排隊等候機制,那麼就必定會有某種隊列造成,這樣的隊列是什麼數據結構呢?

A:是 CLH 變體的 FIFO 雙端隊列。

Q:處於排隊等候機制中的線程,何時能夠有機會獲取鎖呢?

A:能夠詳細看下上面的 ==> 等待隊列中線程出隊列時機

Q:若是處於排隊等候機制中的線程一直沒法獲取鎖,須要一直等待麼?仍是有別的策略來解決這一問?

A:線程所在節點的狀態會變成取消狀態,取消狀態的節點會從隊列中釋放,具體可看上文的 ==>CANCELLED狀態節點生成

Q:Lock 函數經過 Acquire 方法進行加鎖,可是具體是如何加鎖的呢?

A:AQS 的 Acquire 會調用 tryAcquire 方法,tryAcquire 由各個自定義同步器實現,經過 tryAcquire 完成加鎖過程。

AQS 應用

ReentrantLock 的可重入應用

ReentrantLock 的可重入性是 AQS很好的應用之一,在瞭解完上述知識點之後,咱們得知ReentrantLock實現可重入的方法。在 ReentrantLock 裏面,無論是公平鎖仍是非公平鎖,都有一段邏輯。

公平鎖:

if (c == 0) {
    if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
     }
}else if (current == getExclusiveOwnerThread()) {
     int nextc = c + acquires;
     if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
     setState(nextc);
     return true; 
}
複製代碼

非公平鎖:

if (c == 0) {
    if (compareAndSetState(0, acquires)){
        setExclusiveOwnerThread(current);
        return true;
     }
}else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true; 
}
複製代碼

從上面這兩段均可以看到,有一個同步狀態 State 來控制總體可重入的狀況。StateVolatile修飾的,用於保證必定的可見性和有序性。

接下來看 State 這個字段主要的過程:

  1. State 初始化的時候爲 0,表示沒有任何線程持有鎖。
  2. 當有線程持有該鎖時,值就會在原來的基礎上 +1,同一個線程屢次得到鎖是,就會屢次 +1,這裏就是可重入的概念。
  3. 解鎖也是對這個字段 -1,一直到 0,此線程對鎖釋放。

JUC中的應用場景

除了上邊 ReentrantLock 的可重入性的應用,AQS 做爲併發編程的框架,爲不少其餘同步工具提供了良好的解決方案。下面列出了 JUC 中的幾種同步工具,大致介紹一下 AQS 的應用場景:

同步工具 同步工具與AQS的關聯
ReentrantLock 使用 AQS 保存鎖重複持有的次數。當一個線程獲取鎖時,ReentrantLock記錄當前得到鎖的線程標識,用於檢測是否重複獲取,以及錯誤線程試圖解鎖操做時異常狀況的處理。
Semaphore 使用 AQS 同步狀態來保存信號量的當前計數。tryRelease 會增長計數,acquireShared 會減小計數。
CountDownLatch 使用 AQS 同步狀態來表示計數。計數爲 0 時,全部的 Acquire 操做(CountDownLatch 的 await 方法)才能夠經過。
ReentrantReadWriteLock 使用 AQS 同步狀態中的 16 位保存寫鎖持有的次數,剩下的 16 位用於保存讀鎖的持有次數。
ThreadPoolExecutor Worker 利用 AQS 同步狀態實現對獨佔線程變量的設置(tryAcquire 和tryRelease)。

總結

咱們平常開發中使用併發的場景太多,可是對併發內部的基本框架原理了解的人卻很少。並且多線程狀況下,尋找問題所在也是一個很頭大的問題。只有夯實基礎,才能走的更遠。

參考文章: 美團後臺篇

相關文章
相關標籤/搜索