BlockingQueue學習

引言

    在Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。同時,BlockingQueue也用於java自帶線程池的緩衝隊列中,瞭解BlockingQueue也有助於理解線程池的工做模型。java

 

一 BlockingQueue接口

    該接口屬於隊列,因此繼承了Queue接口,該接口最重要的五個方法分別是offer方法,poll方法,put方法,take方法和drainTo方法。node

    offer方法和poll方法分別有一個靜態重載方法,分別是offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)方法。其意義是在限定時間內存入或取出對象,若是不能存入取出則返回false。編程

    put方法會在當隊列存儲對象達到限定值時阻塞線程,而在隊列不爲空時喚醒被take方法所阻塞的線程。take方法是相反的。數組

    drainTo方法可批量獲取隊列中的元素。緩存

 

二 常見的BlockingQueue實現

一 LinkedBlockingQueue

    LinkedBlockingQueue是比較常見的BlockingQueue的實現,他是基於鏈表的阻塞隊列。在建立該對象時若是不指定可存儲對象個數大小時,默認爲Integer.MAX_VALUE。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者當即返回;只有當隊列緩衝區達到最大值緩存容量時,纔會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對於消費者這端的處理也基於一樣的原理。安全

    LinkedBlockingQueue內部使用了獨立的兩把鎖來控制數據同步,這也意味着在高併發的狀況下生產者和消費者能夠並行地操做隊列中的數據,以此來提升整個隊列的併發性能。多線程

put方法和offer方法:併發

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
}

這兩個方法的區別是put方法在容量達到上限時會阻塞,而offer方法則會直接返回false。高併發

 

二 ArrayBlockingQueue

    ArrayBlockingQueue是基於數組的阻塞隊列,除了有一個定長數組外,ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味着二者沒法真正並行運行,這點尤爲不一樣於LinkedBlockingQueue。ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不一樣之處在於,前者在插入或刪除元素時不會產生或銷燬任何額外的對象實例,然後者則會生成一個額外的Node對象。性能

 

三 SynchronousQueue

    是一種沒有緩衝的阻塞隊列,在生產者put的同時必需要有一個消費者進行take,不然就會阻塞。聲明一個SynchronousQueue有兩種不一樣的方式,它們之間有着不太同樣的行爲。公平模式和非公平模式的區別:  若是採用公平模式:SynchronousQueue會採用公平鎖,並配合一個FIFO隊列來阻塞多餘的生產者和消費者,從而體系總體的公平策略;但若是是非公平模式(SynchronousQueue默認):SynchronousQueue採用非公平鎖,同時配合一個LIFO隊列來管理多餘的生產者和消費者,然後一種模式,若是生產者和消費者的處理速度有差距,則很容易出現飢渴的狀況,便可能有某些生產者或者是消費者的數據永遠都得不處處理。

 

四 PriorityBlockingQueue和DelayQueue

    PriorityBlockingQueue是基於優先級的阻塞隊列,該隊列不會阻塞生產者,只會阻塞消費者。

    DelayQueue隊列存儲的對象只有指定的延遲時間到了才能被取出,該隊列也不會阻塞生產者。

 

三 BlockingQueue的使用

   在處理多線程生產者消費者問題時的演示代碼:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Created by gavin on 15-8-30.
 */
public class BlockingQueueTest {

    public static void main(String[] args)
    {
        BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
        Thread p1 = new Thread(new Producer(queue),"producer1");
        Thread p2 = new Thread(new Producer(queue),"producer2");
        Thread c1 = new Thread(new Consumer(queue),"consumer1");
        Thread c2 = new Thread(new Consumer(queue),"consumer2");

        p1.start();
        p2.start();
        c1.start();
        c2.start();
    }
}



class Producer implements Runnable{
    private BlockingQueue<String> queue;

    public Producer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted())
        {
            try {
                queue.put(Thread.currentThread().getName()+" product "+i);
            } catch (InterruptedException e) {
                System.err.println(Thread.currentThread().getName() + " error");
            }
            i++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
}
class Consumer implements Runnable{
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted())
        {
            try {
                String str = queue.take();
                System.out.println(str);
            } catch (InterruptedException e) {

            }
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {

            }
        }
    }
}

 

四 總結

BlockingQueue在併發編程中扮演着重要的角色,既能夠本身用來解決生產者消費者問題,也用於java自帶線程池的緩衝隊列。

參考:http://wsmajunfeng.iteye.com/blog/1629354 BlockingQueue

 

更多文章:https://blog.gavinzh.com

相關文章
相關標籤/搜索