Java併發編程之鎖機制之Condition接口

前言

在前面的文章中,我曾提到過,整個Lock接口下實現的鎖機制中AQS(AbstractQueuedSynchronizer,下文都稱之爲AQS)Condition纔是真正的實現者。也就說Condition在整個同步組件的基礎框架中也起着很是重要的做用,既然它如此重要與犀利,那麼如今咱們就一塊兒去了解其內部的實際原理與具體邏輯。node

在閱讀該文章以前,我由衷的建議先閱讀《Java併發編程之鎖機制之AQS》《Java併發編程之鎖機制之LockSupport工具》這兩篇文章。由於整個Condtion的內部機制與邏輯都離不開以上兩篇文章提到的知識點。編程

Condition接口方法介紹

在正式介紹Condtion以前,咱們能夠先了解其中聲明的方法。具體方法聲明,以下表所示:數組

condition方法.png

從該表中,咱們能夠看出其內部定義了等待(以await開頭系列方法)通知(以singal開頭的系列方法)兩種類型的方法,相似於Object對象的wait()notify()/NotifyAll()方法來對線程的阻塞與喚醒。bash

ConditionObject介紹

在實際使用中,Condition接口實現類是AQS中的內部類ConditionObject。在其內部維護了一個FIFO(first in first out)的隊列(這裏咱們稱之爲等待隊列,你也能夠叫作阻塞隊列,看每一個人的理解),經過與AQS中的同步隊列配合使用,來控制獲取共享資源的線程。併發

等待隊列

等待隊列是ConditionObjec中內部的一個FIFO(first in first out)的隊列,在隊列中的每一個節點都包含了一個線程引用,且該線程就是在ConditionObject對象上阻塞的線程。須要注意的是,在等待隊列中的節點是複用了AQSNode類的定義。換句話說,在AQS中維護的同步隊列與ConditionObjec中維護的等待隊列中的節點類型都是AQS.Node類型。(關於AbstractQueuedSynchronizer.Node類的介紹,你們能夠參看《Java併發編程之鎖機制之AQS》文章中的描述)。框架

在ConditionObject類中也分別定義了firstWaiterlastWaiter兩個指針,分別指向等待隊列中頭部與尾部。當實際線程調用其以await開頭的系列方法後。會將該線程構造爲Node節點。添加等待隊列中的尾部。關於等待隊列的基本結構以下圖所示:工具

condition內部結構.png

對於等待隊列中節點添加的方式也很簡單,將上一尾節點的nextWaiter指向新添加的節點,同時使lastWaiter指向新添加的節點。post

同步隊列與等待隊列的對應關係

上文提到了整個Lock鎖機制須要AQS中的同步隊列ConditionObject的等待隊列配合使用,其對應關係以下圖所示: ui

同步隊列與等待隊列的關係.png

在Lock鎖機制下,能夠擁有一個同步隊列和多個等待隊列,與咱們傳統的Object監視器模型上,一個對象擁有一個同步隊列和等待隊列不一樣。lock中的鎖能夠伴有多個條件。this

Condition的基本使用

爲了你們可以更好的理解同步隊列與等待隊列的關係。下面經過一個有界隊列BoundedBuffer來了解Condition的使用方式,該類是一個特殊的隊列,當隊列爲空時,隊列的獲取操做將會阻塞當前"拿"線程,直到隊列中有新增的元素,當隊列已滿時,隊列的放入操做將會阻塞"放入"線程,直到隊列中出現空位。具體代碼以下所示:

class BoundedBuffer {

    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    
    //依次爲,放入的角標、拿的角標、數組中放入的對象總數
    int putptr, takeptr, count;

    /**
     * 添加一個元素
     * (1)若是當前數組已滿,則把當前"放入"線程,加入到"放入"等待隊列中,並阻塞當前線程
     * (2)若是當前數組未滿,則將x元素放入數組中,喚醒"拿"線程中的等待線程。
     */
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)//若是已滿,則阻塞當前"放入"線程
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();//喚醒"拿"線程
        } finally {
            lock.unlock();
        }
    }

    /**
     * 拿一個元素
     * (1)若是當前數組已空,則把當前"拿"線程,加入到"拿"等待隊列中,並阻塞當前線程
     * (2)若是當前數組不爲空,則把喚醒"放入"等待隊列中的線程。
     */
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)//若是爲空,則阻塞當前"拿"線程
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();//喚醒"放入"線程
            return x;
        } finally {
            lock.unlock();
        }
    }
}
複製代碼

從代碼中咱們能夠看出,在該類中咱們建立了兩個等待隊列notFullnotEmpty。這兩個等待隊列的做用分別是,當請數組已滿時,notFull用於存儲阻塞的"放入"線程,notEmpty用於存儲阻塞的"拿"線程。須要注意的是獲取一個Condition必須經過Lock的newCondition()方法。關於ReentrantLock,在後續的文章中,咱們會進行介紹。

阻塞實現 await()

在瞭解了ConditionObject的內部基本結構和與AQS中內部的同步隊列的對應關係後,如今咱們來看看其阻塞實現。調用ConditionObject的await()方法(或者以await開頭的方法),會使當前線程進入等待隊列,並釋放同步狀態,須要注意的是當該方法返回時,當前線程必定獲取了同步狀態(具體緣由是當經過signal()等系列方法,線程纔會從await()方法返回,而喚醒該線程後會加入同步隊列)。這裏咱們以awati()方法爲例,具體代碼以下所示:

public final void await() throws InterruptedException {
			//若是當前線程已經中斷,直接拋出異常  
            if (Thread.interrupted())
                throw new InterruptedException();
            //(1)將當前線程加入等待隊列
            Node node = addConditionWaiter();
            //(2)釋放同步狀態(也就是釋放鎖),同時將線程節點從同步隊列中移除,並喚醒同步隊列中的下一節點
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //(3)判斷當前線程節點是否還在同步隊列中,若是不在則阻塞線程
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //(4)當線程被喚醒後,從新在同步隊列中與其餘線程競爭獲取同步狀態
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
複製代碼

從代碼總體來看,整個方法分爲如下四個步驟:

  • (1)經過 addConditionWaiter()方法將線程節點加入到等待隊列中。
  • (2)經過fullyRelease(Node node)方法釋放同步狀態(也就是釋放鎖),同時將線程節點從同步隊列中移除,並喚醒同步隊列中的下一節點
  • (3)經過isOnSyncQueue(Node node)方法判斷當前線程節點是否在同步隊列中,若是不在,則經過LockSupport.park(this);阻塞當前線程。
  • (4)當線程被喚醒後,調用acquireQueued(node, savedState)方法,從新在同步隊列中與其餘線程競爭獲取同步狀態

由於每一個步驟涉及到的邏輯都稍微有一點複雜,這裏爲了方便你們理解,分別對以上四個步驟涉及到的方法分別進行介紹。

addConditionWaiter()方法

該方法主要將同步隊列中的須要阻塞的線程節點加入到等待隊列中,關於addConditionWaiter()方法具體代碼以下所示:

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // (1)若是當前尾節點中中對應的線程已經中斷,
            //則移除等待隊列中全部的已經中斷或已經釋放同步狀態的線程節點
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
		    //(2)構建等待隊列中的節點
            Node node = new Node(Node.CONDITION);
			
			//(3)將該線程節點添加到隊列中,同時構建firstWaiter與lastWaiter的指向
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
複製代碼

該方法的邏輯也比較簡單,分爲如下三個步驟:

  • (1)獲取等待隊列中的尾節點,若是當前尾節點已經中斷,那麼則經過unlinkCancelledWaiters()方法移除等待隊列中全部的已經中斷已經釋放同步狀態(也就是釋放鎖)的線程節點
  • (2)構建等待隊列中的節點,注意,是經過New的形式,那麼就說明與同步隊列中的線程節點不是同一個。(對Node狀態枚舉不清楚的小夥伴,能夠參看Java併發編程之鎖機制之AQS文章下的Node狀態枚舉介紹)。
  • (3)將該線程節點添加到等待隊列中去,同時構建firstWaiter與lastWaiter的指向,能夠看出等待隊列老是以FIFO(first in first out )的形式添加線程節點。
unlinkCancelledWaiters()方法

由於在addConditionWaiter()方法的步驟(1)中,調用了unlinkCancelledWaiters移除了全部的已經中斷的線程節點,那咱們看一個該方法的實現。以下所示:

private void unlinkCancelledWaiters() {
			//獲取等待隊列中的頭節點
            Node t = firstWaiter;
            Node trail = null;
            //遍歷等待隊列,將已經中斷的線程節點從等待隊列中移除。
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)//從新定義lastWaiter的指向
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
複製代碼

該方法具體流程以下圖所示:

condition.png

fullyRelease(Node node)

在將阻塞線程將入到等待隊列後,會將該線程節點從同步隊列中移除,釋放同步狀態(也就是釋放鎖),並喚醒同步隊列中的下一節點。具體代碼以下所示:

final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
複製代碼

release(int arg)方法會釋放當前線程的同步狀態, 並喚醒同步隊列中的下一線程節點,使其嘗試獲取同步狀態,由於該方法已經在Java併發編程之鎖機制之AQS文章下的unparkSuccessorNode node)方法的下分析過了,因此這裏就再也不進行分析了。但願你們參考上面提到的文章進行理解。

isOnSyncQueue(Node node)

該方法主要用於判斷當前線程節點是否在同步隊列中。具體代碼以下所示:

final boolean isOnSyncQueue(Node node) {
        //判斷當前節點 waitStatus ==Node.CONDITION或者當前節點上一節點爲空,則不在同步隊列中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //若是當前節點擁有下一個節點,則在同步隊列中。
        if (node.next != null) // If has successor, it must be on queue
            return true;
	    //若是以上條件都不知足,則遍歷同步隊列。檢查是否在同步隊列中。
        return findNodeFromTail(node);
    }
複製代碼

若是你還記得AQS中的同步隊列,那麼你應該知道同步隊列中的Node節點纔會使用其內部的prenext字段,那麼在同步隊列中由於只使用了nextWaiter字段,因此咱們就能很簡單的經過這兩個字段是否爲==null,來判斷是否在同步隊列中。固然也有可能有一種特殊狀況。有可能須要阻塞的線程節點尚未加入到同步隊列中,那麼這個時候咱們須要遍歷同步隊列來判斷是否在該線程節點是否在線程中。具體代碼以下所示:

private boolean findNodeFromTail(Node node) {
        for (Node p = tail;;) {
            if (p == node)
                return true;
            if (p == null)
                return false;
            p = p.prev;
        }
    }
複製代碼

這裏之因此使用同步隊列tail(尾節點)來遍歷,若是node.netx!=null,那麼就說明當前線程已經在同步隊列中。那麼咱們須要處理的狀況確定是針對node.next==null的狀況。因此須要從尾節點開始遍歷。

acquireQueued(final Node node, int arg)

當線程被喚醒後(具體緣由是當經過signal()等系列方法,線程纔會從await()方法返回)會調用該方法將該線程節點加入到同步隊列中。該方法我在《Java併發編程之鎖機制之AQS》中具體描述過了。這裏就不在進行過多的解析。

阻塞流程

在理解了整個阻塞的流程後,如今咱們來概括總結一下,整個阻塞的流程。具體流程以下圖所示:

阻塞流程.png

  • (1)將該線程節點從同步隊列中移除,並釋放其同步狀態。
  • (2)構造新的阻塞節點,加入到等待隊列中。

喚醒實現 signal()

當須要喚醒線程時,會調用ConditionObject中的singal開頭的系列方法,該系列方法會喚醒等待隊列中的首個線程節點,在喚醒該節點以前,會先講該節點移動到同步隊列中。這裏咱們以singal()方法爲例進行講解,具體代碼以下:

public final void signal() {
		    //(1)判斷當前線程是否獲取到了同步狀態(也就是鎖)
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            //(2)獲取等待隊列中的首節點,而後將其移動到同步隊列,而後再喚醒該線程節點
            if (first != null)
                doSignal(first);
        }
複製代碼

該方法主要邏輯分爲如下兩個步驟:

  • (1)經過isHeldExclusively()方法,判斷當前線程是否獲取到了同步狀態(也就是鎖)。
  • (2)經過doSignal(Node first)方法,獲取等待隊列中的首節點,而後將其移動到同步隊列,而後再喚醒該線程節點。

下面咱們會分別對上面涉及到的兩個方法進行描述。

isHeldExclusively()方法

isHeldExclusively()方法是AQS中的方法,默認交給其子類實現,主要用於判斷當前調用singal()方法的線程,是否在同步隊列中,且已經獲取了同步狀態。具體代碼以下所示:

protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
複製代碼

doSignal(Node first)方法

那咱們繼續跟蹤doSignal(Node first)方法,具體方法以下:

private void doSignal(Node first) {
            do {
                //(1)將等待隊列中的首節點從等待隊列中移除,並從新制定firstWaiter的指向
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
            //(2)將等待隊列中的首節點,加入同步隊列中,並從新喚醒該節點
                     (first = firstWaiter) != null);
        }
複製代碼

該方法也很簡單,分爲兩個步驟:

  • (1)將等待隊列中的首節點從等待隊列中移除,並設置firstWaiter的指向爲首節點的下一個節點。 爲了方便你們理解該步驟所描述的邏輯,這裏畫了具體的圖,具體狀況以下圖所示:
    移除首節點.png
  • (2)經過 transferForSignal(Node node)方法,將等待隊列中的首節點,加入到同步隊列中去,而後從新喚醒該線程節點。
transferForSignal(Node node)方法

由於步驟(2)中transferForSignal(Node node)方法較爲複雜,因此會對該方法進行詳細的講解。具體代碼以下所示:

final boolean transferForSignal(Node node) {
       
        //(1)將該線程節點的狀態設置爲初始狀態,若是失敗則表示當前線程已經中斷了
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
        //(2)將該節點放入同步隊列中,
        Node p = enq(node);
        int ws = p.waitStatus;
        //(3)獲取當前節點的狀態並判斷,嘗試將該線程節點狀態設置爲Singal,若是失敗則喚醒線程
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
複製代碼

該方法分爲三個步驟:

  • (1)將該線程節點的狀態設置爲初始狀態,若是失敗則表示當前線程已經中斷了,直接返回。
  • (2)經過enq(Node node)方法,將該線程節點放入同步隊列中。
  • (3)當將該線程節點放入同步隊列後,獲取當前節點的狀態並判斷,若是該節點的waitStatus>0或者經過compareAndSetWaitStatus(ws, Node.SIGNAL)將該節點的狀態設置爲Singal,若是失敗則經過LockSupport.unpark(node.thread)喚醒線程。

上述步驟中,着重講enq(Node node)方法,關於LockSupport.unPark(Thread thread)方法的理解,你們能夠閱讀《Java併發編程之鎖機制之LockSupport工具》。下面咱們就來分析enq(Node node)方法。具體代碼以下所示:

private Node enq(Node node) {
        for (;;) {
            //(1)獲取同步隊列的尾節點
            Node oldTail = tail;
            //(2)若是尾節點不爲空,則將該線程節點加入到同步隊列中
            if (oldTail != null) {
	            //將當前節點的prev指向尾節點
                U.putObject(node, Node.PREV, oldTail);
                //將同步隊列中的tail指針,指向當前節點
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
	            //(3)若是當前同步隊列爲空,則構造同步隊列
                initializeSyncQueue();
            }
        }
    }
複製代碼

觀察該方法,咱們發現該方法經過死循環(固然你也能夠叫作自旋)的方式來添加該節點到同步隊列中去。該方法分爲如下步驟:

  • (1)獲取同步隊列的尾節點
  • (2)若是尾節點不爲空,則將該線程節點加入到同步隊列中
  • (3)若是當前同步隊列爲空,則經過initializeSyncQueue();構造同步隊列。

這裏對Node enq(Node node)中的步驟(2)補充一個知識點。咱們來看一下調用U.putObject(node, Node.PREV, oldTail);語句,內部是如何將當前的節點的prev指向尾節點的。在AQS(AbstractQueuedSynchronizer)中的Node類中有以下靜態變量和語句。這裏我省略了一下不重要的代碼。具體代碼以下所示:

private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
//省略部分代碼
static final long PREV;
    static {
         try {
		    //省略部分代碼
            PREV = U.objectFieldOffset
               (Node.class.getDeclaredField("prev"));
              } catch (ReflectiveOperationException e) {
               throw new Error(e);
            }
        }
    }
複製代碼

其中Node.class.getDeclaredField("prev")語句很好理解,就是獲取Node類中pre字段,若是有則返回相應Field字段,反之拋出NoSuchFieldException異常。關於Unfase中的objectFieldOffset(Field f)方法,我曾經在《Java併發編程之鎖機制之LockSupport工具》描述過相似的狀況。這裏我簡單的再解釋一遍。該方法用於獲取某個字段相對 Java對象的「起始地址」的偏移量,也就是說每一個字段在類對應的內存中存儲是有「角標」的,那麼也就是說咱們如今的PREV靜態變量就表明着Node中prev字段在內存中的「角標」。

當獲取到"角標"後,咱們再經過U.putObject(node, Node.PREV, oldTail);該方法第一個參數是操做對象,第二個參數是操做的內存「角標」,第三個參數是指望值。那麼最後,也就完成了將當前節點的prev字段指向同步隊列的尾節點。

當理解了該知識點後,剩下的將同步隊列中的tail指針,指向當前節點若是當前同步隊列爲空,則構造同步隊列這兩個操做就很是好理解了。因爲篇幅的限制,在這裏我就不在進行描述了。但願讀者朋友們,能閱讀源代碼,觸類旁通。關於這兩個方法的代碼以下所示:

private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long STATE;
    private static final long HEAD;
    private static final long TAIL;
    static {
        try {
            STATE = U.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            HEAD = U.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            TAIL = U.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
        Class<?> ensureLoaded = LockSupport.class;
    }

    private final void initializeSyncQueue() {
        Node h;
        if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
            tail = h;
    }

    private final boolean compareAndSetTail(Node expect, Node update) {
        return U.compareAndSwapObject(this, TAIL, expect, update);
    }
複製代碼

喚醒流程

在理解了喚醒的具體邏輯後,如今來總結一下,喚醒的具體流程。具體以下圖所示:

喚醒流程.png

  • 將等待隊列中的節點線程,移動到同步隊列中。
  • 當移動到同步隊列中後。喚醒該線程。是該線程參與同步狀態的競爭。

總體流程其實不算太複雜,你們只須要注意,當咱們將等待隊列中的線程節點加入到同步隊列以後,纔會喚醒線程

最後

該文章參考如下圖書,站在巨人的肩膀上。能夠看得更遠。

  • 《Java併發編程的藝術》

推薦閱讀

相關文章
相關標籤/搜索