Java併發編程筆記之ArrayBlockingQueue源碼分析

JDK 中基於數組的阻塞隊列 ArrayBlockingQueue 原理剖析,ArrayBlockingQueue 內部如何基於一把獨佔鎖以及對應的兩個條件變量實現出入隊操做的線程安全?數組

首先咱們先大概的瀏覽一下ArrayBlockingQueue 的內部構造,以下類圖:緩存

如類圖所示,能夠看到ArrayBlockingQueue 內部有個數組items 用來存放隊列元素,putIndex變量標示入隊元素的下標,takeIndex是出隊的下標,count是用來統計隊列元素個數,安全

從定義能夠知道,這些屬性並無使用valatile修飾,這是由於訪問這些變量的使用都是在鎖塊內被用。而加鎖了,就足以保證了鎖塊內變量的內存可見性。cookie

 

另外還有個獨佔鎖lock 用來保證出隊入隊操做的原子性,這保證了同時只有一個線程能夠進行入隊出隊操做,另外notEmpty,notFull條件變量用來進行出隊入隊的同步。併發

因爲ArrayBlockingQueue 是有界隊列,因此構造函數必須傳入隊列大小的參數。app

接下來咱們進入ArrayBlockingQueue的源碼看,以下:框架

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
}
   public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

如上面代碼所示,默認狀況下使用的是ReentrantLock提供的非公平獨佔鎖進行入隊出隊操做的加鎖。異步

 

接下來主要看看ArrayBlockingQueue的主要的幾個操做的源碼,以下:async

  1.offer 操做,向隊列尾部插入一個元素,若是隊列有空閒容量,則插入成功後返回true,若是隊列已滿則丟棄當前元素,而後返回false,函數

若是e元素爲null,則拋出 NullPointerException 異常,另外該方法是不阻塞的。源碼以下:

  public boolean offer(E e) {
        //(1)e爲null,則拋出NullPointerException異常
        checkNotNull(e);
        //(2)獲取獨佔鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //(3)若是隊列滿則返回false
            if (count == items.length)
                return false;
            else {
                //(4)否者插入元素
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

代碼(2)獲取獨佔鎖,當前線程獲取到該鎖後,其餘入隊和出隊操做的線程都會被阻塞掛起後放入lock鎖的AQS阻塞隊列。

代碼(3)若是隊列滿則直接返回false,不然調用enqueue方法後返回true,enqueue的源碼以下:

  private void enqueue(E x) {
        //(6)元素入隊
        final Object[] items = this.items;
        items[putIndex] = x;
        //(7)計算下一個元素應該存放的下標
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //(8)
        notEmpty.signal();
    }

能夠看到上面代碼首先把當前元素放入items數組,而後計算下一個元素應該存放的下標,而後遞增元素個數計數器,最後激活 notEmpty 的條件隊列中由於調用 poll 或者 take 操做而被阻塞的的一個線程。

這裏因爲在操做共享變量,好比count前加了鎖,因此不存在內存不可見問題,加過鎖後獲取的共享變量都是從主存獲取的,而不是在CPU緩存獲取寄存器裏面的值。

代碼(5)釋放鎖,釋放鎖後會把修改的共享變量值,好比Count的值刷新回主內存中,這樣其餘線程經過加鎖再次讀取這些共享變量後就能夠看到最新的值。

 

  2.put操做,向隊列尾部插入一個元素,若是隊列有空閒則插入後直接返回true,若是隊列已滿則阻塞當前線程直到隊列有空閒插入成功後返回true,

若是在阻塞的時候被其餘線程設置了中斷標誌,則被阻塞線程會拋出InterruptedException 異常而返回,另外若是 e 元素爲 null 則拋出 NullPointerException 異常。源碼以下:

public void put(E e) throws InterruptedException {
    //(1)
    checkNotNull(e);
    final ReentrantLock lock = this.lock;

    //(2)獲取鎖(可被中斷)
    lock.lockInterruptibly();
    try {

        //(3)若是隊列滿,則把當前線程放入notFull管理的條件隊列
        while (count == items.length)
            notFull.await();

        //(4)插入元素
        enqueue(e);
    } finally {
        //(5)
        lock.unlock();
    }
}

代碼(2)在獲取鎖的過程當中當前線程被其它線程中斷了,則當前線程會拋出 InterruptedException 異常而退出。

代碼(3)判斷若是當前隊列滿了,則把當前線程阻塞掛起後放入到 notFull 的條件隊列,注意這裏是使用了 while 而不是 if。爲何須要while呢?

這是由於考慮到當前線程被虛假喚醒的問題,也就是其它線程沒有調用 notFull 的 singal 方法時候,notFull.await() 在某種狀況下會自動返回。

若是使用if語句簡單判斷一下,那麼虛假喚醒後會執行代碼(4),元素入隊,而且遞增計數器,而這時候隊列已是滿了的,致使隊列元素個數大於了隊列設置的容量,致使程序出錯。

而使用使用 while 循環假如 notFull.await() 被虛假喚醒了,那麼循環在檢查一下當前隊列是不是滿的,若是是則再次進行等待。

代碼(4)若是隊列不滿則插入當前元素。

 

  3.poll操做,從隊列頭部獲取並移除一個元素,若是隊列爲空則返回 null,該方法是不阻塞的。源碼以下:

public E poll() {
    //(1)獲取鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //(2)當前隊列爲空則返回null,否者調用dequeue()獲取
        return (count == 0) ? null : dequeue();
    } finally {
        //(3)釋放鎖
        lock.unlock();
    }
}

代碼(1)獲取獨佔鎖

代碼(2)若是隊列爲空則返回 null,否者調用 dequeue() 方法,dequeue 源碼以下:

private E dequeue() {
    final Object[] items = this.items;

    //(4)獲取元素值
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    //(5)數組中值值爲null;
    items[takeIndex] = null;

    //(6)隊頭指針計算,隊列元素個數減一
   if (++takeIndex == items.length)
            takeIndex = 0;
    count--;

    //(7)發送信號激活notFull條件隊列裏面的一個線程
    notFull.signal();
    return x;
}

如上代碼,能夠看到首先獲取當前隊頭元素保存到局部變量,而後重置隊頭元素爲null,並從新設置隊頭下標,元素計數器遞減,最後發送信號激活notFull 的條件隊列裏面一個由於調用 put 或者 offer 而被阻塞的線程。

 

  4.take 操做,獲取當前隊列頭部元素,並從隊列裏面移除,若是隊列爲空則阻塞調用線程。若是隊列爲空則阻塞當前線程知道隊列不爲空,而後返回元素,

若是若是在阻塞的時候被其它線程設置了中斷標誌,則被阻塞線程會拋出 InterruptedException 異常而返回。源碼以下:

public E take() throws InterruptedException {
    //(1)獲取鎖
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {

        //(2)隊列爲空,則等待,直到隊列有元素
        while (count == 0)
            notEmpty.await();
        //(3)獲取隊頭元素
        return dequeue();
    } finally {
        //(4) 釋放鎖
        lock.unlock();
    }
}

能夠看到take操做的代碼也比較簡單,與poll相比,只是步驟(2)若是隊列爲空,則把當前線程掛起後放入到notEmpty的條件隊列,等其餘線程調用notEmpty.signal() 方法後在返回,

須要注意的是這裏也是使用 while 循環進行檢測並等待而不是使用 if。之因此這樣作,道理都是同樣。這裏就不在解釋了。

 

  5.peek 操做獲取隊列頭部元素可是不從隊列裏面移除,若是隊列爲空則返回 null,該方法是不阻塞的。源碼以下:

public E peek() {
    //(1)獲取鎖
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //(2)
        return itemAt(takeIndex);
    } finally {
       //(3)
        lock.unlock();
    }
}

 @SuppressWarnings("unchecked")
final E itemAt(int i) {
        return (E) items[i];
}

peek的實現更加簡單,首先獲取獨佔鎖,而後從數組items 中獲取當前隊頭下標的值並返回,在返回以前釋放了獲取的鎖。

 

  6. size 操做,獲取當前隊列元素個數。源碼以下:

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

size 操做是簡單的,獲取鎖後直接返回 count,並在返回前釋放鎖。也許你會有疑問這裏有沒有修改Count的值,只是簡單的獲取下,爲什麼要加鎖呢?

答案很簡單,若是count聲明爲volatile,這裏就不須要加鎖了,由於由於 volatile 類型變量保證了內存的可見性,而 ArrayBlockingQueue 的設計中 count 並無聲明爲 volatile,

這是由於count的操做都是在獲取鎖後進行的,而獲取鎖的語義之一就是獲取鎖後訪問的變量都是從主內存獲取的,這就保證了變量的內存可見性。

 

最後用一張圖來加深對ArrayBlockingQueue的理解,以下圖:

 

總結:ArrayBlockingQueue 經過使用全局獨佔鎖實現同時只能有一個線程進行入隊或者出隊操做,這個鎖的粒度比較大,有點相似在方法上添加 synchronized 的意味。ArrayBlockingQueue 的 size 操做的結果是精確的,由於計算前加了全局鎖。

 

2、Logback 框架中異步日誌打印中 ArrayBlockingQueue 的使用

在高併發而且響應時間要求比較小的系統中同步打日誌已經知足不了需求了,這是由於打日誌自己是須要同步寫磁盤的,會形成 響應時間 增長,以下圖同步日誌打印模型爲:

異步模型是業務線程把要打印的日誌任務寫入一個隊列後直接返回,而後使用一個線程專門負責從隊列中獲取日誌任務寫入磁盤,其模型具體以下圖:

 

 

 

如圖可知其實 logback 的異步日誌模型是一個多生產者單消費者模型,經過使用隊列把同步日誌打印轉換爲了異步,業務線程調用異步 appender 只須要把日誌任務放入日誌隊列,日誌線程則負責使用同步的 appender 進行具體的日誌打印到磁盤。

 

接下來看看異步日誌打印具體實現,要把同步日誌打印改成異步須要修改 logback 的 xml 配置文件以下:

<appender name="PROJECT" class="ch.qos.logback.core.FileAppender">
        <file>project.log</file>
        <encoding>UTF-8</encoding>
        <append>true</append>

        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- daily rollover -->
            <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
            <!-- keep 7 days' worth of history -->
            <maxHistory>7</maxHistory>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>
                <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer},
                ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n  %-5level %logger{35} - %m%n]]>
            </pattern>
        </layout>
    </appender>

    <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender">
        <discardingThreshold>0</discardingThreshold>
        <queueSize>1024</queueSize>
        <neverBlock>true</neverBlock>
        <appender-ref ref="PROJECT" />
    </appender>
     <logger name="PROJECT_LOGGER" additivity="false">
        <level value="WARN" />
        <appender-ref ref="asyncProject" />
    </logger>

可知 AsyncAppender 是實現異步日誌的關鍵,下節主要講這個的內部實現。

 

異步原理實現

  首先從 AsyncAppender 的類圖結構來從全局瞭解下 AsyncAppender 中組件構成,類圖以下所示:

從上面的類圖咱們能夠知道以下兩點:

  1.如上圖可知 AsyncAppender 繼承自 AsyncAppenderBase,其中後者具體實現了異步日誌模型的主要功能,前者只是重寫了其中的一些方法。另外從上類圖可知 logback 中的異步日誌隊列是一個阻塞隊列, 後面會知道實際上是一個有界阻塞隊列 ArrayBlockingQueue, 其中 queueSize 是有界隊列的元素個數默認爲 256。

  2.worker 是個線程,也就是異步日誌打印模型中的單消費者線程,aai 是一個 appender 的裝飾器裏面存放同步日誌的 appender, 其中 appenderCount 記錄 aai 裏面附加的同步 appender 的個數;neverBlock 是當日志隊列滿了的時候是否阻塞打日誌的線程的一個開關;discardingThreshold 是一個閾值,當日志隊列裏面空閒個數小於該值時候新來的某些級別的日誌會被直接丟棄,下面會具體講到。

 

首先咱們來看下什麼時候建立的日誌隊列以及什麼時候啓動的消費線程,這須要看下 AsyncAppenderBase 的 start 方法,該方法是在解析完畢配置 AsyncAppenderBase 的 xml 的節點元素後被調用,源碼以下:

public void start() {
    ...
    //(1)日誌隊列爲有界阻塞隊列
    blockingQueue = new ArrayBlockingQueue<E>(queueSize);
    //(2)若是沒設置discardingThreshold則設置爲隊列大小的1/5
    if (discardingThreshold == UNDEFINED)
      discardingThreshold = queueSize / 5;
    //(3)設置消費線程爲守護線程,並設置日誌名稱
    worker.setDaemon(true);
    worker.setName("AsyncAppender-Worker-" + worker.getName());
    //(4)設置啓動消費線程
    super.start();
    worker.start();
}

從上代碼可知以下兩點:

  1. logback 使用的隊列是有界隊列 ArrayBlockingQueue,之因此使用有界隊列是考慮到內存溢出問題,在高併發下寫日誌的 qps 會很高若是設置爲無界隊列隊列自己會佔用很大內存,極可能會形成 內存溢出。

  2.這裏消費日誌隊列的 worker 線程被設置爲了守護線程,意味着當主線程運行結束而且當前沒有用戶線程時候該 worker 線程會隨着 JVM 的退出而終止,而無論日誌隊列裏面是否還有日誌任務未被處理。另外這裏設置了線程的名稱是個很好的習慣,由於這在查找問題的時候頗有幫助,根據線程名字就能夠定位到是哪一個線程。

 

既然是有界隊列那麼確定須要考慮若是隊列滿了,該如何處置,是丟棄老的日誌任務,仍是阻塞日誌打印線程直到隊列有空餘元素那?

要回答這個問題,咱們須要看看具體進行日誌打印的AsyncAppenderBase 的 append 方法,源碼以下:

protected void append(E eventObject) {
   //(5)調用AsyncAppender重寫的isDiscardable
   if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
       return;
   }
   ...
   //(6)放入日誌任務到隊列
   put(eventObject);
}

private boolean isQueueBelowDiscardingThreshold() {
   return (blockingQueue.remainingCapacity() < discardingThreshold);
}

其中 (5) 調用了 AsyncAppender 重寫的 isDiscardable,源碼以下:

    //(7)
    protected boolean isDiscardable(ILoggingEvent event) {
        Level level = event.getLevel();
        return level.toInt() <= Level.INFO_INT;
    }

結合 代碼(5)和代碼(7) 可知若是當前日誌的級別小於 INFO_INT 級別而且當前隊列的剩餘容量小於 discardingThreshold 時候會直接丟棄這些日誌任務。

 

接下來看具體步驟 (6) 的 put 方法,源碼以下:

 private void put(E eventObject) {
        //(8)
        if (neverBlock) {
            blockingQueue.offer(eventObject);
        } else {
            try {//(9)
                blockingQueue.put(eventObject);
            } catch (InterruptedException e) {
                // Interruption of current thread when in doAppend method should not be consumed
                // by AsyncAppender
                Thread.currentThread().interrupt();
            }
        }
 }

可知若是 neverBlock 設置爲了 false(默認爲 false)則會調用阻塞隊列的 put 方法,而 put 是阻塞的,也就是說若是當前隊列滿了,若是在企圖調用 put 方法向隊列放入一個元素則調用線程會被阻塞直到隊列有空餘空間。這裏有必要提下其中第 (9) 步當日志隊列滿了的時候 put 方法會調用 await() 方法阻塞當前線程,若是其它線程中斷了該線程,那麼該線程會拋出 InterruptedException 異常,那麼當前的日誌任務就會被丟棄了。若是 neverBlock 設置爲了 true 則會調用阻塞隊列的 offer 方法,而該方法是非阻塞的,若是當前隊列滿了,則會直接返回,也就是丟棄當前日誌任務。

 

最後看下 addAppender 方法內是什麼的,源碼以下:

 public void addAppender(Appender<E> newAppender) {
        if (appenderCount == 0) {
            appenderCount++;
            ...
            aai.addAppender(newAppender);
        } else {
            addWarn("One and only one appender may be attached to AsyncAppender.");
            addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
        }
 }

如上代碼可知一個異步 appender 只能綁定一個同步 appender, 這個 appender 會被放到 AppenderAttachableImpl 的 appenderList 列表裏面。

 

到這裏咱們已經分析完了日誌生產線程放入日誌任務到日誌隊列的實現,下面一塊兒來看下消費線程是如何從隊列裏面消費日誌任務並寫入磁盤的,因爲消費線程是一個線程,那就從 worker 的 run 方法看起,源碼以下所示:

class Worker extends Thread {

        public void run() {

            AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
            AppenderAttachableImpl<E> aai = parent.aai;

            //(10)一直循環直到該線程被中斷
            while (parent.isStarted()) {
                try {//(11)從阻塞隊列獲取元素
                    E e = parent.blockingQueue.take();
                    aai.appendLoopOnAppenders(e);
                } catch (InterruptedException ie) {
                    break;
                }
            }

            //(12)到這裏說明該線程被中斷,則吧隊列裏面的剩餘日誌任務
            //刷新到磁盤
            for (E e : parent.blockingQueue) {
                aai.appendLoopOnAppenders(e);
                parent.blockingQueue.remove(e);
            }
            ...
..        }
}

其中(11)從日誌隊列使用 take 方法獲取一個日誌任務,若是當前隊列爲空則當前線程會阻塞到 take 方法直到隊列不爲空才返回,獲取到日誌任務後會調用 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,該方法會循環調用經過 addAppender 注入的同步日誌 appener 具體實現日誌打印到磁盤的任務。

相關文章
相關標籤/搜索