Java併發編程系列21 | Condition-Lock的等待通知

Java併發編程系列21 | Condition-Lock的等待通知

收錄於話題
#進階架構師 | 併發編程專題
12個

點擊上方「java進階架構師」,選擇右上角「置頂公衆號」
20大進階架構專題每日送達
Java併發編程系列21 | Condition-Lock的等待通知
Java併發編程系列21 | Condition-Lock的等待通知
咱們知道 synchronized 鎖經過 Object 類的 wait()和 notify()方法實現線程間的等待通知機制,而比 synchronized 更靈活 Lock 鎖一樣也有實現等待通知機制的方式,那就是條件 Condition。本文將從如下幾個方面介紹 Condition:java

  • 如何使用 Condition
  • 源碼分析
  • Condition 的應用場景

    1. Condition 的使用


1.1 Condition 類提供的方法
等待方法:node

// 當前線程進入等待狀態,若是其餘線程調用 condition 的 signal 或者 signalAll 方法而且當前線程獲取 Lock 從 await 方法返回,若是在等待狀態中被中斷會拋出被中斷異常
void await() throws InterruptedException

// 當前線程進入等待狀態直到被通知,中斷或者超時
long awaitNanos(long nanosTimeout)

// 同第二個方法,支持自定義時間單位
boolean await(long time, TimeUnit unit)throws InterruptedException

// 當前線程進入等待狀態直到被通知,中斷或者到了某個時間
boolean awaitUntil(Date deadline) throws InterruptedException

喚醒方法:面試

// 喚醒一個等待在 condition 上的線程,將該線程從等待隊列中轉移到同步隊列中,若是在同步隊列中可以競爭到 Lock 則能夠從等待方法中返回
void signal()

// 與 1 的區別在於可以喚醒全部等待在 condition 上的線程
void signalAll()

1.2 使用舉例
啓動 waiter 和 signaler 兩個線程。
waiter 線程獲取到鎖,檢查 flag=false 不知足條件,執行 condition.await()方法將線程阻塞等待並釋放鎖。
signaler 線程獲取到鎖以後更改條件,將 flag 變爲 true,執行 condition.signalAll()通知喚醒等待線程,釋放鎖。
waiter 線程被喚醒獲取到鎖,自旋檢查 flag=true 知足條件,繼續執行。編程

public class ConditionTest {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "當前條件不知足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知條件知足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

輸出結果:
Thread-0當前條件不知足等待
Thread-0接收到通知,條件知足markdown

2. Condition 與 wait/notify


Object 的 wait 和 notify/notify 是與 synchronized 配合完成線程間的等待/通知機制,是屬於 Java 底層級別的。而 Condition 是語言級別的,具備更高的可控制性和擴展性。具體表現以下:
wait/notify 方式是響應中斷的,當線程處於 Object.wait()的等待狀態中,線程中斷會拋出中斷異常;Condition 有響應中斷和不響應中斷模式能夠選擇。
wait/notify 方式一個 synchronized 鎖只有一個等待隊列;一個 Lock 鎖能夠根據不一樣的條件,new 多個 Condition 對象,每一個對象包含一個等待隊列。架構

須要注意的是,Condition 同 wait/notify 同樣,在等待與喚醒方法使用以前必須獲取到該鎖。併發

3. 源碼分析


Tips:須要在理解 AQS 及 ReentrantLock 基礎上閱讀本文源碼,給出這兩篇的連接:
【原創】14|AQS 源碼分析
【原創】15|重入鎖 ReentrantLock
3.1 條件隊列
首先看 Condition 對象的建立:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();ide

public Condition newCondition() {
return sync.newCondition();
}微服務

final ConditionObject newCondition() {
return new ConditionObject();
}
建立的 Condition 對象其實就是 ConditionObject 對象,ConditionObject 是 AbstractQueuedSynchronizer(AQS)的內部類,實現了 Condition 接口。
每一個 ConditionObject 對象都有一個條件等待隊列,用於保存在該 Condition 對象上等待的線程。條件等待隊列是一個單向鏈表,結點用的 AQS 的 Node 類,每一個結點包含線程、next 結點、結點狀態。ConditionObject 經過持有頭尾指針類管理條件隊列。
Java併發編程系列21 | Condition-Lock的等待通知高併發

注意區分 AQS 的同步隊列和 Condition 的條件隊列。
線程搶鎖失敗時進入 AQS 同步隊列,AQS 同步隊列中的線程都是等待着隨時準備搶鎖的。
線程由於沒有知足某一條件而調用 condition.await()方法以後進入 Condition 條件隊列,Condition 條件隊列中的線程只能等着,沒有獲取鎖的機會。
當條件知足後調用 condition.signal()線程被喚醒,那麼線程就從 Condition 條件隊列移除,進入 AQS 同步隊列,被賦予搶鎖繼續執行的機會。

條件隊列源碼:

public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter;// 頭結點
    private transient Node lastWaiter;// 尾結點

    /**
     * 入隊操做
     */
    private Node addConditionWaiter() {
        Node t = lastWaiter;

        // 若是尾結點取消等待了,將其清除出去,並檢查整個條件隊列將已取消的全部結點清除
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();// 這個方法會遍歷整個條件隊列,而後會將已取消的全部結點清除出隊列
            t = lastWaiter;
        }

        // 將當前線程構形成結點,加入隊尾
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;

        lastWaiter = node;// 維護尾結點指針
        return node;
    }

    /**
     * 遍歷整個條件隊列,清除已取消等待的結點
     */
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;// 用於保存前一個結點
        while (t != null) {
            Node next = t.nextWaiter;

            if (t.waitStatus != Node.CONDITION) {
                // t結點狀態不是Node.CONDITION,說明已經取消等待,刪除
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;// 下次循環中t結點的前一個結點
            t = next;
        }
    }
}

static final class Node {
    volatile Thread thread;// 每個節點對應一個線程
    Node nextWaiter;// next結點
    volatile int waitStatus;// 結點狀態
    static final int CONDITION = -2;// 結點狀態:當前節點進入等待隊列中
 ...
}

3.2 await()
當調用 condition.await()方法後會使得線程進入到條件隊列,此時線程將被阻塞。當調用 condition.signal()方法後,線程從條件隊列進入 AQS 同步隊列排隊等鎖。線程在 AQS 中發生的事情這裏就不介紹了,不明白的能夠看下之前 AQS 的文章【原創】14|AQS 源碼分析。
await()方法源碼:

/**
 * 當前線程被阻塞,並加入條件隊列
 * 線程在AQS同步隊列中被喚醒後嘗試獲取鎖
 */
public final void await() throws InterruptedException {
    // 響應打斷
    if (Thread.interrupted())
        throw new InterruptedException();

    // 將當前線程構形成結點,加入條件隊列隊尾,上文詳細分析了該方法
    Node node = addConditionWaiter();

    // 釋放鎖,線程阻塞前必須將鎖釋放,下文詳解fullyRelease()方法
    int savedState = fullyRelease(node);
    int interruptMode = 0;

    /*
     * 1.isOnSyncQueue()檢查node是否在AQS同步隊列中,不在同步隊列中返回false,下文詳解isOnSyncQueue()方法
     * 2.若是node不在AQS同步隊列中,將當前線程阻塞
     * 3.當其餘代碼調用signal()方法,線程進入AQS同步隊列後被喚醒,繼續從這裏阻塞的地方開始執行
     * 4.注意這裏while循環的自旋,線程被喚醒之後還要再檢查一下node是否在AQS同步隊列中
     */
    while (!isOnSyncQueue(node)) { // 檢查node是否在AQS同步隊列中
        LockSupport.park(this);    // 阻塞,線程被喚醒後從這裏開始執行
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    /*
     * 到這裏,是當前線程在AQS同步隊列中被喚醒了,嘗試獲取鎖
     * acquireQueued()方法搶鎖,搶不到鎖就在同步隊列中阻塞
     * acquireQueued()方法是AQS文章中詳細重點講解過的這裏不詳細分析了
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

fullyRelease()方法:

/**
 * 將node線程的鎖所有釋放
 * 「所有」是指屢次重入的狀況,這裏一次所有釋放
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();// 鎖狀態
        if (release(savedState)) {// 釋放鎖
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
isOnSyncQueue()方法:
/**
 * 檢查node是否在AQS同步隊列中,在同步隊列中返回true
 */
final boolean isOnSyncQueue(Node node) {
    // 狀態爲Node.CONDITION條件等待狀態,確定是在條件隊列中,而不在同步隊列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 若是node已經有後繼節點next,那確定是在同步隊列了
    if (node.next != null)
        return true;
    // 遍歷同步隊列,查看是否有與node相等的結點
    return findNodeFromTail(node);
}

/**
 * 從同步隊列的隊尾開始從後往前遍歷找,若是找到相等的,說明在同步隊列,不然就是不在同步隊列
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

3.3 signal()
調用 condition.signal()方法後,線程從 Condition 條件隊列移除,進入 AQS 同步隊列排隊等鎖。

注意:正常狀況下 signal 只是將線程從 Condition 條件隊列轉移到 AQS 同步隊列,並無喚醒線程。線程的喚醒時機是 AQS 中線程的前驅節點釋放鎖以後。

public final void signal() {
    // 驗證當前線程持有鎖才能調用該方法
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * 從條件隊列隊頭日後遍歷,找出第一個須要轉移的結點node,將node從條件隊列轉移到AQS同步隊列
 * 爲何須要遍歷找?由於前有些線程會取消等待,可是可能還在條件隊列中
 */
private void doSignal(Node first) {
    do {
        // 將first中條件隊列中移除,將first的next結點做爲頭結點賦值給firstWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;

    /*
     * transferForSignal()將first結點加入AQS同步隊列
     * 若是first結點加入同步隊列失敗,是由於first結點取消了Node.CONDITION狀態,緣由在下面transferForSignal()的講解中說明
     * 若是first結點加入同步隊列失敗,那麼選擇first後面的第一個結點進行轉移,依此類推
     */
    } while (!transferForSignal(first) &&    // 將first結點加入AQS同步隊列
             (first = firstWaiter) != null); // first結點加入同步隊列失敗,選擇first後面的結點進行轉移
}

/**
 * 將結點轉移到同步隊列
 * @return true-表明成功轉移;false-表明在signal以前,節點已經取消等待了
 */
final boolean transferForSignal(Node node) {
    /*
     * CAS設置結點狀態
     * CAS失敗說明此node的waitStatus已不是Node.CONDITION,說明節點已經取消。既然已經取消,也就不須要轉移了,方法返回,轉移後面一個節點
     * CAS失敗爲何不是其餘線程搶先操做了呢?由於這裏還持有lock獨佔鎖,只有當前線程能夠訪問。
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);// 自旋進入同步隊列的隊尾
    int ws = p.waitStatus;

    // 正常狀況下不會走這裏,這裏是前驅節點取消或者 CAS 失敗的狀況
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

static final class Node {
    volatile Thread thread;// 每個結點對應一個線程
    Node nextWaiter;// next結點

    volatile int waitStatus;// 結點狀態
    static final int CONDITION = -2;// 結點狀態:當前結點進入等待隊列中
}

3.4 源碼過程總結
ReentrantLock lock = new ReentrantLock();建立 lock 鎖,對應生成 AQS 同步隊列,一個 ReentrantLock 鎖對應一個 AQS 同步隊列。
Condition condition = lock.newCondition();建立 condition,對應生成 condition 條件隊列。
線程 A 調用condition.await();,線程 A 阻塞並加入 condition 同步隊列。
線程 B 調用condition.signal();,線程 A 阻塞從 condition1 同步隊列轉移到 AQS 同步隊列的隊尾。
當 AQS 隊列中線程 A 的前驅節點線程執行完並釋放鎖時,將線程 A 喚醒。
線程 A 被喚醒以後搶鎖,執行邏輯代碼。

4. 應用


Condition 實現的生產者消費者問題。

class BoundedBuffer {
    final ReentrantLock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生產
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 隊列已滿,等待,直到 not full 才能繼續生產
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消費
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 隊列爲空,等待,直到隊列 not empty,才能繼續消費
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

生產者線程調用 put()方法向隊列中添加對象,當隊列滿時,生產者線程就阻塞等待。
消費者線程調用 take()方法取出隊列中的對象,取出對象後隊列能夠添加對象了,通知被阻塞的生產者線程。
生產者線程被喚醒後,從阻塞的位置開始執行,繼續向隊列中添加對象。
一樣,消費者取出隊列中對象時,發現隊列爲空了也會阻塞等待,生產者線程添加對象以後會通知消費者線程。

總結


Object 的 wait 和 notify/notify 是與 synchronized 配合完成線程間的等待/通知機制,而 Condition 與 Lock 配合完成等待通知機制。
Condition 比 wait 和 notify 具備更高的可控制性和擴展性,一個 Lock 鎖能夠有多個 Condition 條件,此外 Condition 還有響應中斷和不響應中斷模式能夠選擇。Condition 的使用與 wait/notify 同樣,在等待與喚醒方法使用以前必須獲取到鎖。
Condition 的實現原理:每一個 condition 都有一個條件隊列,調用 condition.await()方法將線程阻塞後線程就進入了條件隊列,調用 condition.sigal()方法後線程從 condition 條件隊列轉移到 AQS 同步隊列等鎖,該線程的前一節點釋放鎖以後會喚醒該線程搶鎖執行。
Condition 多用於實現的生產者消費者問題。

併發系列文章彙總


【原創】01|開篇獲獎感言
【原創】02|併發編程三大核心問題
【原創】03|重排序-可見性和有序性問題根源
【原創】04|Java 內存模型詳解
【原創】05|深刻理解 volatile
【原創】06|你不知道的 final
【原創】07|synchronized 原理
【原創】08|synchronized 鎖優化
【原創】09|基礎乾貨
【原創】10|線程狀態
【原創】11|線程調度
【原創】12|揭祕 CAS
【原創】13|LockSupport
【原創】14|AQS 源碼分析
【原創】15|重入鎖 ReentrantLock
【原創】16|公平鎖與非公平鎖
【原創】17|讀寫鎖八講(上)
【原創】18|讀寫鎖八講(下)
【原創】19|JDK8 新增鎖 StampedLock
【原創】20|StampedLock 源碼解析
———— e n d ————
微服務、高併發、JVM調優、面試專欄等20大進階架構師專題請關注公衆號【Java進階架構師】後在菜單欄查看。
回覆【架構】領取架構師視頻一套。
Java併發編程系列21 | Condition-Lock的等待通知

原創歷來不開讚揚是由於我以爲你的「在看」,就是給我最好的讚揚^_^

相關文章
相關標籤/搜索