| 好看請贊,養成習慣html
- 你有一個思想,我有一個思想,咱們交換後,一我的就有兩個思想
- If you can NOT explain it simply, you do NOT understand it well enough
現陸續將Demo代碼和技術文章整理在一塊兒 Github實踐精選 ,方便你們閱讀查看,本文一樣收錄在此,以爲不錯,還請Starjava
若是按照用途與特性進行粗略的劃分,JUC 包中包含的工具大致能夠分爲 6 類:git
在併發系列中,主要講解了 執行者與線程池
,同步工具
,鎖
, 在分析源碼時,或多或少的說起到了「隊列」,隊列在 JUC 中也是多種多樣存在,因此本文就以「遠看」視角,幫助你們快速瞭解與區分這些看似「雜亂」的隊列github
Java 併發隊列按照實現方式來進行劃分能夠分爲 2 種:面試
若是你已經看完併發系列鎖的實現,你已經可以知道他們實現的區別:算法
前者就是基於鎖實現的,後者則是基於 CAS 非阻塞算法實現的
常見的隊列有下面這幾種:編程
瞬間懵逼?看到這個沒有人性的圖想直接走人? 客觀先別急,一會就柳暗花明了數組
當下你也許有個問題:緩存
爲何會有這麼多種隊列的存在?多線程
鎖有應對各類情形的鎖,隊列也天然有應對各類情形的隊列了, 是否是也有點單一職責原則的意思呢?
因此咱們要了解這些隊列究竟是怎麼設計的?以及用在了哪些地方?
先來看下圖
若是你在 IDE 中打開以上非阻塞隊列和阻塞隊列,查看其實現方法,你就會發現,阻塞隊列
較非阻塞隊列
額外支持兩種操做:
當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿
當隊列爲空時,獲取元素的線程會阻塞,直到隊列變爲非空
綜合說明入隊/出隊操做,看似雜亂的方法,用一個表格就能歸納了
拋出異常
返回特殊值
一直阻塞
關於阻塞,咱們其實早在 併發編程之等待通知機制 就已經充分說明過了,你還記得下面這張圖嗎?原理實際上是同樣同樣滴
超時退出
和鎖同樣,由於有阻塞,爲了靈活使用,就必定支持超時退出,阻塞時間達到超時時間,就會直接返回
至於爲啥插入和移除這麼多種單詞表示形式,我也不知道,爲了方便記憶,只須要記住阻塞的方法形式便可:
單詞put
和take
字母t
首位相連,一個放,一個拿
到這裏你應該對 Java 併發隊列有了個初步的認識了,原來看似雜亂的方法貌似也有了規律。接下來就到了瘋狂串知識點的時刻了,藉助前序章節的知識,分分鐘就理解所有隊列了
以前也說過,JDK中的命名仍是很講究滴,一看這名字,底層就是數組實現了,是否有界,那就看在構造的時候是否須要指定 capacity 值了
填鴨式的說明也容易忘,這些都是哪看到的呢?在全部隊列的 Java docs 的第一段,一句話就歸納了該隊列的主要特性,因此強烈建議你們本身在看源碼時,簡單瞄一眼 docs 開頭,心中就有多半個數了
在講 Java AQS隊列同步器以及ReentrantLock的應用 時咱們介紹了公平鎖與非公平鎖的概念,ArrayBlockingQueue 也有一樣的概念,看它的構造方法,就有 ReentrantLock 來輔助實現
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
默認狀況下,依舊是不保證線程公平訪問隊列(公平與否是指阻塞的線程可否按照阻塞的前後順序訪問隊列,先阻塞線訪問,後阻塞後訪問)
到這我也要臨時問一個說過屢次的面試送分題了:
爲何默認採用非公平鎖的方式?它較公平鎖方式有什麼好處,又可能帶來哪些問題?
知道了以上內容,結合上面表格中的方法,ArrayBlockingQueue 就能夠輕鬆過關了
和數組相對的天然是鏈表了
LinkedBlockingQueue 也算是一個有界阻塞隊列 ,從下面的構造函數中你也能夠看出,該隊列的默認和最大長度爲 Integer.MAX_VALUE ,這也就 docs 說 optionally-bounded 的緣由了
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
正如 Java 集合同樣,鏈表形式的隊列,其存取效率要比數組形式的隊列高。可是在一些併發程序中,數組形式的隊列因爲具備必定的可預測性,所以能夠在某些場景中得到更高的效率
看到 LinkedBlockingQueue 是否是也有些熟悉呢? 爲何要使用線程池? 就已經和它屢次照面了
建立單個線程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
建立固定個數線程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
面試送分題又來了
使用 Executors 建立線程池很簡單,爲何大廠嚴格要求禁用這種建立方式呢?
PriorityBlockingQueue 是一個支持優先級的無界的阻塞隊列,默認狀況下采用天然順序升序排列,固然也有非默認狀況自定義優先級,須要排序,那天然要用到 Comparator 來定義排序規則了
能夠定義優先級,天然也就有相應的限制,以及使用的注意事項
對於排序值相同的元素,其序列是不保證的,但你能夠繼續自定義其餘能夠區分出來優先級的值,若是你有嚴格的優先級區分,建議有更完善的比較規則,就像 Java docs 這樣
class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { static final AtomicLong seq = new AtomicLong(0); final long seqNum; final E entry; public FIFOEntry(E entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } public E getEntry() { return entry; } public int compareTo(FIFOEntry<E> other) { int res = entry.compareTo(other.entry); if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); return res; } }
PriorityBlockingQueue 也有 put 方法,這是一個阻塞的方法,由於它是無界的,天然不會阻塞,因此就有了下面比較聰明的作法
public void put(E e) { offer(e); // never need to block 請自行對照上面表格 }
能夠給定初始容量,這個容量會按照必定的算法自動擴充
// Default array capacity. private static final int DEFAULT_INITIAL_CAPACITY = 11; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }
這裏默認的容量是 11,因爲也是基於數組,那面試送分題又來了
你一般是怎樣定義容器/集合初始容量的?有哪些依據?
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列
看到這也許以爲這有點和 PriorityBlockingQueue
很像,沒錯,DelayQueue
的內部也是使用 PriorityQueue
上圖綠色框線也告訴你,DelayQueue 隊列的元素必需要實現 Depayed 接口:
因此從上圖能夠看出使用 DelayQueue 很是簡單,只須要兩步:
實現 getDelay() 方法,返回元素要延時多長時間
public long getDelay(TimeUnit unit) { // 最好採用納秒形式,這樣更精確 return unit.convert(time - now(), NANOSECONDS); }
實現 compareTo() 方法,比較元素順序
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
上面的代碼哪來的呢?若是你打開 ScheduledThreadPoolExecutor 裏的 ScheduledFutureTask,你就看到了 (ScheduledThreadPoolExecutor 內部就是應用 DelayQueue)
因此綜合來講,下面兩種狀況很是適合使用 DelayQueue
這是一個不存儲元素的阻塞隊列,不存儲元素還叫隊列?
沒錯,SynchronousQueue 直譯過來叫同步隊列,若是在隊列裏面呆久了應該就算是「異步」了吧
因此使用它,每一個put() 操做必需要等待一個 take() 操做,反之亦然,不然不能繼續添加元素
實際中怎麼用呢?假如你須要兩個線程之間同步共享變量,若是不用 SynchronousQueue 你可能會選擇用 CountDownLatch 來完成,就像這樣:
ExecutorService executor = Executors.newFixedThreadPool(2); AtomicInteger sharedState = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(1); Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); sharedState.set(producedElement); countDownLatch.countDown(); }; Runnable consumer = () -> { try { countDownLatch.await(); Integer consumedElement = sharedState.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } };
這點小事就用計數器來實現,顯然很不合適,用 SynchronousQueue 改造一下,感受瞬間就不同了
ExecutorService executor = Executors.newFixedThreadPool(2); SynchronousQueue<Integer> queue = new SynchronousQueue<>(); Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); try { queue.put(producedElement); } catch (InterruptedException ex) { ex.printStackTrace(); } }; Runnable consumer = () -> { try { Integer consumedElement = queue.take(); } catch (InterruptedException ex) { ex.printStackTrace(); } };
其實 Executors.newCachedThreadPool() 方法裏面使用的就是 SynchronousQueue
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
看到前面LinkedBlockingQueue
用在newSingleThreadExecutor
和newFixedThreadPool
上,而newCachedThreadPool
卻用SynchronousQueue
,這是爲何呢?
由於單線程池和固定線程池中,線程數量是有限的,所以提交的任務須要在LinkedBlockingQueue
隊列中等待空餘的線程;
而緩存線程池中,線程數量幾乎無限(上限爲Integer.MAX_VALUE
),所以提交的任務只須要在SynchronousQueue
隊列中同步移交給空餘線程便可, 因此有時也會說 SynchronousQueue
的吞吐量要高於 LinkedBlockingQueue
和 ArrayBlockingQueue
簡單來講,TransferQueue提供了一個場所,生產者線程使用 transfer
方法傳入一些對象並阻塞,直至這些對象被消費者線程所有取出。
你有沒有以爲,剛剛介紹的 SynchronousQueue 是否很像一個容量爲 0 的 TransferQueue。
但 LinkedTransferQueue 相比其餘阻塞隊列多了三個方法
若是當前有消費者正在等待消費元素,transfer 方法就能夠直接將生產者傳入的元素馬上 transfer (傳輸) 給消費者;若是沒有消費者等待消費元素,那麼 transfer 方法會把元素放到隊列的 tail(尾部)
節點,一直阻塞,直到該元素被消費者消費才返回
tryTransfer,很顯然是一種嘗試,若是沒有消費者等待消費元素,則立刻返回 false ,程序不會阻塞
帶有超時限制,嘗試將生產者傳入的元素 transfer 給消費者,若是超時時間到,尚未消費者消費元素,則返回 false
你瞧,全部阻塞的方法都是一個套路:
看到這你也許感受 LinkedTransferQueue 沒啥特色,其實它和其餘阻塞隊列的差異還挺大的:
BlockingQueue 是若是隊列滿了,線程纔會阻塞;可是 TransferQueue 是若是沒有消費元素,則會阻塞 (transfer 方法)
這也就應了 Doug Lea 說的那句話:
LinkedTransferQueue
is actually a superset ofConcurrentLinkedQueue
,SynchronousQueue
(in 「fair」 mode), and unbounded
LinkedBlockingQueues
. And it’s made better by allowing you to mix and
match those features as well as take advantage of higher-performance i
mplementation techniques.簡單翻譯:
LinkedTransferQueue
是ConcurrentLinkedQueue
,SynchronousQueue
(在公平模式下), 無界的LinkedBlockingQueues
等的超集; 容許你混合使用阻塞隊列的多種特性因此,在合適的場景中,請儘可能使用
LinkedTransferQueue
上面都看的是單向隊列 FIFO,接下來咱們看看雙向隊列
LinkedBlockingDeque
是一個由鏈表結構組成的雙向阻塞隊列,凡是後綴爲 Deque 的都是雙向隊列意思,後綴的發音爲deck——/dek/
, 剛接觸它時我覺得是這個冰激凌的發音
所謂雙向隊列值得就是能夠從隊列的兩端插入和移除元素。因此:
雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊是,也就會減小一半的競爭
隊列有頭,有尾,所以它又比其餘阻塞隊列多了幾個特殊的方法
這麼一看,雙向阻塞隊列確實很高效,
那雙向阻塞隊列應用在什麼地方了呢?
不知道你是否聽過 「工做竊取」模式,看似不太厚道的一種方法,實則是高效利用線程的好辦法。下一篇文章,咱們就來看看 ForkJoinPool 是如何應用 「工做竊取」模式的
到這關於 Java 隊列(其實主要介紹了阻塞隊列)就快速的區分完了,將看似雜亂的方法作了分類整理,方便快速理解其用途,同時也說明了這些隊列的實際用途。相信你帶着更高的視角來閱讀源碼會更加輕鬆,最後也但願你們認真看兩個隊列的源碼實現,在遇到隊列的問題,腦海中的畫面分分鐘就能夠搞定了
日拱一兵 | 原創