Java中的阻塞隊列_BlockingQueue

Java中的阻塞隊列_BlockingQueue數組

1. 什麼是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。緩存

阻塞隊列提供了四種處理方法:數據結構

輸入圖片說明

  • 拋出異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。
  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。

2. Java裏的阻塞隊列

一個 BlockingQueue 多是有界的,若是在插入的時候,發現隊列滿了,那麼 put 操做將會阻塞。一般,在這裏咱們說的無界隊列也不是說真正的無界,而是它的容量是 Integer.MAX_VALUE(21億多)。多線程

JDK7提供了7個阻塞隊列。分別是:less

  1. ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  2. LinkedBlockingQueue :底層基於單向鏈表實現的阻塞隊列,能夠當作無界隊列也能夠當作有界隊列來使用。
  3. PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列,PriorityBlockingQueue 只能指定初始的隊列大小,後面插入元素的時候,若是空間不夠的話會自動擴容。
  4. DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  5. SynchronousQueue:一個不存儲元素的阻塞隊列,SynchronousQueue 的隊列實際上是虛的,其不提供任何空間(一個都沒有)來存儲元素。數據必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。。
  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  7. LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列,所謂雙向隊列指的你能夠從隊列的兩端插入和移出元素。

3. 阻塞隊列的簡介

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。咱們可使用如下代碼建立一個公平的阻塞隊列:ui

ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

訪問者的公平性是使用可重入鎖實現的,代碼以下:this

/**
 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
 * capacity and the specified access policy.
 *
 * @param capacity the capacity of this queue
 * @param fair if {@code true} then queue accesses for threads blocked
 *        on insertion or removal, are processed in FIFO order;
 *        if {@code false} the access order is unspecified.
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

LinkedBlockingQueue

底層基於單向鏈表實現的阻塞隊列,能夠當作無界隊列也能夠當作有界隊列來使用。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。線程

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

也能夠設置隊列的長度設計

/**
 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *         than zero
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界隊列,內部數據結構使用數組實現。默認狀況下元素採起天然順序排列,也能夠經過比較器comparator來指定元素的排序規則。元素按照升序排列。但它會在初始化的時候指定一個初始化的長度DEFAULT_INITIAL_CAPACITY,code

/**
 * Creates a {@code PriorityBlockingQueue} with the default
 * initial capacity (11) that orders its elements according to
 * their {@linkplain Comparable natural ordering}.
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

這個初始化的長度會動態調整的,動態調整的邏輯以下面這段代碼,

/**
 * Tries to grow array to accommodate at least one more element
 * (but normally expand by about 50%), giving up (allowing retry)
 * on contention (which we expect to be rare). Call only while
 * holding lock.
 *
 * @param array the heap array
 * @param oldCap the length of the array
 */
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                    0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                    (oldCap + 2) : // grow faster if small
                    (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。咱們能夠將DelayQueue運用在如下應用場景:

  • 緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,好比TimerQueue就是使用DelayQueue實現的。

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合於傳遞性場景,好比在一個線程中使用的數據,傳遞給另一個線程使用,SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。 transfer方法,若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法能夠把生產者傳入的元素馬上transfer(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。 transfer方法的關鍵代碼以下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代碼是試圖把存放當前元素的s節點做爲tail節點。第二行代碼是讓CPU自旋等待消費者消費元素。由於自旋會消耗CPU,因此自旋必定的次數後使用Thread.yield()方法來暫停當前正在執行的線程,並執行其餘線程。

tryTransfer方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法不管消費者是否接收,方法當即返回。而transfer方法是必須等到消費者消費了才返回。

對於帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回true。

LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你能夠從隊列的兩端插入和移出元素。雙端隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素。另外插入方法add等同於addLast,移除方法remove等效於removeFirst。可是take方法卻等同於takeFirst,不知道是否是Jdk的bug,使用時仍是用帶有First和Last後綴的方法更清楚。在初始化LinkedBlockingDeque時能夠設置容量防止其過渡膨脹。另外雙向阻塞隊列能夠運用在「工做竊取」模式中。

/**
 * Creates a {@code LinkedBlockingDeque} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}

/**
 * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
 *
 * @param capacity the capacity of this deque
 * @throws IllegalArgumentException if {@code capacity} is less than 1
 */
public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

==========END==========

相關文章
相關標籤/搜索