併發編程(八)—— Java 併發隊列 BlockingQueue 實現之 ArrayBlockingQueue 源碼分析

開篇先介紹下 BlockingQueue 這個接口的規則,後面再看其實現。html

阻塞隊列概要

阻塞隊列與咱們日常接觸的普通隊列(LinkedList或ArrayList等)的最大不一樣點,在於阻塞隊列的阻塞添加和阻塞刪除方法。java

阻塞添加
所謂的阻塞添加是指當阻塞隊列元素已滿時,隊列會阻塞加入元素的線程,直隊列元素不滿時才從新喚醒線程執行元素加入操做。數組

阻塞刪除
阻塞刪除是指在隊列元素爲空時,刪除隊列元素的線程將被阻塞,直到隊列不爲空再執行刪除操做(通常都會返回被刪除的元素)。緩存

 

因爲Java中的阻塞隊列接口BlockingQueue繼承自Queue接口,所以先來看看阻塞隊列接口爲咱們提供的主要方法app

 1 public interface BlockingQueue<E> extends Queue<E> {  2 
 3     //將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量)  4     //在成功時返回 true,若是此隊列已滿,則拋IllegalStateException。 
 5     boolean add(E e);  6 
 7     //將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量)  8     // 將指定的元素插入此隊列的尾部,若是該隊列已滿,  9     //則在到達指定的等待時間以前等待可用的空間,該方法可中斷 
10     boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; 11 
12     //將指定的元素插入此隊列的尾部,若是該隊列已滿,則一直等到(阻塞)。 
13     void put(E e) throws InterruptedException; 14 
15     //獲取並移除此隊列的頭部,若是沒有元素則等待(阻塞), 16     //直到有元素將喚醒等待線程執行該操做 
17     E take() throws InterruptedException; 18 
19     //獲取並移除此隊列的頭部,在指定的等待時間前一直等到獲取元素, //超過期間方法將結束
20     E poll(long timeout, TimeUnit unit) throws InterruptedException; 21 
22     //今後隊列中移除指定元素的單個實例(若是存在)。 
23     boolean remove(Object o); 24 }

這裏咱們把上述操做進行分類ide

插入方法:this

  add(E e) : 添加成功返回true,失敗拋IllegalStateException異常
  offer(E e) : 成功返回 true,若是此隊列已滿,則返回 false。
  put(E e) :將元素插入此隊列的尾部,若是該隊列已滿,則一直阻塞
刪除方法:spa

  remove(Object o) :移除指定元素,成功返回true,失敗返回false
  poll() : 獲取並移除此隊列的頭元素,若隊列爲空,則返回 null
  take():獲取並移除此隊列頭元素,若沒有元素則一直阻塞。線程

阻塞隊列的對元素的增刪查操做主要就是上述的三類方法,一般狀況下咱們都是經過這3類方法操做阻塞隊列,瞭解完阻塞隊列的基本方法後,下面咱們將分析阻塞隊列中的兩個實現類ArrayBlockingQueue和LinkedBlockingQueue的簡單使用和實現原理,其中實現原理是這篇文章重點分析的內容。code

ArrayBlockingQueue

在看源碼以前,經過查詢API發現對ArrayBlockingQueue特色的簡單介紹:

一、一個由數組支持的有界隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
二、新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素
三、這是一個簡單的「有界緩存區」,一旦建立,就不能在增長其容量
四、在向已滿隊列中添加元素會致使操做阻塞,從空隊列中提取元素也將致使阻塞
五、此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認狀況下,不保證是這種排序的。然而經過將公平性(fairness)設置爲true,而構造的隊列容許按照FIFO順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」。

簡單的來講,ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法爲添加和刪除的阻塞方法,下面咱們經過ArrayBlockingQueue隊列實現一個生產者消費者的案例,經過該案例簡單瞭解其使用方式

使用示例

Consumer 消費者和 Producer 生產者,經過ArrayBlockingQueue 隊列獲取和添加元素,其中消費者調用了take()方法獲取元素當隊列沒有元素就阻塞,生產者調用put()方法添加元素,當隊列滿時就阻塞,經過這種方式便實現生產者消費者模式。比直接使用等待喚醒機制或者Condition條件隊列來得更加簡單。

 1 package com.zejian.concurrencys.Queue;  2 import java.util.concurrent.ArrayBlockingQueue;  3 import java.util.concurrent.TimeUnit;  4 
 5 /**
 6  * Created by chenhao on 2018/01/07  7  */
 8 public class ArrayBlockingQueueDemo {  9     private final static ArrayBlockingQueue<Apple> queue= new ArrayBlockingQueue<>(1); 10     public static void main(String[] args){ 11         new Thread(new Producer(queue)).start(); 12         new Thread(new Producer(queue)).start(); 13         new Thread(new Consumer(queue)).start(); 14         new Thread(new Consumer(queue)).start(); 15  } 16 } 17 
18  class Apple { 19     public Apple(){ 20  } 21  } 22 
23 /**
24  * 生產者線程 25  */
26 class Producer implements Runnable{ 27     private final ArrayBlockingQueue<Apple> mAbq; 28     Producer(ArrayBlockingQueue<Apple> arrayBlockingQueue){ 29         this.mAbq = arrayBlockingQueue; 30  } 31 
32  @Override 33     public void run() { 34         while (true) { 35  Produce(); 36  } 37  } 38 
39     private void Produce(){ 40         try { 41             Apple apple = new Apple(); 42  mAbq.put(apple); 43             System.out.println("生產:"+apple); 44         } catch (InterruptedException e) { 45  e.printStackTrace(); 46  } 47  } 48 } 49 
50 /**
51  * 消費者線程 52  */
53 class Consumer implements Runnable{ 54 
55     private ArrayBlockingQueue<Apple> mAbq; 56     Consumer(ArrayBlockingQueue<Apple> arrayBlockingQueue){ 57         this.mAbq = arrayBlockingQueue; 58  } 59 
60  @Override 61     public void run() { 62         while (true){ 63             try { 64                 TimeUnit.MILLISECONDS.sleep(1000); 65  comsume(); 66             } catch (InterruptedException e) { 67  e.printStackTrace(); 68  } 69  } 70  } 71 
72     private void comsume() throws InterruptedException { 73         Apple apple = mAbq.take(); 74         System.out.println("消費Apple="+apple); 75  } 76 }

輸出:

1 生產:com.zejian.concurrencys.Queue.Apple@109967f 2 消費Apple=com.zejian.concurrencys.Queue.Apple@109967f 3 生產:com.zejian.concurrencys.Queue.Apple@269a77 4 生產:com.zejian.concurrencys.Queue.Apple@1ce746e 5 消費Apple=com.zejian.concurrencys.Queue.Apple@269a77 6 消費Apple=com.zejian.concurrencys.Queue.Apple@1ce746e 7 ........

源碼剖析

ArrayBlockingQueue內部的阻塞隊列是經過重入鎖ReenterLock和Condition條件隊列實現的,因此ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,對於公平訪問隊列,被阻塞的線程能夠按照阻塞的前後順序訪問隊列,即先阻塞的線程先訪問隊列。而非公平隊列,當隊列可用時,阻塞的線程將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的前後順序。建立公平與非公平阻塞隊列代碼以下:

 1 //默認非公平阻塞隊列
 2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2);  3 //公平阻塞隊列
 4 ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true);  5 
 6 //構造方法源碼
 7 public ArrayBlockingQueue(int capacity) {  8      this(capacity, false);  9  } 10 
11 public ArrayBlockingQueue(int capacity, boolean fair) { 12      if (capacity <= 0) 13          throw new IllegalArgumentException(); 14      this.items = new Object[capacity]; 15      lock = new ReentrantLock(fair); 16      notEmpty = lock.newCondition(); 17      notFull = lock.newCondition(); 18  }

ArrayBlockingQueue的內部是經過一個可重入鎖ReentrantLock和兩個Condition條件對象來實現阻塞,這裏先看看其內部成員變量

 1 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
 2         implements BlockingQueue<E>, java.io.Serializable {  3 
 4     /** 存儲數據的數組 */
 5     final Object[] items;  6 
 7     /**獲取數據的索引,主要用於take,poll,peek,remove方法 */
 8     int takeIndex;  9 
10     /**添加數據的索引,主要用於 put, offer, or add 方法*/
11     int putIndex; 12 
13     /** 隊列元素的個數 */
14     int count; 15 
16 
17     /** 控制並不是訪問的鎖 */
18     final ReentrantLock lock; 19 
20     /**notEmpty條件對象,用於通知take方法隊列已有元素,可執行獲取操做 */
21     private final Condition notEmpty; 22 
23     /**notFull條件對象,用於通知put方法隊列未滿,可執行添加操做 */
24     private final Condition notFull; 25 
26     /**
27  迭代器 28      */
29     transient Itrs itrs = null; 30 
31 }

從成員變量可看出,ArrayBlockingQueue內部確實是經過數組對象items來存儲全部的數據,值得注意的是ArrayBlockingQueue經過一個ReentrantLock來同時控制添加線程與移除線程的並不是訪問,這點與LinkedBlockingQueue區別很大(稍後會分析)。而對於notEmpty條件對象則是用於存放等待或喚醒調用take方法的線程,告訴他們隊列已有元素,能夠執行獲取操做。同理notFull條件對象是用於等待或喚醒調用put方法的線程,告訴它們,隊列未滿,能夠執行添加元素的操做。takeIndex表明的是下一個方法(take,poll,peek,remove)被調用時獲取數組元素的索引,putIndex則表明下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。圖示以下

添加

 1 //add方法實現,間接調用了offer(e)
 2 public boolean add(E e) {  3         if (offer(e))  4             return true;  5         else
 6             throw new IllegalStateException("Queue full");  7  }  8 
 9 //offer方法
10 public boolean offer(E e) { 11      checkNotNull(e);//檢查元素是否爲null
12      final ReentrantLock lock = this.lock; 13      lock.lock();//加鎖
14      try { 15          if (count == items.length)//判斷隊列是否滿
16              return false; 17          else { 18              enqueue(e);//添加元素到隊列
19              return true; 20  } 21      } finally { 22  lock.unlock(); 23  } 24  } 25 
26 //入隊操做
27 private void enqueue(E x) { 28     //獲取當前數組
29     final Object[] items = this.items; 30     //經過putIndex索引對數組進行賦值
31     items[putIndex] = x; 32     //索引自增,若是已經是最後一個位置,從新設置 putIndex = 0;
33     if (++putIndex == items.length) 34         putIndex = 0; 35     count++;//隊列中元素數量加1 36     //喚醒調用take()方法的線程,執行元素獲取操做。
37  notEmpty.signal(); 38 }

這裏的add方法和offer方法實現比較簡單,其中須要注意的是enqueue(E x)方法,當putIndex索引大小等於數組長度時,須要將putIndex從新設置爲0,由於後面講到的取值也是從數組中第一個開始依次日後面取,取了以後會將原位置的值設置爲null,方便循環put操做,這裏要注意並非每次都是取數組中的第一個值,takeIndex也會增長。由於作了添加操做,數組中確定不會空,則 notEmpty條件會喚醒take()方法取值。

ok~,接着看put方法,它是一個阻塞添加的方法:

 1 //put方法,阻塞時可中斷
 2  public void put(E e) throws InterruptedException {  3  checkNotNull(e);  4       final ReentrantLock lock = this.lock;  5       lock.lockInterruptibly();//該方法可中斷
 6       try {  7           //當隊列元素個數與數組長度相等時,沒法添加元素
 8           while (count == items.length)  9               //將當前調用線程掛起,添加到notFull條件隊列中等待喚醒
10  notFull.await(); 11           enqueue(e);//若是隊列沒有滿直接添加。。
12       } finally { 13  lock.unlock(); 14  } 15   }

put方法是一個阻塞的方法,若是隊列元素已滿,那麼當前線程將會被notFull條件對象掛起加到等待隊列中,直到隊列有空檔纔會喚醒執行添加操做。但若是隊列沒有滿,那麼就直接調用enqueue(e)方法將元素加入到數組隊列中。到此咱們對三個添加方法即put,offer,add都分析完畢,其中offer,add在正常狀況下都是無阻塞的添加,而put方法是阻塞添加。

(獲取)刪除

關於刪除先看poll方法,該方法獲取並移除此隊列的頭元素,若隊列爲空,則返回 null。

 1 public E poll() {  2   final ReentrantLock lock = this.lock;  3  lock.lock();  4    try {  5        //判斷隊列是否爲null,不爲null執行dequeue()方法,不然返回null
 6        return (count == 0) ? null : dequeue();  7    } finally {  8  lock.unlock();  9  } 10 } 11  //刪除隊列頭元素並返回
12  private E dequeue() { 13      //拿到當前數組的數據
14      final Object[] items = this.items; 15       @SuppressWarnings("unchecked") 16       //獲取要刪除的對象
17       E x = (E) items[takeIndex]; 18  將數組中takeIndex索引位置設置爲null 19       items[takeIndex] = null; 20       //takeIndex索引加1並判斷是否與數組長度相等, 21       //若是相等說明已到盡頭,恢復爲0
22       if (++takeIndex == items.length) 23           takeIndex = 0; 24       count--;//隊列個數減1
25       if (itrs != null) 26           itrs.elementDequeued();//同時更新迭代器中的元素數據 27       //刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操做
28  notFull.signal(); 29       return x; 30  }

接着看take()方法,是一個阻塞方法,獲取隊列頭元素並刪除。

 1 //從隊列頭部刪除,隊列沒有元素就阻塞,可中斷
 2  public E take() throws InterruptedException {  3     final ReentrantLock lock = this.lock;  4       lock.lockInterruptibly();//中斷
 5       try {  6           //若是隊列沒有元素
 7           while (count == 0)  8               //執行阻塞操做
 9  notEmpty.await(); 10           return dequeue();//若是隊列有元素執行刪除操做
11       } finally { 12  lock.unlock(); 13  } 14  }

take和poll的區別是,隊列爲空時,poll返回null,take則被掛起阻塞,直到有元素添加進來,take線程被喚醒,而後獲取第一個元素並刪除。

 

peek方法很是簡單,直接返回當前隊列的頭元素但不刪除任何元素。

 1 public E peek() {  2       final ReentrantLock lock = this.lock;  3  lock.lock();  4       try {  5        //直接返回當前隊列的頭元素,但不刪除
 6           return itemAt(takeIndex); // null when queue is empty
 7       } finally {  8  lock.unlock();  9  } 10  } 11 
12 final E itemAt(int i) { 13       return (E) items[i]; 14   }

ok~,到此對於ArrayBlockingQueue的主要方法就分析完了。

原文出處:https://www.cnblogs.com/java-chen-hao/p/10234149.html

相關文章
相關標籤/搜索