以前,介紹了一下 ThreadPoolExecutor 的各參數的含義(併發編程之線程池ThreadPoolExecutor),其中有一個 BlockingQueue,它是一個阻塞隊列。那麼,小夥伴們有沒有想過,爲何此處的線程池要用阻塞隊列呢?面試
咱們知道隊列是先進先出的。當放入一個元素的時候,會放在隊列的末尾,取出元素的時候,會從隊頭取。那麼,當隊列爲空或者隊列滿的時候怎麼辦呢。編程
這時,阻塞隊列,會自動幫咱們處理這種狀況。segmentfault
當阻塞隊列爲空的時候,從隊列中取元素的操做就會被阻塞。當阻塞隊列滿的時候,往隊列中放入元素的操做就會被阻塞。數組
然後,一旦空隊列有數據了,或者滿隊列有空餘位置時,被阻塞的線程就會被自動喚醒。併發
這就是阻塞隊列的好處,你不須要關心線程什麼時候被阻塞,也不須要關心線程什麼時候被喚醒,一切都由阻塞隊列自動幫咱們完成。咱們只須要關注具體的業務邏輯就能夠了。ide
而這種阻塞隊列常常用在生產者消費者模式中。(可參看:面試官讓我手寫一個生產者消費者模式)高併發
那麼,通常咱們用到的阻塞隊列有哪些呢。下面,經過idea的類圖,列出來經常使用的阻塞隊列,而後一個一個講解(不懂怎麼用的,能夠參考這篇文章:怎麼用IDEA快速查看類圖關係)。學習
阻塞隊列中,全部經常使用的方法都在 BlockingQueue 接口中定義。如測試
插入元素的方法: put,offer,add。移除元素的方法: remove,poll,take。this
它們有四種不一樣的處理方式,第一種是在失敗時拋出異常,第二種是在失敗時返回特殊值,第三種是一直阻塞當前線程,最後一種是在指定時間內阻塞,不然返回特殊值。(以上特殊值,是指在插入元素時,失敗返回false,在取出元素時,失敗返回null)
拋異常 | 特殊值 | 阻塞 | 超時 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
1) ArrayBlockingQueue
這是一個由數組結構組成的有界阻塞隊列。首先看下它的構造方法,有三個。
第一個能夠指定隊列的大小,第二個還能夠指定隊列是否公平,不指定的話,默認是非公平。它是使用 ReentrantLock 的公平鎖和非公平鎖實現的(後續講解AQS時,會詳細說明)。
簡單理解就是,ReentrantLock 內部會維護一個有前後順序的等待隊列,假若有五個任務一塊兒過來,都被阻塞了。若是是公平的,則等待隊列中等待最久的任務就會先進入阻塞隊列。若是是非公平的,那麼這五個線程就須要搶鎖,誰先搶到,誰就先進入阻塞隊列。
第三個構造方法,是把一個集合的元素初始化到阻塞隊列中。
另外,ArrayBlockingQueue 沒有實現讀寫分離,也就是說,讀和寫是不能同時進行的。由於,它讀寫時用的是同一把鎖,以下圖所示:
2) LinkedBlockingQueue
這是一個由鏈表結構組成的有界阻塞隊列。它的構造方法有三個。
能夠看到和 ArrayBlockingQueue 的構造方法大同小異,不過是,LinkedBlockingQueue 能夠不指定隊列的大小,默認值是 Integer.MAX_VALUE 。
可是,最好不要這樣作,建議指定一個固定大小。由於,若是生產者的速度比消費者的速度大的多的狀況下,這會致使阻塞隊列一直膨脹,直到系統內存被耗盡(此時,還沒達到隊列容量的最大值)。
此外,LinkedBlockingQueue 實現了讀寫分離,能夠實現數據的讀和寫互不影響,這在高併發的場景下,對於效率的提升無疑是很是巨大的。
3) SynchronousQueue
這是一個沒有緩衝的無界隊列。什麼意思,看一下它的 size 方法:
老是返回 0 ,由於它是一個沒有容量的隊列。
當執行插入元素的操做時,必須等待一個取出操做。也就是說,put元素的時候,必須等待 take 操做。
那麼,有的同窗就好奇了,這沒有容量,還叫什麼隊列啊,這有什麼意義呢。
個人理解是,這適用於併發任務不大,並且生產者和消費者的速度相差很少的場景下,直接把生產者和消費者對接,不用通過隊列的入隊出隊這一系列操做。因此,效率上會高一些。
能夠去查看一下 Excutors.newCachedThreadPool 方法用的就是這種隊列。
這個隊列有兩個構造方法,用於傳入是公平仍是非公平,默認是非公平。
4)PriorityBlockingQueue
這是一個支持優先級排序的無界隊列。有四個構造方法:
能夠指定初始容量大小(注意初始容量並不表明最大容量),或者不指定,默認大小爲 11。也能夠傳入一個比較器,把元素按必定的規則排序,不指定比較器的話,默認是天然順序。
PriorityBlockingQueue 是基於二叉樹最小堆實現的,每當取元素的時候,就會把優先級最高的元素取出來。咱們測試一下:
public class Person { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Person{" + "id=" + id + ", name='" + name + '\'' + '}'; } public Person(int id, String name) { this.id = id; this.name = name; } public Person() { } } public class QueueTest { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<Person> priorityBlockingQueue = new PriorityBlockingQueue<>(1, new Comparator<Person>() { @Override public int compare(Person o1, Person o2) { return o1.getId() - o2.getId(); } }); Person p2 = new Person(7, "李四"); Person p1 = new Person(9, "張三"); Person p3 = new Person(6, "王五"); Person p4 = new Person(2, "趙六"); priorityBlockingQueue.add(p1); priorityBlockingQueue.add(p2); priorityBlockingQueue.add(p3); priorityBlockingQueue.add(p4); //因爲二叉樹最小堆實現,用這種方式直接打印元素,不能保證有序 System.out.println(priorityBlockingQueue); System.out.println(priorityBlockingQueue.take()); System.out.println(priorityBlockingQueue); System.out.println(priorityBlockingQueue.take()); System.out.println(priorityBlockingQueue); } }
打印結果:
[Person{id=2, name='趙六'}, Person{id=6, name='王五'}, Person{id=7, name='李四'}, Person{id=9, name='張三'}] Person{id=2, name='趙六'} [Person{id=6, name='王五'}, Person{id=9, name='張三'}, Person{id=7, name='李四'}] Person{id=6, name='王五'} [Person{id=7, name='李四'}, Person{id=9, name='張三'}]
能夠看到,第一次取出的是 id 最小值 2, 第二次取出的是 6 。
5)DelayQueue
這是一個帶有延遲時間的無界阻塞隊列。隊列中的元素,只有等延時時間到了,才能取出來。此隊列通常用於過時數據的刪除,或任務調度。如下,模擬一下定長時間的數據刪除。
首先定義數據元素,須要實現 Delayed 接口,實現 getDelay 方法用於計算剩餘時間,和 CompareTo方法用於優先級排序。
public class DelayData implements Delayed { private int id; private String name; //數據到期時間 private long endTime; private TimeUnit timeUnit = TimeUnit.MILLISECONDS; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getEndTime() { return endTime; } public void setEndTime(long endTime) { this.endTime = endTime; } public DelayData(int id, String name, long endTime) { this.id = id; this.name = name; //須要把傳入的時間endTime 加上當前系統時間,做爲數據的到期時間 this.endTime = endTime + System.currentTimeMillis(); } public DelayData() { } @Override public long getDelay(TimeUnit unit) { return this.endTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { return o.getDelay(this.timeUnit) - this.getDelay(this.timeUnit) < 0 ? 1: -1; } }
模擬三條數據,分別設置不一樣的過時時間:
public class ProcessData { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayData> delayQueue = new DelayQueue<>(); DelayData a = new DelayData(5, "A", 5000); DelayData b = new DelayData(8, "B", 8000); DelayData c = new DelayData(2, "C", 2000); delayQueue.add(a); delayQueue.add(b); delayQueue.add(c); System.out.println("開始計時時間:" + System.currentTimeMillis()); for (int i = 0; i < 3; i++) { DelayData data = delayQueue.take(); System.out.println("id:"+data.getId()+",數據:"+data.getName()+"被移除,當前時間:"+System.currentTimeMillis()); } } }
最後結果:
開始計時時間:1583333583216 id:2,數據:C被移除,當前時間:1583333585216 id:5,數據:A被移除,當前時間:1583333588216 id:8,數據:B被移除,當前時間:1583333591216
能夠看到,數據是按過時時間長短,按順序移除的。C的時間最短 2 秒,而後過了 3 秒 A 也過時,再過 3 秒,B 過時。
若是本文對你有用,歡迎點贊,評論,轉發。
學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章推送。