突擊併發編程JUC系列-阻塞隊列 BlockingQueue

突擊併發編程JUC系列演示代碼地址: https://github.com/mtcarpenter/JavaTutorial前端

什麼是阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。java

  • 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
  • 支持阻塞的移除方法:意思是在隊列爲空時,獲取元素的線程會等待隊列變爲非空。

阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。git

插入和移除操做的4種處理方式

方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用
  • 拋出異常:當隊列滿時,若是再往隊列裏插入元素,會拋出 IllegalStateException ("Queue full")異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException 異常。
  • 返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若是是移除方法,則是從隊列裏取出一個元素,若是沒有則返回 null 。
  • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏 put 元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,若是消費者線程從隊列裏 take 元素,隊列會阻塞住消費者線程,直到隊列不爲空。
  • 超時退出:當阻塞隊列滿時,若是生產者線程往隊列裏插入元素,隊列會阻塞生產者線程一段時間,若是超過了指定的時間,生產者線程就會退出。

若是是無界阻塞隊列,隊列不可能會出現滿的狀況,因此使用 put 或 offer 方法永遠不會被阻塞,並且使用offer方法時,該方法永遠返回 true。github

ArrayBlockingQueue

ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。 默認狀況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。爲了保證公平性,一般會下降吞吐量。編程

阻塞式寫方法

ArrayBlockingQueue中提供了兩個阻塞式寫方法,分別以下(在該隊列中,不管是阻塞式寫方法仍是非阻塞式寫方法,都不容許寫入null)。後端

  • void put(E e):向隊列的尾部插入新的數據,當隊列已滿時調用該方法的線程會進入阻塞,直到有其餘線程對該線程執行了中斷操做,或者隊列中的元素被其餘線程消費。
  • boolean offer(E e, long timeout, TimeUnit unit):向隊列尾部寫入新的數據,當隊列已滿時執行該方法的線程在指定的時間單位內將進入阻塞,直到到了指定的超時時間後,或者在此期間有其餘線程對隊列數據進行了消費。

put() 方法示例數組

public class ArrayBlockingQueueExample1 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        try {
            queue.put("class 1");
            queue.put("class 2");
            queue.put("class 3");
            // 超過指定得容量當前線程阻塞
            queue.put("class 4");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

非阻塞式寫方法

當隊列已滿時寫入數據,若是不想使得當前線程進入阻塞,那麼就可使用非阻塞式的寫操做方法。緩存

  • boolean add(E e):向隊列尾部寫入新的數據,當隊列已滿時不會進入阻塞,可是該方法會拋出隊列已滿的異常。
  • boolean offer(E e):向隊列尾部寫入新的數據,當隊列已滿時不會進入阻塞,而且會當即返回 false。

add() 方法示例多線程

public class ArrayBlockingQueueExample2 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.add("class 1");
        queue.add("class 2");
        queue.add("class 3");
        //  超過指定容量 拋出異常
        queue.add("class 4");
    }
}
// 拋出異常

阻塞式讀方法

  • E take():從隊列頭部獲取數據,而且該數據會從隊列頭部移除,當隊列爲空時執行take方法的線程將進入阻塞,直到有其餘線程寫入新的數據,或者當前線程被執行了中斷操做。
  • E poll(long timeout, TimeUnit unit):從隊列頭部獲取數據而且該數據會從隊列頭部移除,若是隊列中沒有任何元素時則執行該方法,當前線程會阻塞指定的時間,直到在此期間有新的數據寫入,或者阻塞的當前線程被其餘線程中斷,當線程因爲超時退出阻塞時,返回值爲null。

take() 方法示例併發

public class ArrayBlockingQueueExample3 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.add("class 1");
        queue.add("class 2");
        queue.add("class 3");
        try {
            // 取出對頭元素
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 隊列大小 
        System.out.println(queue.size());
    }
}
//class 1
// 2

非阻塞式讀方法

  • E poll():從隊列頭部獲取數據而且該數據會從隊列頭部移除,當隊列爲空時,該方法不會使得當前線程進入阻塞,而是返回null值。
  • E peek():當隊列爲空時,該方法不會使得當前線程進入阻塞,而是返回null值。
public class ArrayBlockingQueueExample4 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        // 隊列無元素 直接返回 null
        System.out.println(queue.poll( ));
        System.out.println(queue.peek( ));
    }
}
// null
// null

部分源碼

public void put(E e) throws InterruptedException {
        // 檢查元素
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lockInterruptibly();
        try {
            // 元素滿 一直阻塞,隊列非滿時,被喚醒
            while (count == items.length)
                notFull.await();
            // 入隊
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
         // 獲取鎖
        lock.lockInterruptibly();
        try {
            // 隊列爲空 等待
            while (count == 0)
                notEmpty.await();
            // 出隊
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

LinkedBlockingQueue

LinkedBlockingQueue 是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排列。也能夠自定義類實現compareTo()方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator 來對元素進行排序。須要注意的是不能保證同優先級元素的順序。

public class PriorityBlockingQueueExample1 {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue();
        queue.offer(1);
        queue.offer(12);
        queue.offer(21);
        queue.offer(6);
		// 內部排序
        System.out.println(queue.poll()); // 1
        System.out.println(queue.poll()); // 6
        System.out.println(queue.poll()); // 12
        System.out.println(queue.poll()); //21

    }
}

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。

DelayQueue很是有用,能夠將DelayQueue運用在如下應用場景。

  • 緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  • 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,好比TimerQueue就是使用DelayQueue實現的。

DelayQueue隊列的元素必須實現Delayed接口。咱們能夠參考ScheduledThreadPoolExecutorScheduledFutureTask類的實現。

public class DelayQueueExample1 {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedEntry> queue = new DelayQueue<>();
        // 延期3秒 處理
        queue.put(new DelayedEntry("A", 30000L));
        // 延期10 秒處理
        queue.add(new DelayedEntry("B", 10000L));
        // 延期 20 秒處理
        queue.add(new DelayedEntry("C", 20000L));
        int size = queue.size();
        System.out.println("當前時間是:" + LocalDateTime.now());
        // 從延時隊列中獲取元素, 將輸出 A,B,C
        for (int i = 0; i < size; i++) {
            System.out.println(queue.take() + " ------ " + LocalDateTime.now());
        }
    }
}

/**
 * 繼承 Delayed 接口
 */
class DelayedEntry implements Delayed {
    /**
     * 元素數據內容
     */
    private final String value;
    /**
     * 用於計算失效時間
     */
    private final long exeTime;

    DelayedEntry(String value, long exeTime) {
        this.value = value;
        this.exeTime = exeTime + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return exeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        DelayedEntry t = (DelayedEntry) o;
        if (this.exeTime < t.exeTime) {
            return -1;
        } else if (this.exeTime > t.exeTime) {
            return 1;
        } else {
            return 0;
        }

    }

    @Override
    public String toString() {
        return "DelayedEntry{" +
                "value=" + value +
                ", exeTime=" + exeTime +
                '}';
    }
}

//當前時間是:2020-10-15T16:26:37.167
//DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117
// DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105
//DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104

SynchronousQueue

SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。 它支持公平訪問隊列。默認狀況下線程採用非公平性策略訪問隊列。使用如下構造方法能夠建立公平性訪問的SynchronousQueue,若是設置爲true,則等待的線程會採用先進先出的順序訪問隊列。

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfertransfer方法。

  • transfer方法

    若是當前有消費者正在等待接收元素(消費者使用 take() 方法或帶時間限制的poll()方法時),transfer 方法能夠把生產者傳入的元素馬上 transfer(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer 方法會將元素存放在隊列的tail節點,並等到該元素被消費者消費了才返回。transfer 方法的關鍵代碼以下

  • tryTransfer方法

    tryTransfer方法是用來試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回false。和transfer方法的區別是tryTransfer方法不管消費者是否接收,方法當即返回,而transfer方法是必須等到消費者消費了才返回。 對於帶有時間限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回false,若是在超時時間內消費了元素,則返回 true。

LinkedBlockingDeque

LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是能夠從隊列的兩端插入和移出元素。雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirstaddLastofferFirstofferLastpeekFirstpeekLast等方法,以First單詞結尾的方法,表示插入、獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙端隊列的最後一個元素。另外,插入方法add等同於addLast,移除方法remove等效於removeFirst。可是take方法卻等同於takeFirst,不知道是否是JDK的 bug,使用時仍是用帶有FirstLast後綴的方法更清楚。 在初始化LinkedBlockingDeque 時能夠設置容量防止其過分膨脹。另外,雙向阻塞隊列能夠運用在「工做竊取」模式中。


歡迎關注公衆號 山間木匠 , 我是小春哥,從事 Java 後端開發,會一點前端、經過持續輸出系列技術文章以文會友,若是本文能爲您提供幫助,歡迎你們關注、點贊、分享支持,咱們下期再見!<br />

相關文章
相關標籤/搜索