併發條件隊列之 Condition 精講

1. 條件隊列的意義

       Condition將Object監控器方法( wait , notify和notifyAll )分解爲不一樣的對象,從而經過與任意Lock實現結合使用,從而使每一個對象具備多個等待集。 Lock替換了synchronized方法和語句的使用,而Condition替換了Object監視器方法的使用。java

       條件(也稱爲條件隊列或條件變量)爲一個線程暫停執行(「等待」)直到另外一線程通知某些狀態條件如今可能爲真提供了一種方法。 因爲對該共享狀態信息的訪問發生在不一樣的線程中,所以必須對其進行保護,所以某種形式的鎖與該條件相關聯。 等待條件提供的關鍵屬性是它自動釋放關聯的鎖並掛起當前線程,就像Object.wait同樣。node

       Condition實例從本質上綁定到鎖。 要獲取特定Lock實例的Condition實例,請使用其newCondition()方法數據結構

2. 條件隊列原理

2.1 條件隊列結構

       條件隊列是一個單向鏈表,在該鏈表中咱們使用nextWaiter屬性來串聯鏈表。可是,就像在同步隊列中不會使用nextWaiter屬性來串聯鏈表同樣,在條件隊列是中,也並不會用到prev, next屬性,它們的值都爲null。併發

隊列的信息包含如下幾個部分:ide

  1. private transient Node firstWaiter;// 隊列的頭部結點
  2. private transient Node lastWaiter;// 隊列的尾部節點

隊列中節點的信息包含如下幾個部分:源碼分析

  1. 當前節點的線程 thread
  2. 當前節點的狀態 waitStatus
  3. 當前節點的下一個節點指針 nextWaiter

結構圖:優化

在這裏插入圖片描述

注意:ui

       在條件隊列中,咱們只須要關注一個值便可那就是CONDITION。它表示線程處於正常的等待狀態,而只要waitStatus不是CONDITION,咱們就認爲線程再也不等待了,此時就要從條件隊列中出隊。this

2.2 入隊原理

       每建立一個Condtion對象就會對應一個Condtion隊列,每個調用了Condtion對象的await方法的線程都會被包裝成Node扔進一個條件隊列中線程

3. 條件隊列與同步隊列

       通常狀況下,等待鎖的同步隊列和條件隊列條件隊列是相互獨立的,彼此之間並無任何關係。可是,當咱們調用某個條件隊列的signal方法時,會將某個或全部等待在這個條件隊列中的線程喚醒,被喚醒的線程和普通線程同樣須要去爭鎖,若是沒有搶到,則一樣要被加到等待鎖的同步隊列中去,此時節點就從條件隊列中被轉移到同步隊列中

1. 條件隊列轉向同步隊列圖

圖片: https://uploader.shimo.im/f/HQ1slBQjxKVpHB0t.png
                            注意圖中標紅色的線

       可是,這裏尤爲要注意的是,node是被一個一個轉移過去的,哪怕咱們調用的是signalAll()方法也是一個一個轉移過去的,而不是將整個條件隊列接在同步隊列的末尾。
       同時要注意的是,咱們在同步隊列中只使用prev、next來串聯鏈表,而不使用nextWaiter;咱們在條件隊列中只使用nextWaiter來串聯鏈表,而不使用prev、next.事實上,它們就是兩個使用了一樣的Node數據結構的徹底獨立的兩種鏈表。
       所以,將節點從條件隊列中轉移到同步隊列中時,咱們須要斷開原來的連接(nextWaiter),創建新的連接(prev, next),這某種程度上也是須要將節點一個一個地轉移過去的緣由之一。

2. 條件隊列與同步隊列的區別

       同步隊列是等待鎖的隊列,當一個線程被包裝成Node加到該隊列中時,必然是沒有獲取到鎖;當處於該隊列中的節點獲取到了鎖,它將從該隊列中移除(事實上移除操做是將獲取到鎖的節點設爲新的dummy head,並將thread屬性置爲null)。

       條件隊列是等待在特定條件下的隊列,由於調用await方法時,必然是已經得到了lock鎖,因此在進入條件隊列前線程必然是已經獲取了鎖;在被包裝成Node扔進條件隊列中後,線程將釋放鎖,而後掛起;當處於該隊列中的線程被signal方法喚醒後,因爲隊列中的節點在以前掛起的時候已經釋放了鎖,因此必須先去再次的競爭鎖,所以,該節點會被添加到同步隊列中。所以,條件隊列在出隊時,線程並不持有鎖。

3. 條件隊列與同步隊列鎖關係

條件隊列:入隊時已經持有了鎖 -> 在隊列中釋放鎖 -> 離開隊列時沒有鎖 -> 轉移到同步隊列

同步隊列:入隊時沒有鎖 -> 在隊列中爭鎖 -> 離開隊列時得到了鎖

4. 實戰用法

       例如,假設咱們有一個有界緩衝區,它支持put和take方法。 若是嘗試在空緩衝區上進行take ,則線程將阻塞,直到有可用項爲止。 若是嘗試在完整的緩衝區上進行put ,則線程將阻塞,直到有可用空間爲止。 咱們但願繼續等待put線程,並在單獨的等待集中take線程,以便咱們可使用僅當緩衝區中的項目或空間可用時才通知單個線程的優化。 這可使用兩個Condition實例來實現一個典型的生產者-消費者模型。這裏在同一個lock鎖上,建立了兩個條件隊列fullCondition, notFullCondition。當隊列已滿,沒有存儲空間時,put方法在notFull條件上等待,直到隊列不是滿的;當隊列空了,沒有數據可讀時,take方法在notEmpty條件上等待,直到隊列不爲空,而notEmpty.signal()和notFull.signal()則用來喚醒等待在這個條件上的線程。

public class BoundedQueue {
  /**
   * 生產者容器
   */
  private LinkedList<Object> buffer;
  /**
   * 容器最大值是多少
   */
  private int maxSize;
  /**
   * 鎖
   */
  private Lock lock;
  /**
   * 滿了
   */
  private Condition fullCondition;
  /**
   * 不滿
   */
  private Condition notFullCondition;
  BoundedQueue(int maxSize) {
    this.maxSize = maxSize;
    buffer = new LinkedList<Object>();
    lock = new ReentrantLock();
    fullCondition = lock.newCondition();
    notFullCondition = lock.newCondition();
  }
  /**
   * 生產者
   *
   * @param obj
   * @throws InterruptedException
   */
  public void put(Object obj) throws InterruptedException {
    //獲取鎖
    lock.lock();
    try {
      while (maxSize == buffer.size()) {
        System.out.println(Thread.currentThread().getName() + "此時隊列滿了,添加的線程進入等待狀態");
        // 隊列滿了,添加的線程進入等待狀態
        notFullCondition.await();
      }
      buffer.add(obj);
      //通知
      fullCondition.signal();
    } finally {
      lock.unlock();
    }
  }
  /**
   * 消費者
   *
   * @return
   * @throws InterruptedException
   */
  public Object take() throws InterruptedException {
    Object obj;
    lock.lock();
    try {
      while (buffer.size() == 0) {
        System.out.println(Thread.currentThread().getName() + "此時隊列空了線程進入等待狀態");
        // 隊列空了線程進入等待狀態
        fullCondition.await();
      }
      obj = buffer.poll();
      //通知
      notFullCondition.signal();
    } finally {
      lock.unlock();
    }
    return obj;
  }
  public static void main(String[] args) {
    // 初始化最大能放2個元素的隊列
    BoundedQueue boundedQueue = new BoundedQueue(2);
    for (int i = 0; i < 3; i++) {
      Thread thread = new Thread(() -> {
        try {
          boundedQueue.put("元素");
          System.out.println(Thread.currentThread().getName() + "生產了元素");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.setName("線程" + i);
      thread.start();
    }
    for (int i = 0; i < 3; i++) {
      Thread thread = new Thread(() -> {
        try {
          boundedQueue.take();
          System.out.println(Thread.currentThread().getName() + "消費了元素");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
      thread.setName("線程" + i);
      thread.start();
    }
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

輸出結果:
圖片: https://uploader.shimo.im/f/ZbYdtEOSIvSGoInd.png

5. 源碼分析

Condition接口中的方法

1. await()

       實現可中斷條件等待,其實咱們以上案例是利用ReentrantLock來實現的生產者消費者案例,進去看源碼發現其實實現該方法的是 AbstractQueuedSynchronizer 中ConditionObject實現的
       將節點添加進同步隊列中,並要麼當即喚醒線程,要麼等待前驅節點釋放鎖後將本身喚醒,不管怎樣,被喚醒的線程要從哪裏恢復執行呢?調用了await方法的地方

中斷模式interruptMode這個變量記錄中斷事件,該變量有三個值:

  1. 0 : 表明整個過程當中一直沒有中斷髮生。
  2. THROW_IE : 表示退出await()方法時須要拋出InterruptedException,這種模式對應於中斷髮生在signal以前
  3. REINTERRUPT : 表示退出await()方法時只須要再自我中斷如下,這種模式對應於中斷髮生在signal以後。
    public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加節點到條件隊列中
    Node node = addConditionWaiter();
     // 釋放當前線程所佔用的鎖,保存當前的鎖狀態
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 若是當前隊列不在同步隊列中,說明剛剛被await, 尚未人調用signal方法,
    // 則直接將當前線程掛起
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 線程掛起的地方
         // 線程將在這裏被掛起,中止運行
        // 能執行到這裏說明要麼是signal方法被調用了,要麼是線程被中斷了
        // 因此檢查下線程被喚醒的緣由,若是是由於中斷被喚醒,則跳出while循環
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 線程將在同步隊列中利用進行acquireQueued方法進行「阻塞式」爭鎖,
    // 搶到鎖就返回,搶不到鎖就繼續被掛起。所以,當await()方法返回時,
    // 必然是保證了當前線程已經持有了lock鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }

    addConditionWaiter() 方法是封裝一個節點將該節點放入條件隊列中

    private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    // 若是尾節點被cancel了,則先遍歷整個鏈表,清除全部被cancel的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 將當前線程包裝成Node扔進條件隊列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 若是當前節點爲空值那麼新建立的node節點就是第一個等待節點
    if (t == null)
        firstWaiter = node;
    // 若是當前節點不爲空值那麼新建立的node節點就加入到當前節點的尾部節點的下一個
    else
        t.nextWaiter = node;
    lastWaiter = node; // 尾部節點指向當前節點
    return node; // 返回新加入的節點
    }

    注意:

  4. 節點加入條件隊列時waitStatus的值爲Node.CONDTION。
  5. 若是入隊時發現尾節點已經取消等待了,那麼咱們就不該該接在它的後面,此時須要調用unlinkCancelledWaiters來清除那些已經取消等待的線程(條件隊列從頭部進行遍歷的,同步隊列是從尾部開始遍歷的)
    private void unlinkCancelledWaiters() {
    // 獲取隊列的頭節點
    Node t = firstWaiter;
    Node trail = null;
    // 當前節點不爲空 
    while (t != null) {
       // 獲取下一個節點
        Node next = t.nextWaiter;
        // 若是當前節點不是條件節點
        if (t.waitStatus != Node.CONDITION) {
            // 在隊列中取消當前節點
            t.nextWaiter = null;
            if (trail == null)
                // 隊列的頭節點是當前節點的下一個節點
                firstWaiter = next;
            else
                // trail的 nextWaiter 指向當前節點t的下一個節點
                // 由於此時t節點已經被取消了
                trail.nextWaiter = next;
                // 若是t節點的下一個節點爲空那麼lastWaiter指向trail
            if (next == null)
                lastWaiter = trail;
        }
        else
            // 若是是條件節點 trail 指向當前節點
            trail = t;
        // 循環賦值遍歷
        t = next;
    }
    }

    fullyRelease(node) 方法釋放當前線程所佔用的鎖

    final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 若是釋放成功
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            // 節點的狀態被設置成取消狀態,從同步隊列中移除
            node.waitStatus = Node.CANCELLED;
    }
    }
    public final boolean release(int arg) {
    // 嘗試獲取鎖,若是獲取成功,喚醒後續線程
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
    }

    線程喚醒後利用checkInterruptWhileWaiting方法檢測中斷模式

  6. 狀況一中斷髮生時,線程尚未被喚醒過

       這裏假設已經發生過中斷,則Thread.interrupted()方法必然返回true,接下來就是用transferAfterCancelledWait進一步判斷是否發生了signal。

// 檢查是否有中斷,若是在發出信號以前被中斷,則返回THROW_IE;
 // 在發出信號以後,則返回REINTERRUPT;若是沒有被中斷,則返回0。
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

       只要一個節點的waitStatus仍是Node.CONDITION,那就說明它尚未被signal過。
       因爲如今咱們分析狀況1,則當前節點的waitStatus必然是Node.CONDITION,則會成功執行compareAndSetWaitStatus(node, Node.CONDITION, 0),將該節點的狀態設置成0,而後調用enq(node)方法將當前節點添加進同步隊列中,而後返回true。

注意: 咱們此時並無斷開node的nextWaiter,因此最後必定不要忘記將這個連接斷開。
       再回到transferAfterCancelledWait調用處,可知,因爲transferAfterCancelledWait將返回true,如今checkInterruptWhileWaiting將返回THROW_IE,這表示咱們在離開await方法時應當要拋出THROW_IE異常。

// ....
   while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 線程掛起的地方
         // 線程將在這裏被掛起,中止運行
        // 能執行到這裏說明要麼是signal方法被調用了,要麼是線程被中斷了
        // 因此檢查下線程被喚醒的緣由,若是是由於中斷被喚醒,則跳出while循環
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
   // 線程將在同步隊列中利用進行acquireQueued方法進行「阻塞式」爭鎖,
   // 搶到鎖就返回,搶不到鎖就繼續被掛起。所以,當await()方法返回時,
   // 必然是保證了當前線程已經持有了lock鎖

   // 咱們這裏假設它獲取到了鎖了,因爲咱們這時
   // 的interruptMode = THROW_IE,則會跳過if語句。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 遍歷鏈表了,把鏈表中全部沒有在等待的節點都拿出去,因此這裏調用
    // unlinkCancelledWaiters方法,該方法咱們在前面await()第一部分的分析
    // 的時候已經講過了,它就是簡單的遍歷鏈表,找到全部waitStatus
    // 不爲CONDITION的節點,並把它們從隊列中移除
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 這裏咱們的interruptMode=THROW_IE,說明發生了中斷,
    // 則將調用reportInterruptAfterWait
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
 }
// 在interruptMode=THROW_IE時,就是簡單的拋出了一個InterruptedException
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

       interruptMode如今爲THROW_IE,則咱們將執行break,跳出while循環。接下來咱們將執行acquireQueued(node, savedState)進行爭鎖,注意,這裏傳入的須要獲取鎖的重入數量是savedState,即以前釋放了多少,這裏就須要再次獲取多少
狀況一總結:

  1. 線程由於中斷,從掛起的地方被喚醒
  2. 隨後,咱們經過transferAfterCancelledWait確認了線程的waitStatus值爲Node.CONDITION,說明並無signal發生過
  3. 而後咱們修改線程的waitStatus爲0,並經過enq(node)方法將其添加到同步隊列中
  4. 接下來線程將在同步隊列中以阻塞的方式獲取,若是獲取不到鎖,將會被再次掛起
  5. 線程在同步隊列中獲取到鎖後,將調用unlinkCancelledWaiters方法將本身從條件隊列中移除,該方法還會順便移除其餘取消等待的鎖
  6. 最後咱們經過reportInterruptAfterWait拋出了InterruptedException

所以:

       由此能夠看出,一個調用了await方法掛起的線程在被中斷後不會當即拋出InterruptedException,而是會被添加到同步隊列中去爭鎖,若是爭不到,仍是會被掛起;

       只有爭到了鎖以後,該線程才得以從同步隊列和條件隊列中移除,最後拋出InterruptedException。

       因此說,一個調用了await方法的線程,即便被中斷了,它依舊仍是會被阻塞住,直到它獲取到鎖以後才能返回,並在返回時拋出InterruptedException。中斷對它意義更多的是體如今將它從條件隊列中移除,加入到同步隊列中去爭鎖,從這個層面上看,中斷和signal的效果其實很像,所不一樣的是,在await()方法返回後,若是是由於中斷被喚醒,則await()方法須要拋出InterruptedException異常,表示是它是被非正常喚醒的(正常喚醒是指被signal喚醒)

  1. 狀況二中斷髮生時,線程已經被喚醒過包含如下兩種狀況
    a. 被喚醒時,已經發生了中斷,但此時線程已經被signal過了
    final boolean transferAfterCancelledWait(Node node) {
    // 線程A執行到這裏,CAS操做將會失敗
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
        enq(node);
        return true;
    }
    // 因爲中斷髮生前,線程已經被signal了,則這裏只須要等待線程成功進入同步便可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
    }

           因爲transferAfterCancelledWait返回了false,則checkInterruptWhileWaiting方法將返回REINTERRUPT,這說明咱們在退出該方法時只須要再次中斷由於signal後條件隊列加入到了同步隊列中因此node.nextWaiter爲空了,因此直接走到了reportInterruptAfterWait(interruptMode)方法

if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 這裏咱們的interruptMode=THROW_IE,說明發生了中斷,
    // 則將調用reportInterruptAfterWait
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
 }
// 在interruptMode=THROW_IE時,就是簡單的拋出了一個InterruptedException
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
   // 這裏並無拋出中斷異常,而只是將當前線程再中斷一次。
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

狀況二中的第一種狀況總結:

  1. 線程從掛起的地方被喚醒,此時既發生過中斷,又發生過signal
  2. 隨後,咱們經過transferAfterCancelledWait確認了線程的waitStatus值已經不爲Node.CONDITION,說明signal發生於中斷以前
  3. 而後,咱們經過自旋的方式,等待signal方法執行完成,確保當前節點已經被成功添加到同步隊列中
  4. 接下來線程將在同步隊列中以阻塞的方式獲取鎖,若是獲取不到,將會被再次掛起
  5. 最後咱們經過reportInterruptAfterWait將當前線程再次中斷,可是不會拋出InterruptedException

    b. 被喚醒時,並無發生中斷,可是在搶鎖的過程當中發生了中斷

       此狀況就是已經被喚醒了那麼isOnSyncQueue(node)返回true,在同步隊列中了就,退出了while循環。
       退出while循環後接下來仍是利用acquireQueued爭鎖,由於前面沒有發生中斷,則interruptMode=0,這時,若是在爭鎖的過程當中發生了中斷,則acquireQueued將返回true,則此時interruptMode將變爲REINTERRUPT。
       接下是判斷node.nextWaiter != null,因爲在調用signal方法時已經將節點移出了隊列,全部這個條件也不成立。
       最後就是彙報中斷狀態了,此時interruptMode的值爲REINTERRUPT,說明線程在被signal後又發生了中斷,這個中斷髮生在搶鎖的過程當中,這個中斷來的太晚了,所以咱們只是再次自我中斷一下。

狀況二中的第二種狀況總結:

  1. 線程被signal方法喚醒,此時並無發生過中斷
  2. 由於沒有發生過中斷,咱們將從checkInterruptWhileWaiting處返回,此時interruptMode=0
  3. 接下來咱們回到while循環中,由於signal方法保證了將節點添加到同步隊列中,此時while循環條件不成立,循環退出
  4. 接下來線程將在同步隊列中以阻塞的方式獲取,若是獲取不到鎖,將會被再次掛起
  5. 線程獲取到鎖返回後,咱們檢測到在獲取鎖的過程當中發生過中斷,而且此時interruptMode=0,這時,咱們將interruptMode修改成REINTERRUPT
  6. 最後咱們經過reportInterruptAfterWait將當前線程再次中斷,可是不會拋出InterruptedException

3.狀況三一直沒發生中斷
       直接正常返回

await方法總結

  1. 進入await()時必須是已經持有了鎖
  2. 離開await()時一樣必須是已經持有了鎖
  3. 調用await()會使得當前線程被封裝成Node扔進條件隊列,而後釋放所持有的鎖
  4. 釋放鎖後,當前線程將在條件隊列中被掛起,等待signal或者中斷
  5. 線程被喚醒後會將會離開條件隊列進入同步隊列中進行搶鎖
  6. 若在線程搶到鎖以前發生過中斷,則根據中斷髮生在signal以前仍是以後記錄中斷模式
  7. 線程在搶到鎖後進行善後工做(離開條件隊列,處理中斷異常)
  8. 線程已經持有了鎖,從await()方法返回
    在這裏插入圖片描述
           在這一過程當中咱們尤爲要關注中斷,如前面所說,中斷和signal所起到的做用都是將線程從條件隊列中移除,加入到同步隊列中去爭鎖,所不一樣的是,signal方法被認爲是正常喚醒線程,中斷方法被認爲是非正常喚醒線程,若是中斷髮生在signal以前,則咱們在最終返回時,應當拋出InterruptedException;若是中斷髮生在signal以後,咱們就認爲線程自己已經被正常喚醒了,這個中斷來的太晚了,咱們直接忽略它,並在await()返回時再自我中斷一下,這種作法至關於將中斷推遲至await()返回時再發生。

2. awaitUninterruptibly

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
       //  發生了中斷後線程依舊留在了條件隊列中,將會再次被掛起
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

       因而可知,awaitUninterruptibly()全程忽略中斷,即便是當前線程由於中斷被喚醒,該方法也只是簡單的記錄中斷狀態,而後再次被掛起(由於並無並無任何操做將它添加到同步隊列中)要使當前線程離開條件隊列去爭鎖,則必須是發生了signal事件。
       最後,當線程在獲取鎖的過程當中發生了中斷,該方法也是不響應,只是在最終獲取到鎖返回時,再自我中斷一下。能夠看出,該方法和「中斷髮生於signal以後的」REINTERRUPT模式的await()方法很像

方法總結

  1. 中斷雖然會喚醒線程,可是不會致使線程離開條件隊列,若是線程只是由於中斷而被喚醒,則他將再次被掛起
  2. 只有signal方法會使得線程離開條件隊列
  3. 調用該方法時或者調用過程當中若是發生了中斷,僅僅會在該方法結束時再自我中斷如下,不會拋出InterruptedException

    3. awaitNanos

       該方法幾乎和await()方法同樣,只是多了超時時間的處理該方法的主要設計思想是,若是設定的超時時間還沒到,咱們就將線程掛起;超過等待的時間了,咱們就將線程從條件隊列轉移到同步對列中。

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

4. await(long time, TimeUnit unit)

       在awaitNanos(long nanosTimeout)的基礎上多了對於超時時間的時間單位的設置,可是在內部實現上仍是會把時間轉成納秒去執行。
       能夠看出,這兩個方法主要的差異就體如今返回值上面,awaitNanos(long nanosTimeout)的返回值是剩餘的超時時間,若是該值大於0,說明超時時間還沒到,則說明該返回是由signal行爲致使的,而await(long time, TimeUnit unit)的返回值就是transferAfterCancelledWait(node)的值,咱們知道,若是調用該方法時,node尚未被signal過則返回true,node已經被signal過了,則返回false。所以當await(long time, TimeUnit unit)方法返回true,則說明在超時時間到以前就已經發生過signal了,該方法的返回是由signal方法致使的而不是超時時間。

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

5. awaitUntil

       awaitUntil(Date deadline)方法與上面的幾種帶超時的方法也基本相似,所不一樣的是它的超時時間是一個絕對的時間

public final boolean awaitUntil(Date deadline)
        throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

6. signal

只喚醒一個節點

public final void signal() {
 // getExclusiveOwnerThread() == Thread.currentThread(); 當前線
 // 程是否是獨佔線程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取第一個阻塞線程節點
    Node first = firstWaiter;
    // 條件隊列是否爲空
    if (first != null)
        doSignal(first);
}
// 遍歷整個條件隊列,找到第一個沒有被cancelled的節點,並將它添加到條件隊列的末尾
// 若是條件隊列裏面已經沒有節點了,則將條件隊列清空
private void doSignal(Node first) {
    do {
        // 將firstWaiter指向條件隊列隊頭的下一個節點
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 將條件隊列原來的隊頭從條件隊列中斷開,則此時該節點成爲一個孤立的節點
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

方法總結:
       調用signal()方法會從當前條件隊列中取出第一個沒有被cancel的節點添加到sync隊列的末尾。

7. signalAll

喚醒全部的節點

public final void signalAll() {
 // getExclusiveOwnerThread() == Thread.currentThread(); 當前線
 // 程是否是獨佔線程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 獲取第一個阻塞線程節點
    Node first = firstWaiter;
   // 條件隊列是否爲空
    if (first != null)
        doSignalAll(first);
}
// 移除並轉移全部節點
private void doSignalAll(Node first) {
    // 清空隊列中全部數據
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
// 將條件隊列中的節點一個一個的遍歷到同步隊列中
final boolean transferForSignal(Node node) {
  // 若是該節點在調用signal方法前已經被取消了,則直接跳過這個節點
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
 // 利用enq方法將該節點添加至同步隊列的尾部   
    Node p = enq(node); 
    // 返回的是前驅節點,將其設置SIGNAL以後,纔會掛起
    // 當前節點
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

       在transferForSignal方法中,咱們先使用CAS操做將當前節點的waitStatus狀態由CONDTION設爲0,若是修改不成功,則說明該節點已經被CANCEL了則咱們直接返回操做下一個節點;若是修改爲功,則說明咱們已經將該節點從等待的條件隊列中成功「喚醒」了,但此時該節點對應的線程並無真正被喚醒,它還要和其餘普通線程同樣去爭鎖,所以它將被添加到同步隊列的末尾等待獲取鎖 。
方法總結:

  1. 將條件隊列清空(只是令lastWaiter = firstWaiter = null,隊列中的節點和鏈接關係仍然還存在)
  2. 將條件隊列中的頭節點取出,使之成爲孤立節點(nextWaiter,prev,next屬性都爲null)
  3. 若是該節點處於被Cancelled了的狀態,則直接跳過該節點(因爲是孤立節點,則會被GC回收)
  4. 若是該節點處於正常狀態,則經過enq方法將它添加到同步隊列的末尾
  5. 判斷是否須要將該節點喚醒(包括設置該節點的前驅節點的狀態爲SIGNAL),若有必要,直接喚醒該節點
  6. 重複2-5,直到整個條件隊列中的節點都被處理完

6. 總結

       以上即是Condition的分析,下一篇文章將是併發容器類的分析,若有錯誤之處,幫忙指出及時更正,謝謝, 若是喜歡謝謝點贊加收藏加轉發(轉發註明出處謝謝!!!)

相關文章
相關標籤/搜索