阻塞隊列和生產者-消費者模式、DelayQueue

1.ArrayDeque, (數組雙端隊列) 
2.PriorityQueue, (優先級隊列) 
3.ConcurrentLinkedQueue, (基於鏈表的併發隊列)算法

4.DelayQueue,                                         (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口) 
5.ArrayBlockingQueue,           (基於數組的併發阻塞隊列) 
6.LinkedBlockingQueue,        (基於鏈表的FIFO阻塞隊列) 
7.LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列) 
8.PriorityBlockingQueue,        (帶優先級的無界阻塞隊列) 
9.SynchronousQueue                       (併發同步阻塞隊列) 
—————————————————–設計模式

阻塞隊列和生產者-消費者模式數組

阻塞隊列(Blocking queue)提供了可阻塞的put和take方法,它們與可定時的offer和poll是等價的。若是Queue已經滿了,put方法會被阻塞直到有空間可用;若是Queue是空的,那麼take方法會被阻塞,直到有元素可用。Queue的長度能夠有限,也能夠無限;無限的Queue永遠不會充滿,因此它的put方法永遠不會阻塞。併發

阻塞隊列支持生產者-消費者設計模式。一個生產者-消費者設計分離了「生產產品」和「消費產品」。該模式不會發現一個工做便當即處理,而是把工做置於一個任務(「to do」)清單中,以備後期處理。生產者-消費者模式簡化了開發,由於它解除了生產者和消費者之間相互依賴的代碼。生產者和消費者以不一樣的或者變化的速度生產和消費數據,生產者-消費者模式將這些活動解耦,於是簡化了工做負荷的管理。工具

生產者-消費者設計是圍繞阻塞隊列展開的,生產者把數據放入隊列,並使數據可用,當消費者爲適當的行爲作準備時會從隊列中獲取數據。生產者不須要知道消費者的省份或者數量,甚至根本沒有消費者—它們只負責把數據放入隊列。相似地,消費者也不須要知道生產者是誰,以及是誰給它們安排的工做。BlockingQueue可使用任意數量的生產者和消費者,從而簡化了生產者-消費者設計的實現。最多見的生產者-消費者設計是將線程池與工做隊列相結合。性能

阻塞隊列簡化了消費者的編碼,由於take會保持阻塞直到可用數據出現。若是生產者不能足夠快地產生工做,讓消費者忙碌起來,那麼消費者只能一直等待,直到有工做可作。同時,put方法的阻塞特性也大大地簡化了生產者的編碼;若是使用一個有界隊列,那麼當隊列充滿的時候,生產者就會阻塞,暫不能生成更多的工做,從而給消費者時間來趕進進度。this

有界隊列是強大的資源管理工具,用來創建可靠的應用程序:它們遏制那些能夠產生過多工做量、具備威脅的活動,從而讓你的程序在面對超負荷工做時更加健壯。編碼

雖然生產者-消費者模式能夠把生產者和消費者的代碼相互解耦合,可是它們的行爲仍是間接地經過共享隊列耦合在一塊兒了。spa

類庫中包含一些BlockingQueue的實現,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO隊列,與 LinkedList和ArrayList類似,可是卻擁有比同步List更好的併發性能。PriorityBlockingQueue是一個按優先級順序排序的隊列,當你不但願按照FIFO的屬性處理元素時,這個PriorityBolckingQueue是很是有用的。正如其餘排序的容器同樣,PriorityBlockingQueue能夠比較元素自己的天然順序(若是它們實現了Comparable),也可使用一個 Comparator進行排序。線程

最後一個BlockingQueue的實現是SynchronousQueue,它根本上不是一個真正的隊列,由於它不會爲隊列元素維護任何存儲空間。不過,它維護一個排隊的線程清單,這些線程等待把元素加入(enqueue)隊列或者移出(dequeue)隊列。由於SynchronousQueue沒有存儲能力,因此除非另外一個線程已經準備好參與移交工做,不然put和take會一直阻止。SynchronousQueue這類隊列只有在消費者充足的時候比較合適,它們總能爲下一個任務做好準備。

生產者-消費者模式一樣帶來了一些性能方面的提升。生產者和消費者能夠併發地執行,若是一個受限於I/O,另外一個受限於CPU,那麼併發執行的所有產出會高於順序執行的產出。

———— 
Deque是一個雙端隊列,容許高效地在頭和尾部分別進行插入和移除。實現有ArrayDeque和LinkedBlockingDeque。 
正如阻塞隊列適用於生產者-消費者模式同樣,雙端隊列使它們自身與一種叫作竊取工做(work stealing)的模式相關聯。一個消費者生產者設計中,全部的消費者只共享一個工做隊列;在竊取工做的設計中,每個消費者都有一個本身的雙端隊列。若是一個消費者完成了本身雙端隊列中的所有工做,它能夠竊取其餘消費者的雙端隊列中的末尾任務。由於工做者線程並不會競爭一個共享的任務隊列,因此竊取工做模式比傳統的生產者-消費者設計有更佳的可伸縮性;大多數時候它們訪問本身的雙端隊列,減小競爭。當一個工做者必需要訪問另外一個隊列時,它會從尾部截取,而不是從頭部,從而進一步下降對雙端隊列的爭奪。 
竊取工做剛好適合用於解決消費者與生產者同體的問題—-當運行到一個任務的某單元時,可能會識別出更多的任務。好比垃圾回收時對堆作了記號,能夠並行使用竊取工做。當一個線程發現了一個新的任務單元時,它會把它放在本身隊列的末尾;當雙端隊列爲空時,它會去其餘隊列的隊尾尋找新的任務,這樣能確保每個線程都保持忙碌狀態。

———————— 
非阻塞算法

基於鎖的算法會帶來一些活躍度失敗的風險。若是線程在持有鎖的時候由於阻塞I/O,頁面錯誤,或其餘緣由發生延遲,極可能全部的線程都不能前進了。 
一個線程的失敗或掛起不該該影響其餘線程的失敗或掛起,這樣的算法成爲非阻塞(nonblocking)算法;若是算法的每個步驟中都有一些線程可以繼續執行,那麼這樣的算法稱爲鎖自由(lock-free)算法。在線程間使用CAS進行協調,這樣的算法若是能構建正確的話,它既是非阻塞的,又是鎖自由的。非競爭的CAS老是可以成功,若是多個線程以一個CAS競爭,總會有一個勝出並前進。非阻塞算法堆死鎖和優先級倒置有「免疫性」(但它們可能會出現飢餓和活鎖,由於它們容許重進入)。

非阻塞算法經過使用低層次的併發原語,好比比較交換,取代了鎖。原子變量類向用戶提供了這些底層級原語,也可以當作「更佳的volatile變量」使用,同時提供了整數類和對象引用的原子化更新操做。

——————————- 
/** 
*DelayQueue(延時隊列)是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象, 
*內部經過一個優先隊列(PriorityQueue)的引用實現相關數據操做。 
*其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即對頭對象的延遲到期的時間最長。 
*若是沒有任何到期,那麼就不會有任何頭元素,而且poll將返回null(正由於這樣,不能將null放入該隊列中) 
*Delayed接口有一個名爲getDelay()的方法,它用來告知延時到期有多長時間,或延遲在多長時間以前已經到期。 
*爲了排序,Delayed接口還繼承了Comparable接口,所以必須實現comparaTo(),使其產生合理比較。 
*/ 
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    private transient final ReentrantLock lock = new ReentrantLock();//鎖 
    /** 
    *接口Condition :鎖條件變量,其實例和特定的鎖綁定。提供了: 
    * await()、awaitUninterruptibly()、awaitNanos(long)、await(long, TimeUnit)、awaitUntil(Date) 
    * signal()、signalAll() 方法,實現了對線程的「等待」和「喚醒」操做 
    */ 
    private transient final Condition available = lock.newCondition();//鎖的條件變量,提供「等待」「喚醒」線程的操做 
    private final PriorityQueue<E> q = new PriorityQueue<E>();//內部有一個優先隊列(無界的隊列)的引用

    public DelayQueue() {}

    public DelayQueue(Collection<? extends E> c) { 
        this.addAll(c); 
    }

    /** 
     * 插入元素到延時隊列 (調用offer實現) 
     */ 
    public boolean add(E e) { 
        return offer(e); 
    }

    /** 
     * 插入元素(加鎖) 
     */ 
    public boolean offer(E e) { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            E first = q.peek(); 
            q.offer(e); //調用優先隊列的實現 
            if (first == null || e.compareTo(first) < 0) 
                available.signalAll();//喚醒其餘的線程 
            return true; 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 插入元素 
     */ 
    public void put(E e) { 
        offer(e); 
    } 
    //因爲是無界的,不會對offer插入元素阻塞,參數unit無效 
    public boolean offer(E e, long timeout, TimeUnit unit) { 
        return offer(e); 
    }

    /** 
     * 頭元素出對列(加鎖) 
     */ 
    public E poll() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            E first = q.peek(); 
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 
                return null; 
            else { 
                E x = q.poll(); 
                assert x != null; 
                if (q.size() != 0) 
                    available.signalAll(); 
                return x; 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 得到並移除頭元素,在延時到期的狀況下。 
     */ 
    public E take() throws InterruptedException { 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            for (;;) { 
                E first = q.peek(); 
                if (first == null) { 
                    available.await(); 
                } else { 
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS); 
                    if (delay > 0) { 
                        long tl = available.awaitNanos(delay); 
                    } else { 
                        E x = q.poll(); 
                        assert x != null; 
                        if (q.size() != 0) 
                            available.signalAll(); // wake up other takers 
                        return x;

                    } 
                } 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 得到並移除頭元素,延遲到期或指定時間到期後。 
     */ 
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
        long nanos = unit.toNanos(timeout); 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            for (;;) { 
                E first = q.peek(); 
                if (first == null) { 
                    if (nanos <= 0) 
                        return null; 
                    else 
                        nanos = available.awaitNanos(nanos); 
                } else { 
                    long delay = first.getDelay(TimeUnit.NANOSECONDS); 
                    if (delay > 0) { 
                        if (nanos <= 0) 
                            return null; 
                        if (delay > nanos) 
                            delay = nanos; 
                        long timeLeft = available.awaitNanos(delay); 
                        nanos -= delay – timeLeft; 
                    } else { 
                        E x = q.poll(); 
                        assert x != null; 
                        if (q.size() != 0) 
                            available.signalAll(); 
                        return x; 
                    } 
                } 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

    /** 
     * 獲取但不移除元素(調用優先隊列的peek方法) 
     */ 
    public E peek() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.peek(); 
        } finally { 
            lock.unlock(); 
        } 
    }

    public int size() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.size(); 
        } finally { 
            lock.unlock(); 
        } 
    }

    //把隊列在的元素「剪切到」集合c中 
    public int drainTo(Collection<? super E> c) { 
        if (c == null) 
            throw new NullPointerException(); 
        if (c == this) 
            throw new IllegalArgumentException(); 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            int n = 0; 
            for (;;) { 
                E first = q.peek(); 
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 
                    break; 
                c.add(q.poll()); 
                ++n; 
            } 
            if (n > 0) 
                available.signalAll(); 
            return n; 
        } finally { 
            lock.unlock(); 
        } 
    }

    //把maxElements個元素「剪切」到集合c中 
    public int drainTo(Collection<? super E> c, int maxElements) { 
        if (c == null) 
            throw new NullPointerException(); 
        if (c == this) 
            throw new IllegalArgumentException(); 
        if (maxElements <= 0) 
            return 0; 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            int n = 0; 
            while (n < maxElements) { 
                E first = q.peek(); 
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 
                    break; 
                c.add(q.poll()); 
                ++n; 
            } 
            if (n > 0) 
                available.signalAll(); 
            return n; 
        } finally { 
            lock.unlock(); 
        } 
    }

    public void clear() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            q.clear(); 
        } finally { 
            lock.unlock(); 
        } 
    }

    public int remainingCapacity() { 
        return Integer.MAX_VALUE; 
    }

    public Object[] toArray() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.toArray(); 
        } finally { 
            lock.unlock(); 
        } 
    } 
    public <T> T[] toArray(T[] a) { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.toArray(a); 
        } finally { 
            lock.unlock(); 
        } 
    }

    public boolean remove(Object o) { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return q.remove(o); 
        } finally { 
            lock.unlock(); 
        } 
    }

……

}

相關文章
相關標籤/搜索