BlockingQueue深刻分析

1.BlockingQueue定義的經常使用方法以下java

  拋出異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
檢查 element() peek() 不可用 不可用

 

1)add(anObject):把anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,則返回true,不然招聘異常數組

2)offer(anObject):表示若是可能的話,將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,則返回true,不然返回false.安全

3)put(anObject):把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續.數據結構

4)poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null併發

5)take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止函數

其中:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會拋出NullPointerException。null 被用做指示poll 操做失敗的警惕值。 性能

 

二、BlockingQueue的幾個注意點

【1】BlockingQueue 能夠是限定容量的。它在任意給定時間均可以有一個remainingCapacity,超出此容量,便沒法無阻塞地put 附加元素。沒有任何內部容量約束的BlockingQueue 老是報告Integer.MAX_VALUE 的剩餘容量。this

【2】BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持Collection 接口。所以,舉例來講,使用remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操做一般 會有效執行,只能有計劃地偶爾使用,好比在取消排隊信息時。spa

【3】BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內部鎖或其餘形式的併發控制來自動達到它們的目的。然而,大量的 Collection 操做(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明。所以,舉例來講,在只添加了c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。.net

【4】BlockingQueue 實質上不支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的end-of-stream 或poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。

 

三、簡要概述BlockingQueue經常使用的四個實現類

 

 

1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的

3)PriorityBlockingQueue:相似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的天然排序順序或者是構造函數的Comparator決定的順序.

4)SynchronousQueue:特殊的BlockingQueue,對其的操做必須是放和取交替完成的.

    

其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不同,致使LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue.  

 

下面主要看一下ArrayBlockingQueue的源碼:


 

public boolean offer(E e) {      
        if (e == null) throw new NullPointerException();      
        final ReentrantLock lock = this.lock;//每一個對象對應一個顯示的鎖      
        lock.lock();//請求鎖直到得到鎖(不能夠被interrupte)      
        try {      
            if (count == items.length)//若是隊列已經滿了      
                return false;      
            else {      
                insert(e);      
                return true;      
            }      
        } finally {      
            lock.unlock();//      
        }      
}      
看insert方法:      
private void insert(E x) {      
        items[putIndex] = x;      
        //增長全局index的值。      
        /*    
        Inc方法體內部:    
        final int inc(int i) {    
        return (++i == items.length)? 0 : i;    
            }    
        這裏能夠看出ArrayBlockingQueue採用從前到後向內部數組插入的方式插入新元素的。若是插完了,putIndex可能從新變爲0(在已經執行了移除操做的前提下,不然在以前的判斷中隊列爲滿)    
        */     
        putIndex = inc(putIndex);       
        ++count;      
        notEmpty.signal();//wake up one waiting thread      
}
public void put(E e) throws InterruptedException {      
        if (e == null) throw new NullPointerException();      
        final E[] items = this.items;      
        final ReentrantLock lock = this.lock;      
        lock.lockInterruptibly();//請求鎖直到獲得鎖或者變爲interrupted      
        try {      
            try {      
                while (count == items.length)//若是滿了,當前線程進入noFull對應的等waiting狀態      
                    notFull.await();      
            } catch (InterruptedException ie) {      
                notFull.signal(); // propagate to non-interrupted thread      
                throw ie;      
            }      
            insert(e);      
        } finally {      
            lock.unlock();      
        }      
}
public boolean offer(E e, long timeout, TimeUnit unit)      
        throws InterruptedException {      
     
        if (e == null) throw new NullPointerException();      
    long nanos = unit.toNanos(timeout);      
        final ReentrantLock lock = this.lock;      
        lock.lockInterruptibly();      
        try {      
            for (;;) {      
                if (count != items.length) {      
                    insert(e);      
                    return true;      
                }      
                if (nanos <= 0)      
                    return false;      
                try {      
                //若是沒有被 signal/interruptes,須要等待nanos時間才返回      
                    nanos = notFull.awaitNanos(nanos);      
                } catch (InterruptedException ie) {      
                    notFull.signal(); // propagate to non-interrupted thread      
                    throw ie;      
                }      
            }      
        } finally {      
            lock.unlock();      
        }      
    }
public boolean add(E e) {      
    return super.add(e);      
}      
父類:      
public boolean add(E e) {      
        if (offer(e))      
            return true;      
        else     
            throw new IllegalStateException("Queue full");      
    }

該類中有幾個實例變量:takeIndex/putIndex/count

用三個數字來維護這個隊列中的數據變動:      
    /** items index for next take, poll or remove */     
    private int takeIndex;      
    /** items index for next put, offer, or add. */     
    private int putIndex;      
    /** Number of items in the queue */     
    private int count;
相關文章
相關標籤/搜索