Java多線程高併發學習筆記——阻塞隊列

在探討可重入鎖以後,接下來學習阻塞隊列,這篇文章也是斷斷續續的寫了好久,由於最近開始學ssm框架,準備作一個本身的小網站,後續可能更新本身寫網站的技術分享。html

請尊重做者勞動成果,轉載請標明原文連接:java

http://www.cnblogs.com/superfj/p/7757876.html程序員

阻塞隊列是什麼?

首先了解隊列,隊列是數據先進先出的一種數據結構。阻塞隊列,關鍵字是阻塞,先理解阻塞的含義,在阻塞隊列中,線程阻塞有這樣的兩種狀況:編程

1.當阻塞隊列爲空時,獲取隊列元素的線程將等待,直到該則塞隊列非空;2.當阻塞隊列變滿時,使用該阻塞隊列的線程會等待,直到該阻塞隊列變成非滿。數組

爲何要使用阻塞隊列?

在常見的狀況下,生產者消費者模式須要用到隊列,生產者線程生產數據,放進隊列,而後消費從隊列中獲取數據,這個在單線程的狀況下沒有問題。可是當多線程的狀況下,某個特定時間下,(峯值高併發)出現消費者速度遠大於生產者速度,消費者必須阻塞來等待生產者,以保證生產者可以生產出新的數據;當生產者速度遠大於消費者速度時,一樣也是一個道理。這些狀況都要程序員本身控制阻塞,同時又要線程安全和運行效率。安全

阻塞隊列的出現使得程序員不須要關心這些細節,好比何時阻塞線程,何時喚醒線程,這些都由阻塞隊列完成了。數據結構

阻塞隊列的主要方法

 阻塞隊列的方法,在不能當即知足但可能在未來某一時刻知足的狀況下,按處理方式能夠分爲三類:多線程

拋出異常:拋出一個異常;併發

特殊值:返回一個特殊值(null或false,視狀況而定)框架

則塞:在成功操做以前,一直阻塞線程

超時:放棄前只在最大的時間內阻塞

 

工欲善其事必先利其器,學會用阻塞隊列,必需要知道它有哪些方法,怎麼用,有哪些注意事項,這樣到真正使用的時候,就能少踩雷了。

首先介紹插入操做:

1.public abstract boolean add(E paramE);

 將指定元素插入此隊列中(若是當即可行且不會違反容量限制),成功時返回 true,若是當前沒有可用的空間,則拋出 IllegalStateException

若是該元素是NULL,則會拋出NullPointerException異常。

 

2.public abstract boolean offer(E paramE);

將指定元素插入此隊列中(若是當即可行且不會違反容量限制),成功時返回 true,若是當前沒有可用的空間,則返回 false

 

3.public abstract void put(E paramE) throws InterruptedException;

 將指定元素插入此隊列中,將等待可用的空間(若是有必要)

 

4.offer(E o, long timeout, TimeUnit unit)

能夠設定等待的時間,若是在指定的時間內,還不能往隊列中加入BlockingQueue,則返回失敗。

 

獲取數據操做:

1.poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null;

2.poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,若是在指定時間內,隊列一旦有數據可取,則當即返回隊列中的數據。不然知道時間

超時尚未數據可取,返回失敗。

3.take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到BlockingQueue有新的數據被加入; 

4.drainTo():一次性從BlockingQueue獲取全部可用的數據對象(還能夠指定獲取數據的個數),經過該方法,能夠提高獲取數據效率;不須要屢次分批加鎖或釋放鎖。

 

重點方法重點介紹

首先來看put方法

public void put(E paramE) throws InterruptedException { checkNotNull(paramE); ReentrantLock localReentrantLock = this.lock; localReentrantLock.lockInterruptibly(); try { while (this.count == this.items.length) this.notFull.await(); enqueue(paramE); localReentrantLock.unlock(); } finally { localReentrantLock.unlock(); } }

一行一行來看代碼,首先進行空校驗。checkNotNull(paramE);

private static void checkNotNull(Object paramObject) { if (paramObject != null) return; throw new NullPointerException(); }

這是一個私有方法,須要注意的就是若是put的參數爲空,則拋出空指針異常。(這個很值得咱們學習,先進行空校驗,在維護的時候就很容易定位錯誤),接着 ReentrantLock localReentrantLock = this.lock;實例化鎖,這個ReentrantLock 在我以前的博客中也介紹過,能夠共同探討一下。

下一行localReentrantLock.lockInterruptibly();這裏特別強調一下:

lockInterruptibly()容許在等待時由其餘線程的Thread.interrupt()方法來中斷等待線程而直接返回,這時是不用獲取鎖的,而會拋出一個InterruptException。而ReentrantLock.lock()方法則不容許Thread.interrupt()中斷,即便檢測到了Thread.interruptted同樣會繼續嘗試獲取鎖,失敗則繼續休眠。只是在最後獲取鎖成功以後在把當前線程置爲interrupted狀態。

 

注意這裏已經鎖住,每次進行此操做時時候只有一個線程,回到代碼中,接着進行

while (this.count == this.items.length)
          this.notFull.await();

這裏向咱們說明了一個信息,當隊列滿的時候,將會等待。這裏使用了private final Condition notFull;這個實例化的Condition,這個Condition用來控制隊列滿的等待。

 接着執行了enqueue(paramE)方法,進入這個方法來繼續看

private void enqueue(E paramE) { Object[] arrayOfObject = this.items; arrayOfObject[this.putIndex] = paramE; if (++this.putIndex == arrayOfObject.length) this.putIndex = 0; this.count += 1; this.notEmpty.signal(); }

來看第一行,Object[] arrayOfObject = this.items;這個items是在構造器時候實例化的,final Object[] items = new Object[paramInt];將item賦值到arrayObject中

繼續 arrayOfObject[this.putIndex] = paramE;將put方法傳入的參數賦值到arrayOfObject中,這裏實際上是items也改變了,由於java是值引用的緣故。

if (++this.putIndex == arrayOfObject.length)
            this.putIndex = 0;

若是這個偏移值+1以後等於數組的長度,那麼偏移值變爲0。this.count += 1;count值加1;這個count表明數組的總數。this.notEmpty.signal();喚醒被Condition notEmpty阻塞的方法,最後 localReentrantLock.unlock();解鎖(這個操做不可以忘了)

這裏不由要問,是什麼方法被阻塞了呢?帶着這個疑問來看take方法。

 

public E take() throws InterruptedException { ReentrantLock localReentrantLock = this.lock; localReentrantLock.lockInterruptibly(); try { while (this.count == 0) this.notEmpty.await(); Object localObject1 = dequeue(); return localObject1; } finally { localReentrantLock.unlock(); } }

 首先看前兩行,和put方法同樣先上鎖,使得每次持有本段代碼的時候只有一個線程。

while (this.count == 0)
   this.notEmpty.await();

當數組的數量爲空時,也就是無任何數據供區出來的時候,notEmpty這個Condition就會進行阻塞,知道被notEmpty喚醒,還記得上文提到的嗎。就是在put方法中喚醒的,這裏能夠發現,只要成功進行一個put操做,就會喚醒一次。

 繼續看代碼,接着執行Object localObject1 = dequeue();獲取元素,跟進dequeue()方法繼續:

private E dequeue() {
        Object[] arrayOfObject = this.items;
        Object localObject = arrayOfObject[this.takeIndex];
        arrayOfObject[this.takeIndex] = null;
        if (++this.takeIndex == arrayOfObject.length)
            this.takeIndex = 0;
        this.count -= 1;
        if (this.itrs != null)
            this.itrs.elementDequeued();
        this.notFull.signal();
        return localObject;
    }

Object[] arrayOfObject = this.items;進行值傳遞操做,takeIndex是取元素的時候的偏移值,因而可知,put和take操做的偏移量分別是由putIndex和takeIndex控制的。

Object localObject = arrayOfObject[this.takeIndex];取出在數組中的數據,而後 arrayOfObject[this.takeIndex] = null;將原來位置的數據b變成null.

if (++this.takeIndex == arrayOfObject.length)
            this.takeIndex = 0;

若是當前的++takeIndex等於該數組的長度,則takeIndex賦值0,結合put方法,這兩個操做是用數組造成隊列操做。接着喚醒持有notFull這個Condition的線程。

方法就總結到這裏,其實看put和take是有不少類似之處的,繼續看下一章節。

常見的阻塞隊列

首先來看這張圖,這個是阻塞隊列的繼承圖(雙端隊列,沒有列出來,沒有太大區別)

主要有ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue,DelayQueue這個五個實現類。

在這五個阻塞隊列中,比較經常使用的是ArrayBlockingQueue,LinkedBlockingQueue,本文也會重點介紹這兩個類。

ArrayBlockingQueue

在上面的源碼分析中就是分析的ArrayBlockingQueue的源碼。數組阻塞隊列必須傳入的參數是數組大小,還能夠指定是否公平性。公平性就是當隊列可用時,線程訪問隊列的順序按照它排隊時候的順序,非公平鎖則不按照這樣的順序,可是非公平隊列要比公平隊列執行的速度快。

繼續看ArrayBlockingQueue實際上是一個數組有界隊列,此隊列按照先進先出的原則維護數組中的元素順序,看源碼可知,是由兩個整形變量(上文提到的putIndex和takeIndex)分別指着頭和尾的位置。

LinkedBlockingQueue

LinkedBlockingQueue是基於鏈表的阻塞隊列,內部維持的數據緩衝隊列是由鏈表組成的,也是按照先進先出的原則。

若是構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個相似無限大小(Integer.Max_VALUE)的容量,這樣的話,若是生產者的速度一旦大於消費者的速度,也許尚未等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。

LinkedBlockingQueue之因此可以高效的處理併發數據,是由於take()方法和put(E param)方法使用了不一樣的可重入鎖,分別爲private final ReentrantLock putLock和private final ReentrantLock takeLock,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。

 

二者對比

1.ArrayBlockingQueue在put,take操做使用了同一個鎖,二者操做不能同時進行,而LinkedBlockingQueue使用了不一樣的鎖,put操做和take操做可同時進行。

2.ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象,這在長時間內須要高效併發地處理大批量數據的系統中,其對於GC的影響仍是存在必定的區別。

 

 其餘還有優先級阻塞隊列:PriorityBlockingQueue延時隊列:DelayQueue,SynchronousQueue等,由於使用頻率較低,這裏就不重點介紹了,有興趣的讀者能夠深刻研究。

 

用阻塞隊列實現生產者消費者

模擬洗盤子的經歷,洗碗工洗好一個盤子放在工做臺上,而後廚師看到工做臺上有空餘的盤子,便使用盤子。寫到代碼裏就是洗碗工就是一個生產者線程,廚師就是消費者線程,工做臺就是阻塞隊列。

public class TestBlockingQueue {
    /**
     * 生產和消費業務操做
     * 
     * @author tang
     *
     */
    protected class WorkDesk {

        BlockingQueue<String> desk = new LinkedBlockingQueue<String>(10);

        public void washDish() throws InterruptedException {
            desk.put("洗好一個盤子");
        }

        public String useDish() throws InterruptedException {
            return desk.take();
        }
    }

    /**
     * 生產者類
     * 
     * @author tang
     *
     */
    class Producer implements Runnable {

        private String producerName;
        private WorkDesk workDesk;

        public Producer(String producerName, WorkDesk workDesk) {
            this.producerName = producerName;
            this.workDesk = workDesk;
        }

        @Override
        public void run() {
            try {
                for (;;) {
                    System.out.println(producerName + "洗好一個盤子");
                    workDesk.washDish();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 消費者類
     * 
     * @author tang
     *
     */
    class Consumer implements Runnable {
        private String consumerName;
        private WorkDesk workDesk;

        public Consumer(String consumerName, WorkDesk workDesk) {
            this.consumerName = consumerName;
            this.workDesk = workDesk;
        }

        @Override
        public void run() {
            try {
                for (;;) {
                    System.out.println(consumerName + "使用一個盤子");
                    workDesk.useDish();
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        TestBlockingQueue testQueue = new TestBlockingQueue();
        WorkDesk workDesk = testQueue.new WorkDesk();

        ExecutorService service = Executors.newCachedThreadPool();
        //四個生產者線程
        Producer producer1 = testQueue.new Producer("生產者-1-", workDesk);
        Producer producer2 = testQueue.new Producer("生產者-2-", workDesk);
        Producer producer3 = testQueue.new Producer("生產者-3-", workDesk);
        Producer producer4 = testQueue.new Producer("生產者-4-", workDesk);
        //兩個消費者線程
        Consumer consumer1 = testQueue.new Consumer("消費者-1-", workDesk);
        Consumer consumer2 = testQueue.new Consumer("消費者-2-", workDesk);
        
        service.submit(producer1);
        service.submit(producer2);
        service.submit(producer3);
        service.submit(producer4);
        service.submit(consumer1);
        service.submit(consumer2);

    }

}

查看打印結果:

總的來講生產者的速度是會大於消費者的速度的,可是由於阻塞隊列的緣故,因此咱們不須要控制阻塞,當阻塞隊列滿的時候,生產者線程就會被阻塞,直到再也不滿。反之亦然,當消費者線程多於生產者線程時,消費者速度大於生產者速度,當隊列爲空時,就會阻塞消費者線程,直到隊列非空。

參考資料:

《Java編程思想》

http://www.cnblogs.com/studyLog-share/p/5390745.html

http://www.cnblogs.com/dolphin0520/p/3932906.html

相關文章
相關標籤/搜索