深刻淺出AQS之條件隊列

相比於獨佔鎖跟共享鎖,AbstractQueuedSynchronizer中的條件隊列可能被關注的並非不少,但它在阻塞隊列的實現裏起着相當重要的做用,同時若是想全面瞭解AQS,條件隊列也是必需要學習的。node

原文地址:http://www.jianshu.com/p/3f8b...segmentfault

這篇文章會涉及到AQS中獨佔鎖跟共享鎖的一些知識,若是你已經對這兩塊內容很瞭解了,那就直接往下看。不然在讀本文以前仍是建議讀者先去看看我以前寫的兩篇文章溫習一下。
深刻淺出AQS之獨佔鎖模式
深刻淺出AQS之共享鎖模式併發

1、使用場景介紹

區別於前面兩篇文章,可能以前不少人都沒有太在乎AQS中的這塊內容,因此這篇文章咱們先來看下條件隊列的使用場景:源碼分析

//首先建立一個可重入鎖,它本質是獨佔鎖
private final ReentrantLock takeLock = new ReentrantLock();
//建立該鎖上的條件隊列
private final Condition notEmpty = takeLock.newCondition();
//使用過程
public E take() throws InterruptedException {
        //首先進行加鎖
        takeLock.lockInterruptibly();
        try {
            //若是隊列是空的,則進行等待
            notEmpty.await();
            //取元素的操做...
            
            //若是有剩餘,則喚醒等待元素的線程
            notEmpty.signal();
        } finally {
            //釋放鎖
            takeLock.unlock();
        }
        //取完元素之後喚醒等待放入元素的線程
    }

上面的代碼片斷截取自LinkedBlockingQueue,是Java經常使用的阻塞隊列之一。
從上面的代碼能夠看出,條件隊列是創建在鎖基礎上的,並且必須是獨佔鎖(緣由後面會經過源碼分析)。學習

2、執行過程概述

等待條件的過程:ui

  1. 在操做條件隊列以前首先須要成功獲取獨佔鎖,否則直接在獲取獨佔鎖的時候已經被掛起了。
  2. 成功獲取獨佔鎖之後,若是當前條件還不知足,則在當前鎖的條件隊列上掛起,與此同時釋放掉當前獲取的鎖資源。這裏能夠考慮一下若是不釋放鎖資源會發生什麼?
  3. 若是被喚醒,則檢查是否能夠獲取獨佔鎖,不然繼續掛起。

條件知足後的喚醒過程(以喚醒一個節點爲例,也能夠喚醒多個):this

  1. 把當前等待隊列中的第一個有效節點(若是被取消就無效了)加入同步隊列等待被前置節點喚醒,若是此時前置節點被取消,則直接喚醒該節點讓它從新在同步隊列裏適當的嘗試獲取鎖或者掛起。

注:說到這裏必需要解釋一個知識點,整個AQS分爲兩個隊列,一個同步隊列,一個條件隊列。只有同步隊列中的節點才能獲取鎖。前面兩篇獨佔鎖共享鎖文章中提到的加入隊列就是同步隊列。條件隊列中所謂的喚醒是把節點從條件隊列移到同步隊列,讓節點有機會去獲取鎖。線程

2、源碼深刻分析

下面的代碼稍微複雜一點,由於它考慮了中斷的處理狀況。我因爲想跟文章開頭的代碼片斷保持一致,因此選取了該方法進行說明。若是隻想看核心邏輯的話,那推薦讀者看看awaitUninterruptibly()方法的源碼。指針

//條件隊列入口,參考上面的代碼片斷
        public final void await() throws InterruptedException {
            //若是當前線程被中斷則直接拋出異常
            if (Thread.interrupted())
                throw new InterruptedException();
            //把當前節點加入條件隊列
            Node node = addConditionWaiter();
            //釋放掉已經獲取的獨佔鎖資源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //若是不在同步隊列中則不斷掛起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //中斷處理,另外一種跳出循環的方式
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //走到這裏說明節點已經條件知足被加入到了同步隊列中或者中斷了
            //這個方法很熟悉吧?就跟獨佔鎖調用一樣的獲取鎖方法,從這裏能夠看出條件隊列只能用於獨佔鎖
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //走到這裏說明已經成功獲取到了獨佔鎖,接下來就作些收尾工做
            //刪除條件隊列中被取消的節點
            if (node.nextWaiter != null) 
                unlinkCancelledWaiters();
            //根據不一樣模式處理中斷
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

流程比較複雜,一步一步來分析,首先看下加入條件隊列的代碼:code

//注:1.與同步隊列不一樣,條件隊列頭尾指針是firstWaiter跟lastWaiter
        //注:2.條件隊列是在獲取鎖以後,也就是臨界區進行操做,所以不少地方不用考慮併發
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            //若是最後一個節點被取消,則刪除隊列中被取消的節點
            //至於爲啥是最後一個節點後面會分析
            if (t != null && t.waitStatus != Node.CONDITION) {
                //刪除全部被取消的節點
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //建立一個類型爲CONDITION的節點並加入隊列,因爲在臨界區,因此這裏不用併發控制
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        //刪除取消節點的邏輯雖然長,但比較簡單,就不單獨說了,就是鏈表刪除
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

把節點加入到條件隊列中之後,接下來要作的就是釋放鎖資源:

//入參就是新建立的節點,即當前節點
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //這裏這個取值要注意,獲取當前的state並釋放,這從另外一個角度說明必須是獨佔鎖
            //能夠考慮下這個邏輯放在共享鎖下面會發生什麼?
            int savedState = getState();
            //跟獨佔鎖釋放鎖資源同樣,不贅述
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                //若是這裏釋放失敗,則拋出異常
                throw new IllegalMonitorStateException();
            }
        } finally {
            //若是釋放鎖失敗,則把節點取消,由這裏就能看出來上面添加節點的邏輯中只須要判斷最後一個節點是否被取消就能夠了
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

走到這一步,節點也加入條件隊列中了,鎖資源也釋放了,接下來就該掛起了(先忽略中斷處理,單看掛起邏輯):

//若是不在同步隊列就繼續掛起(signal操做會把節點加入同步隊列)
     while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           //中斷處理後面再分析
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
     }
    //判斷節點是否在同步隊列中
    final boolean isOnSyncQueue(Node node) {
        //快速判斷1:節點狀態或者節點沒有前置節點
        //注:同步隊列是有頭節點的,而條件隊列沒有
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //快速判斷2:next字段只有同步隊列纔會使用,條件隊列中使用的是nextWaiter字段
        if (node.next != null) 
            return true;
        //上面若是沒法判斷則進入複雜判斷
        return findNodeFromTail(node);
    }

    //注意這裏用的是tail,這是由於條件隊列中的節點是被加入到同步隊列尾部,這樣查找更快
    //從同步隊列尾節點開始向前查找當前節點,若是找到則說明在,不然不在
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

若是被喚醒且已經被轉移到了同步隊列,則會執行與獨佔鎖同樣的方法acquireQueued()進行同步隊列獨佔獲取。
最後咱們來梳理一下里面的中斷邏輯以及收尾工做的代碼:

while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           //這裏被喚醒多是正常的signal操做也多是中斷
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
     }

     //這裏的判斷邏輯是:
     //1.若是如今不是中斷的,即正常被signal喚醒則返回0
     //2.若是節點由中斷加入同步隊列則返回THROW_IE,由signal加入同步隊列則返回REINTERRUPT
     private int checkInterruptWhileWaiting(Node node) {
           return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
     }

     //修改節點狀態並加入同步隊列
     //該方法返回true表示節點由中斷加入同步隊列,返回false表示由signal加入同步隊列
     final boolean transferAfterCancelledWait(Node node) {
        //這裏設置節點狀態爲0,若是成功則加入同步隊列
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //與獨佔鎖一樣的加入隊列邏輯,不贅述
            enq(node);
            return true;
        }
        //若是上面設置失敗,說明節點已經被signal喚醒,因爲signal操做會將節點加入同步隊列,咱們只需自旋等待便可
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
     }

在把喚醒後的中斷判斷作好之後,看await()中最後一段邏輯:

//在處理中斷以前首先要作的是從同步隊列中成功獲取鎖資源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
//因爲當前節點多是因爲中斷修改了節點狀態,因此若是有後繼節點則執行刪除已取消節點的操做
//若是沒有後繼節點,根據上面的分析在後繼節點加入的時候會進行刪除
if (node.nextWaiter != null) 
      unlinkCancelledWaiters();
if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);

//根據中斷時機選擇拋出異常或者設置線程中斷狀態
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
      if (interruptMode == THROW_IE)
           throw new InterruptedException();
      else if (interruptMode == REINTERRUPT)
           //實現代碼爲:Thread.currentThread().interrupt();
           selfInterrupt();
}

至此條件隊列await操做所有分析完畢。signal()方法相對容易一些,一塊兒看源碼分析下:

//條件隊列喚醒入口
   public final void signal() {
       //若是不是獨佔鎖則拋出異常,再次說明條件隊列只適用於獨佔鎖
       if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
       //若是條件隊列不爲空,則進行喚醒操做
       Node first = firstWaiter;
       if (first != null)
            doSignal(first);
   }

   //該方法就是把一個有效節點從條件隊列中刪除並加入同步隊列
   //若是失敗則會查找條件隊列上等待的下一個節點直到隊列爲空
   private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&(first = firstWaiter) != null);
   }

    //將節點加入同步隊列
    final boolean transferForSignal(Node node) {
        //修改節點狀態,這裏若是修改失敗只有一種可能就是該節點被取消,具體看上面await過程分析
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //該方法很熟悉了,跟獨佔鎖入隊方法同樣,不贅述
        Node p = enq(node);
        //注:這裏的p節點是當前節點的前置節點
        int ws = p.waitStatus;
        //若是前置節點被取消或者修改狀態失敗則直接喚醒當前節點
        //此時當前節點已經處於同步隊列中,喚醒會進行鎖獲取或者正確的掛起操做
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

3、總結

相比於獨佔鎖跟共享鎖,條件隊列多是最不受關注的了,但因爲它是阻塞隊列實現的關鍵組件,仍是有必要了解一下其中的原理。其實我認爲關鍵點有兩條,第一是條件隊列是創建在某個具體的鎖上面的,第二是條件隊列跟同步隊列是兩個隊列,前者依賴條件喚醒後者依賴鎖釋放喚醒,瞭解了這兩點之後搞清楚條件隊列就不是什麼難事了。


至此,Java同步器AQS中三大鎖模式就都分析完了。雖然已經盡力思考,儘可能寫的清楚,但鑑於水平有限,若是有紕漏的地方,歡迎廣大讀者指正。明天就是國慶長假了,我本身也計劃出國玩一趟,散散心。提早祝廣大朋友國慶快樂。

相關文章
相關標籤/搜索