AQS:JAVA經典之鎖實現算法(二)-Condition

零:序言

使用過ReentrantLock的盆友應該也知道Condition的存在。先講解下它存在的意義:就是仿照實現Object類的wait signal signallAll等函數功能的。java

這裏引伸一個面試常問到的問題:wait會釋放鎖,sleep不會。node

  • Condition的一般使用場景是這樣的: 生產者消費者模型,假設生產者只有在生產隊列爲空時才進行生產,則代碼相似以下:
Condition emptyCondition = ReentrantLock.newCondition();
Runnable consumer = new Runnable() {
  public void run() {
    if(queue.isEmpty()) {
      emptyCondition.signal();  // emptyObj.notify();
    } else {
      consumer.consume();
    }
  }
}
Runnable provider = new Runnable() {
  public void run() {
    emptyCondition.wait();  // emptyObj.wait();
    providerInstance.produce();
  }
}
複製代碼

因此咱們能夠知道Condition設計的意義了。下面咱們來說解下其實現原理。面試

一:實現概況

還記得在AQS:JAVA經典之鎖實現算法(一)提到的鎖實現的Sync Queue嗎? Condition的實現是相似的原理: 每一個AQS裏有x(視你newCondition幾回)個Condition Queue,它的結點類也是AQS內部類NodeNode裏有一個nextWaiter,指向下一個在同一Condition Queue裏的Node。 結構以下圖: 算法

Condition Queue.png

  • 首先明確下是,condition.wait必定是在成功lock的線程裏調用纔有效,否則不符合邏輯,同時也會拋出IlleagleMornitorException
  • 獲取鎖的線程處於Sync Queue的隊首,當調用condition.wait時,該線程會釋放鎖(即將AQSstate置爲0),同時喚醒後繼結點,後繼結點在acquire的循環裏會成功獲取鎖,而後將本身所在結點置爲隊首,而後開始本身線程本身的業務代碼。 這個過程看下圖:
    wait狀態圖_1

wait狀態圖_2

  • 當waiter_1收到相應conditionsignal後,在Condition Queue中的Node會從Condition Queue中出隊,進入Sync Queue隊列,開始它的鎖競爭的過程。 過程看下圖:

signal狀態圖_1

signal狀態圖_2

因此,這裏能夠看出來,即便是被signal了,被signal的線程也不是直接就開始跑,而是再次進入Sync Queue開始競爭鎖而已。這裏的這個邏輯,跟Object.wait Object.signal也是徹底同樣的。bash

二:代碼實現原理

咱們先看一段運用到condition的代碼案例: 假設生成者在生產隊列queue爲空時emptyCondition.signal才進行生產操做ide

ReentrantLock locker = new ReentrantLock();
Condition emptyCondition = locker.newCondition();

Runnable consumer = new Runnable() {
  public void run() {
    locker.lock();
    if (queue.isEmpty()) {
      emptyCondition.signal();
    } else {
      ...
    }
    locker.unlock();
  }
};

Runnable producer = new Runnable() {
  public void run() {
    locker.lock();
    emptyCondition.wait();
    // 開始生產
    ...
    locker.unlock();
  }
}
複製代碼

咱們從消費者一步一步走,擬定以下這樣一套線程切換邏輯:函數

  • producer#lock
  • consumer#lock
  • producer#await
  • consumer#signal
  • consumer#unlock
  • producer#unlock

(先從Sync Queue Condition Queue圖解講一遍,而後對應圖解,對着代碼擼一遍)ui


  • producer#lock

生產者直接獲取鎖成功,入隊Sync Queue,位隊首 this

producer#lock後queue狀態

consumer#lock

消費者競爭鎖失敗,進入Sync Queue等待獲取鎖 spa

consumer#lock後queue狀態

  • producer#await

生產者進入等待,釋放鎖,出Sync Queue,進入Condition Queue,等待emptyCondition來喚醒。

producer#wait後Queue狀態

  • consumer#signal

消費者喚起生產者,生產者consumernodeCondition Queue轉移到Sync Queue開始競爭鎖。

consumer#signal後Queue狀態

  • consumer.unlock

consumer釋放鎖後,consumernodeSync Queue出隊,釋放state,喚醒後繼結點provider#nodeprovider搶佔到鎖。

consumer#unlock後Queue狀態

  • provider#unlock

這裏就沒有啥好說的了。

固然,我爲了講解過程,像在鎖被第一次成功獲取的時候,邏輯上雖然並非直接進入Sync Queue我也給講解成直接進入Sync Queue了,這是爲了縮減邊邊角角的小邏輯,講清楚主線邏輯。你們看明白主邏輯,而後再本身去擼一遍,就融會貫通了。

三:代碼擼一把

  • provider.lock

final void lock() {
            // 這就直接獲取鎖成功了,沒有else的邏輯了
            if (compareAndSetState(0, 1))
                // 這個方法是AQS類用來設置擁有鎖的線程實例
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
複製代碼
  • consumer#lock

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            // consumer.lock就要走這裏了,由於上面的compareAndSetState
            // 返回false
            else
                acquire(1);
        }
複製代碼
protected final boolean compareAndSetState(int expect, int update) {
        // 樓下這個是CAS原理進行值修改,CAS就對比樂觀鎖來,
        // 這裏想要修改this這個對象的state字段,若是state是expect
        // 則修改至update,返回true;不然false。咱們知道provider.lock
        // 已經將state 改成非0值了,因此這裏確定失敗啦
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
複製代碼
  • provider#await

先簡單看下Condition類對象結構

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}
...
複製代碼

一個Condition對象就是一條鏈隊,頭尾結點在Condition的內部字段指定firstWaiter lastWaiter

await方法

public final void await() throws InterruptedException {
            // 由於await是響應中斷的等待,這裏就是檢驗下,
            // 一般而言,凡是throws InterruptedException的,
            // 開頭基本都是這句
            if (Thread.interrupted())
                throw new InterruptedException();
            // 這裏是向condition queue中插入一個node,並返回之,
            // 插入了這個node,就表明當前線程在condition queue
            // 中開始等待了
            Node node = addConditionWaiter();
            // 這個是AQS釋放鎖方法,加個fully,就是用來將屢次
            // 獲取鎖一次性都釋放掉,而後將鎖獲取次數返回,
            // 留着後面signal後成功獲取鎖的時候,還要加鎖一樣的
            // 次數。
            // !!!同時注意,這裏喚醒了後繼結點!後集結點就繼續開始
            // 競爭鎖,就是在acquire那個自旋方法裏,記得嗎
            // 不記得去看看文章(一)
            int savedState = fullyRelease(node);
            // 記錄當前線程中斷的標記
            int interruptMode = 0;
            // 判斷當前的node是否已經轉移到sync queue裏了。
            // 轉移了,說明這個node已經開始競爭鎖了,不用再等待
            // 喚醒了,沒轉,繼續自旋
            while (!isOnSyncQueue(node)) {
                // 這裏把當前線程給掛起了
                LockSupport.park(this);
                // 這裏的方法checkxxx就是用來檢查waiting自旋期間,線程有沒有
                // interrupt掉。由於await方法是響應線程中斷的。
                // 若interrupt了,則在checkxxx方法裏,會將node轉移到
                // sync Queue中,去競爭,不要擔憂,由於同時
                // 會設置interruptMode,在最後會根據其值拋Interrupted
                // 異常。。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                // 那何時就結束上面的自旋呢?一個是當前的線程被
                // signal了,那node就被transfer到sync queue了,while
                // 就不知足了。再一個就是線程中斷了,在while循環體裏給break掉了
            }
            // 跳出來後,緊接着去競爭鎖,知道成功爲止。&& 後面這個THROW_IE,標識
            // 要拋出異常,不是的話,就是REINTERRPUT,表明保證線程的中斷標記不被
            // 重置便可。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 這兒是在condition queue裏有多個waiter的時候才起做用,主要用來將
            // CANCEL的結點從鏈隊中剔除掉
            // 具體你們本身看吧。如今忽略這
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 這兒就是處理interruptMode中斷標記字段的邏輯
            // 在reportxxx中,interruptMode爲THROW_IE,則拋出
            // 異常,不是,則保證線程的中斷field不被重置爲「未中斷」便可
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
複製代碼
  • consumer#signal

consumer在調用emptyCondition.signal的時候,會影響到emptyConditioncondition queue中的等待線程,這裏 具體指上面的provider#await方法。

public final void signal() {
            // 先判斷下,lock鎖是否是在調用signal方法的當前線程手裏
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 取到condition queue裏的第一個waiter node,這裏也就是
            // consumer,由於它第一個await進入condition queue了
            Node first = firstWaiter;
            // 這裏去進行了具體的signal操做,具體會作先把waiter node的waitStatus
            // 從CONDITION狀態改成入Sync Queue的正常狀態值0
            // 而後修改Sync Queue 的Head Tail等,讓其入隊成功
            // 最後再從其前驅結點的狀態值上確保當前結點可以被喚起便可。
            // 這裏是由於這個waitStatus值對後繼結點的行爲是有影響的,像SIGNAL指
            // 的是在結點釋放後,要去喚醒後繼結點
            // 
            if (first != null)
                doSignal(first);
        }
複製代碼
  • consumer#unlock

unlock具體調用的 AQSrelease()方法

public void unlock() {
        sync.release(1);
    }

    // AQS.release
    public final boolean release(int arg) {
        // tryRelease,這裏由NonFairSync實現,具體就是經過
        // CAS去修改state值,並判斷是否成功釋放鎖
        if (tryRelease(arg)) {
            // 成功釋放了,則在waitStatus 不是初始狀態時,去喚醒後繼,
            // 這個 != 0 來作判斷的緣由,就要綜合全部狀況,
            // 像FailSync NonFairSync \ Exclusive \ Share
            // 等全部狀況來看這裏的waitSTatus都會處於什麼狀態。
            // 全擼一遍的話,會發現這裏的 != 0可以涵蓋以上全部狀況。
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼
  • provider#unlock

這裏就同理上面了。

總結

整體來看兩個 queue的轉換仍是挺清楚的。只要記住,無論什麼狀況(中斷與否),都是要從condition queue轉移到sync queue的。具體你們仍是要本身去想一種線程切換場景,去走走看。 行文匆匆, 歡迎指正。

相關文章
相關標籤/搜索