Java多線程(十):BlockingQueue實現生產者消費者模型

BlockingQueue

BlockingQueue、解決了多線程中,如何高效安全「傳輸」數據的問題。程序員無需關心何時阻塞線程,何時喚醒線程,該喚醒哪一個線程。java

方法介紹


BlockingQueue是Queue的子類程序員

void put(E e)

插入指定元素,當BlockingQueue爲滿,則線程阻塞,進入Waiting狀態,直到BlockingQueue有空閒空間再繼續。
這裏以ArrayBlockingQueue爲例進行分析

數組

void take()

隊首出隊,當BlockingQueue爲空,則線程阻塞,進入Waiting狀態,直到BlockingQueue不爲空再繼續。

安全

int drainTo(Collection<? super E> c)

從隊列中批量取出數據,並放入到另外一個集合中,返回轉移數據的數量,只需一次加鎖和解鎖。多線程

BlockingQueue的實現類

ArrayBlockingQueue

/*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

基於數組實現的BlockingQueue,須要指定隊列容量,能夠指定是否爲公平鎖;只有一個ReentrantLock,生產者和消費者不能異步執行。異步

LinkedBlockingQueue

/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

基於鏈表實現的BlockingQueue,能夠指定隊列容量,不指定隊列容量默認爲Integer.MAX_VALUE;有兩個ReentrantLock,生產者和消費者能夠異步執行。線程

BlockingQueue實現生產者消費者模型

  • 緩衝區能夠存放大量數據
  • 生產者和消費者速度各不相同
public class MyThread42 {
    public static void main(String[] args)
    {
        final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
        Runnable producerRunnable = new Runnable()
        {
            int i = 0;
            public void run()
            {
                while (true)
                {
                    try
                    {
                        System.out.println("我生產了一個" + i++);
                        bq.put(i + "");
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };

        Runnable customerRunnable = new Runnable()
        {
            public void run()
            {
                while (true)
                {
                    try
                    {
                        System.out.println("我消費了一個" + bq.take());
                        Thread.sleep(3000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread producerThread = new Thread(producerRunnable);
        Thread customerThread = new Thread(customerRunnable);
        producerThread.start();
        customerThread.start();
    }
}

輸出結果以下3d

我生產了一個0
我消費了一個1
我生產了一個1
我生產了一個2
我消費了一個2
我生產了一個3
我生產了一個4
我生產了一個5
我消費了一個3
我生產了一個6
我生產了一個7
我生產了一個8
我消費了一個4
我生產了一個9
我生產了一個10
我生產了一個11
我消費了一個5
我生產了一個12
我生產了一個13
我生產了一個14
我消費了一個6
我生產了一個15
我生產了一個16
我消費了一個7
我生產了一個17
我消費了一個8
我生產了一個18
我消費了一個9
我生產了一個19
我消費了一個10
我生產了一個20
我消費了一個11
我生產了一個21
我消費了一個12
我生產了一個22
我消費了一個13
我生產了一個23
我消費了一個14
我生產了一個24

······

生產者沒有生產到BlockingQueue的容量(極限是10)以前,生產3個,消費1個,再生產到BlockingQueue的容量以後,生產一個消費一個,由於不能超過BlockingQueue的容量。code

相關文章
相關標籤/搜索