Java併發指南8:AQS中的公平鎖與非公平鎖,Condtion

一行一行源碼分析清楚 AbstractQueuedSynchronizer (二)html

轉自https://www.javadoop.com/post...java

文章比較長,信息量比較大,建議在 pc 上閱讀。文章標題是爲了呼應前文,其實能夠單獨成文的,主要是但願讀者看文章能系統看。node

本文關注如下幾點內容:linux

  1. 深刻理解 ReentrantLock 公平鎖和非公平鎖的區別
  2. 深刻分析 AbstractQueuedSynchronizer 中的 ConditionObject
  3. 深刻理解 java 線程中斷和 InterruptedException 異常

基本上本文把以上幾點都說清楚了,我假設讀者看過上一篇文章中對 AbstractQueuedSynchronizer 的介紹 ,固然若是你已經熟悉 AQS 中的獨佔鎖了,那也能夠直接看這篇。各小節之間基本上沒什麼關係,你們能夠只關注本身感興趣的部分。git

公平鎖和非公平鎖

ReentrantLock 默認採用非公平鎖,除非你在構造方法中傳入參數 true 。面試

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

公平鎖的 lock 方法:數據庫

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 1\. 和非公平鎖相比,這裏多了一個判斷:是否有線程在等待
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

非公平鎖的 lock 方法:編程

static final class NonfairSync extends Sync {
    final void lock() {
        // 2\. 和公平鎖相比,這裏會直接先進行一次CAS,成功就返回了
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    // AbstractQueuedSynchronizer.acquire(int arg)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

總結:公平鎖和非公平鎖只有兩處不一樣:

  1. 非公平鎖在調用 lock 後,首先就會調用 CAS 進行一次搶鎖,若是這個時候恰巧鎖沒有被佔用,那麼直接就獲取到鎖返回了。
  2. 非公平鎖在 CAS 失敗後,和公平鎖同樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,若是發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS 搶鎖,可是公平鎖會判斷等待隊列是否有線程處於等待狀態,若是有則不去搶鎖,乖乖排到後面。

公平鎖和非公平鎖就這兩點區別,若是這兩次 CAS 都不成功,那麼後面非公平鎖和公平鎖是同樣的,都要進入到阻塞隊列等待喚醒。

相對來講,非公平鎖會有更好的性能,由於它的吞吐量比較大。固然,非公平鎖讓獲取鎖的時間變得更加不肯定,可能會致使在阻塞隊列中的線程長期處於飢餓狀態。

Condition

Tips: 這裏重申一下,要看懂這個,必需要先看懂上一篇關於 AbstractQueuedSynchronizer 的介紹,或者你已經有相關的知識了,不然這節確定是看不懂的。

咱們先來看看 Condition 的使用場景,Condition 常常能夠用在生產者-消費者的場景中,請看 Doug Lea 給出的這個例子:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // condition 依賴於 lock 來產生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 隊列已滿,等待,直到 not full 才能繼續生產
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消費
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 隊列爲空,等待,直到隊列 not empty,才能繼續消費
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

ArrayBlockingQueue 採用這種方式實現了生產者-消費者,因此請只把這個例子當作學習例子,實際生產中能夠直接使用 ArrayBlockingQueue)

咱們經常使用 obj.wait(),obj.notify() 或 obj.notifyAll() 來實現類似的功能,可是,它們是基於對象的監視器鎖的。須要深刻了解這幾個方法的讀者,能夠參考個人另外一篇文章《深刻分析 java 8 編程語言規範:Threads and Locks》。而這裏說的 Condition 是基於 ReentrantLock 實現的,而 ReentrantLock 是依賴於 AbstractQueuedSynchronizer 實現的。

在往下看以前,讀者內心要有一個總體的概念。condition 是依賴於 ReentrantLock 的,無論是調用 await 進入等待仍是 signal 喚醒,都必須獲取到鎖才能進行操做。

每一個 ReentrantLock 實例能夠經過調用屢次 newCondition 產生多個 ConditionObject 的實例:

final ConditionObject newCondition() {
    return new ConditionObject();
}

咱們首先來看下咱們關注的 Condition 的實現類 AbstractQueuedSynchronizer 類中的 ConditionObject

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // 條件隊列的第一個節點
          // 不要管這裏的關鍵字 transient,是不參與序列化的意思
        private transient Node firstWaiter;
        // 條件隊列的最後一個節點
        private transient Node lastWaiter;
        ......

在上一篇介紹 AQS 的時候,咱們有一個阻塞隊列,用於保存等待獲取鎖的線程的隊列。這裏咱們引入另外一個概念,叫條件隊列(condition queue),我畫了一張簡單的圖用來講明這個。

這裏的阻塞隊列若是叫作同步隊列(sync queue)其實比較貼切,不過爲了和前篇呼應,我就繼續使用阻塞隊列了。記住這裏的兩個概念, 阻塞隊列條件隊列

轉存失敗從新上傳取消condition-2

這裏,咱們簡單回顧下 Node 的屬性:

volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

prev 和 next 用於實現阻塞隊列的雙向鏈表,nextWaiter 用於實現條件隊列的單向鏈表

基本上,把這張圖看懂,你也就知道 condition 的處理流程了。因此,我先簡單解釋下這圖,而後再具體地解釋代碼實現。

  1. 咱們知道一個 ReentrantLock 實例能夠經過屢次調用 newCondition() 來產生多個 Condition 實例,這裏對應 condition1 和 condition2。注意,ConditionObject 只有兩個屬性 firstWaiter 和 lastWaiter;
  2. 每一個 condition 有一個關聯的條件隊列,如線程 1 調用 condition1.await() 方法便可將當前線程 1 包裝成 Node 後加入到條件隊列中,而後阻塞在這裏,不繼續往下執行,條件隊列是一個單向鏈表;
  3. 調用 condition1.signal() 會將condition1 對應的條件隊列的 firstWaiter 移到阻塞隊列的隊尾,等待獲取鎖,獲取鎖後 await 方法返回,繼續往下執行。

我這裏說的 一、二、3 是最簡單的流程,沒有考慮中斷、signalAll、還有帶有超時參數的 await 方法等,不過把這裏弄懂是這節的主要目的。

同時,從圖中也能夠很直觀地看出,哪些操做是線程安全的,哪些操做是線程不安全的。

這個圖看懂後,下面的代碼分析就簡單了。

接下來,咱們一步步按照流程來走代碼分析,咱們先來看看 wait 方法:

// 首先,這個方法是可被中斷的,不可被中斷的是另外一個方法 awaitUninterruptibly()
// 這個方法會阻塞,直到調用 signal 方法(指 signal() 和 signalAll(),下同),或被中斷
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到 condition 的條件隊列中
    Node node = addConditionWaiter();
    // 釋放鎖,返回值是釋放鎖以前的 state 值
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 這裏退出循環有兩種狀況,以後再仔細分析
    // 1\. isOnSyncQueue(node) 返回 true,即當前 node 已經轉移到阻塞隊列了
    // 2\. checkInterruptWhileWaiting(node) != 0 會到 break,而後退出循環,表明的是線程中斷
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被喚醒後,將進入阻塞隊列,等待獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

其實,我大致上也把整個 await 過程說得十之八九了,下面咱們分步把上面的幾個點用源碼說清楚。

1. 將節點加入到條件隊列

addConditionWaiter() 是將當前節點加入到條件隊列,看圖咱們知道,這種條件隊列內的操做是線程安全的。

// 將當前線程對應的節點入隊,插入隊尾
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 若是條件隊列的最後一個節點取消了,將其清除出去
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 這個方法會遍歷整個條件隊列,而後會將已取消的全部節點清除出隊列
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 若是隊列爲空
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

在addWaiter 方法中,有一個 unlinkCancelledWaiters() 方法,該方法用於清除隊列中已經取消等待的節點。

當 await 的時候若是發生了取消操做(這點以後會說),或者是在節點入隊的時候,發現最後一個節點是被取消的,會調用一次這個方法。

// 等待隊列是一個單向鏈表,遍歷鏈表將已經取消等待的節點清除出去
// 純屬鏈表操做,很好理解,看不懂多看幾遍就能夠了
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 若是節點的狀態不是 Node.CONDITION 的話,這個節點就是被取消的
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

2. 徹底釋放獨佔鎖

回到 wait 方法,節點入隊了之後,會調用 int savedState = fullyRelease(node); 方法釋放鎖,注意,這裏是徹底釋放獨佔鎖,由於 ReentrantLock 是能夠重入的。

// 首先,咱們要先觀察到返回值 savedState 表明 release 以前的 state 值
// 對於最簡單的操做:先 lock.lock(),而後 condition1.await()。
//         那麼 state 通過這個方法由 1 變爲 0,鎖釋放,此方法返回 1
//         相應的,若是 lock 重入了 n 次,savedState == n
// 若是這個方法失敗,會將節點設置爲"取消"狀態,並拋出異常 IllegalMonitorStateException
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 這裏使用了當前的 state 做爲 release 的參數,也就是徹底釋放掉鎖,將 state 置爲 0
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

3. 等待進入阻塞隊列

釋放掉鎖之後,接下來是這段,這邊會自旋,若是發現本身還沒到阻塞隊列,那麼掛起,等待被轉移到阻塞隊列。

int interruptMode = 0;
while (!isOnSyncQueue(node)) {
    // 線程掛起
    LockSupport.park(this);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

isOnSyncQueue(Node node) 用於判斷節點是否已經轉移到阻塞隊列了:

// 在節點入條件隊列的時候,初始化時設置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的時候須要將節點從條件隊列移到阻塞隊列,
// 這個方法就是判斷 node 是否已經移動到阻塞隊列了
final boolean isOnSyncQueue(Node node) {
    // 移動過去的時候,node 的 waitStatus 會置爲 0,這個以後在說 signal 方法的時候會說到
    // 若是 waitStatus 仍是 Node.CONDITION,也就是 -2,那確定就是還在條件隊列中
    // 若是 node 的前驅 prev 指向仍是 null,說明確定沒有在 阻塞隊列
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 若是 node 已經有後繼節點 next 的時候,那確定是在阻塞隊列了
    if (node.next != null) 
        return true;

    // 這個方法從阻塞隊列的隊尾開始從後往前遍歷找,若是找到相等的,說明在阻塞隊列,不然就是不在阻塞隊列

    // 能夠經過判斷 node.prev() != null 來推斷出 node 在阻塞隊列嗎?答案是:不能。
    // 這個能夠看上篇 AQS 的入隊方法,首先設置的是 node.prev 指向 tail,
    // 而後是 CAS 操做將本身設置爲新的 tail,但是此次的 CAS 是可能失敗的。

    // 調用這個方法的時候,每每咱們須要的就在隊尾的部分,因此通常都不須要徹底遍歷整個隊列的
    return findNodeFromTail(node);
}

// 從同步隊列的隊尾往前遍歷,若是找到,返回 true
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

回到前面的循環,isOnSyncQueue(node) 返回 false 的話,那麼進到 LockSupport.park(this); 這裏線程掛起。

4. signal 喚醒線程,轉移到阻塞隊列

爲了你們理解,這裏咱們先看喚醒操做,由於剛剛到 LockSupport.park(this); 把線程掛起了,等待喚醒。

喚醒操做一般由另外一個線程來操做,就像生產者-消費者模式中,若是線程由於等待消費而掛起,那麼當生產者生產了一個東西后,會調用 signal 喚醒正在等待的線程來消費。

// 喚醒等待了最久的線程
// 其實就是,將這個線程對應的 node 從條件隊列轉移到阻塞隊列
public final void signal() {
    // 調用 signal 方法的線程必須持有當前的獨佔鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

// 從條件隊列隊頭日後遍歷,找出第一個須要轉移的 node
// 由於前面咱們說過,有些線程會取消排隊,可是還在隊列中
private void doSignal(Node first) {
    do {
          // 將 firstWaiter 指向 first 節點後面的第一個
        // 若是將隊頭移除後,後面沒有節點在等待了,那麼須要將 lastWaiter 置爲 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 由於 first 立刻要被移到阻塞隊列了,和條件隊列的連接關係在這裏斷掉
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
      // 這裏 while 循環,若是 first 轉移不成功,那麼選擇 first 後面的第一個節點進行轉移,依此類推
}

// 將節點從條件隊列轉移到阻塞隊列
// true 表明成功轉移
// false 表明在 signal 以前,節點已經取消了
final boolean transferForSignal(Node node) {

    // CAS 若是失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION,說明節點已經取消,
    // 既然已經取消,也就不須要轉移了,方法返回,轉移後面一個節點
    // 不然,將 waitStatus 置爲 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // enq(node): 自旋進入阻塞隊列的隊尾
    // 注意,這裏的返回值 p 是 node 在阻塞隊列的前驅節點
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws > 0 說明 node 在阻塞隊列中的前驅節點取消了等待鎖,直接喚醒 node 對應的線程。喚醒以後會怎麼樣,後面再解釋
    // 若是 ws <= 0, 那麼 compareAndSetWaitStatus 將會被調用,上篇介紹的時候說過,節點入隊後,須要把前驅節點的狀態設爲 Node.SIGNAL(-1)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 若是前驅節點取消或者 CAS 失敗,會進到這裏喚醒線程,以後的操做看下一節
        LockSupport.unpark(node.thread);
    return true;
}

正常狀況下,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 這句中,ws <= 0,並且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 會返回 true,因此通常也不會進去 if 語句塊中喚醒 node 對應的線程。而後這個方法返回 true,也就意味着 signal 方法結束了,節點進入了阻塞隊列。

假設發生了阻塞隊列中的前驅節點取消等待,或者 CAS 失敗,只要喚醒線程,讓其進到下一步便可。

5. 喚醒後檢查中斷狀態

上一步 signal 以後,咱們的線程由條件隊列轉移到了阻塞隊列,以後就準備獲取鎖了。只要從新獲取到鎖了之後,繼續往下執行。

等線程從掛起中恢復過來,繼續往下看

int interruptMode = 0;
while (!isOnSyncQueue(node)) {
    // 線程掛起
    LockSupport.park(this);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

先解釋下 interruptMode。interruptMode 能夠取值爲 REINTERRUPT(1),THROW_IE(-1),0

  • REINTERRUPT: 表明 await 返回的時候,須要從新設置中斷狀態
  • THROW_IE: 表明 await 返回的時候,須要拋出 InterruptedException 異常
  • 0 :說明在 await 期間,沒有發生中斷

有如下三種狀況會讓 LockSupport.park(this); 這句返回繼續往下執行:

  1. 常規路勁。signal -> 轉移節點到阻塞隊列 -> 獲取了鎖(unpark)
  2. 線程中斷。在 park 的時候,另一個線程對這個線程進行了中斷
  3. signal 的時候咱們說過,轉移之後的前驅節點取消了,或者對前驅節點的CAS操做失敗了
  4. 假喚醒。這個也是存在的,和 Object.wait() 相似,都有這個問題

線程喚醒後第一步是調用 checkInterruptWhileWaiting(node) 這個方法,此方法用於判斷是否在線程掛起期間發生了中斷,若是發生了中斷,是 signal 調用以前中斷的,仍是 signal 以後發生的中斷。

// 1\. 若是在 signal 以前已經中斷,返回 THROW_IE
// 2\. 若是是 signal 以後中斷,返回 REINTERRUPT
// 3\. 沒有發生中斷,返回 0
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
Thread.interrupted():若是當前線程已經處於中斷狀態,那麼該方法返回 true,同時將中斷狀態重置爲 false,因此,纔有後續的  從新中斷(REINTERRUPT) 的使用。

看看怎麼判斷是 signal 以前仍是以後發生的中斷:

// 只有線程處於中斷狀態,纔會調用此方法
// 若是須要的話,將這個已經取消等待的節點轉移到阻塞隊列
// 返回 true:若是此線程在 signal 以前被取消,
final boolean transferAfterCancelledWait(Node node) {
    // 用 CAS 將節點狀態設置爲 0 
    // 若是這步 CAS 成功,說明是 signal 方法以前發生的中斷,由於若是 signal 先發生的話,signal 中會將 waitStatus 設置爲 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 將節點放入阻塞隊列
        // 這裏咱們看到,即便中斷了,依然會轉移到阻塞隊列
        enq(node);
        return true;
    }

    // 到這裏是由於 CAS 失敗,確定是由於 signal 方法已經將 waitStatus 設置爲了 0
    // signal 方法會將節點轉移到阻塞隊列,可是可能還沒完成,這邊自旋等待其完成
    // 固然,這種事情仍是比較少的吧:signal 調用以後,沒完成轉移以前,發生了中斷
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
這裏再說一遍,即便發生了中斷,節點依然會轉移到阻塞隊列。

到這裏,你們應該都知道這個 while 循環怎麼退出了吧。要麼中斷,要麼轉移成功。

6. 獲取獨佔鎖

while 循環出來之後,下面是這段代碼:

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;

因爲 while 出來後,咱們肯定節點已經進入了阻塞隊列,準備獲取鎖。

這裏的 acquireQueued(node, savedState) 的第一個參數 node 以前已經通過 enq(node) 進入了隊列,參數 savedState 是以前釋放鎖前的 state,這個方法返回的時候,表明當前線程獲取了鎖,並且 state == savedState了。

注意,前面咱們說過,無論有沒有發生中斷,都會進入到阻塞隊列,而 acquireQueued(node, savedState) 的返回值就是表明線程是否被中斷。若是返回 true,說明被中斷了,並且 interruptMode != THROW_IE,說明在 signal 以前就發生中斷了,這裏將 interruptMode 設置爲 REINTERRUPT,用於待會從新中斷。

繼續往下:

if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

本着一絲不苟的精神,這邊說說 node.nextWaiter != null 怎麼知足。我前面也說了 signal 的時候會將節點轉移到阻塞隊列,有一步是 node.nextWaiter = null,將斷開節點和條件隊列的聯繫。

但是,在判斷髮生中斷的狀況下,是 signal 以前仍是以後發生的? 這部分的時候,我也介紹了,若是 signal 以前就中斷了,也須要將節點進行轉移到阻塞隊列,這部分轉移的時候,是沒有設置 node.nextWaiter = null 的。

以前咱們說過,若是有節點取消,也會調用 unlinkCancelledWaiters 這個方法,就是這裏了。

7. 處理中斷狀態

到這裏,咱們終於能夠好好說下這個 interruptMode 幹嗎用了。

  • 0:什麼都不作。
  • THROW_IE:await 方法拋出 InterruptedException 異常
  • REINTERRUPT:從新中斷當前線程
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
爲何這麼處理?這部分的知識在本文的最後一節

* 帶超時機制的 await

通過前面的 7 步,整個 ConditionObject 類基本上都分析完了,接下來簡單分析下帶超時機制的 await 方法。

public final long awaitNanos(long nanosTimeout) 
                  throws InterruptedException
public final boolean awaitUntil(Date deadline)
                throws InterruptedException
public final boolean await(long time, TimeUnit unit)
                throws InterruptedException

這三個方法都差很少,咱們就挑一個出來看看吧:

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    // 等待這麼多納秒
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 當前時間 + 等待時長 = 過時時間
    final long deadline = System.nanoTime() + nanosTimeout;
    // 用於返回 await 是否超時
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 時間到啦
        if (nanosTimeout <= 0L) {
            // 這裏由於要 break 取消等待了。取消等待的話必定要調用 transferAfterCancelledWait(node) 這個方法
            // 若是這個方法返回 true,在這個方法內,將節點轉移到阻塞隊列成功
            // 返回 false 的話,說明 signal 已經發生,signal 方法將節點轉移了。也就是說沒有超時嘛
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // spinForTimeoutThreshold 的值是 1000 納秒,也就是 1 毫秒
        // 也就是說,若是不到 1 毫秒了,那就不要選擇 parkNanos 了,自旋的性能反而更好
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 獲得剩餘時間
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

超時的思路仍是很簡單的,不帶超時參數的 await 是 park,而後等待別人喚醒。而如今就是調用 parkNanos 方法來休眠指定的時間,醒來後判斷是否 signal 調用了,調用了就是沒有超時,不然就是超時了。超時的話,本身來進行轉移到阻塞隊列,而後搶鎖。

* 不拋出 InterruptedException 的 await

關於 Condition 最後一小節了。

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

很簡單,我就不廢話了。

AbstractQueuedSynchronizer 獨佔鎖的取消排隊

這篇文章說的是 AbstractQueuedSynchronizer,只不過好像 Condition 說太多了,趕忙把思路拉回來。

接下來,我想說說怎麼取消對鎖的競爭?

上篇文章提到過,最重要的方法是這個,咱們要在這裏面找答案:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先,到這個方法的時候,節點必定是入隊成功的。

我把 parkAndCheckInterrupt() 代碼貼過來:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

這兩段代碼聯繫起來看,是否是就清楚了。

若是咱們要取消一個線程的排隊,咱們須要在另一個線程中對其進行中斷。好比某線程調用 lock() 老久不返回,我想中斷它。一旦對其進行中斷,此線程會從 LockSupport.park(this); 中喚醒,而後 Thread.interrupted();返回 true。

咱們發現一個問題,即便是中斷喚醒了這個線程,也就只是設置了 interrupted = true 而後繼續下一次循環。並且,因爲 Thread.interrupted(); 會清除中斷狀態,第二次進 parkAndCheckInterrupt 的時候,返回會是 false。

因此,咱們要看到,在這個方法中,interrupted 只是用來記錄是否發生了中斷,而後用於方法返回值,其餘沒有作任何相關事情。

因此,咱們看外層方法怎麼處理 acquireQueued 返回 false 的狀況。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

因此說,lock() 方法處理中斷的方法就是,你中斷歸中斷,我搶鎖仍是照樣搶鎖,幾乎不要緊,只是我搶到鎖了之後,設置線程的中斷狀態而已,也不拋出任何異常出來。調用者獲取鎖後,能夠去檢查是否發生過中斷,也能夠不理會。

    • *

來條分割線。有沒有被騙的感受,我說了一大堆,但是和取消沒有任何關係啊。

咱們來看 ReentrantLock 的另外一個 lock 方法:

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

方法上多了個 throws InterruptedException ,通過前面那麼多知識的鋪墊,這裏我就再也不囉裏囉嗦了。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

繼續往裏:

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 就是這裏了,一旦異常,立刻結束這個方法,拋出異常。
                // 這裏再也不只是標記這個方法的返回值表明中斷狀態
                // 而是直接拋出異常,並且外層也不捕獲,一直往外拋到 lockInterruptibly
                throw new InterruptedException();
        }
    } finally {
        // 若是經過 InterruptedException 異常出去,那麼 failed 就是 true 了
        if (failed)
            cancelAcquire(node);
    }
}

既然到這裏了,順便說說 cancelAcquire 這個方法吧:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

到這裏,我想我應該把取消排隊這件事說清楚了吧。

再說 java 線程中斷和 InterruptedException 異常

在以前的文章中,咱們接觸了大量的中斷,這邊算是個總結吧。若是你徹底熟悉中斷了,沒有必要再看這節,本節爲新手而寫。

線程中斷

首先,咱們要明白,中斷不是相似 linux 裏面的命令 kill -9 pid,不是說咱們中斷某個線程,這個線程就中止運行了。中斷表明線程狀態,每一個線程都關聯了一箇中斷狀態,是一個 true 或 false 的 boolean 值,初始值爲 false。

關於中斷狀態,咱們須要重點關注如下幾個方法:

// Thread 類中的實例方法,持有線程實例引用便可檢測線程中斷狀態
public boolean isInterrupted() {}

// Thread 中的靜態方法,檢測調用這個方法的線程是否已經中斷
// 注意:這個方法返回中斷狀態的同時,會將此線程的中斷狀態重置爲 false
// 因此,若是咱們連續調用兩次這個方法的話,第二次的返回值確定就是 false 了
public static boolean interrupted() {}

// Thread 類中的實例方法,用於設置一個線程的中斷狀態爲 true
public void interrupt() {}

咱們說中斷一個線程,其實就是設置了線程的 interrupted status 爲 true,至於說被中斷的線程怎麼處理這個狀態,那是那個線程本身的事。如如下代碼:

while (!Thread.interrupted()) {
   doWork();
   System.out.println("我作完一件事了,準備作下一件,若是沒有其餘線程中斷個人話");
}

固然,中斷除了是線程狀態外,還有其餘含義,不然也不須要專門搞一個這個概念出來了。

若是線程處於如下三種狀況,那麼當線程被中斷的時候,能自動感知到:

  1. 來自 Object 類的 wait()、wait(long)、wait(long, int),

    來自 Thread 類的 join()、join(long)、join(long, int)、sleep(long)、sleep(long, int)

    這幾個方法的相同之處是,方法上都有: throws InterruptedException

    若是線程阻塞在這些方法上(咱們知道,這些方法會讓當前線程阻塞),這個時候若是其餘線程對這個線程進行了中斷,那麼這個線程會從這些方法中當即返回,拋出 InterruptedException 異常,同時重置中斷狀態爲 false。

  2. 實現了 InterruptibleChannel 接口的類中的一些 I/O 阻塞操做,如 DatagramChannel 中的 connect 方法和 receive 方法等

    若是線程阻塞在這裏,中斷線程會致使這些方法拋出 ClosedByInterruptException 並重置中斷狀態。
  3. Selector 中的 select 方法,這個有機會咱們在講 NIO 的時候說

    一旦中斷,方法當即返回

對於以上 3 種狀況是最特殊的,由於他們能自動感知到中斷(這裏說自動,固然也是基於底層實現),而且在作出相應的操做後都會重置中斷狀態爲 false

那是否是隻有以上 3 種方法能自動感知到中斷呢?不是的,若是線程阻塞在 LockSupport.park(Object obj) 方法,也叫掛起,這個時候的中斷也會致使線程喚醒,可是喚醒後不會重置中斷狀態,因此喚醒後去檢測中斷狀態將是 true。

InterruptedException 概述

它是一個特殊的異常,不是說 JVM 對其有特殊的處理,而是它的使用場景比較特殊。一般,咱們能夠看到,像 Object 中的 wait() 方法,ReentrantLock 中的 lockInterruptibly() 方法,Thread 中的 sleep() 方法等等,這些方法都帶有 throws InterruptedException,咱們一般稱這些方法爲阻塞方法(blocking method)。

阻塞方法一個很明顯的特徵是,它們須要花費比較長的時間(不是絕對的,只是說明時間不可控),還有它們的方法結束返回每每依賴於外部條件,如 wait 方法依賴於其餘線程的 notify,lock 方法依賴於其餘線程的 unlock等等。

當咱們看到方法上帶有 throws InterruptedException 時,咱們就要知道,這個方法應該是阻塞方法,咱們若是但願它能早點返回的話,咱們每每能夠經過中斷來實現。

除了幾個特殊類(如 Object,Thread等)外,感知中斷並提早返回是經過輪詢中斷狀態來實現的。咱們本身須要寫可中斷的方法的時候,就是經過在合適的時機(一般在循環的開始處)去判斷線程的中斷狀態,而後作相應的操做(一般是方法直接返回或者拋出異常)。固然,咱們也要看到,若是咱們一次循環花的時間比較長的話,那麼就須要比較長的時間才能注意到線程中斷了。

處理中斷

一旦中斷髮生,咱們接收到了這個信息,而後怎麼去處理中斷呢?本小節將簡單分析這個問題。

咱們常常會這麼寫代碼:

try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    // ignore
}
// go on

當 sleep 結束繼續往下執行的時候,咱們每每都不知道這塊代碼是真的 sleep 了 10 秒,仍是隻休眠了 1 秒就被中斷了。這個代碼的問題在於,咱們將這個異常信息吞掉了。(對於 sleep 方法,我相信大部分狀況下,咱們都不在乎是不是中斷了,這裏是舉例)

AQS 的作法很值得咱們借鑑,咱們知道 ReentrantLock 有兩種 lock 方法:

public void lock() {
    sync.lock();
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

前面咱們提到過,lock() 方法不響應中斷。若是 thread1 調用了 lock() 方法,過了好久還沒搶到鎖,這個時候 thread2 對其進行了中斷,thread1 是不響應這個請求的,它會繼續搶鎖,固然它不會把「被中斷」這個信息扔掉。咱們能夠看如下代碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 咱們看到,這裏也沒作任何特殊處理,就是記錄下來中斷狀態。
        // 這樣,若是外層方法須要去檢測的時候,至少咱們沒有把這個信息丟了
        selfInterrupt();// Thread.currentThread().interrupt();
}

而對於 lockInterruptibly() 方法,由於其方法上面有 throws InterruptedException ,這個信號告訴咱們,若是咱們要取消線程搶鎖,直接中斷這個線程便可,它會當即返回,拋出 InterruptedException 異常。

在併發包中,有很是多的這種處理中斷的例子,提供兩個方法,分別爲響應中斷和不響應中斷,對於不響應中斷的方法,記錄中斷而不是丟失這個信息。如 Condition 中的兩個方法就是這樣的:

void await() throws InterruptedException;
void awaitUninterruptibly();

一般,若是方法會拋出 InterruptedException 異常,每每方法體的第一句就是:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
     ...... 
}

熟練使用中斷,對於咱們寫出優雅的代碼是有幫助的,也有助於咱們分析別人的源碼。

參考: https://www.ibm.com/developerworks/library/j-jtp05236/index.html

翻譯:https://www.ibm.com/developerworks/cn/java/j-jtp05236.html

更多內容請關注微信公衆號【Java技術江湖】

一位阿里 Java 工程師的技術小站。做者黃小斜,專一 Java 相關技術:SSM、SpringBoot、MySQL、分佈式、中間件、集羣、Linux、網絡、多線程,偶爾講點Docker、ELK,同時也分享技術乾貨和學習經驗,致力於Java全棧開發!(關注公衆號後回覆」Java「便可領取 Java基礎、進階、項目和架構師等免費學習資料,更有數據庫、分佈式、微服務等熱門技術學習視頻,內容豐富,兼顧原理和實踐,另外也將贈送做者原創的Java學習指南、Java程序員面試指南等乾貨資源)

相關文章
相關標籤/搜索