【Java併發編程】—–「J.U.C」:ArrayBlockingQueue

前言

本文的主要詳細分析ArrayBlockingQueue的實現原理,因爲該併發集合其底層是使用了java.util.ReentrantLock和java.util.Condition來完成併發控制的,咱們能夠經過JDK的源代碼更好的學習這些併發控制類的使用,同時該類也是全部併發集合中最簡單的一個,分析該類的源碼也是爲以後分析其餘併發集合作好基礎。java

1.Queue接口和BlockingQueue接口回顧

1.1 Queue接口回顧

在Queue接口中,除了繼承Collection接口中定義的方法外,它還分別額外地定義插入、刪除、查詢這3個操做,其中每個操做都以兩種不一樣的形式存在,每一種形式都對應着一個方法。數組

方法說明:安全

操做 拋出異常 返回特殊值
Insert add(e) offer(e)
Remove remove() poll()
Examine element() peek()
  1. add方法在將一個元素插入到隊列的尾部時,若是出現隊列已經滿了,那麼就會拋出IllegalStateException,而使用offer方法時,若是隊列滿了,則添加失敗,返回false,但並不會引起異常。
  2. remove方法是獲取隊列的頭部元素而且刪除,若是當隊列爲空時,那麼就會拋出NoSuchElementException。而poll在隊列爲空時,則返回一個null。
  3. element方法是從隊列中獲取到隊列的第一個元素,但不會刪除,可是若是隊列爲空時,那麼它就會拋出NoSuchElementException。peek方法與之相似,只是不會拋出異常,而是返回false。

後面咱們在分析ArrayBlockingQueue的方法時,主要也是圍繞着這幾個方法來進行分析。併發

1.2 BlockingQueue接口回顧

BlockingQueue是JDK1.5出現的接口,它在原來的Queue接口基礎上提供了更多的額外功能:當獲取隊列中的頭部元素時,若是隊列爲空,那麼它將會使執行線程處於等待狀態;當添加一個元素到隊列的尾部時,若是隊列已經滿了,那麼它一樣會使執行的線程處於等待狀態。工具

前面咱們在說Queue接口時提到過,它針對於相同的操做提供了2種不一樣的形式,而BlockingQueue更誇張,針對於相同的操做提供了4種不一樣的形式。性能

該四種形式分別爲:學習

  • 拋出異常
  • 返回一個特殊值(多是null或者是false,取決於具體的操做)
  • 阻塞當前執行直到其能夠繼續
  • 當線程被掛起後,等待最大的時間,若是一旦超時,即便該操做依舊沒法繼續執行,線程也不會再繼續等待下去。

對應的方法說明:this

操做 拋出異常 返回特殊值 阻塞 超時
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()

BlockingQueue雖然比起Queue在操做上提供了更多的支持,可是它在使用的使用也應該以下的幾點:spa

  1. BlockingQueue中是不容許添加null的,該接受在聲明的時候就要求全部的實現類在接收到一個null的時候,都應該拋出NullPointerException。
  1. BlockingQueue是線程安全的,所以它的全部和隊列相關的方法都具備原子性。可是對於那麼從Collection接口中繼承而來的批量操做方法,好比addAll(Collection e)等方法,BlockingQueue的實現一般沒有保證其具備原子性,所以咱們在使用的BlockingQueue,應該儘量地不去使用這些方法。
  2. BlockingQueue主要應用於生產者與消費者的模型中,其元素的添加和獲取都是極具規律性的。可是對於remove(Object o)這樣的方法,雖然BlockingQueue能夠保證元素正確的刪除,可是這樣的操做會很是響應性能,所以咱們在沒有特殊的狀況下,也應該避免使用這類方法。

2. ArrayBlockingQueue深刻分析

有了上面的鋪墊,下面咱們就能夠真正開始分析ArrayBlockingQueue了。在分析以前,首先讓咱們看看API對其的描述。
注意:這裏使用的JDK版本爲1.7,不一樣的JDK版本在實現上存在不一樣線程

ArrayBlockingQueueAPI說明.png

首先讓咱們看下ArrayBlockingQueue的核心組成:

/** 底層維護隊列元素的數組 */
    final Object[] items;

    /**  當讀取元素時數組的下標(這裏稱爲讀下標) */
    int takeIndex;

    /** 添加元素時數組的下標 (這裏稱爲寫小標)*/
    int putIndex;

    /** 隊列中的元素個數 */
    int count;

    /**用於併發控制的工具類**/
    final ReentrantLock lock;

    /** 控制take操做時是否讓線程等待 */
    private final Condition notEmpty;

    /** 控制put操做時是否讓線程等待 */
    private final Condition notFull;

take方法分析(369-379行):

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        /*
            嘗試獲取鎖,若是此時鎖被其餘線程鎖佔用,那麼當前線程就處於Waiting的狀態。           
            注意:當方法是支持線程中斷響應的若是其餘線程此時中斷當前線程,
            那麼當前線程就會拋出InterruptedException 
         */
        lock.lockInterruptibly();
        try {
            /*
          若是此時隊列中的元素個數爲0,那麼就讓當前線程wait,而且釋放鎖。
          注意:這裏使用了while進行重複檢查,是爲了防止當前線程可能因爲  其餘未知的緣由被喚醒。
          (一般這種狀況被稱爲"spurious wakeup")
            */    
        while (count == 0)
                notEmpty.await();
            //若是隊列不爲空,則從隊列的頭部取元素
            return extract();
        } finally {
             //完成鎖的釋放
            lock.unlock();
        }
    }

extract方法分析(163-171):

/*
      根據takeIndex來獲取當前的元素,而後通知其餘等待的線程。
      Call only when holding lock.(只有當前線程已經持有了鎖以後,它才能調用該方法)
     */
    private E extract() {
        final Object[] items = this.items;

        //根據takeIndex獲取元素,由於元素是一個Object類型的數組,所以它經過cast方法將其轉換成泛型。
        E x = this.<E>cast(items[takeIndex]);

        //將當前位置的元素設置爲null
        items[takeIndex] = null;

        //而且將takeIndex++,注意:這裏由於已經使用了鎖,所以inc方法中沒有使用到原子操做
        takeIndex = inc(takeIndex);
      
        //將隊列中的總的元素減1
        --count;
        //喚醒其餘等待的線程
        notFull.signal();
        return x;
    }

put方法分析(318-239)

public void put(E e) throws InterruptedException {
        //首先檢查元素是否爲空,不然拋出NullPointerException
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //進行鎖的搶佔
        lock.lockInterruptibly();
        try {
            /*當隊列的長度等於數組的長度,此時說明隊列已經滿了,這裏一樣
              使用了while來方式當前線程被"僞喚醒"。*/
            while (count == items.length)
                //則讓當前線程處於等待狀態
                notFull.await();
            //一旦獲取到鎖而且隊列還未滿時,則執行insert操做。
            insert(e);
        } finally {
            //完成鎖的釋放
            lock.unlock();
        }
    }

      //檢查元素是否爲空
     private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
      }

     //該方法的邏輯很是簡單
    private void insert(E x) {
        //將當前元素設置到putIndex位置   
        items[putIndex] = x;
        //讓putIndex++
        putIndex = inc(putIndex);
        //將隊列的大小加1
        ++count;
        //喚醒其餘正在處於等待狀態的線程
        notEmpty.signal();
    }

注:ArrayBlockingQueue實際上是一個循環隊列
咱們使用一個圖來簡單說明一下:

黃色表示數組中有元素

1-1.png

 

當再一次執行put的時候,其結果爲:

1-2.png

此時放入的元素會從頭開始置,咱們經過其incr方法更加清晰的看出其底層的操做:

/**
     * Circularly increment i.
     */
    final int inc(int i) {
        //當takeIndex的值等於數組的長度時,就會從新置爲0,這個一個循環遞增的過程
        return (++i == items.length) ? 0 : i;
    }

至此,ArrayBlockingQueue的核心部分就分析完了,其他的隊列操做基本上都是換湯不換藥的,此處再也不一一列舉。

做者:碼農一枚 連接:https://www.jianshu.com/p/9a652250e0d1 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。

相關文章
相關標籤/搜索