Java阻塞隊列

💛原文地址爲http://www.javashuo.com/article/p-ktssqkkn-gq.html,轉載請註明出處!html

什麼是阻塞隊列

原文地址爲,轉載請註明出處!
阻塞隊列是一個支持阻塞的插入和移除的隊列。java

  • 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 支持阻塞的移除方法:意思是隊列爲空時,獲取元素(同時移除元素)的線程會被阻塞,等到隊列變爲非空。

阻塞隊列用法

阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏獲取元素的線程。數組

當阻塞隊列不可用時,會有四種相應的處理方式:this

處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入操做 add(e) offer(e) put(e) offer(e, time, unit)
移除操做 remove() poll() take() poll(time, unit)
獲取操做 element() peek() 不可用 不可用
  • 返回特殊值:插入元素時,會返回是否插入成功,成功返回true。若是是移除方法,則是從隊列中取出一個元素,沒有則返回null。
  • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏面put元素,則生產者線程會被阻塞,知道隊列不滿或者響應中斷退出。當隊列爲空時,若是消費者線程從隊列裏take元素。
  • 超時退出:當阻塞隊列滿時,若是生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間,若是超過了指定時間,生產者線程就會退出。

若是是無界阻塞隊列,隊列則不會出現滿的狀況。線程

阻塞隊列

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列
  • LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列code

  • PriorityBlockingQueue:一個支持優先級排序無界阻塞隊列
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列
  • SynchronousQueue:一個不存儲元素的阻塞隊列
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列
    • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列

1.ArrayBlockingQueue

此隊列按照先進先出(FIFO)的原則對元素進行排序htm

默認狀況下不保證線程公平地訪問隊列(所謂公平是指當隊列可用時,先被阻塞的線程先訪問隊列)blog

爲了保證公平性一般會下降吞吐量。排序

公平鎖是利用了可重入鎖的公平鎖來實現。索引

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

2.ArrayBlockingQueue

此隊列按照先進先出(FIFO)的原則對元素進行排序

默認長度爲Integer.MAX_VALUE

3.PriorityBlockingQueue

默認狀況下元素採起天然順序升序排列

能夠自定義Comparator或者自定義類實現compareTo()方法來指定排序規則

不支持同優先級元素排序

4.DelayQueue

隊列使用PriorityQueue來實現,隊列中的元素必須實現Delayed接口

只有在延時期滿才能從隊列中提取元素

阻塞隊列原理

若是隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?

使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。

ArrayBlockingQueue爲例子

/** items 存放隊列中的元素*/
    final Object[] items;

    /** take 操做的索引 */
    int takeIndex;

    /** put 操做的索引 */
    int putIndex;

    /** 隊列中元素個數 */
    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** 控制生產者 takes 操做的 Condition */
    private final Condition notEmpty;

    /** 控制消費者 put 操做的 Condition */
    private final Condition notFull;

put操做

public void put(E e) throws InterruptedException {
        checkNotNull(e); //判斷 e == null
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //獲取鎖,與lock不一樣,能夠嘗試中斷阻塞
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

入隊操做,入隊以後喚醒消費者線程。

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

notFull.await();中其實調用了park方法,先使用setBlocker保存一下將要阻塞的線程,而後調用本地方法UNSAFE.park(boolean isAbsolute, long time)進行阻塞。

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
相關文章
相關標籤/搜索