多線程-阻塞隊列

1.阻塞隊列

阻塞隊列

阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素

 

 

阻塞隊列提供了四種處理方法:

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。

  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,如果沒有則返回null

  • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。

  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

成員方法

隊列 有界性 數據結構
ArrayBlockingQueue bounded(有界) 加鎖 arrayList
LinkedBlockingQueue optionally-bounded 加鎖 linkedList
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap
SynchronousQueue bounded 加鎖
LinkedTransferQueue unbounded 加鎖 heap
LinkedBlockingDeque unbounded 無鎖 heap

下面分別簡單介紹一下:

  • ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】

  • LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度爲Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。

  • PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。

  • DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿後才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)

  • SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閒線程則會重複使用,線程空閒了60秒後會被回收。

  • LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。

  • LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程併發時,可以將鎖的競爭最多降到一半。

ArrayBlockingQueue

package demo.queue;
​
import java.util.concurrent.ArrayBlockingQueue;
​
public class BlockingQueue1 {
​
    public static void main(String[] args) {
​
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//給定初始容量
        // add/remove 拋出異常
        arrayBlockingQueue.add("a");
        arrayBlockingQueue.add("b");
        arrayBlockingQueue.add("c");
​
        //容量爲3 當我們加入第四個值時則會拋出異常
        arrayBlockingQueue.add("d");
​
    }
​
}

輸出:

 

package demo.queue;
​
import java.util.concurrent.ArrayBlockingQueue;
​
public class BlockingQueue1 {
​
    public static void main(String[] args) {
​
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);//給定初始容量
        // add/remove 拋出異常
        arrayBlockingQueue.add("a");
        arrayBlockingQueue.add("b");
        arrayBlockingQueue.add("c");
​
        //容量爲3 當我們加入第四個值時則會拋出異常
        //arrayBlockingQueue.add("d");
​
        //remove
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        //當隊列沒有元素時拋出異常
        System.out.println(arrayBlockingQueue.remove());
​
    }
​
}

輸出:

 

add方法和offer方法最終調用的是enqueue(E x)方法 看源碼可知

SynchronousQueue

SynchronousQueue , 只有一個容量!

每一個put操作,就需要有一個 take操作!

package demo.queue;
​
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
​
public class BlockingQueue2 {
​
    public static void main(String[] args) {
​
        SynchronousQueue<String> arrayBlockingQueue = new SynchronousQueue<>();
​
        // A 存
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + "put a");
                arrayBlockingQueue.put("a");
                System.out.println(Thread.currentThread().getName() + "put b");
                arrayBlockingQueue.put("b");
                System.out.println(Thread.currentThread().getName() + "put c");
                arrayBlockingQueue.put("c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"A:").start();
​
        // B 取
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
​
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
​
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + arrayBlockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"B:").start();
​
​
​
​
    }
​
}

輸出: