Java顯式鎖學習總結之六:Condition源碼分析

概述

先來回顧一下java中的等待/通知機制java

咱們有時會遇到這樣的場景:線程A執行到某個點的時候,由於某個條件condition不知足,須要線程A暫停;等到線程B修改了條件condition,使condition知足了線程A的要求時,A再繼續執行。node

自旋實現的等待通知

最簡單的實現方法就是將condition設爲一個volatile的變量,當A線程檢測到條件不知足時就自旋,相似下面:安全

public class Test {
    private static volatile int condition = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!(condition == 1)) {
                    // 條件不知足,自旋
                }
                System.out.println("a executed");
            }
        });

        A.start();
        Thread.sleep(2000);
        condition = 1;
    }

}

這種方式的問題在於自旋很是耗費CPU資源,固然若是在自旋的代碼塊里加入Thread.sleep(time)將會減輕CPU資源的消耗,可是若是time設的太大,A線程就不能及時響應condition的變化,若是設的過小,依然會形成CPU的消耗。less

Object提供的等待通知

所以,java在Object類裏提供了wait()和notify()方法,使用方法以下:ide

class Test1 {
    private static volatile int condition = 0;
    private static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    while (!(condition == 1)) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    System.out.println("a executed by notify");
                }
            }
        });
        A.start();
        Thread.sleep(2000);
        condition = 1;
        synchronized (lock) {
            lock.notify();
        }
    }
}

經過代碼能夠看出,在使用一個對象的wait()、notify()方法前必需要獲取這個對象的鎖。源碼分析

當線程A調用了lock對象的wait()方法後,線程A將釋放持有的lock對象的鎖,而後將本身掛起,直到有其餘線程調用notify()/notifyAll()方法或被中斷。能夠看到在lock.wait()前面檢測condition條件的時候使用了一個while循環而不是if,那是由於當有其餘線程把condition修改成知足A線程的要求並調用notify()後,A線程會從新等待獲取鎖,獲取到鎖後才從lock.wait()方法返回,而在A線程等待鎖的過程當中,condition是有可能再次變化的。ui

由於wait()、notify()是和synchronized配合使用的,所以若是使用了顯示鎖Lock,就不能用了。因此顯示鎖要提供本身的等待/通知機制,Condition應運而生。this

顯示鎖提供的等待通知

咱們用Condition實現上面的例子:spa

class Test2 {
    private static volatile int condition = 0;
    private static Lock lock = new ReentrantLock();
    private static Condition lockCondition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    while (!(condition == 1)) {
                        lockCondition.await();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
                System.out.println("a executed by condition");
            }
        });
        A.start();
        Thread.sleep(2000);
        condition = 1;
        lock.lock();
        try {
            lockCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}

能夠看到經過 lock.newCondition() 能夠得到到 lock 對應的一個Condition對象lockCondition ,lockCondition的await()、signal()方法分別對應以前的Object的wait()和notify()方法。總體上和Object的等待通知是相似的。線程

應用舉例

上面咱們看到了Condition實現的等待通知和Object的等待通知是很是相似的,而Condition提供的等待通知功能更強大,最重要的一點是,一個lock對象能夠經過屢次調用 lock.newCondition() 獲取多個Condition對象,也就是說,在一個lock對象上,能夠有多個等待隊列,而Object的等待通知在一個Object上,只能有一個等待隊列。用下面的例子說明,下面的代碼實現了一個阻塞隊列,當隊列已滿時,add操做被阻塞有其餘線程經過remove方法刪除元素;當隊列已空時,remove操做被阻塞直到有其餘線程經過add方法添加元素。

public class BoundedQueue1<T> {
    public List<T> q; //這個列表用來存隊列的元素
    private int maxSize; //隊列的最大長度
    private Lock lock = new ReentrantLock();
    private Condition addConditoin = lock.newCondition();
    private Condition removeConditoin = lock.newCondition();

    public BoundedQueue1(int size) {
        q = new ArrayList<>(size);
        maxSize = size;
    }

    public void add(T e) {
        lock.lock();
        try {
            while (q.size() == maxSize) {
                addConditoin.await();
            }
            q.add(e);
            removeConditoin.signal(); //執行了添加操做後喚醒因隊列空被阻塞的刪除操做
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public T remove() {
        lock.lock();
        try {
            while (q.size() == 0) {
                removeConditoin.await();
            }
            T e = q.remove(0);
            addConditoin.signal(); //執行刪除操做後喚醒因隊列滿而被阻塞的添加操做
            return e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }

}

 

源碼分析

下面來分析Condition源碼

概述

以前咱們介紹AQS的時候說過,AQS的同步排隊用了一個隱式的雙向隊列,同步隊列的每一個節點是一個AbstractQueuedSynchronizer.Node實例。

Node的主要字段有:

  1. waitStatus:等待狀態,全部的狀態見下面的表格。
  2. prev:前驅節點
  3. next:後繼節點
  4. thread:當前節點表明的線程
  5. nextWaiter:Node既能夠做爲同步隊列節點使用,也能夠做爲Condition的等待隊列節點使用(將會在後面講Condition時講到)。在做爲同步隊列節點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED標識當前節點是獨佔模式仍是共享模式;在做爲等待隊列節點使用時,nextWaiter保存後繼節點。
狀態 含義
CANCELLED 1 當前節點由於超時或中斷被取消同步狀態獲取,該節點進入該狀態不會再變化
SIGNAL -1 標識後繼的節點處於阻塞狀態,當前節點在釋放同步狀態或被取消時,須要通知後繼節點繼續運行。每一個節點在阻塞前,須要標記其前驅節點的狀態爲SIGNAL。
CONDITION -2 標識當前節點是做爲等待隊列節點使用的。
PROPAGATE -3  
0 0 初始狀態

Condition實現等待的時候內部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每個節點也是一個AbstractQueuedSynchronizer.Node實例。

每一個Condition對象中保存了firstWaiter和lastWaiter做爲隊列首節點和尾節點,每一個節點使用Node.nextWaiter保存下一個節點的引用,所以等待隊列是一個單向隊列。

每當一個線程調用Condition.await()方法,那麼該線程會釋放鎖,構形成一個Node節點加入到等待隊列的隊尾。

等待

Condition.await()方法的源碼以下:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter(); //構造一個新的等待隊列Node加入到隊尾
            int savedState = fullyRelease(node); //釋放當前線程的獨佔鎖,無論重入幾回,都把state釋放爲0
            int interruptMode = 0;
//若是當前節點沒有在同步隊列上,即尚未被signal,則將當前線程阻塞
while (!isOnSyncQueue(node)) { LockSupport.park(this);
//後面的藍色代碼都是和中斷相關的,主要是區分兩種中斷:是在被signal前中斷仍是在被signal後中斷,若是是被signal前就被中斷則拋出 InterruptedException,不然執行 Thread.currentThread().interrupt();
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中斷則直接退出自旋 break; }
//退出了上面自旋說明當前節點已經在同步隊列上,可是當前節點不必定在同步隊列隊首。acquireQueued將阻塞直到當前節點成爲隊首,即當前線程得到了鎖。而後await()方法就能夠退出了,讓線程繼續執行await()後的代碼。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }


final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } final boolean isOnSyncQueue(Node node) {
//若是當前節點狀態是CONDITION或node.prev是null,則證實當前節點在等待隊列上而不是同步隊列上。之因此能夠用node.prev來判斷,是由於一個節點若是要加入同步隊列,在加入前就會設置好prev字段。
if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//若是node.next不爲null,則必定在同步隊列上,由於node.next是在節點加入同步隊列後設置的
if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是否是當前節點。 } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

通知

Condition.signal() 方法的源碼以下:

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException(); //若是同步狀態不是被當前線程獨佔,直接拋出異常。從這裏也能看出來,Condition只能配合獨佔類同步組件使用。
            Node first = firstWaiter;
            if (first != null)
                doSignal(first); //通知等待隊列隊首的節點。
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&   //transferForSignal方法嘗試喚醒當前節點,若是喚醒失敗,則繼續嘗試喚醒當前節點的後繼節點。
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        //若是當前節點狀態爲CONDITION,則將狀態改成0準備加入同步隊列;若是當前狀態不爲CONDITION,說明該節點等待已被中斷,則該方法返回false,doSignal()方法會繼續嘗試喚醒當前節點的後繼節點
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);  //將節點加入同步隊列,返回的p是節點在同步隊列中的先驅節點
        int ws = p.waitStatus;
//若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,線程被喚醒後會執行acquireQueued方法,該方法會從新嘗試將節點的先驅狀態設爲SIGNAL並再次park線程;若是當前設置前驅節點狀態爲SIGNAL成功,那麼就不須要立刻喚醒線程了,當它的前驅節點成爲同步隊列的首節點且釋放同步狀態後,會自動喚醒它。
//其實筆者認爲這裏不加這個判斷條件應該也是能夠的。只是對於CAS修改前驅節點狀態爲SIGNAL成功這種狀況來講,若是不加這個判斷條件,提早喚醒了線程,等進入acquireQueued方法了節點發現本身的前驅不是首節點,還要再阻塞,等到其前驅節點成爲首節點並釋放鎖時再喚醒一次;而若是加了這個條件,線程被喚醒的時候它的前驅節點確定是首節點了,線程就有機會直接獲取同步狀態從而避免二次阻塞,節省了硬件資源。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

Condition等待通知的本質

總的來講,Condition的本質就是等待隊列和同步隊列的交互:

當一個持有鎖的線程調用Condition.await()時,它會執行如下步驟:

  1. 構造一個新的等待隊列節點加入到等待隊列隊尾
  2. 釋放鎖,也就是將它的同步隊列節點從同步隊列隊首移除
  3. 自旋,直到它在等待隊列上的節點移動到了同步隊列(經過其餘線程調用signal())或被中斷
  4. 阻塞當前節點,直到它獲取到了鎖,也就是它在同步隊列上的節點排隊排到了隊首。

當一個持有鎖的線程調用Condition.signal()時,它會執行如下操做:

從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操做;若是節點CANCELLED,就嘗試喚醒下一個節點;若是再CANCELLED則繼續迭代。

對每一個節點執行喚醒操做時,首先將節點加入同步隊列,此時await()操做的步驟3的解鎖條件就已經開啓了。而後分兩種狀況討論:

  1. 若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,此時await()方法就會完成步驟3,進入步驟4.
  2. 若是成功把先驅節點的狀態設置爲了SIGNAL,那麼就不當即喚醒了。等到先驅節點成爲同步隊列首節點並釋放了同步狀態後,會自動喚醒當前節點對應線程的,這時候await()的步驟3才執行完成,並且有很大機率快速完成步驟4.

總結  

若是知道Object的等待通知機制,Condition的使用是比較容易掌握的,由於和Object等待通知的使用基本一致。

對Condition的源碼理解,主要就是理解等待隊列,等待隊列能夠類比同步隊列,並且等待隊列比同步隊列要簡單,由於等待隊列是單向隊列,同步隊列是雙向隊列。

如下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不一樣意見:

之因此同步隊列要設計成雙向的,是由於在同步隊列中,節點喚醒是接力式的,由每個節點喚醒它的下一個節點,若是是由next指針獲取下一個節點,是有可能獲取失敗的,由於虛擬隊列每添加一個節點,是先用CAS把tail設置爲新節點,而後才修改原tail的next指針到新節點的。所以用next向後遍歷是不安全的,可是若是在設置新節點爲tail前,爲新節點設置prev,則能夠保證從tail往前遍歷是安全的。所以要安全的獲取一個節點Node的下一個節點,先要看next是否是null,若是是null,還要從tail往前遍歷看看能不能遍歷到Node。

而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,所以每次要執行喚醒操做的時候,直接喚醒等待隊列的首節點就好了。等待隊列的實現中不須要遍歷隊列,所以也不須要prev指針。

相關文章
相關標籤/搜索