死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

問題

(1)條件鎖是什麼?java

(2)條件鎖適用於什麼場景?node

(3)條件鎖的await()是在其它線程signal()的時候喚醒的嗎?git

簡介

條件鎖,是指在獲取鎖以後發現當前業務場景本身沒法處理,而須要等待某個條件的出現才能夠繼續處理時使用的一種鎖。源碼分析

好比,在阻塞隊列中,當隊列中沒有元素的時候是沒法彈出一個元素的,這時候就須要阻塞在條件notEmpty上,等待其它線程往裏面放入一個元素後,喚醒這個條件notEmpty,當前線程才能夠繼續去作「彈出一個元素」的行爲。學習

注意,這裏的條件,必須是在獲取鎖以後去等待,對應到ReentrantLock的條件鎖,就是獲取鎖以後才能調用condition.await()方法。ui

在java中,條件鎖的實現都在AQS的ConditionObject類中,ConditionObject實現了Condition接口,下面咱們經過一個例子來進入到條件鎖的學習中。this

使用示例

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 聲明一個重入鎖
        ReentrantLock lock = new ReentrantLock();
        // 聲明一個條件鎖
        Condition condition = lock.newCondition();

        new Thread(()->{
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待條件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 這裏睡1000ms是爲了讓上面的線程先獲取到鎖
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 這裏睡2000ms表明這個線程執行業務須要的時間
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知條件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}
複製代碼

上面的代碼很簡單,一個線程等待條件,另外一個線程通知條件已成立,後面的數字表明代碼實際運行的順序,若是你能把這個順序看懂基本條件鎖掌握得差很少了。spa

源碼分析

ConditionObject的主要屬性

public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}
複製代碼

能夠看到條件鎖中也維護了一個隊列,爲了和AQS的隊列區分,我這裏稱爲條件隊列,firstWaiter是隊列的頭節點,lastWaiter是隊列的尾節點,它們是幹什麼的呢?接着看。線程

lock.newCondition()方法

新建一個條件鎖。指針

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}
// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}
// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }
複製代碼

新建一個條件鎖最後就是調用的AQS中的ConditionObject類來實例化條件鎖。

condition.await()方法

condition.await()方法,代表如今要等待條件的出現。

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 若是線程中斷了,拋出異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加節點到Condition的隊列中,並返回該節點
    Node node = addConditionWaiter();
    // 徹底釋放當前線程獲取的鎖
    // 由於鎖是可重入的,因此這裏要把獲取的鎖所有釋放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步隊列中
    while (!isOnSyncQueue(node)) {
        // 阻塞當前線程
        LockSupport.park(this);
        
        // 上面部分是調用await()時釋放本身佔有的鎖,並阻塞本身等待條件的出現
        // *************************分界線************************* //
        // 下面部分是條件已經出現,嘗試去獲取鎖
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 嘗試獲取鎖,注意第二個參數,這是上一章分析過的方法
    // 若是沒獲取到會再次阻塞(這個方法這裏就不貼出來了,有興趣的翻翻上一章的內容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的節點
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 線程中斷相關
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 若是條件隊列的尾節點已取消,從頭節點開始清除全部已取消的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 從新獲取尾節點
        t = lastWaiter;
    }
    // 新建一個節點,它的等待狀態是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 若是尾節點爲空,則把新節點賦值給頭節點(至關於初始化隊列)
    // 不然把新節點賦值給尾節點的nextWaiter指針
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾節點指向新節點
    lastWaiter = node;
    // 返回新節點
    return node;
}
// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取狀態變量的值,重複獲取鎖,這個值會一直累加
        // 因此這個值也表明着獲取鎖的次數
        int savedState = getState();
        // 一次性釋放全部得到的鎖
        if (release(savedState)) {
            failed = false;
            // 返回獲取鎖的次數
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 若是等待狀態是CONDITION,或者前一個指針爲空,返回false
    // 說明尚未移到AQS的隊列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 若是next指針有值,說明已經移到AQS的隊列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 從AQS的尾節點開始往前尋找看是否能夠找到當前節點,找到了也說明已經在AQS的隊列中了
    return findNodeFromTail(node);
}
複製代碼

這裏有幾個難理解的點:

(1)Condition的隊列和AQS的隊列不徹底同樣;

AQS的隊列頭節點是不存在任何值的,是一個虛節點;

Condition的隊列頭節點是存儲着實實在在的元素值的,是真實節點。
複製代碼

(2)各類等待狀態(waitStatus)的變化;

首先,在條件隊列中,新建節點的初始等待狀態是CONDITION(-2);

其次,移到AQS的隊列中時等待狀態會更改成0(AQS隊列節點的初始等待狀態爲0);

而後,在AQS的隊列中若是須要阻塞,會把它上一個節點的等待狀態設置爲SIGNAL(-1);

最後,無論在Condition隊列仍是AQS隊列中,已取消的節點的等待狀態都會設置爲CANCELLED(1);

另外,後面咱們在共享鎖的時候還會講到另一種等待狀態叫PROPAGATE(-3)。
複製代碼

(3)類似的名稱;

AQS中下一個節點是next,上一個節點是prev;

Condition中下一個節點是nextWaiter,沒有上一個節點。
複製代碼

若是弄明白了這幾個點,看懂上面的代碼仍是輕鬆加愉快的,若是沒弄明白,彤哥這裏指出來了,但願您回頭再看看上面的代碼。

下面總結一下await()方法的大體流程:

(1)新建一個節點加入到條件隊列中去;

(2)徹底釋放當前線程佔有的鎖;

(3)阻塞當前線程,並等待條件的出現;

(4)條件已出現(此時節點已經移到AQS的隊列中),嘗試獲取鎖;

也就是說await()方法內部實際上是先釋放鎖->等待條件->再次獲取鎖的過程。

condition.signal()方法

condition.signal()方法通知條件已經出現。

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 若是不是當前線程佔有着鎖,調用這個方法拋出異常
    // 說明signal()也要在獲取鎖以後執行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 條件隊列的頭節點
    Node first = firstWaiter;
    // 若是有等待條件的節點,則通知它條件已成立
    if (first != null)
        doSignal(first);
}
// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到條件隊列的頭節點日後一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 至關於把頭節點從隊列中出隊
        first.nextWaiter = null;
        // 轉移節點到AQS隊列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把節點的狀態更改成0,也就是說即將移到AQS隊列中
    // 若是失敗了,說明節點已經被改爲取消狀態了
    // 返回false,經過上面的循環可知會尋找下一個可用節點
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 調用AQS的入隊方法把節點移到AQS的隊列中
    // 注意,這裏enq()的返回值是node的上一個節點,也就是舊尾節點
    Node p = enq(node);
    // 上一個節點的等待狀態
    int ws = p.waitStatus;
    // 若是上一個節點已取消了,或者更新狀態爲SIGNAL失敗(也是說明上一個節點已經取消了)
    // 則直接喚醒當前節點對應的線程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 若是更新上一個節點的等待狀態爲SIGNAL成功了
    // 則返回true,這時上面的循環不成立了,退出循環,也就是隻通知了一個節點
    // 此時當前節點仍是阻塞狀態
    // 也就是說調用signal()的時候並不會真正喚醒一個節點
    // 只是把節點從條件隊列移到AQS隊列中
    return true;
}
複製代碼

signal()方法的大體流程爲:

(1)從條件隊列的頭節點開始尋找一個非取消狀態的節點;

(2)把它從條件隊列移到AQS隊列;

(3)且只移動一個節點;

注意,這裏調用signal()方法後並不會真正喚醒一個節點,那麼,喚醒一個節點是在啥時候呢?

還記得開頭例子嗎?倒回去再好好看看,signal()方法後,最終會執行lock.unlock()方法,此時纔會真正喚醒一個節點,喚醒的這個節點若是曾經是條件節點的話又會繼續執行await()方法「分界線」下面的代碼。

結束了,仔細體會下^^

若是非要用一個圖來表示的話,我想下面這個圖能夠大體表示一下(這裏是用時序圖畫的,可是實際並不能算做一個真正的時序圖哈,瞭解就好):

ReentrantLock

總結

(1)重入鎖是指可重複獲取的鎖,即一個線程獲取鎖以後再嘗試獲取鎖時會自動獲取鎖;

(2)在ReentrantLock中重入鎖是經過不斷累加state變量的值實現的;

(3)ReentrantLock的釋放要跟獲取匹配,即獲取了幾回也要釋放幾回;

(4)ReentrantLock默認是非公平模式,由於非公平模式效率更高;

(5)條件鎖是指爲了等待某個條件出現而使用的一種鎖;

(6)條件鎖比較經典的使用場景就是隊列爲空時阻塞在條件notEmpty上;

(7)ReentrantLock中的條件鎖是經過AQS的ConditionObject內部類實現的;

(8)await()和signal()方法都必須在獲取鎖以後釋放鎖以前使用;

(9)await()方法會新建一個節點放到條件隊列中,接着徹底釋放鎖,而後阻塞當前線程並等待條件的出現;

(10)signal()方法會尋找條件隊列中第一個可用節點移到AQS隊列中;

(11)在調用signal()方法的線程調用unlock()方法才真正喚醒阻塞在條件上的節點(此時節點已經在AQS隊列中);

(12)以後該節點會再次嘗試獲取鎖,後面的邏輯與lock()的邏輯基本一致了。

彩蛋

爲何java有自帶的關鍵字synchronized了還須要實現一個ReentrantLock呢?

首先,它們都是可重入鎖;

其次,它們都默認是非公平模式;

而後,...,呃,咱們下一章繼續深刻探討 ReentrantLock VS synchronized。

推薦閱讀

  1. 死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

  2. 死磕 java同步系列之AQS起篇

  3. 死磕 java同步系列之本身動手寫一個鎖Lock

  4. 死磕 java魔法類之Unsafe解析

  5. 死磕 java同步系列之JMM(Java Memory Model)

  6. 死磕 java同步系列之volatile解析

  7. 死磕 java同步系列之synchronized解析


歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode
相關文章
相關標籤/搜索