死磕java concurrent包系列(四)基於AQS的條件隊列完全理解ArrayBlockingQueue

阻塞隊列概覽

上篇文章咱們分析了AQS中的同步隊列和條件隊列,而ArrayBlockingQueue和LinkedBlockingQueue正是基於AQS實現的,若是對AQS和ReentrantLock的條件隊列不熟悉的話,建議去看https://juejin.im/post/5c053e546fb9a049fc034924,它與咱們平時接觸的LinkedList和ArrayList相比,最大的特色就是:java

  • 阻塞添加 當阻塞隊列的元素已經滿的時,隊列會阻塞加入元素的線程(讓線程睡一會),等隊列不滿時再從新喚醒它執行入隊操做
  • 阻塞移出 阻塞移出是在隊列元素爲空的時候,刪除隊列元素的線程會被阻塞,直到隊列不爲空再執行刪除操做 咱們先看一下代碼,BlockingQueue繼承自Queue接口:
public interface BlockingQueue<E> extends Queue<E> {


    boolean add(E e); 

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 

    void put(E e) throws InterruptedException; 

    E take() throws InterruptedException; 

    E poll(long timeout, TimeUnit unit) throws InterruptedException; 

    boolean remove(Object o); 
}

    //除了上述方法還有繼承自Queue接口的方法 
    //獲取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementException 
    E element(); 

    //獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。 
    E peek(); 

    //獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。 
    E poll();

複製代碼

總結一下:node

  • 插入方法
    • add(E e) :添加到隊列,成功則返回true,失敗則拋異常
    • offer(E e):成功返回true,若是隊列滿則返回false
    • put(E e):將元素添加到隊列尾部,若是隊列滿則一直阻塞直到隊列有空位爲止
  • 刪除方法
    • remove(E e) :刪除指定元素,成功則返回true,失敗則返回false
    • poll(E e):獲取並移出隊列的頭元素,若隊列爲空,則返回null
    • take(E e):獲取並移出隊列頭元素,若沒有元素,則一直阻塞
  • 查詢方法
    • element():獲取但不移除此隊列的頭元素,沒有則跑異常NoSuchElementException
    • peek():獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。

這就是阻塞隊列基本的增刪查方法,接下來咱們看一下如何使用它。 #ArrayBlockingQueue阻塞隊列的使用方法 再次回到上一篇文章的場景,基於生產者-消費者,生產者產生烤雞,消費者消費烤雞,若是使用ArrayBlockingQueue來實現,會比直接經過condition隊列實現簡單一些:spring

package com.springsingleton.demo.Chicken;

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueTest {

  //定義吃雞隊列,隊列大小是1
  private ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);

  @SuppressWarnings("unchecked")
  private void product() {
    Chicken chicken = new Chicken();
    try {
      arrayBlockingQueue.put(chicken);
      System.out.println(Thread.currentThread().getName()+" has produced a Chicken");
    }catch (InterruptedException e){
      System.out.println(e.getMessage());
    }
  }

  private void consume(){
    try {
      //每次消費前先睡一秒鐘
      Thread.sleep(1000);
      arrayBlockingQueue.take();
      System.out.println(Thread.currentThread().getName()+" has eaten a Chicken");
    }catch (InterruptedException e){
      System.out.println(e.getMessage());
    }
  }

  public static void main(String args[]){
    ArrayBlockingQueueTest arrayBlockingQueueTest = new ArrayBlockingQueueTest();
    new Thread( ()->{
      while (true){
        Thread.currentThread().setName("生產者一號");
        arrayBlockingQueueTest.product();
      }
    }
    ).start();
    new Thread( ()->{
      while (true){
        Thread.currentThread().setName("生產者二號");
        arrayBlockingQueueTest.product();
      }
    }
    ).start();
    new Thread( ()->{
      while (true){
        Thread.currentThread().setName("吃雞者一號");
        arrayBlockingQueueTest.consume();
      }
    }
    ).start();
    new Thread( ()->{
      while (true){
        Thread.currentThread().setName("吃雞者二號");
        arrayBlockingQueueTest.consume();
      }
    }
    ).start();
  }
}

複製代碼

輸出以下: 數組

QQ20181210-212319.gif
咱們稍微瞥一眼它的構造方法:

//默認非公平阻塞隊列
ArrayBlockingQueue queue = new ArrayBlockingQueue(666);
//公平阻塞隊列
ArrayBlockingQueue queue1 = new ArrayBlockingQueue(666,true);

//構造方法源碼
public ArrayBlockingQueue(int capacity) {
     this(capacity, false);
 }

public ArrayBlockingQueue(int capacity, boolean fair) {
     if (capacity <= 0)
         throw new IllegalArgumentException();
     this.items = new Object[capacity];
     lock = new ReentrantLock(fair);
     notEmpty = lock.newCondition();
     notFull =  lock.newCondition();
 }
複製代碼

經過構造方法發現:它的內部經過一個ReentrantLock和兩個條件隊列構成,既然是ReentrantLock,那麼就有公平和非公平之分了,不懂ReetrantLock的去看上一篇文章:juejin.im/post/5c021b… ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,對於公平訪問隊列,被阻塞的線程能夠按照阻塞的前後順序訪問隊列,即先阻塞的線程先訪問隊列。而非公平隊列,當隊列可用時,阻塞的線程將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的前後順序。安全

ArrayBlockingQueue源碼分析

ArrayBlockingQueue的內部是經過一個可重入鎖ReentrantLock和兩個Condition條件對象來實現阻塞,這裏先看看其內部成員變量bash

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存儲數據的數組 */
    final Object[] items;

    /**獲取數據的索引,主要用於take,poll,peek,remove方法 */
    int takeIndex;

    /**添加數據的索引,主要用於 put, offer, or add 方法*/
    int putIndex;

    /** 隊列元素的個數 */
    int count;


    /** 控制並不是訪問的鎖 */
    final ReentrantLock lock;

    /**notEmpty條件對象,用於通知take方法隊列已有元素,可執行獲取操做 */
    private final Condition notEmpty;

    /**notFull條件對象,用於通知put方法隊列未滿,可執行添加操做 */
    private final Condition notFull;


}

複製代碼

ArrayBlockingQueue內部確實是經過數組對象items來存儲全部的數據,ArrayBlockingQueue經過一個ReentrantLock來同時控制添加線程與移除線程的併發訪問,這點與LinkedBlockingQueue區別很大(稍後會分析)。 notEmpty條件隊列則是用於存放等待或喚醒調用take方法的線程,告訴他們隊列已有元素,能夠執行獲取操做。 同理notFull條件對象是用於等待或喚醒調用put方法的線程,告訴它們,隊列未滿,能夠執行添加元素的操做。 takeIndex表明的是下一個方法(take,poll,peek,remove)被調用時獲取數組元素的索引,putIndex則表明下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。圖示以下併發

image.png

ArrayBlockingQueue的阻塞添加

咱們先來看看非阻塞的狀況,也就是以前總結過得add和offer方法,都是非阻塞的添加到隊列,只是一個失敗返回fase,另外一個會拋異常:源碼分析

//add方法實現,內部間接調用了offer(e)
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//offer方法
public boolean offer(E e) {
     //非空校驗
     checkNotNull(e);
     final ReentrantLock lock = this.lock;
     //加鎖
     lock.lock();
     try {
         //判斷隊列是否滿
         if (count == items.length)
             return false;
         else {
              //入隊
             enqueue(e);
             return true;
         }
     } finally {
         lock.unlock();
     }
 }

//入隊操做
private void enqueue(E x) {
    //獲取當前存放數據的數組
    final Object[] items = this.items;
    //經過putIndex索引對數組進行賦值
    items[putIndex] = x;
    //索引自增,若是已經是最後一個位置,從新設置 putIndex = 0;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;//隊列中元素數量加1
    //喚醒調用take()方法的線程,執行元素獲取操做。
    notEmpty.signal();
}
複製代碼

源碼很簡單:其中須要注意的是enqueue(E x)方法,這個方法內部經過putIndex索引直接將元素添加到數組items中,這裏可能會疑惑的是當putIndex索引大小等於數組長度時,須要將putIndex從新設置爲0,這是由於當前隊列執行元素獲取時老是從隊列頭部獲取,而添加元素從中從隊列尾部獲取因此當隊列索引(從0開始)與數組長度相等時,下次咱們就須要從數組頭部開始添加了,以下圖演示 :post

  • 假設隊列總共長度length爲5,putindex指向的是最後一個空的array:下標爲4 ui

  • 此時元素1被移出:takeindex指向元素2

    image.png

  • 此時元素5被加入隊列:下標爲4的putindex自增後剛好等於隊列長度5,那麼下一次只能從隊列頭開始添加元素:

    image.png

接下來咱們看看阻塞添加方法put:

//put方法,阻塞時可中斷
 public void put(E e) throws InterruptedException {
     checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//該方法可中斷
      try {
          //當隊列元素個數與數組長度相等時,沒法添加元素
          while (count == items.length)
              //將當前調用線程掛起,添加到notFull條件隊列中等待喚醒
              notFull.await();
          enqueue(e);//若是隊列沒有滿直接添加
      } finally {
          lock.unlock();
      }
  }

複製代碼

put方法是一個阻塞的方法,若是隊列元素已滿,那麼當前線程將會被notFull條件隊列掛起加到條件隊列中,直到隊列有元素被移出纔會喚醒執行添加操做。但若是隊列沒有滿,那麼就直接調用enqueue(e)方法將元素加入到數組隊列中。

總結

三個添加方法即put,offer,add,其中offer,add在正常狀況下都是無阻塞的添加,而put方法是阻塞添加。這就是阻塞隊列的添加過程。說白了就是當隊列滿時經過條件對象Condtion來阻塞當前調用put方法的線程,直到線程又再次被喚醒執行。 爲了方便理解,總得來講put方法的執行存在如下兩種狀況:

  • 隊列已滿,那麼新到來的put線程將添加到notFull的條件隊列中等待
  • 有移除線程執行移除操做,移除成功同時喚醒put線程,以下圖所示 假設隊列全滿時:
    image.png

接下來有5個線程經過put方法阻塞入隊,他們所有被阻塞,而線程被包裝爲Node隊列存在條件隊列中:

image.png

此時元素1被移出了,那麼會調用notfull.signal方法,喚醒條件隊列的WaitNode,waitNode喚醒後,會調用enqueue()方法入隊:

image.png

ArrayBlockingQueue的阻塞移出

一樣的,咱們先看非阻塞的移出,poll和remove。 其中:poll(),獲取並刪除隊列頭元素,隊列沒有數據就返回null,內部經過dequeue()方法刪除頭元素

public E poll() {
      final ReentrantLock lock = this.lock;
       lock.lock();
       try {
           //判斷隊列是否爲null
           return (count == 0) ? null : dequeue();
       } finally {
           lock.unlock();
       }
    }
 //移除隊列頭元素並返回
 private E dequeue() {
     //拿到當前數組的數據
     final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      //獲取要刪除的對象
      E x = (E) items[takeIndex];
      將數組中takeIndex索引位置設置爲null
      items[takeIndex] = null;
      //takeIndex索引加1並判斷是否與數組長度相等,
      //若是相等說明已到盡頭,恢復爲0
      if (++takeIndex == items.length)
          takeIndex = 0;
      count--;//隊列個數減1
      if (itrs != null)
          //同時更新迭代器中的元素數據
          itrs.elementDequeued();
      //移出了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操做
      notFull.signal();
      return x;
    }

複製代碼

總結就是加鎖以後獲取要刪除的對象(注意,這裏的lock和添加時候的lock是同一個lock,意味着同一時間只能添加或者刪除,不能併發執行),以後將數組的takeindex進行處理,並在有空位以後喚醒添加隊列的線程執行添加操做,接下來看remove方法:

public boolean remove(Object o) {
    if (o == null) return false;
    //獲取數組數據
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();//加鎖
    try {
        //若是此時隊列不爲null,這裏是爲了防止併發狀況
        if (count > 0) {
            //獲取下一個要添加元素時的索引
            final int putIndex = this.putIndex;
            //獲取當前要被刪除元素的索引
            int i = takeIndex;
            //執行循環查找要刪除的元素
            do {
                //找到要刪除的元素
                if (o.equals(items[i])) {
                    removeAt(i);//執行刪除
                    return true;//刪除成功返回true
                }
                //當前刪除索引執行加1後判斷是否與數組長度相等
                //若爲true,說明索引已到數組盡頭,將i設置爲0
                if (++i == items.length)
                    i = 0; 
            } while (i != putIndex);//繼承查找
        }
        return false;
    } finally {
        lock.unlock();
    }
}

//根據索引刪除元素,其實是把刪除索引以後的元素往前移動一個位置
void removeAt(final int removeIndex) {

     final Object[] items = this.items;
      //先判斷要刪除的元素是否爲當前隊列頭元素
      if (removeIndex == takeIndex) {
          //若是是就簡單了:直接刪除
          items[takeIndex] = null;
          if (++takeIndex == items.length)
              takeIndex = 0;
          count--;//隊列元素減1
          if (itrs != null)
              itrs.elementDequeued();//更新迭代器中的數據
      } else {
      //若是要刪除的元素不在隊列頭部,
      //那麼只需循環迭代把刪除元素後面的全部元素往前移動一個位置
          //獲取下一個要被添加的元素的索引,做爲循環判斷結束條件
          final int putIndex = this.putIndex;
          //執行循環
          for (int i = removeIndex;;) {
              //獲取要刪除節點索引的下一個索引
              int next = i + 1;
              //判斷是否已爲數組長度,若是是從數組頭部(索引爲0)開始找
              if (next == items.length)
                  next = 0;
               //若是查找的索引不等於要添加元素的索引,說明元素能夠再移動
              if (next != putIndex) {
                  items[i] = items[next];//把後一個元素前移覆蓋要刪除的元
                  i = next;
              } else {
              //在removeIndex索引以後的元素都往前移動完畢後清空最後一個元素
                  items[i] = null;
                  this.putIndex = i;
                  break;//結束循環
              }
          }
          count--;//隊列元素減1
          if (itrs != null)
              itrs.removedAt(removeIndex);//更新迭代器數據
      }
      notFull.signal();//喚醒添加線程
    }

複製代碼

remove(Object o)方法的刪除過程相對複雜些,由於該方法並非直接從隊列頭部刪除元素,而是刪除指定的位置。 首先線程先獲取鎖,再一步判斷隊列count>0,這點是保證併發狀況下刪除操做安全執行。接着獲取下一個要添加源的索引putIndex以及takeIndex索引 ,做爲後續循環的結束判斷,由於只要putIndex與takeIndex不相等就說明隊列沒有結束。而後經過while循環找到要刪除的元素索引,執行removeAt(i)方法刪除,在removeAt(i)方法中實際上作了兩件事:

  • 一是若是刪除的元素正好在隊列頭,那麼就不須要對後面的數組作任何操做,直接刪除,並喚醒添加線程便可
  • 二是若是要刪除的元素並非隊列頭元素,刪除以後須要將數組從新reformat同樣:從要刪除元素的索引removeIndex以後的元素都往前移動一個位置,那麼要刪除的元素就被removeIndex以後的元素替換,從而也就完成了刪除操做。

接着看take()方法,是一個阻塞方法,直接獲取隊列頭元素並刪除。

//從隊列頭部刪除,隊列沒有元素就阻塞,可中斷
 public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//中斷
      try {
          //若是隊列沒有元素
          while (count == 0)
              //執行阻塞操做
              notEmpty.await();
          return dequeue();//若是隊列有元素執行刪除操做
      } finally {
          lock.unlock();
      }
    }
複製代碼

take方法其實很簡單,有就刪除沒有就阻塞,注意這個阻塞是能夠中斷的,若是隊列沒有數據那麼就加入notEmpty條件隊列等待(有數據就直接取走,dequeue方法以前分析過了),若是有新的put線程添加了數據,那麼put操做將會喚醒take線程,執行take操做。圖示以下 假設隊列全空時:

image.png
這個時候有五個線程調用take方法拿元素:
image.png
這個時候有有一個元素666被put進隊列:
image.png

總結

ArrayBlockingQueue內部經過一把鎖ReentrantLock和兩個AQS條件隊列實現了阻塞的入隊和刪除:

  • 元素滿時,阻塞put線程,封裝爲node節點在notFull條件隊列中,此時若是有線程移出元素,在移出後會喚醒notFull條件隊列,讓條件隊列中的put線程繼續嘗試進行put
  • 元素空時,阻塞take線程,封裝爲node節點在notEmpty條件隊列中,此時若是有線程加入元素,在移出後會喚醒notEmpty條件隊列,讓條件隊列中的take線程繼續嘗試進行take
相關文章
相關標籤/搜索