多線程學習筆記四之Condition實現分析

簡介

  在使用內置鎖synchronized時,經過調用java.lang.Objec中定義的監視器方法,主要有wait()、wait(long timeout)、notify()和notifyAll()方法,能夠實現等待/通知模式。Codition接口中也定義了相似的監視器方法,與顯示鎖Lock配合使用也能夠實現等待/通知模式。
  當線程須要利用Condition對象進行等待時,須要提早獲取到Condition對象關聯的顯示鎖Lock對象,使用案例以下:node

Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //等待
    public void coditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        }finally {
            lock.unlock();
        }
    }

    //通知
    public void coditionSignal() throws InterruptedException {
        lock.lock();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

  Condition接口由同步器AbstractQueuedSynchronizer內部類ConditionObject提供實現,而顯示鎖Lock對象實現時內部類Sync會繼承AQS,從而把Condition對象與Lock對象關聯起來。併發

等待隊列

  在上一篇博客中介紹到爲了處理多個線程競爭同一把鎖,同步器AQS中維護了一個先入先出的雙向同步隊列,讓競爭失敗的線程進入同步隊列等待。一樣,AQS在實現Condition接口也維護了一個先入先出的單向等待隊列,當一個與Lock對象關聯的Condition對象調用await方法,得到鎖的線程就要釋放鎖,並推出同步隊列head頭節點,進入condition等待隊列。condition隊列規定了頭節點firstWaiter和尾節點lastWaiter。less

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

AQS中構建等待隊列複用了內部類Node結點類源碼分析

static final class Node {
        //等待狀態
        volatile int waitStatus;

        //前驅結點
        volatile Node prev;
    
        //後繼節點
        volatile Node next;
        
        //等待獲取鎖的線程
        volatile Thread thread;
        
        //condition隊列的後繼節點
        Node nextWaiter;      
    }

nextWaiter

  從上圖能夠發現,Condition等待隊列是一個先入先出的單向鏈表,從鏈表尾部加入元素,頭部移出鏈表。使用nextWaiter指向下一個等待節點,構成鏈表的基本元素是節點Node,複用了AQS中的Node類,nextWaiter並不僅僅在Condition鏈表指向下一個等待節點。這是Node類定義nextWaiter的註釋:ui

Link to next node waiting on condition, or the special value SHARED. Because condition queues are accessed only when holding in exclusive mode, we just need a simple linked queue to hold nodes while they are waiting on conditions. They are then transferred to the queue to re-acquire. And because conditions can only be exclusive,we save a field by using special value to indicate sharedmode.this

大意是隻有獨佔鎖纔會關聯Condition隊列,經過nextWaiter變量在構成同步隊列節點標識同步鎖是獨佔鎖仍是共享鎖,從如下方法能夠看出AQS使用nextWaiter來表示鎖:線程

/** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    //判斷是不是共享鎖
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //構建同步隊列節點,nextWaiter標識同步鎖是獨佔鎖仍是共享鎖
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //構建等待隊列節點,nextWaiter指向單向鏈表下一個節點
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }

從以上分析能夠看出:AQS複用了Node類來構建同步隊列和等待隊列,Node用來構建同步隊列節點,nextWaiter標識同步鎖是獨佔鎖仍是共享鎖;Node用來構建等待隊列節點,nextWaiter指向單向鏈表下一個節點。剛開始看這一部分時,對我形成了很大的困擾,因此特意寫出來。code

源碼分析

await()

  await實現等待考慮到了中斷,若當前線程等待期間發生中斷,拋出InterruptedException異常。線程在等待期間會被阻塞,直到發生中斷或者Condition對象調用signal方法。基本流程:首先將node加入condition隊列,而後釋放鎖,掛起當前線程等待喚醒,喚醒後線程從新進入同步隊列並調用acquireQueued獲取鎖。流程圖以下:
對象

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //將當前線程加入Condition等待隊列          
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //判斷當前線程是否在同步隊列中
        while (!isOnSyncQueue(node)) {
            //阻塞當前線程
            LockSupport.park(this);
            //在阻塞的過程當中發生中斷
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //被其餘線程喚醒,退出Condition等待隊列加入同步隊列
        //獲取鎖
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
  • addConditionWaiter()
      以當前線程構成節點Node加入等待隊列,由於加入Condition等待隊列在釋放鎖以前,因此不須要考慮併發的狀況,就不須要像加入同步隊列採用循環加CAS的機制。
private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //若是尾節點lastWaiter等待狀態是CANCELLED,將隊列全部CANCELLED節點清除
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //以當前線程構成節點
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        //尾節點爲空,等待隊列爲空,進行初始化,當前節點是等待隊列的頭節點
        if (t == null)
            firstWaiter = node;
        //不然添加到等待隊列的尾部,當前節點是等待隊列新的lastWaiter
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }


    //unlinkCancelledWaiters方法遍歷CONDITION隊列,刪除狀態爲CANCELLED的節點。
    private void unlinkCancelledWaiters() {
        //首節點
        Node t = firstWaiter;
        //保存遍歷節點前驅節點的引用
        Node trail = null;
        //單向鏈表從前日後遍歷
        while (t != null) {
            //下一個節點
            Node next = t.nextWaiter;
            //節點t的waitStatus爲CANCELLED
            if (t.waitStatus != Node.CONDITION) {
                 t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                     trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
  • fullyRelease(Node node)
      徹底釋放鎖,釋放成功則返回,失敗則將當前節點(在Condition隊列)的狀態設置成CANCELLED表示當前節點失效
final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //獲取同步狀態
            int savedState = getState();
            //若是是重入鎖,要屢次釋放
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
  • isOnSyncQueue(Node node)
      判斷node節點是否被signal方法從condition隊列轉移到同步隊列
final boolean isOnSyncQueue(Node node) {
        //轉移到同步隊列,CONDITION狀態會被清除
        //同步隊列prev表示前驅結點,不爲null
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //同步隊列next表示後繼節點,不爲null
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        //遍歷同步隊列,一個一個找
        return findNodeFromTail(node);
    }
  • checkInterruptWhileWaiting(Node node)
      檢查當前線程在等待狀態時中斷狀態,返回REINTERRUPT標誌位,退出等待狀態時調用selfInterrupt方法產生中斷;返回THROW_IE標誌位,線程退出等待狀態時會拋出InterruptedException異常。
//表示從等待狀態退出時會從新產生一箇中斷,但不會拋出異常
        private static final int REINTERRUPT =  1;
        //從等待狀態退出時拋出InterruptedException異常
        private static final int THROW_IE    = -1;

        /**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
  • reportInterruptAfterWait(int interruptMode)
      根據interruptMode對應的標誌位響應中斷
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        //產生異常
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        //產生中斷
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

signal()

  檢查當前線程是否佔據獨佔鎖,喚醒等待在當前Condition對象等待最久的線程(等待隊列的頭節點)

public final void signal() {
        //檢查當前線程是否佔據獨佔鎖,若是不是沒有權限喚醒等待線程,拋出異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }
  • transferForSignal(Node node)
      將當前線程從Condition等待隊列轉移到同步隊列中,看到這裏應該明白爲何await方法以節點是否在同步隊列(isOnSyncQueue(node))作爲循環條件了。
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //若是CAS設置失敗,說明節點在signal以前被取消了,返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        //CAS設置成功,入隊
        //插入節點的前驅節點
        Node p = enq(node);
        //前驅節點的等待狀態
        int ws = p.waitStatus;
        //若是p等待狀態爲CANECLLED或對p進行CAS設置失敗,則喚醒線程,讓node中線程進入acquireQueued方法。不然
        //因爲前驅節點等待狀態爲signal,由同步器喚醒線程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

signalAll()

  將等待隊列全部節點依次轉移到同步隊列末尾。

public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }

    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            //first節點從condition隊列移出
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            //first節點加入同步隊列
            transferForSignal(first);
            //更新first節點指向
            first = next;
        } while (first != null);
    }

總結

  以上是對AQS中內部類ConditionObject對Condition接口實現的簡單分析。

相關文章
相關標籤/搜索