BlockingQueue位於JDK5新增的concurrent包中,它很好地解決了多線程中,如何高效安全地「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。java
阻塞隊列,顧名思義,它首先它是一個隊列,在數據結構中,隊列是一種線性表。程序員
咱們經過一個共享的隊列,可使得數據由隊列的一端輸入,從另一端輸出。經常使用的隊列主要有如下兩種:
先進先出(FIFO):先插入的隊列的元素也最早出隊列,相似於排隊的功能。從某種程度上來講這種隊列也體現了一種公平性。
後進先出(LIFO):後插入隊列的元素最早出隊列,這種隊列優先處理最近發生的事件,至關於棧。數組
在多線程環境中,經過隊列能夠很容易地實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。緩存
假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。可是,若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。安全
在JDK5的concurrent包發佈之前,在多線程環境下,程序員只能靠本身去人爲地控制這些實現細節,還要兼顧效率和線程安全,這會給咱們的程序帶來不小的複雜度。因而,強大的concurrent包橫空出世了,而且給咱們帶來了強大的BlockingQueue。(注:在多線程領域,所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒)。數據結構
阻塞隊列與咱們日常接觸的普通隊列(LinkedList或ArrayList等)的最大不一樣點,在於阻塞隊列提供了阻塞添加和阻塞刪除的方法。多線程
*阻塞添加 所謂阻塞添加,是指當阻塞隊列元素已滿時,隊列會阻塞加入元素的線程,直到隊列元素不滿時才從新喚醒線程執行元素加入操做。 *阻塞刪除 阻塞刪除是指在隊列元素爲空時,刪除隊列元素的線程將被阻塞,直到隊列不爲空時再執行刪除操做(通常都會返回被刪除的元素)。
做爲BlockingQueue的使用者,咱們不再用關心何時須要阻塞線程,何時須要喚醒線程,由於BlockingQueue把這一切都爲咱們包辦了。併發
BlockingQueue的核心方法:ide
插入方法:函數
刪除方法:
檢查方法
常見BlockingQueue:
①ArrayBlockingQueue
ArrayBlockingQueue是一個阻塞式的隊列,繼承自AbstractBlockingQueue,間接的實現了Queue接口和Collection接口。底層以數組的形式保存數據(實際上可看做一個循環數組)。經常使用的操做包括 add ,offer,put,remove,poll,take,peek。 前三者add offer put 是插入的操做。後面四個方法是取出的操做。
能夠說,ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法爲添加和刪除的阻塞方法。
須要注意的是,ArrayBlockingQueue內部的阻塞隊列是經過重入鎖ReenterLock和Condition條件隊列實現的,因此ArrayBlockingQueue中的元素存在公平訪問與非公平訪問的區別,對於公平訪問隊列,被阻塞的線程能夠按照阻塞的前後順序訪問隊列,即先阻塞的線程先訪問隊列。而非公平隊列,當隊列可用時,阻塞的線程將進入爭奪訪問資源的競爭中,也就是說誰先搶到誰就執行,沒有固定的前後順序。
②LinkedBlockingQueue
LinkedBlockingQueue是底層基於鏈表實現的阻塞隊列,內部維持着一個數據緩衝隊列(該隊列由鏈表構成)。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。結構圖以下:
LinkedBlockingQueue構造的時候若沒有指定大小,則默認大小爲Integer.MAX_VALUE,固然也能夠在構造函數的參數中指定大小。LinkedBlockingQueue不接受null。
LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。
LinkedBlockingQueue中維持兩把鎖,一把鎖用於入隊,一把鎖用於出隊,這也就意味着,同一時刻,只能有一個線程執行入隊,其他執行入隊的線程將會被阻塞;同時,能夠有另外一個線程執行出隊,其他執行出隊的線程將會被阻塞。換句話說,雖然入隊和出隊兩個操做同時均只能有一個線程操做,可是能夠一個入隊線程和一個出隊線程共同執行,也就意味着可能同時有兩個線程在操做隊列,那麼爲了維持線程安全,LinkedBlockingQueue使用一個AtomicInterger類型的變量表示當前隊列中含有的元素個數,因此能夠確保兩個線程之間操做底層隊列是線程安全的。
LinkedBlockingQueue能夠指定容量,內部維持一個隊列,因此有一個頭節點head和一個尾節點last,內部維持兩把鎖,一個用於入隊,一個用於出隊,還有鎖關聯的Condition對象。重要字段有:
//容量,若是沒有指定,該值爲Integer.MAX_VALUE; private final int capacity; //當前隊列中的元素 private final AtomicInteger count = new AtomicInteger(); //隊列頭節點,始終知足head.item==null transient Node<E> head; //隊列的尾節點,始終知足last.next==null private transient Node<E> last; //用於出隊的鎖 private final ReentrantLock takeLock = new ReentrantLock(); //當隊列爲空時,保存執行出隊的線程 private final Condition notEmpty = takeLock.newCondition(); //用於入隊的鎖 private final ReentrantLock putLock = new ReentrantLock(); //當隊列滿時,保存執行入隊的線程 private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue的構造方法有三個:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//last和head在隊列爲空時都存在,因此隊列中至少有一個節點 } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
從LinkedBlockingQueue的構造方法中能夠看出:當調用無參的構造方法時,容量是int的最大值;隊列中至少包含一個節點,哪怕隊列對外表現爲空;LinkedBlockingQueue不支持null元素。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通、最經常使用的阻塞隊列。
LinkedBlockingQueue用一個鏈表保存元素,其內部有一個Node的內部類,其中有一個成員變量 Node next,這樣就造成了一個鏈表的結構,要獲取下一個元素,只要調用next就能夠了。而ArrayBlockingQueue則基於數組來保存元素。
LinkedBlockingQueue內部讀寫(插入獲取)各有一個鎖,而ArrayBlockingQueue則讀寫共享一個鎖。
③SynchronousQueue
不像ArrayBlockingQueue或LinkedBlockingQueue,SynchronousQueue內部並無數據緩存空間,你不能調用peek()方法來看隊列中是否有數據元素,由於數據元素只有當你試着取走的時候纔可能存在,不取走而只想偷窺一下是不行的,固然遍歷這個隊列的操做也是不容許的。隊列頭元素是第一個排隊要插入數據的線程,而不是要交換的數據。數據是在配對的生產者和消費者線程之間直接傳遞的,並不會將數據緩衝到隊列中。能夠這樣來理解:生產者和消費者互相等待對方,握手,而後一塊兒離開。
SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。
④LinkedBlockingDeque
LinkedBlockingDeque是一個基於鏈表的雙端阻塞隊列。和LinkedBlockingQueue相似,區別在於該類實現了Deque接口,而LinkedBlockingQueue實現了Queue接口。
LinkedBlockingDeque是一個可選容量的阻塞隊列,若是沒有設置容量,那麼容量將是Int的最大值。
LinkedBlockingDeque的底層數據結構是一個雙端隊列,該隊列使用鏈表實現,如圖所示:
LinkedBlockingDeque的重要字段有以下幾個:
//隊列的頭節點 transient Node<E> first; //隊列的尾節點 transient Node<E> last; //隊列中元素的個數 private transient int count; //隊列中元素的最大個數 private final int capacity; //鎖 final ReentrantLock lock = new ReentrantLock(); //隊列爲空時,阻塞take線程的條件隊列 private final Condition notEmpty = lock.newCondition(); //隊列滿時,阻塞put線程的條件隊列 private final Condition notFull = lock.newCondition();
從上面的字段,能夠看到LinkedBlockingDeque內部只有一把鎖以及該鎖上關聯的兩個條件,因此能夠推斷同一時刻只有一個線程能夠在隊頭或者隊尾執行入隊或出隊操做。能夠發現這點和LinkedBlockingQueue不一樣,LinkedBlockingQueue能夠同時有兩個線程在兩端執行操做。
因爲LinkedBlockingDeque是一個雙端隊列,因此就能夠在隊頭執行入隊和出隊操做,也能夠在隊尾執行入隊和出隊操做。
LinkedBlockingDeque的構造方法有三個:
public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; } public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock lock = this.lock; lock.lock(); // Never contended, but necessary for visibility try { for (E e : c) { if (e == null) throw new NullPointerException(); if (!linkLast(new Node<E>(e))) throw new IllegalStateException("Deque full"); } } finally { lock.unlock(); } }
能夠看到這三個構造方法的結構和LinkedBlockingQueue是相同的。 可是LinkedBlockingQueue是存在一個哨兵節點維持頭節點的,而LinkedBlockingDeque中是沒有的。
LinkedBlockingDeque和LinkedBlockingQueue的相同點在於: ①基於鏈表 ②容量可選,不設置的話,就是Int的最大值 LinkedBlockingDeque和LinkedBlockingQueue的不一樣點在於: ①雙端鏈表和單鏈表 ②不存在哨兵節點 ③一把鎖+兩個條件 LinkedBlockingDeque和ArrayBlockingQueue的相同點在於:使用一把鎖+兩個條件維持隊列的同步。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;singleThreadExecutor.execute(new Runnable() { @Overridepublic void run() {try {System.out.println(index);Thread.sleep(2000);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}});}