死磕java concurrent包系列(三)基於ReentrantLock理解AQS的條件隊列

基於Codition分析AQS的條件隊列

前言

上一篇咱們講了AQS中的同步隊列隊列,如今咱們研究一下條件隊列。java

在java中最多見的加鎖方式就是synchorinzed和Reentrantlock,咱們都說Reentrantlock比synchorinzed更加靈活,其實就靈活在Reentrantlock中的條件隊列的用法上。node

Condition接口

它是在java1.5中引入的一個接口,主要是爲了替代object類中的wait、notify方法,以一種更靈活的方式解決線程之間的通訊問題:spring

public interface Condition {

 //使當前線程進入等待狀態直到被通知(signal)
 void await() throws InterruptedException;

 //當前線程進入等待狀態,直到被喚醒,該方法不響應中斷要求
 void awaitUninterruptibly();

 //調用該方法,當前線程進入等待狀態,直到被喚醒或被中斷或超時
 //其中nanosTimeout指的等待超時時間,單位納秒
 long awaitNanos(long nanosTimeout) throws InterruptedException;

  //同awaitNanos,但能夠指明時間單位
  boolean await(long time, TimeUnit unit) throws InterruptedException;

 //調用該方法當前線程進入等待狀態,直到被喚醒、中斷或到達某個時
 //間期限(deadline),若是沒到指定時間就被喚醒,返回true,其餘狀況返回false
  boolean awaitUntil(Date deadline) throws InterruptedException;

 //喚醒一個等待在Condition上的線程,該線程從等待方法返回前必須
 //獲取與Condition相關聯的鎖,功能與notify()相同
  void signal();

 //喚醒全部等待在Condition上的線程,該線程從等待方法返回前必須
 //獲取與Condition相關聯的鎖,功能與notifyAll()相同
  void signalAll();
}
複製代碼

最重要的是await方法使線程進入等待狀態,再經過signal方法喚醒。接下來咱們結合實際例子分析。bash

Condition能夠解決什麼問題

假設有一個生產者-消費者的場景:數據結構

一、生產者有兩個線程產生烤雞;消費者有兩個線程消費烤雞併發

二、四個線程一塊兒執行,但同時只能有一個生產者線程生成烤雞,一個消費者線程消費烤雞。ui

三、只有產生了烤雞,才能通知消費線程去消費,不然只能等着;this

四、只有消費了烤雞,才能通知生產者線程去生產,不然只能等着spa

因而乎,咱們使用ReentrantLock控制併發,並使用它生成兩組Condition對象,productCondition和consumeCondition:前者控制生產者線程,後者控制消費者線程。當isHaveChicken爲true時,表明烤雞生成完畢,生產線程必須進入等待狀態同時喚醒消費線程進行消費,消費線程消費完畢後將flag設置爲false,表明烤雞消費完成,進入等待狀態,同時喚醒生產線程生產烤雞。。。。。。線程

package com.springsingleton.demo.Chicken;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ChikenStore {

  ReentrantLock reentrantLock = new ReentrantLock();

  Condition productCondition = reentrantLock.newCondition();

  Condition consumeCondition = reentrantLock.newCondition();

  private int count = 0;

  private volatile boolean isHaveChicken = false;

  //生產
  public void ProductChicken() {
    reentrantLock.lock();
    while (isHaveChicken) {
      try {
        System.out.println("有烤雞了" + Thread.currentThread().getName() + "不生產了");
        productCondition.await();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count++;
    System.out.println(Thread.currentThread().getName() + "產生了第" + count + "個烤雞,趕忙開始賣");
    isHaveChicken = true;
    consumeCondition.signal();
    reentrantLock.unlock();
  }

  public void SellChicken() {
    reentrantLock.lock();
    while (!isHaveChicken) {
      try {
        System.out.println("沒有烤雞了" + Thread.currentThread().getName() + "不賣了");
        consumeCondition.await();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count--;
    isHaveChicken = false;
    System.out.println(Thread.currentThread().getName() + "賣掉了第" + count + 1 + "個烤雞,趕忙開始生產");
    productCondition.signal();
    reentrantLock.unlock();
  }

  public static void main(String[] args) {
    ChikenStore chikenStore = new ChikenStore();
    new Thread(() -> {
      Thread.currentThread().setName("生產者1號");
      while (true) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("生產者2號");
      for (; ; ) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消費者1號");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消費者2號");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();

  }
}

複製代碼

輸出:

生產者1號產生了第1個烤雞,趕忙開始賣
有烤雞了生產者1號不生產了
有烤雞了生產者2號不生產了
消費者1號賣掉了第01個烤雞,趕忙開始生產
沒有烤雞了消費者1號不賣了
生產者1號產生了第1個烤雞,趕忙開始賣
有烤雞了生產者1號不生產了
消費者1號賣掉了第01個烤雞,趕忙開始生產
沒有烤雞了消費者1號不賣了
沒有烤雞了消費者2號不賣了
生產者2號產生了第1個烤雞,趕忙開始賣
有烤雞了生產者2號不生產了
消費者1號賣掉了第01個烤雞,趕忙開始生產
沒有烤雞了消費者1號不賣了
生產者1號產生了第1個烤雞,趕忙開始賣
有烤雞了生產者1號不生產了
消費者2號賣掉了第01個烤雞,趕忙開始生產
沒有烤雞了消費者2號不賣了
複製代碼

若是用synchorinzed的話:

package com.springsingleton.demo.Chicken;

public class ChickenStoreSync {

  private int count = 0;

  private volatile boolean isHaveChicken = false;

  public synchronized void ProductChicken() {
    while (isHaveChicken) {
      try {
        System.out.println("有烤雞了" + Thread.currentThread().getName() + "不生產了");
        this.wait();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count++;
    System.out.println(Thread.currentThread().getName() + "產生了第" + count + "個烤雞,趕忙開始賣");
    isHaveChicken = true;
    notifyAll();
  }

  public synchronized void SellChicken() {
    while (!isHaveChicken) {
      try {
        System.out.println("沒有烤雞了" + Thread.currentThread().getName() + "不賣了");
        this.wait();
      } catch (Exception e) {
        System.out.println("error" + e.getMessage());
      }
    }
    count--;
    isHaveChicken = false;
    System.out.println(Thread.currentThread().getName() + "賣掉了第" + count + 1 + "個烤雞,趕忙開始生產");
    notifyAll();
  }

  public static void main(String[] args) {
    ChickenStoreSync chikenStore = new ChickenStoreSync();
    new Thread(() -> {
      Thread.currentThread().setName("生產者1號");
      while (true) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("生產者2號");
      for (; ; ) {
        chikenStore.ProductChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消費者1號");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();
    new Thread(() -> {
      Thread.currentThread().setName("消費者2號");
      while (true) {
        chikenStore.SellChicken();
      }
    }).start();

  }
}
複製代碼

如上代碼,在調用notify()或者 notifyAll()方法時,因爲synchronized等待隊列中同時存在生產者線程和消費者線程,因此咱們並不能保證被喚醒的究竟是消費者線程仍是生產者線程,而Codition則能夠避免這種狀況。

AQS中Condition的實現原理

Condition的具體實現類是AQS的內部類ConditionObject,以前咱們分析過AQS中存在兩種隊列,一種是同步隊列,一種是條件隊列,而條件隊列是基於Condition實現的。注意在使用Condition前必須得到鎖(由於condition通常是由lock構造出來的,它依賴於lock),同時在Condition的條件隊列上的也有一個Node節點,其結點的waitStatus的值爲CONDITION。在實現類ConditionObject中有兩個結點分別是firstWaiter和lastWaiter,firstWaiter表明等待隊列第一個等待結點,lastWaiter表明等待隊列最後一個等待結點

public class ConditionObject implements Condition, java.io.Serializable {
    //等待隊列第一個等待結點
    private transient Node firstWaiter;
    //等待隊列最後一個等待結點
    private transient Node lastWaiter;
    //省略.......
}
複製代碼

每一個Condition都對應着一個條件隊列;一個鎖上能夠建立多個Condition對象,那麼也就存在多個條件隊列。條件隊列一樣是一個FIFO的隊列,在隊列中每個節點都包含了一個線程的引用,而該線程就是Condition對象上等待的線程。

當一個線程調用了await()相關的方法,那麼該線程將會釋放鎖,並構建一個Node節點封裝當前線程的相關信息加入到條件隊列中進行等待,直到被喚醒、中斷、超時才從隊列中移出。Condition中的等待隊列模型以下


正如圖所示,Node節點的數據結構,和同步隊列的node相比,Condtion中等待隊列的是一個單向的,並且使用的變量是nextWaiter而不是next,這點咱們在前面分析結點Node的數據結構時講過。firstWaiter指向條件隊列的頭結點,lastWaiter指向條件隊列的尾結點,條件隊列中結點的狀態只有兩種即CANCELLED和CONDITION,前者表示線程已結束須要從等待隊列中移除,後者表示條件結點等待被喚醒。

每一個Codition對象對於一個條件隊列,也就是說AQS中只能存在一個同步隊列,但可擁有多個條件隊列(以前烤雞的例子就有兩個new出來的condition的隊列)。下面從代碼層面看看被調用await()方法(其餘await()實現原理相似)的線程是如何加入等待隊列的,而又是如何從等待隊列中被喚醒的。

public final void await() throws InterruptedException {
      //判斷線程是否被中斷
      if (Thread.interrupted())
          throw new InterruptedException();
      //建立新結點加入等待隊列並返回
      Node node = addConditionWaiter();
      //釋放當前線程鎖即釋放同步狀態
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      //判斷結點是否同步隊列(SyncQueue)中,便是否被喚醒
      while (!isOnSyncQueue(node)) {
          //掛起線程
          LockSupport.park(this);
          //判斷是否被中斷喚醒,若是是退出循環。
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }
      //被喚醒後 自旋操做爭取得到鎖,同時判斷線程是否被中斷
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
       // clean up if cancelled
      if (node.nextWaiter != null) 
          //清理等待隊列中不爲CONDITION狀態的結點
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
  }
複製代碼

再看看addConditionWaiter方法,添加到等待隊列:

private Node addConditionWaiter() {
    Node t = lastWaiter;
      // 判斷是否爲結束狀態的結點並移除
      if (t != null && t.waitStatus != Node.CONDITION) {
          unlinkCancelledWaiters();
          t = lastWaiter;
      }
      //建立新結點狀態爲CONDITION
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      //加入等待隊列
      if (t == null)
          firstWaiter = node;
      else
          t.nextWaiter = node;
      lastWaiter = node;
      return node;
}
複製代碼

await()方法主要作了3件事:

一是調用addConditionWaiter()方法將當前線程封裝成node結點加入等待隊列。

二是調用fullyRelease(node)方法釋放同步狀態並喚醒後繼結點的線程。

三是調用isOnSyncQueue(node)方法判斷結點是否在同步隊列中,這裏是個while循環,若是同步隊列中沒有該結點就直接掛起該線程,須要明白的是若是線程被喚醒後就調用acquireQueued(node, savedState)執行自旋操做爭取鎖,即當前線程結點從等待隊列轉移到同步隊列並開始努力獲取鎖。

接下來看看Singnal

public final void signal() {
     //判斷是否持有獨佔鎖,若是不是拋出異常
   if (!isHeldExclusively())
          throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      //喚醒等待隊列第一個結點的線程
      if (first != null)
          doSignal(first);
 }

複製代碼

這裏signal()方法作了兩件事:

一是判斷當前線程是否持有獨佔鎖,沒有就拋異常。

二是喚醒等待隊列的第一個結點,即執行doSignal(first)

private void doSignal(Node first) {
     do {
             //移除條件等待隊列中的第一個結點,
             //若是後繼結點爲null,那麼說明沒有其餘結點了,因此將尾結點也設置爲null
            if ( (firstWaiter = first.nextWaiter) == null)
                 lastWaiter = null;
             first.nextWaiter = null;
          //若是被通知節點沒有進入到同步隊列而且條件等待隊列還有不爲空的節點,則繼續循環通知後續結點
         } while (!transferForSignal(first) &&
                  (first = firstWaiter) != null);
        }

//transferForSignal方法
final boolean transferForSignal(Node node) {
    //嘗試設置喚醒結點的waitStatus爲0,即初始化狀態
    //若是compareAndSetWaitStatus返回false,說明當期結點node的waitStatus已不爲
    //CONDITION狀態,那麼只能是結束狀態了,因此返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
         return false;
    }
    //加入同步隊列並返回前驅結點p
    Node p = enq(node);
    int ws = p.waitStatus;
    //判斷前驅結點是否爲結束結點(CANCELLED=1)或者在設置
    //前驅節點狀態爲Node.SIGNAL狀態失敗時,喚醒被通知節點表明的線程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
        //喚醒node結點的線程
        LockSupport.unpark(node.thread);
        return true;
    }
}

複製代碼

doSignal(first)方法中作了2件事:

一是從條件隊列移除被喚醒的節點,而後從新維護條件條件隊列的firstWaiter和lastWaiter的指向。

二是將從條件隊列移除的結點加入同步隊列(在transferForSignal()方法中完成的),若是進入到同步隊列失敗而且條件隊列還有不爲空的節點,則繼續循環喚醒後續其餘結點的線程。

總結:

signal()被調用後,先判斷當前線程是否獲取鎖,若是有,那麼喚醒當前Condition對象中條件隊列的第一個結點的線程,並從條件隊列中移除該結點,移動到同步隊列中,若是加入同步隊列失敗(此時只有可能線程被取消),那麼繼續循環喚醒條件隊列中的其餘結點的線程,若是成功加入同步隊列,那麼若是其前驅結點是否已結束或者設置前驅節點狀態爲Node.SIGNAL狀態失敗,則經過LockSupport.unpark()喚醒被通知節點表明的線程,到此signal()任務完成,注意被喚醒後的線程,將從前面的await()方法中的while循環中退出,由於此時該線程的結點已在同步隊列中,那麼while (!isOnSyncQueue(node))將不在符合循環條件,進而調用AQS的acquireQueued()方法加入獲取同步狀態的競爭中,這就是等待喚醒機制的整個流程實現原理,流程以下圖(注意不管是同步隊列仍是條件隊列使用的Node數據結構都是同一個,不過是使用的內部變量不一樣罷了)

相關文章
相關標籤/搜索