Java多線程15:Queue、BlockingQueue以及利用BlockingQueue實現生產者/消費者模型

Queue是什麼java

隊列,是一種數據結構。除了優先級隊列和LIFO隊列外,隊列都是以FIFO(先進先出)的方式對各個元素進行排序的。不管使用哪一種排序方式,隊列的頭都是調用remove()或poll()移除元素的。在FIFO隊列中,全部新元素都插入隊列的末尾。node

 

Queue中的方法程序員

Queue中的方法不難理解,6個,每2對是一個也就是總共3對。看一下JDK API就知道了:數組

注意一點就好,Queue一般不容許插入Null,儘管某些實現(好比LinkedList)是容許的,可是也不建議。安全

 

BlockingQueue數據結構

一、BlockingQueue概述多線程

只講BlockingQueue,由於BlockingQueue是Queue中的一個重點,而且經過BlockingQueue咱們再次加深對於生產者/消費者模型的理解。其餘的Queue都不難,經過查看JDK API和簡單閱讀源碼徹底能夠理解他們的做用。spa

BlockingQueue,顧名思義,阻塞隊列。BlockingQueue是在java.util.concurrent下的,所以不難理解,BlockingQueue是爲了解決多線程中數據高效安全傳輸而提出的。線程

多線程中,不少場景均可以使用隊列實現,好比經典的生產者/消費者模型,經過隊列能夠便利地實現二者之間數據的共享,定義一個生產者線程,定義一個消費者線程,經過隊列共享數據就能夠了。code

固然現實不可能都是理想的,好比消費者消費速度比生產者生產的速度要快,那麼消費者消費到 必定程度上的時候,必需要暫停等待一下了(使消費者線程處於WAITING狀態)。BlockingQueue的提出,就是爲了解決這個問題的,他不用程序員去控制這些細節,同時還要兼顧效率和線程安全。

阻塞隊列所謂的"阻塞",指的是某些狀況下線程會掛起(即阻塞),一旦條件知足,被掛起的線程又會自動喚醒。使用BlockingQueue,不須要關心何時須要阻塞線程,何時須要喚醒線程,這些內容BlockingQueue都已經作好了

二、BlockingQueue中的方法

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已經列了。看一下BlockingQueue中特有的方法:

(1)void put(E e) throws InterruptedException

把e添加進BlockingQueue中,若是BlockingQueue中沒有空間,則調用線程被阻塞,進入等待狀態,直到BlockingQueue中有空間再繼續

(2)void take() throws InterruptedException

取走BlockingQueue裏面排在首位的對象,若是BlockingQueue爲空,則調用線程被阻塞,進入等待狀態,直到BlockingQueue有新的數據被加入

(3)int drainTo(Collection<? super E> c, int maxElements)

一次性取走BlockingQueue中的數據到c中,能夠指定取的個數。經過該方法能夠提高獲取數據效率,不須要屢次分批加鎖或釋放鎖

三、ArrayBlockingQueue

基於數組的阻塞隊列,必須指定隊列大小。比較簡單。ArrayBlockingQueue中只有一個ReentrantLock對象,這意味着生產者和消費者沒法並行運行(見下面的代碼)。另外,建立ArrayBlockingQueue時,能夠指定ReentrantLock是否爲公平鎖,默認採用非公平鎖。

/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

四、LinkedBlockingQueue

基於鏈表的阻塞隊列,和ArrayBlockingQueue差很少。不過LinkedBlockingQueue若是不指定隊列容量大小,會默認一個相似無限大小的容量,之因此說是相似是由於這個無限大小是Integer.MAX_VALUE,這麼說就好理解ArrayBlockingQueue爲何必需要制定大小了,若是ArrayBlockingQueue不指定大小的話就用Integer.MAX_VALUE,那將形成大量的空間浪費,可是基於鏈表實現就不同的,一個一個節點連起來而已。另外,LinkedBlockingQueue生產者和消費者都有本身的鎖(見下面的代碼),這意味着生產者和消費者能夠"同時"運行。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

五、SynchronousQueue

比較特殊,一種沒有緩衝的等待隊列。什麼叫作沒有緩衝區,ArrayBlocking中有:

/** The queued items  */
private final E[] items;

數組用以存儲隊列。LinkedBlockingQueue中有:

/**
 * Linked list node class
 */
static class Node<E> {
    /** The item, volatile to ensure barrier separating write and read */
    volatile E item;
    Node<E> next;
    Node(E x) { item = x; }
}

將隊列以鏈表形式鏈接。

生產者/消費者操做數據實際上都是經過這兩個"中介"來操做數據的,可是SynchronousQueue則是生產者直接把數據給消費者(消費者直接從生產者這裏拿數據),好像又回到了沒有生產者/消費者模型的老辦法了。換句話說,每個插入操做必須等待一個線程對應的移除操做。SynchronousQueue又有兩種模式:

一、公平模式

採用公平鎖,並配合一個FIFO隊列(Queue)來管理多餘的生產者和消費者

二、非公平模式

採用非公平鎖,並配合一個LIFO棧(Stack)來管理多餘的生產者和消費者,這也是SynchronousQueue默認的模式

 

利用BlockingQueue實現生產者消費者模型

上一篇咱們寫的生產者消費者模型有侷限,侷限體如今:

  • 緩衝區內只能存放一個數據,實際生產者/消費者模型中的緩衝區內能夠存放大量生產者生產出來的數據
  • 生產者和消費者處理數據的速度幾乎同樣

OK,咱們就用BlockingQueue來簡單寫一個例子,而且讓生產者、消費者處理數據速度不一樣。子類選擇的是ArrayBlockingQueue,大小定爲10:

public static void main(String[] args)
{
    final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
    Runnable producerRunnable = new Runnable()
    {
        int i = 0;
        public void run()
        {
            while (true)
            {
                try
                {
                    System.out.println("我生產了一個" + i++);
                    bq.put(i + "");
                    Thread.sleep(1000);
                } 
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    };
    Runnable customerRunnable = new Runnable()
    {
        public void run()
        {
            while (true)
            {
                try
                {
                    System.out.println("我消費了一個" + bq.take());
                    Thread.sleep(3000);
                } 
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    };
    Thread producerThread = new Thread(producerRunnable);
    Thread customerThread = new Thread(customerRunnable);
    producerThread.start();
    customerThread.start();
}

代碼的作法是讓生產者生產速度快於消費者消費速度的,看一下運行結果:

 1 我生產了一個0
 2 我消費了一個1
 3 我生產了一個1
 4 我生產了一個2
 5 我消費了一個2
 6 我生產了一個3
 7 我生產了一個4
 8 我生產了一個5
 9 我消費了一個3
10 我生產了一個6
11 我生產了一個7
12 我生產了一個8
13 我消費了一個4
14 我生產了一個9
15 我生產了一個10
16 我生產了一個11
17 我消費了一個5
18 我生產了一個12
19 我生產了一個13
20 我生產了一個14
21 我消費了一個6
22 我生產了一個15
23 我生產了一個16
24 我消費了一個7
25 我生產了一個17
26 我消費了一個8
27 我生產了一個18

分兩部分來看輸出結果:

一、第1行~第23行。這塊BlockingQueue未滿,因此生產者隨便生產,消費者隨便消費,基本上都是生產3個消費1個,消費者消費速度慢

二、第24行~第27行,從前面咱們能夠看出,生產到16,消費到6,說明到了ArrayBlockingQueue的極限10了,這時候沒辦法,生產者生產一個ArrayBlockingQueue就滿了,因此不能繼續生產了,只有等到消費者消費完才能夠繼續生產。因此以後的打印內容必定是一個生產者、一個消費者

這就是前面一章開頭說的"經過平衡生產者和消費者的處理能力來提升總體處理數據的速度",這給例子應該體現得很明顯。另外,也不要擔憂非單一輩子產者/消費者場景下的系統假死問題,緩衝區空、緩衝區滿的場景BlockingQueue都是定義了不一樣的Condition,因此不會喚醒本身的同類。

相關文章
相關標籤/搜索