阻塞隊列之ArrayBlockingQueue(ABQ)JDK1-8實現原理詳解

以前有一篇關於關於java對阻塞隊列提供了七種實現,若是有興趣,能夠去看一下 寫得還算很全: www.jianshu.com/p/9e4eca735…java

今天我就想專門去研究下兩個比較基礎的ArrayBlockingQueue的源碼實現以及實現原理。算法

關於阻塞隊列的實現原理:數組

若是隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?若是讓你來設計阻塞隊列你會如何設計,如何讓生產者和消費者進行高效率的通訊呢?在jdk中通常是使用通知模式,我更想理解成觀察者模式。就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看JDK源碼發現ArrayBlockingQueue使用了Condition來實現。bash

先來看看構造函數有哪些?函數

ArrayBlockingQueue

public ArrayBlockingQueue(int capacity) { 
  this(capacity, false);//默認構造非公平鎖的阻塞隊列 
} 

public ArrayBlockingQueue(int capacity, boolean fair) { 
&emsp;&emsp;if (capacity <= 0)  
&emsp;&emsp;&emsp;&emsp;throw new IllegalArgumentException(); 
&emsp;&emsp;this.items = new Object[capacity]; 
&emsp;&emsp;lock = new ReentrantLock(fair);//初始化ReentrantLock重入鎖,出隊入隊擁有這同一個鎖 
&emsp;&emsp;notEmpty = lock.newCondition;//初始化非空等待隊列
&emsp;&emsp;notFull = lock.newCondition;//初始化非滿等待隊列 
} 
public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
&emsp;&emsp;this(capacity, fair); 
&emsp;&emsp;final ReentrantLock lock = this.lock; 
&emsp;&emsp;lock.lock();//注意在這個地方須要得到鎖,由於指定加入一個集合 ,必須等加入集合完成後,才能繼續操做
&emsp;&emsp;try { 
&emsp;&emsp;&emsp;&emsp;int i = 0; 
&emsp;&emsp;&emsp;&emsp;try { 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;for (E e : c) { 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;checkNotNull(e); 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;item[i++] = e;//將集合添加進數組構成的隊列中 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;} 
&emsp;&emsp;&emsp;&emsp;} catch (ArrayIndexOutOfBoundsException ex) { 
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;throw new IllegalArgumentException(); 
&emsp;&emsp;&emsp;&emsp;} 
&emsp;&emsp;&emsp;&emsp;count = i;//隊列中的實際數據數量 
&emsp;&emsp;&emsp;&emsp;putIndex = (i == capacity) ? 0 : i; 
&emsp;&emsp;} finally { 
&emsp;&emsp;&emsp;&emsp;lock.unlock(); 
&emsp;&emsp;} 
}
複製代碼

在看一些基礎屬性:ui

final Object[] items;  //AQS實現是使用對象數組存放數據

    int takeIndex;        //出隊列對應的索引

    int putIndex;    //下一個入隊列的索引

    int count;    //隊列總長度

    final ReentrantLock lock;     //可重入鎖,用於入隊列和出隊列時加鎖。

    private final Condition notEmpty; //這裏是隊列爲null的時候,不能執行出隊列操做
//這時候須要阻塞,若是有一個入隊列操做,完成後,隊列不爲null了,須要通知喚醒出隊列線程。
//這個設計是針對第四種或第三種處理,這個能夠理解爲消費者。

      //跟上面相反,當入隊列爲滿時,若是是第四種或第三種處理,則先阻塞入隊列操做,
//而後若是有一個出隊列操做完成後,就喚醒出隊列線程,也就是該condition。
    private final Condition notFull;

總結下: 

 notEmpty  能夠理解爲消費者線程。只有隊列裏有元素,才喚醒,執行出隊列。

notEmpty  能夠理解爲生產者線程。只有隊列裏元素不是滿的,才喚醒,執行入隊列。
複製代碼

而後咱們知道,對於阻塞隊列,當入隊列時,隊列滿,等別有四種處理,java在ArrayBlockingQueue類中,針對每種處理分別提供給咱們四種方法對應不一樣的處理:this

下面分別針對幾種入隊列隊列滿的處理分析下:spa

1.add(e) 拋異常線程

public boolean add(E e) {
        return super.add(e);
    }

public boolean add(E e) {
//看這裏,調用quque中的offer來判斷隊列是否滿,offer在接口中,未實現,因此仍是在     
//子類ArrayBlockingQueue中重寫的。咱們來看offer 
      if (offer(e))   
                 ArrayBlockingQueue
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

public boolean offer(E e) {
//判斷,ABQ是不能加入null的
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();  //當須要判斷隊列長度時而且加入元素時,須要加鎖。
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);   //看下面分析
                return true;
            }
        } finally {    //釋放鎖
            lock.unlock();
        }
    }

 private void enqueue(E x) {    
        final Object[] items = this.items;  
        items[putIndex] = x;   //把x加入數組的下一個索引的位置
        if (++putIndex == items.length)   //若是隊列滿,則putIndex置空。
            putIndex = 0;
        count++;
        notEmpty.signal();    //喚醒消費者線程消費隊列裏的元素。   
    }
複製代碼

入隊列的第一種第二種處理咱們已經看完了,下面看看第三種處理。設計

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();     //請求相應中斷的鎖,這裏是調用可重入鎖重寫的acquireInterruptibly(1),方法
        try {
            //若是隊列滿,則自旋。
            while (count == items.length) {
                //區別: 若是超過指定時間,返回false
                if (nanos <= 0)
                    return false;
   //使當前線程等待直到發出信號或中斷,或指定的等待時間過去。
                nanos = notFull.awaitNanos(nanos);  //此線程還持有該鎖的時間
            }
            enqueue(e);  //若是隊列不滿,則執行入隊列操做。
            return true;
        } finally {
            lock.unlock();
        }
    }
複製代碼

第四種入隊列滿處理的邏輯 put,上面有看,應該懂了

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();   //隊列滿則阻塞生產者線程,也能夠理解爲入隊列線程
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
複製代碼

而後根據出隊列的四種處理,咱們一樣有如下的四種處理:

第一種處理:

//AbstractQueue#remove,這也是一個模板方法,定義刪除隊列元素的算法骨架,隊列中元素時返回具體元素,元素爲空時拋出異常,具體實現poll由子類實現, 
public E remove() { 
&emsp;&emsp;E x = poll();//poll方法由Queue接口定義 
&emsp;&emsp;if (x != null) 
&emsp;&emsp;&emsp;&emsp;return x; 
&emsp;&emsp;else 
&emsp;&emsp;&emsp;&emsp;throw new NoSuchElementException(); 
}

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();   
        try {
            return (count == 0) ? null : dequeue();  //dequeue  出隊列操做
        } finally {
            lock.unlock();
        }
    }

private E dequeue() {
        final Object[] items = this.items;
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)   //這裏是迭代器相關的處理。
            itrs.elementDequeued();
        notFull.signal();     //出隊列後,喚醒入隊列線程
        return x;
    }
複製代碼

第二種處理 poll()//隊列不爲空時返回隊首值並移除;隊列爲空時返回null。非阻塞當即返回。見上面

第三種處理: poll(time, unit)//設定等待的時間,若是在指定時間內隊列還未孔則返回null,不爲空則返回隊首值

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {  //若是隊列尾null,自旋
                if (nanos <= 0)   //判斷時間是否超時,ture返回null
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();  //出隊列
        } finally {
            lock.unlock();
        }
    }
複製代碼

第四種處理 take(e)//隊列不爲空返回隊首值並移除;當隊列爲空時會阻塞等待,一直等到隊列不爲空時再返回隊首值。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)   //若是隊列爲nul,自旋阻塞出隊列線程。直到不爲null
                notEmpty.await();
            return dequeue();   //出隊列
        } finally {
            lock.unlock();
        }
    }
複製代碼

總結:

從以上分析,能夠看出出隊列和入隊列操做方法的第四種處理主要是經過條件的通知機制來完成可阻塞式的插入數據和獲取數據。在理解ArrayBlockingQueue後再去理解的LinkedBlockingQueue就很容易了。

待續。。。

相關文章
相關標籤/搜索