ArrayBlockingQueue源碼閱讀(1.8)

ArrayBlockingQueue源碼閱讀


一、ArrayBlockingQueue類結構

  public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue接口的一種實現,要了解它就必須清楚BlockingQueue的相關知識;java

二、BlockingQueue接口介紹

  在併發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue爲表明的高性能隊列,一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queue接口!,BlockingQueue的類繼承關係以下:
ArrayBlockingQueue源碼閱讀(1.8)算法

BlockingQueue接口重要方法以下:數組


  • offer(anObject): 表示若是可能的話, 將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納, 則返回true, 不然返回false.(本方法不阻塞當前執行方法的線程)。
  • offer(E o, long timeout, TimeUnit unit), 能夠設定等待的時間,若是在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。
  • put(anObject): 把anObject加到BlockingQueue裏, 若是BlockQueue沒有空間, 則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續。
  • poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,若是在指定時間內,隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間超時尚未數據可取,返回失敗,若是不指定超時時間,在沒有數據時當即返回失敗。
  • take(): 取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入。
  • drainTo(): 一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數),經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。

三、源碼分析

3.一、類屬性查看

/** The queued items */ 以數組做爲數據結構
final Object[] items;

/** items index for next take, poll, peek or remove */ 隊列中下一個將被取出值的下標
int takeIndex;

/** items index for next put, offer, or add */  隊列中下一個將被放入值的下標
int putIndex;

/** Number of elements in the queue */ 數組元素數量
int count;

/*
 * Concurrency control uses the classic two-condition algorithm  使用雙條件算法
 * found in any textbook. 
 */

/** Main lock guarding all access */ 使用重入鎖(獨佔鎖)
final ReentrantLock lock;
/** Condition for waiting takes */ take時候用於等待的條件
private final Condition notEmpty;
/** Condition for waiting puts */ put時候用於等待的條件
private final Condition notFull;

    transient Itrs itrs = null;

3.二、構造函數分析

/**數據結構

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity and default access policy.
  • @param capacity the capacity of this queue
  • @throws IllegalArgumentException if {@code capacity < 1}
    */併發

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);  //調用public ArrayBlockingQueue(int capacity, boolean fair)構造方法,默認使用非公平鎖
    }

    /**app

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity and the specified access policy.
  • @param capacity the capacity of this queue
  • @param fair if {@code true} then queue accesses for threads blocked
  • on insertion or removal, are processed in FIFO order; //若是傳入的值爲true即公平鎖,則須要維護一個有序隊列,保證先進先出的原則
  • if {@code false} the access order is unspecified.
  • @throws IllegalArgumentException if {@code capacity < 1}
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity]; //建立指定容量的數組
        lock = new ReentrantLock(fair); //默認使用非公平鎖
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    /**less

  • Creates an {@code ArrayBlockingQueue} with the given (fixed)
  • capacity, the specified access policy and initially containing the
  • elements of the given collection,
  • added in traversal order of the collection's iterator.
  • @param capacity the capacity of this queue
  • @param fair if {@code true} then queue accesses for threads blocked
  • on insertion or removal, are processed in FIFO order;
  • if {@code false} the access order is unspecified.
  • @param c the collection of elements to initially contain 使用指定集合初始化隊列
  • @throws IllegalArgumentException if {@code capacity} is less than
  • {@code c.size()}, or less than 1.
  • @throws NullPointerException if the specified collection or any
  • of its elements are null
    */
    //這個構造函數的核心就是c.size()與capacity的大小關係對比了
    //若是c.size()>capacity那就會報錯,因此在初始化的時候要注意ide

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair); //先建立指定容量的數組,以便集合中的元素存放
       //這種寫法咱們很常見,使用final表示引用不能改變,但又避免了直接使用成員變量
        final ReentrantLock lock = this.lock;
                //對隊列直接修改操做,須要先獲取獨佔鎖
        lock.lock(); // Lock only for visibility, not mutual exclusion   
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e; //下標從0開始存放
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i; //將數組元素個數返回給全局變量
            putIndex = (i == capacity) ? 0 : i; ////初始化入隊索引
        } finally {
            lock.unlock(); //解鎖
        }
    }

    3.三、入隊列方法

    • add(E e) 方法,源碼以下:
      //調用父類AbstractQueue的方法
      //在隊列末尾(數組)插入指定的元素,前提是隊列有空餘空間,且指定元素不爲空
      public boolean add(E e) {
      return super.add(e);
      }

      //父類AbstractQueue的方法函數

      public boolean add(E e) {
      if (offer(e)) //調用offer方法添加元素,不阻塞當前線程(等待)
          return true;
      else
          throw new IllegalStateException("Queue full");
      }
    • offer(E e)方法,源碼以下:
      //若是隊列能夠容納則當即返回成功,不然失敗,不阻塞當前線程,是官方推薦使用的方法源碼分析

      public boolean offer(E e) {
      checkNotNull(e); //檢查元素是否爲空
      final ReentrantLock lock = this.lock;
      lock.lock();//獲取獨佔鎖
      try {
          if (count == items.length) //當前數組已滿,當即返回失敗
              return false;
          else {
              enqueue(e);
              return true; 
          }
      } finally {
          lock.unlock(); //解鎖
      }
      }

      //插入元素

      private void enqueue(E x) {
      // assert lock.getHoldCount() == 1;  當前線程調用lock()的次數
      // assert items[putIndex] == null; 當前位置沒值
      final Object[] items = this.items;
      items[putIndex] = x; //在指定的位置插入元素
      if (++putIndex == items.length)
          putIndex = 0;
      count++;  //更新數組元素個數
      notEmpty.signal();//通知被take方法讀取元素阻塞等待的線程(前提是該線程持有鎖)
      }

      put(E e)方法,若是BlockQueue沒有空間, 則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續,其源碼以下:

      public void put(E e) throws InterruptedException {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly(); //申請鎖的過程可被外界打斷(中斷響應,不會無限制等待)
      try {
          while (count == items.length)
              notFull.await(); //隊列已經滿了,則使用put的條件等待(此方法與object.wait()方法不一樣,不釋放鎖),之道有空閒空間繼續執行下一步enqueue(e);
          enqueue(e);
      } finally {
          lock.unlock(); //解鎖
      }
      }

      //offer(E,long,TimeUnit)會在等待一段時間後返回,可是等待的過程當中是能夠響應中斷的

      public boolean offer(E e, long timeout, TimeUnit unit)
      throws InterruptedException {
      
      checkNotNull(e);
      long nanos = unit.toNanos(timeout);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly(); //響應中斷
      try {
          while (count == items.length) {
              if (nanos <= 0)
                  return false;
              nanos = notFull.awaitNanos(nanos);
          }
          enqueue(e);
          return true;
      } finally {
          lock.unlock();
      }
      }

3.四、出隊列方法

3.4.一、 E poll方法
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue(); //數組不爲空,調用dequeue取出takeIndex下標位置上的元素,而且會喚醒等待notFull條件的線程
        } finally {
            lock.unlock();
        }
    }
3.4.2 E take() 方法
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //隊列爲空,則阻塞當前隊列
            return dequeue(); //隊列有元素,則調用dequeue
        } finally {
            lock.unlock();
        }
    }

E dequeue()閱讀:

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex]; //獲取takeIndex下標處的元素
        items[takeIndex] = null; //將當前位置的元素設置爲null
                //這裏能夠看出這個數組是個環形數組,其實取元素,老是從隊列頭部開始,即items[0]
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--; //更新數組元素個數
        if (itrs != null)
            itrs.elementDequeued(); //修改迭代器參數
        notFull.signal(); //通知阻塞在putIndex操做的線程
        return x;
    }
3.4.三、 drainTo一次性返回多個(默認Integer.MAX_VALUE)元素(放到集合中)
public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this) //存放返回元素的集合不能使當前隊列
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(maxElements, count); //要取出的元素不能大於數組元素個數
            int take = takeIndex;
            int i = 0;
            try {
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x); //將takerInex下標處的元素放入集合中
                    items[take] = null; //對應位置的元素設置爲null
                    if (++take == items.length)
                        take = 0; // 環形數組
                    i++;
                }
                return n; //返回獲取到的元素個數
            } finally {
                // Restore invariants even if c.add() threw
                                //更新狀態,即便操做失敗也要還原
                if (i > 0) {//
                    count -= i;
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0)
                            itrs.queueIsEmpty();
                        else if (i > take)
                            itrs.takeIndexWrapped();
                    }
                    for (; i > 0 && lock.hasWaiters(notFull); i--)
                        notFull.signal(); //喚醒阻塞在此條件(putIndex)的線程
                }
            }
        } finally {
            lock.unlock();
        }
    }
相關文章
相關標籤/搜索