在前面的文章ArrayBlockingQueue源碼分析中,已經對JDK中的BlockingQueue中的作了一個回顧,同時對ArrayBlockingQueue中的核心方法做了說明,而LinkedBlockingQueue做爲JDK中BlockingQueue家族系列中一員,因爲其做爲固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊列,分析它的目的主要在於2點:
(1) 與ArrayBlockingQueue進行類比學習,加深各類數據結構的理解
(2) 瞭解底層實現,可以更好地理解每一種阻塞隊列對線程池性能的影響,作到真正的知其然,且知其因此然node
LinkedBlockingQueue,見名之意,它是由一個基於鏈表的阻塞隊列,首先看一下的核心組成:數組
// 全部的元素都經過Node這個靜態內部類來進行存儲,這與LinkedList的處理方式徹底同樣 static class Node<E> { //使用item來保存元素自己 E item; //保存當前節點的後繼節點 Node<E> next; Node(E x) { item = x; } } /** 阻塞隊列所能存儲的最大容量 用戶能夠在建立時手動指定最大容量,若是用戶沒有指定最大容量 那麼最默認的最大容量爲Integer.MAX_VALUE. */ private final int capacity; /** 當前阻塞隊列中的元素數量 PS:若是你看過ArrayBlockingQueue的源碼,你會發現 ArrayBlockingQueue底層保存元素數量使用的是一個 普通的int類型變量。其緣由是在ArrayBlockingQueue底層 對於元素的入隊列和出隊列使用的是同一個lock對象。而數 量的修改都是在處於線程獲取鎖的狀況下進行操做,所以不 會有線程安全問題。 而LinkedBlockingQueue卻不是,它的入隊列和出隊列使用的是兩個 不一樣的lock對象,所以不管是在入隊列仍是出隊列,都會涉及對元素數 量的併發修改,(以後經過源碼能夠更加清楚地看到)所以這裏使用了一個原子操做類 來解決對同一個變量進行併發修改的線程安全問題。 */ private final AtomicInteger count = new AtomicInteger(0); /** * 鏈表的頭部 * LinkedBlockingQueue的頭部具備一個不變性: * 頭部的元素老是爲null,head.item==null */ private transient Node<E> head; /** * 鏈表的尾部 * LinkedBlockingQueue的尾部也具備一個不變性: * 即last.next==null */ private transient Node<E> last; /** 元素出隊列時線程所獲取的鎖 當執行take、poll等操做時線程須要獲取的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 當隊列爲空時,經過該Condition讓從隊列中獲取元素的線程處於等待狀態 */ private final Condition notEmpty = takeLock.newCondition(); /** 元素入隊列時線程所獲取的鎖 當執行add、put、offer等操做時線程須要獲取鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 當隊列的元素已經達到capactiy,經過該Condition讓元素入隊列的線程處於等待狀態 */ private final Condition notFull = putLock.newCondition();
經過上面的分析,咱們能夠發現LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味着它們之間的操做不會存在互斥操做。在多個CPU的狀況下,它們能夠作到真正的在同一時刻既消費、又生產,可以作到並行處理。安全
下面讓咱們看下LinkedBlockingQueue的構造方法:數據結構
/** * 若是用戶沒有顯示指定capacity的值,默認使用int的最大值 */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** 能夠看到,當隊列中沒有任何元素的時候,此時隊列的頭部就等於隊列的尾部, 指向的是同一個節點,而且元素的內容爲null */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } /* 在初始化LinkedBlockingQueue的時候,還能夠直接將一個集合 中的元素所有入隊列,此時隊列最大容量依然是int的最大值。 */ public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; //獲取鎖 putLock.lock(); // Never contended, but necessary for visibility try { //迭代集合中的每個元素,讓其入隊列,而且更新一下當前隊列中的元素數量 int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); //參考下面的enqueue分析 enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { //釋放鎖 putLock.unlock(); } } /** * 我去,這代碼其實可讀性不怎麼樣啊。 * 其實下面的代碼等價於以下內容: * last.next=node; * last = node; * 其實也沒有什麼花樣: 就是讓新入隊列的元素成爲原來的last的next,讓進入的元素稱爲last * */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
在分析完LinkedBlockingQueue的核心組成以後,下面讓咱們再看下核心的幾個操做方法,首先分析一下元素入隊列的過程:併發
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var /*注意上面這句話,約定全部的put/take操做都會預先設置本地變量, 能夠看到下面有一個將putLock賦值給了一個局部變量的操做 */ int c = -1; Node<E> node = new Node(e); /* 在這裏首先獲取到putLock,以及當前隊列的元素數量 即上面所描述的預設置本地變量操做 */ final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; /* 執行可中斷的鎖獲取操做,即意味着若是線程因爲獲取 鎖而處於Blocked狀態時,線程是能夠被中斷而再也不繼 續等待,這也是一種避免死鎖的一種方式,不會由於 發現到死鎖以後而因爲沒法中斷線程最終只能重啓應用。 */ putLock.lockInterruptibly(); try { /* 當隊列的容量到底最大容量時,此時線程將處於等待狀 態,直到隊列有空閒的位置才繼續執行。使用while判 斷依舊是爲了放置線程被"僞喚醒」而出現的狀況,即當 線程被喚醒時而隊列的大小依舊等於capacity時,線 程應該繼續等待。 */ while (count.get() == capacity) { notFull.await(); } //讓元素進行隊列的末尾,enqueue代碼在上面分析過了 enqueue(node); //首先獲取原先隊列中的元素個數,而後再對隊列中的元素個數+1. c = count.getAndIncrement(); /*注:c+1獲得的結果是新元素入隊列以後隊列元素的總和。 當前隊列中的總元素個數小於最大容量時,此時喚醒其餘執行入隊列的線程 讓它們能夠放入元素,若是新加入元素以後,隊列的大小等於capacity, 那麼就意味着此時隊列已經滿了,也就沒有必需要喚醒其餘正在等待入隊列的線程,由於喚醒它們以後,它們也仍是繼續等待。 */ if (c + 1 < capacity) notFull.signal(); } finally { //完成對鎖的釋放 putLock.unlock(); } /*當c=0時,即意味着以前的隊列是空隊列,出隊列的線程都處於等待狀態, 如今新添加了一個新的元素,即隊列再也不爲空,所以它會喚醒正在等待獲取元素的線程。 */ if (c == 0) signalNotEmpty(); } /* 喚醒正在等待獲取元素的線程,告訴它們如今隊列中有元素了 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //經過notEmpty喚醒獲取元素的線程 notEmpty.signal(); } finally { takeLock.unlock(); } }
看完put方法,下面再看看下offer是如何處理的方法:源碼分析
/** 在BlockingQueue接口中除了定義put方法外(當隊列元素滿了以後就會阻塞, 直到隊列有新的空間能夠方法線程纔會繼續執行),還定義一個offer方法, 該方法會返回一個boolean值,當入隊列成功返回true,入隊列失敗返回false。 該方法與put方法基本操做基本一致,只是有細微的差別。 */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; /* 當隊列已經滿了,它不會繼續等待,而是直接返回。 所以該方法是非阻塞的。 */ if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { /* 當獲取到鎖時,須要進行二次的檢查,由於可能當隊列的大小爲capacity-1時, 兩個線程同時去搶佔鎖,而只有一個線程搶佔成功,那麼此時 當線程將元素入隊列後,釋放鎖,後面的線程搶佔鎖以後,此時隊列 大小已經達到capacity,因此將它沒法讓元素入隊列。 下面的其他操做和put都同樣,此處再也不詳述 */ if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
BlockingQueue還定義了一個限時等待插入操做,即在等待必定的時間內,若是隊列有空間能夠插入,那麼就將元素入隊列,而後返回true,若是在過完指定的時間後依舊沒有空間能夠插入,那麼就返回false,下面是限時等待操做的分析:性能
/** 經過timeout和TimeUnit來指定等待的時長 timeout爲時間的長度,TimeUnit爲時間的單位 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); //將指定的時間長度轉換爲毫秒來進行處理 long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //若是等待的剩餘時間小於等於0,那麼直接返回 if (nanos <= 0) return false; /* 經過condition來完成等待,此時當前線程會完成鎖的,而且處於等待狀態 直到被其餘線程喚醒該線程、或者當前線程被中斷、 等待的時間截至纔會返回,該返回值爲從方法調用到返回所經歷的時長。 注意:上面的代碼是condition的awitNanos()方法的通用寫法, 能夠參看Condition.awaitNaos的API文檔。 下面的其他操做和put都同樣,此處再也不詳述 */ nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
經過上面的分析,咱們應該比較清楚地知道了LinkedBlockingQueue的入隊列的操做,其主要是經過獲取到putLock鎖來完成,當隊列的數量達到最大值,此時會致使線程處於阻塞狀態或者返回false(根據具體的方法來看);若是隊列還有剩餘的空間,那麼此時會新建立出一個Node對象,將其設置到隊列的尾部,做爲LinkedBlockingQueue的last元素。學習
在分析完入隊列的過程以後,咱們接下來看看LinkedBlockingQueue出隊列的過程;因爲BlockingQueue的方法都具備對稱性,此處就只分析take方法的實現,其他方法的實現都一模一樣:this
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //經過takeLock獲取鎖,而且支持線程中斷 takeLock.lockInterruptibly(); try { //當隊列爲空時,則讓當前線程處於等待 while (count.get() == 0) { notEmpty.await(); } //完成元素的出隊列 x = dequeue(); /* 隊列元素個數完成原子化操做-1,能夠看到count元素會 在插入元素的線程和獲取元素的線程進行併發修改操做。 */ c = count.getAndDecrement(); /* 當一個元素出隊列以後,隊列的大小依舊大於1時 當前線程會喚醒其餘執行元素出隊列的線程,讓它們也 能夠執行元素的獲取 */ if (c > 1) notEmpty.signal(); } finally { //完成鎖的釋放 takeLock.unlock(); } /* 當c==capaitcy時,即在獲取當前元素以前, 隊列已經滿了,而此時獲取元素以後,隊列就會 空出一個位置,故當前線程會喚醒執行插入操做的線 程通知其餘中的一個能夠進行插入操做。 */ if (c == capacity) signalNotFull(); return x; } /** * 讓頭部元素出隊列的過程 * 其最終的目的是讓原來的head被GC回收,讓其的next成爲head * 而且新的head的item爲null. * 由於LinkedBlockingQueue的頭部具備一致性:即元素爲null。 */ private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
LinkedBlockingQueue出隊列大體過程.pngspa
對於LinkedBlockingQueue的源碼分析就到這裏,下面讓咱們將LinkedBlockingQueue與ArrayBlockingQueue進行一個比較。
ArrayBlockingQueue因爲其底層基於數組,而且在建立時指定存儲的大小,在完成後就會當即在內存分配固定大小容量的數組元素,所以其存儲一般有限,故其是一個「有界「的阻塞隊列;而LinkedBlockingQueue能夠由用戶指定最大存儲容量,也能夠無需指定,若是不指定則最大存儲容量將是Integer.MAX_VALUE,便可以看做是一個「無界」的阻塞隊列,因爲其節點的建立都是動態建立,而且在節點出隊列後能夠被GC所回收,所以其具備靈活的伸縮性。可是因爲ArrayBlockingQueue的有界性,所以其可以更好的對於性能進行預測,而LinkedBlockingQueue因爲沒有限制大小,當任務很是多的時候,不停地向隊列中存儲,就有可能致使內存溢出的狀況發生。
其次,ArrayBlockingQueue中在入隊列和出隊列操做過程當中,使用的是同一個lock,因此即便在多核CPU的狀況下,其讀取和操做的都沒法作到並行,而LinkedBlockingQueue的讀取和插入操做所使用的鎖是兩個不一樣的lock,它們之間的操做互相不受干擾,所以兩種操做能夠並行完成,故LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。
/** 下面的代碼是Executors建立固定大小線程池的代碼,其使用了 LinkedBlockingQueue來做爲任務隊列。 */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
JDK中選用LinkedBlockingQueue做爲阻塞隊列的緣由就在於其無界性。由於線程大小固定的線程池,其線程的數量是不具有伸縮性的,當任務很是繁忙的時候,就勢必會致使全部的線程都處於工做狀態,若是使用一個有界的阻塞隊列來進行處理,那麼就很是有可能很快致使隊列滿的狀況發生,從而致使任務沒法提交而拋出RejectedExecutionException,而使用無界隊列因爲其良好的存儲容量的伸縮性,能夠很好的去緩衝任務繁忙狀況下場景,即便任務很是多,也能夠進行動態擴容,當任務被處理完成以後,隊列中的節點也會被隨之被GC回收,很是靈活。
至此,LinkedBlockingQueue的分析就到這裏,若是您發現有任何編寫不對的地方,請指出(萬分感謝!)。