原文地址:https://www.cnblogs.com/java-jun-world2099/articles/10165949.htmlhtml
========================================================================前端
在Java多線程應用中,隊列的使用率很高,多數生產消費模型的首選數據結構就是隊列。Java提供的線程安全的Queue能夠分爲阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是java
ConcurrentLinkedQueue,在實際應用中要根據實際須要選用阻塞隊列或者非阻塞隊列。node
注:什麼叫線程安全?這個首先要明確。線程安全的類 ,指的是類內共享的全局變量的訪問必須保證是不受多線程形式影響的。若是因爲多線程的訪問(好比修改、遍歷、查看)而使這些變量結構被破壞或者針對這些變量操做的原子性被破壞,則這個類就不是線程安全的。算法
在併發的隊列上jdk提供了兩套實現,一個是以ConcurrentLinkedQueue爲表明的高性能隊列,一個是以BlockingQueue接口爲表明的阻塞隊列,不管在那種都繼承自Queue。
今天就聊聊這兩種Queue:編程
Queue是什麼就不須要多說了吧,一句話:隊列是先進先出。相對的,棧是後進先出。若是不熟悉的話先找本基礎的數據結構的書看看吧。 後端
BlockingQueue,顧名思義,「阻塞隊列」:能夠提供阻塞功能的隊列。
首先,看看BlockingQueue提供的經常使用方法: 數組
從上表能夠很明顯看出每一個方法的做用,這個不用多說。我想說的是:緩存
BlockingQueue做爲線程容器,能夠爲線程同步提供有力的保障。安全
BlockingQueue定義的經常使用方法:
拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查 element() peek() 不可用 不可用
基於數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個經常使用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。
ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着二者沒法真正並行運行,這點尤爲不一樣於LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue徹底能夠採用分離鎖,從而實現生產者和消費者操做的徹底並行運行。Doug Lea之因此沒這樣去作,也許是由於ArrayBlockingQueue的數據寫入和獲取操做已經足夠輕巧,以致於引入獨立的鎖機制,除了給代碼帶來額外的複雜性外,其在性能上徹底佔不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。而在建立ArrayBlockingQueue時,咱們還能夠控制對象的內部鎖是否採用公平鎖,默認採用非公平鎖。
基於鏈表的阻塞隊列,同ArrayListBlockingQueue相似,其內部也維持着一個數據緩衝隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時(LinkedBlockingQueue能夠經過構造函數指定該值),纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。而LinkedBlockingQueue之因此可以高效的處理併發數據,還由於其對於生產者端和消費者端分別採用了獨立的鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。
做爲開發者,咱們須要注意的是,若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小的容量(Integer.MAX_VALUE),這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。
阻塞隊列:線程安全
按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,而且隊列檢索操做會得到位於隊列頭部的元素。連接隊列的吞吐量一般要高於基於數組的隊列,可是在大多數併發應用程序中,其可預知的性能要低。
注意:
一、必需要使用take()方法在獲取的時候達成阻塞結果
二、使用poll()方法將產生非阻塞效果
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class BlockingDeque { //阻塞隊列,FIFO private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Producer("producer1")); executorService.submit(new Producer("producer2")); executorService.submit(new Producer("producer3")); executorService.submit(new Consumer("consumer1")); executorService.submit(new Consumer("consumer2")); executorService.submit(new Consumer("consumer3")); } static class Producer implements Runnable { private String name; public Producer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { System.out.println(name+ " 生產: " + i); //concurrentLinkedQueue.add(i); try { concurrentLinkedQueue.put(i); Thread.sleep(200); //模擬慢速的生產,產生阻塞的效果 } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } static class Consumer implements Runnable { private String name; public Consumer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { try { //必需要使用take()方法在獲取的時候阻塞 System.out.println(name+"消費: " + concurrentLinkedQueue.take()); //使用poll()方法 將產生非阻塞效果 //System.out.println(name+"消費: " + concurrentLinkedQueue.poll()); //還有一個超時的用法,隊列空時,指定阻塞時間後返回,不會一直阻塞 //但有一個疑問,既然能夠不阻塞,爲啥還叫阻塞隊列? //System.out.println(name+" Consumer " + concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS)); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
基於優先級的阻塞隊列(優先級的判斷經過構造函數傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現Comparable接口),在實現PriorityBlockingQueue時,內部控制線程同步的鎖採用的是公平鎖,他也是一個無界的隊列。
Task.java
UsePriorityBlockingQueue.java
打印結果:
帶有延遲時間的Queue,其中的元素只有當其指定的延遲時間到了,纔可以從隊列中獲取到該元素。DelayQueue中的元素必須實現Delayed接口,DelayQueue是一個沒有大小限制的隊列,應用場景不少,好比對緩存超時的數據進行移除、任務超時處理、空閒鏈接的關閉等等。
Wangmin.java
WangBa.java
打印結果:
LinkedBlockingDeque是一個線程安全的雙端隊列實現,由鏈表結構組成的雙向阻塞隊列,便可以從隊列的兩端插入和移除元素。雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。能夠說他是最爲複雜的一種隊列,在內部實現維護了前端和後端節點,可是其沒有實現讀寫分離,所以同一時間只能有一個線程對其講行操做。在高併發中性能要遠低於其它BlockingQueue。更要低於ConcurrentLinkedQueue,jdk早期有一個非線程安全的Deque就是ArryDeque了, java6裏添加了LinkBlockingDeque來彌補多線程場景下線程安全的問題。
相比於其餘阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first結尾的方法,表示插入、獲取獲移除雙端隊列的第一個元素。以last結尾的方法,表示插入、獲取獲移除雙端隊列的最後一個元素。
此外,LinkedBlockingDeque仍是可選容量的,防止過分膨脹,默認等於Integer.MAX_VALUE。
主要方法:
akeFirst()和takeLast():分別返回類表中第一個和最後一個元素,返回的元素會從類表中移除。若是列表爲空,調用的方法的線程將會被阻塞直達列表中有可用元素。
getFirst()和getLast():分別返回類表中第一個和最後一個元素,返回的元素不會從列表中移除。若是列表爲空,則拋出NoSuckElementException異常。
peek()、peekFirst()和peekLast():分別返回列表中第一個元素和最後一個元素,返回元素不會被移除。若是列表爲空返回null。
poll()、pollFirst()和pollLast():分別返回類表中第一個和最後一個元素,返回的元素會從列表中移除。若是列表爲空,返回Null。
public class UseDeque { public static void main(String[] args) { LinkedBlockingDeque<String> dq = new LinkedBlockingDeque<String>(10); dq.addFirst("a"); dq.addFirst("b"); dq.addFirst("c"); dq.addFirst("d"); dq.addFirst("e"); dq.addLast("f"); dq.addLast("g"); dq.addLast("h"); dq.addLast("i"); dq.addLast("j"); //dq.offerFirst("k"); System.out.println("查看頭元素:" + dq.peekFirst()); System.out.println("獲取尾元素:" + dq.pollLast()); Object [] objs = dq.toArray(); for (int i = 0; i < objs.length; i++) { System.out.print(objs[i] + " -- "); } } }
打印結果:
1)LinkedBlockingQeque
先看它的結構基本字段:
和LinkedBlockingDeque的區別之一就是,LinkedBlockingQueue採用了兩把鎖來對隊列進行操做,也就是隊尾添加的時候,
隊頭仍然能夠刪除等操做。接下來看典型的操做。
put操做
主要的思想仍是比較容易理解的,如今看看enqueue
方法:
private void enqueue(Node<E> node) { //入對操做。 last = last.next = node; //隊尾進 }
再看看signalNotEmpty
方法:
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); //加鎖 try { notEmpty.signal(); //用於signal,notEmpty } finally { takeLock.unlock(); } }
take操做
take操做,就是從隊列裏面彈出一個元素,下面看它的詳細代碼:
public E take() throws InterruptedException { E x; int c = -1; //設定一個記錄變量 final AtomicInteger count = this.count; //得到count final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); //加鎖 try { while (count.get() == 0) { //若是沒有元素,那麼就阻塞性等待 notEmpty.await(); } x = dequeue(); //必定能夠拿到。 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); //報告還有元素,喚醒隊列 } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); //解鎖 return x; }
接下來看dequeue
方法:
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC 指向本身,幫助gc回收 head = first; E x = first.item; //從隊頭出。 first.item = null; //將head.item設爲null。 return x; }
對於LinkedBlockingQueue來講,有兩個ReentrantLock分別控制隊頭和隊尾,這樣就可使得添加操做分開來作,通常的操做是獲取一把鎖就能夠,但有些操做例如remove操做,則須要同時獲取兩把鎖:
public boolean remove(Object o) { if (o == null) return false; fullyLock(); //獲取鎖 try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //依次循環遍歷 if (o.equals(p.item)) { //找到了 unlink(p, trail); //解除連接 return true; } } return false; //沒找到,或者解除失敗 } finally { fullyUnlock(); } }
固然,除了上述的remove方法外,在Iterator的next方法,remove方法以及LBQSpliterator分割迭代器中也是須要加全鎖進行操做的。
2)LinkedBlockingDeque
LinkedBlockingDeque類有三個構造方法:
public LinkedBlockingDeque() public LinkedBlockingDeque(int capacity) public LinkedBlockingDeque(Collection<? extends E> c)
LinkedBlockingDeque類中的數據都被封裝成了Node對象:
static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } }
LinkedBlockingDeque類中的重要字段以下:
// 隊列雙向鏈表首節點 transient Node<E> first; // 隊列雙向鏈表尾節點 transient Node<E> last; // 雙向鏈表元素個數 private transient int count; // 雙向鏈表最大容量 private final int capacity; // 全局獨佔鎖 final ReentrantLock lock = new ReentrantLock(); // 非空Condition對象 private final Condition notEmpty = lock.newCondition(); // 非滿Condition對象 private final Condition notFull = lock.newCondition();
LinkedBlockingDeque類的底層實現和LinkedBlockingQueue類很類似,都有一個全局獨佔鎖,和兩個Condition對象,用來阻塞和喚醒線程。
LinkedBlockingDeque類對元素的操做方法比較多,咱們下面以putFirst、putLast、pollFirst、pollLast方法來對元素的入隊、出隊操做進行分析。
入隊
putFirst(E e)方法是將指定的元素插入雙端隊列的開頭,源碼以下:
public void putFirst(E e) throws InterruptedException { // 若插入元素爲null,則直接拋出NullPointerException異常 if (e == null) throw new NullPointerException(); // 將插入節點包裝爲Node節點 Node<E> node = new Node<E>(e); // 獲取全局獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(node)) notFull.await(); } finally { // 釋放全局獨佔鎖 lock.unlock(); } }
入隊操做是經過linkFirst(E e)方法來完成的,以下所示:
private boolean linkFirst(Node<E> node) { // assert lock.isHeldByCurrentThread(); // 元素個數超出容量。直接返回false if (count >= capacity) return false; // 獲取雙向鏈表的首節點 Node<E> f = first; // 將node設置爲首節點 node.next = f; first = node; // 若last爲null,設置尾節點爲node節點 if (last == null) last = node; else // 更新原首節點的前驅節點 f.prev = node; ++count; // 喚醒阻塞在notEmpty上的線程 notEmpty.signal(); return true; }
若入隊成功,則linkFirst(E e)方法返回true,不然,返回false。若該方法返回false,則當前線程會阻塞在notFull條件上。
putLast(E e)方法是將指定的元素插入到雙端隊列的末尾,源碼以下:
public void putLast(E e) throws InterruptedException { // 若插入元素爲null,則直接拋出NullPointerException異常 if (e == null) throw new NullPointerException(); // 將插入節點包裝爲Node節點 Node<E> node = new Node<E>(e); // 獲取全局獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(node)) notFull.await(); } finally { // 釋放全局獨佔鎖 lock.unlock(); } }
該方法和putFirst(E e)方法幾乎同樣,不一樣點在於,putLast(E e)方法經過調用linkLast(E e)方法來插入節點:
private boolean linkLast(Node<E> node) { // assert lock.isHeldByCurrentThread(); // 元素個數超出容量。直接返回false if (count >= capacity) return false; // 獲取雙向鏈表的尾節點 Node<E> l = last; // 將node設置爲尾節點 node.prev = l; last = node; // 若first爲null,設置首節點爲node節點 if (first == null) first = node; else // 更新原尾節點的後繼節點 l.next = node; ++count; // 喚醒阻塞在notEmpty上的線程 notEmpty.signal(); return true; }
若入隊成功,則linkLast(E e)方法返回true,不然,返回false。若該方法返回false,則當前線程會阻塞在notFull條件上。
出隊
pollFirst()方法是獲取並移除此雙端隊列的首節點,若不存在,則返回null,源碼以下:
public E pollFirst() { // 獲取全局獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); } finally { // 釋放全局獨佔鎖 lock.unlock(); } }
移除首節點的操做是經過unlinkFirst()方法來完成的:
private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); // 獲取首節點 Node<E> f = first; // 首節點爲null,則返回null if (f == null) return null; // 獲取首節點的後繼節點 Node<E> n = f.next; // 移除first,將首節點更新爲n E item = f.item; f.item = null; f.next = f; // help GC first = n; // 移除首節點後,爲空隊列 if (n == null) last = null; else // 將新的首節點的前驅節點設置爲null n.prev = null; --count; // 喚醒阻塞在notFull上的線程 notFull.signal(); return item; }
pollLast()方法是獲取並移除此雙端隊列的尾節點,若不存在,則返回null,源碼以下:
public E pollLast() { // 獲取全局獨佔鎖 final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); } finally { // 釋放全局獨佔鎖 lock.unlock(); } }
移除尾節點的操做是經過unlinkLast()方法來完成的:
private E unlinkLast() { // assert lock.isHeldByCurrentThread(); // 獲取尾節點 Node<E> l = last; // 尾節點爲null,則返回null if (l == null) return null; // 獲取尾節點的前驅節點 Node<E> p = l.prev; // 移除尾節點,將尾節點更新爲p E item = l.item; l.item = null; l.prev = l; // help GC last = p; // 移除尾節點後,爲空隊列 if (p == null) first = null; else // 將新的尾節點的後繼節點設置爲null p.next = null; --count; // 喚醒阻塞在notFull上的線程 notFull.signal(); return item; }
其實LinkedBlockingDeque類的入隊、出隊操做都是經過linkFirst、linkLast、unlinkFirst、unlinkLast這幾個方法來實現的,源碼讀起來也比較簡單。
基於連接節點的、無界的、線程安全。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列檢索操做從隊列頭部得到元素。當許多線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不容許 null 元素。
ConcurrentLinkedQueue是一個適用於高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue。
ConcurrentLinkedQueue重要方法:
add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,這兩個方法投有任何區別)
poll()和peek()都是取頭元素節點,區別在於前者會刪除元素,後者不會,至關於查看。
public class UseQueue_ConcurrentLinkedQueue { public static void main(String[] args) throws Exception { //高性能無阻塞無界隊列:ConcurrentLinkedQueue ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.add("e"); System.out.println("從頭部取出元素,並從隊列裏刪除 >> "+q.poll()); //a 從頭部取出元素,並從隊列裏刪除 System.out.println("刪除後的長度 >> "+q.size()); //4 System.out.println("取出頭部元素 >> "+q.peek()); //b System.out.println("長度 >> "+q.size()); //4 } }
打印結果:
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class NoBlockQueue { private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Producer("producer1")); executorService.submit(new Producer("producer2")); executorService.submit(new Producer("producer3")); executorService.submit(new Consumer("consumer1")); executorService.submit(new Consumer("consumer2")); executorService.submit(new Consumer("consumer3")); } static class Producer implements Runnable { private String name; public Producer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { System.out.println(name+ " start producer " + i); concurrentLinkedQueue.add(i); try { Thread.sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } //System.out.println(name+"end producer " + i); } } } static class Consumer implements Runnable { private String name; public Consumer(String name) { this.name = name; } public void run() { for (int i = 1; i < 10; ++i) { try { System.out.println(name+" Consumer " + concurrentLinkedQueue.poll()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // System.out.println(); // System.out.println(name+" end Consumer " + i); } } } }
在併發編程中,通常推薦使用阻塞隊列,這樣實現能夠儘可能地避免程序出現意外的錯誤。阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,而後解析線程不斷從隊列取數據解析。還有其餘相似的場景,只要符合生產者-消費者模型的均可以使用阻塞隊列。
使用非阻塞隊列,雖然能即時返回結果(消費結果),但必須自行編碼解決返回爲空的狀況處理(以及消費重試等問題)。
另外它們都是線程安全的,不用考慮線程同步問題。
=====================================
留做參考文章