以前由於找實習的緣故,博客1個多月沒有寫了。找實習的經歷總算告一段落,如今從新更新博客,此次的內容是分析Java併發包中的阻塞隊列
關於阻塞隊列,我以前是一直充滿好奇,很好奇這個阻塞是怎麼實現。如今咱們先看一個該抽象類的實現類ArrayBlockingQueue。下面所有的代碼均在githubjava
ArrayBlockingQueue顧名思義是一種數組形式的阻塞隊列,其天然就有數組的特色,即隊列的長度不可改變,只有初始化的時候指定。
下面,咱們看一下例子。node
public class ArrayBlock { private BlockingQueue<String> blockingQueue; public ArrayBlock(){ blockingQueue = new ArrayBlockingQueue<String>(3); } public BlockingQueue<String> getBlockingQueue() { return blockingQueue; } }
建立一個大小爲3的ArrayBlockingQueue,下面是一個生產者和消費者,經過ArrayBlockingQueue實現生產者/消費者模型。git
public class Producer extends Thread { private BlockingQueue<String> blockingQueue; @Override public void run() { super.run(); for (int i = 0 ; i < 5;i++) { try { blockingQueue.put(i + ""); System.out.println(getName() + " 生產數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } public Producer(ArrayBlock arrayBlock){ this.setName("Producer"); blockingQueue = arrayBlock.getBlockingQueue(); } } public class Costumer extends Thread{ private BlockingQueue<String> blockingQueue; public Costumer(ArrayBlock arrayBlock) { blockingQueue = arrayBlock.getBlockingQueue(); this.setName("Costumer"); } @Override public void run() { super.run(); while (true) { try { Thread.sleep(6000); String str = blockingQueue.take(); System.out.println(getName() + " 取出數據 " + str); } catch (InterruptedException e) { e.printStackTrace(); } } } }
測試過程就不放了,直接放出結果:github
Producer 生產數據 Producer 生產數據 Producer 生產數據 Costumer 取出數據 0 Producer 生產數據 Costumer 取出數據 1 Producer 生產數據 Costumer 取出數據 2 Costumer 取出數據 3 Costumer 取出數據 4
這能夠看出put方法與take方法均是阻塞的方法。當隊列已經滿的時候,就會阻塞放入方法,當隊列爲空的時候,就會阻塞取出方法。
下面,咱們主要看這個兩個方法,到底是如何實現阻塞的。算法
** put方法 **數組
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
put方法是將元素放入到隊列中,這裏面能夠看出是用過Lock類與Condition類來實現的,即經過等待/通知機制實現的阻塞隊列。這裏notFull是一個條件,當隊列已經滿的時候,就會執行await方法,若是沒有滿就執行入隊(enqueue)方法。這裏,判斷隊列已滿用的是count == items.length
。接下來,咱們看一下take方法,來看看取數據的阻塞。緩存
** take方法**安全
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
這裏,與put方法相似,當元素爲0時,就會執行await方法,上面方法中都沒有直接說明signal方法的執行。其實該方法是入隊與出隊的方法中實現的。也就是當執行notFull.await()
時,是經過dequeue()
方法來通知中止等待的,能夠放入元素。當執行到notEmpty.await()
時,是經過enqueue
來通知結束阻塞,能夠取出元素。併發
LinkedBlockingQueue顧名思義是一個鏈表形式的阻塞隊列,不一樣於ArrayBlockingQueue。若是不指定容量,則默認是Integer.MAX_VALUE。也就是說他是一個無界阻塞隊列。他的例子與上面的相似,可是其put與take方法實現不一樣於ArrayBlockingQueue,但二者大體思路一致。咱們只看一下put實現:less
** put方法**
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
這裏阻塞的本質實現也是經過Condition類的等待/通知機制。可是有幾點不一樣:
第一 這裏用了一個原子類的count計數,官方的給的註釋是即便沒有鎖來提供保護,也能保證線程安全,實現wait guard。
第二 ArrayBlockingQueue的通知是在入隊與出隊的方法中,LinkedBlockingQueue則不是,而且插入以後不滿的時候,還有通知其餘await的線程。
第三 ArrayBlockingQueue的lock一直是一個,也就是put/take是用的一個鎖,放與取沒法實現並行。可是LinkedBlockingQueue是兩個鎖,放一個鎖,取一個鎖,能夠實現put/take的並行,要高效一些。
SynchronousQueue顧名思義是同步隊列,特色不一樣於上面的阻塞隊列,他是一個無界非緩存的隊列,準確說他不存儲元素,放入的元素,只有等待取走元素以後才能放入。也就是說任意時刻:
元素並不會被生產者存在隊列中,而是直接生產者與消費者進行交互。
其實現是利用無鎖算法,能夠參考SynchronousQueue實現
還有一點須要注意,同步隊列支持公平性與非公平性。公平性是利用隊列來管理多餘生產者與消費者,非公平性是利用棧來管理多餘生產者與消費者。