java併發編程之併發容器和框架(三)

Java中的阻塞隊列

1 什麼是阻塞隊列

阻塞隊列(BlockingQueue)是一種支持兩個附加操做的隊列.java

  • 當隊列滿時,隊列會阻塞存儲元素的線程,直到隊列有可用空間
  • 在隊列爲空時,獲取元素的線程會等待隊列變爲非空

阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏存儲元素的線程,消費者是從隊列裏獲取元素的線程.阻塞隊列就是生產者存儲元素、消費者獲取元素的容器.node

在阻塞隊列不可用時,這兩個附加操做提供了4種處理方式
這裏寫圖片描述數組

  • 拋出異常
    • 當隊列滿時,若是再往隊列裏插入元素,會拋出IllegalStateException(「Queuefull」)異常
    • 當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常
  • 返回特殊值
    • 當往隊列插入元素時,會返回元素是否插入成功,成功則返回true
    • 如果移除方法,則是從隊列裏取出一個元素,若沒有則返回null
  • 一直阻塞
    • 當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到隊列有可用空間或響應中斷退出
    • 當隊列空時,若消費者線程從隊列裏take元素,隊列會阻塞住消費者線程,直到隊列非空
  • 超時退出
    • 當阻塞隊列滿時,若生產者線程往隊列裏插入元素,隊列會阻塞生產者線程
      一段時間,若超過指定的時間,生產者線程就會退出

注意 如果無界阻塞隊列,隊列不會出現滿的狀況,因此使用put或offer方法永遠不會被阻塞,使用offer方法時,永遠返回true緩存

BlockingQueue 不接受 null 元素。試圖 add、put 或 offer null 元素時,會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值(沒法經過編譯)。 安全

BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操做一般表現並不高效,只能有計劃地偶爾使用,好比在取消排隊信息時。markdown

BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內置鎖或其餘形式的併發控制來自動達到它們的目的。然而,大量的Collection 操做(addAll、containsAll、retainAll 和 removeAll)沒有必要自動執行,除非在實現中特別說明。所以,舉例來講,在只添加 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。併發

BlockingQueue 實質上不支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。 ui

2 Java裏的阻塞隊列

至JDK8,Java提供了7個阻塞隊列
·ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
·LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
·PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
·DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
·SynchronousQueue:一個不存儲元素的阻塞隊列。
·LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
·LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。this

2.1 ArrayBlockingQueue

是一個用數組實現的有界阻塞隊列.此隊列按FIFO的原則對元素進行排序spa

這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和消費者獲取的元素.一旦建立了這樣的緩存區,就不能再增長其容量.試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞.

此類支持對等待的生產者和消費者線程進行排序的可選的公平策略.默認狀況下,不保證這種機制.然而,經過將公平性設置爲 true 而構造的隊列容許按 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」.

所謂公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞的線程先訪問隊列.
非公平性是對先等待的線程是非公平的,當隊列有可用空間時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列.
爲保證公平性,一般會下降吞吐量.咱們可使用如下代碼建立一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,代碼以下:

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

3 阻塞隊列的實現原理

若隊列爲空,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?讓咱們看看JDK是如何實現的。
使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。經過查看源碼發現ArrayBlockingQueue使用了Condition來實現,代碼以下。

private final Condition notFull;
    private final Condition notEmpty;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        // 省略其餘代碼
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

     private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

當往隊列裏插入一個元素時,若是隊列不可用,那麼阻塞生產者主要經過
LockSupport.park(this)來實現。

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

繼續進入源碼,發現調用setBlocker先保存一下將要阻塞的線程,而後調用unsafe.park阻塞當前線程。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

unsafe.park是個native方法,代碼以下。

public native void park(boolean isAbsolute, long time);

park這個方法會阻塞當前線程,只有如下4種狀況中的一種發生時,該方法纔會返回。

  • 與park對應的unpark執行或已經執行時。「已經執行」是指unpark先執行,而後再執行park的狀況。
  • 線程被中斷時。
  • 等待完time參數指定的毫秒數時。
  • 異常現象發生時,這個異常現象沒有任何緣由。
相關文章
相關標籤/搜索