Java阻塞隊列(BlockingQueue)實現 生產者/消費者 示例

Java阻塞隊列(BlockingQueue)實現 生產者/消費者 示例html

本文由 TonySpark 翻譯自 Javarevisited轉載請參見文章末尾的要求java

     Java.util.concurrent.BlockingQueue 是一個隊列實現類,支持這樣的操做:當從隊列中獲取或者移除元素時,若是隊列爲空,須要等待,直到隊列不爲空;同時若是向隊列中添加元素時,此時若是隊列無可用空間,也須要等待。安全

    BlockingQueue 類不接收Null值,若是你試圖向隊列中存入Null值將拋出NullPointerException.架構

    BlockingQueue的實現是線程安全的。全部隊列方法自己都是原子操做,使用併發控制的內部鎖或者其它形式。併發

    BlockingQueue這個接口是Java集合架構的一部分,它主要用於解決生產者/消費者問題。在BlockingQueue中,咱們不用擔憂生產者操做時是否有可用空間或者消費者操做時是否有可用的對像而等待這樣的問題,這些都會在它的實現類中進行處理。ide

    Java中提供了幾個對BlockingQueue的實現類,如: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 等this

    在處理生產者/消費者問題上 咱們將會使用ArrayBlockingQueue來實現,以下是咱們需知道的重要方法:spa

  • put(E e): 這個方法用於向隊列中插入元素,若是隊列已滿,須要等待可用的這間。
  • E take(): 這個方法用於從隊列頭部獲取或者移除元素,若是隊列爲空則須要等待可用的元素。

    如今我們看看用BlockingQueue來解決生產者/消費者問題。線程

   Message翻譯

   Producer產生的普通Java對象,並添加到隊列中。

   Message.java

 1 package com.journaldev.concurrency;
 2  
 3 public class Message {
 4     private String msg;
 5      
 6     public Message(String str){
 7         this.msg=str;
 8     }
 9  
10     public String getMsg() {
11         return msg;
12     }
13  
14 }

Producer

Producer這個類會產生消息並將其放入隊列中。

Producer.java

package com.journaldev.concurrency;
 
import java.util.concurrent.BlockingQueue;
 
public class Producer implements Runnable {
 
    private BlockingQueue<Message> queue;
     
    public Producer(BlockingQueue<Message> q){
        this.queue=q;
    }
    @Override
    public void run() {
        //生產消息
        for(int i=0; i<100; i++){
            Message msg = new Message(""+i);
            try {
                Thread.sleep(i);
                queue.put(msg);
                System.out.println("Produced "+msg.getMsg());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //添加退出消息
        Message msg = new Message("exit");
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
}

Consumer

Consumer類會從隊列獲取消息進行處理。若是獲取的是退出消息則結束。

Consumer.java

package com.journaldev.concurrency;
 
import java.util.concurrent.BlockingQueue;
 
public class Consumer implements Runnable{
 
private BlockingQueue<Message> queue;
     
    public Consumer(BlockingQueue<Message> q){
        this.queue=q;
    }
 
    @Override
    public void run() {
        try{
            Message msg;
            //獲取並處理消息直到接收到「exit」消息
            while((msg = queue.take()).getMsg() !="exit"){
            Thread.sleep(10);
            System.out.println("Consumed "+msg.getMsg());
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ProducerConsumerService

生產者/消費者的服務類將會產生固定大小的BlockingQueue,生產者和消費者同時共享該BlockingQueue,該服務類會起啓動生產者和消費者線程。

ProducerConsumerService.java

 1 package com.journaldev.concurrency;
 2  
 3  
 4 import java.util.concurrent.ArrayBlockingQueue;
 5 import java.util.concurrent.BlockingQueue;
 6  
 7  
 8 public class ProducerConsumerService {
 9  
10     public static void main(String[] args) {
11         //建立大小爲10的 BlockingQueue 
12         BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
13         Producer producer = new Producer(queue);
14         Consumer consumer = new Consumer(queue);
15         //開啓 producer線程向隊列中生產消息
16         new Thread(producer).start();
17         //開啓 consumer線程 中隊列中消費消息
18         new Thread(consumer).start();
19         System.out.println("Producer and Consumer has been started");
20     }
21  
22 }

上面程序的運行結果:

 1 Producer and Consumer has been started
 2 Produced 0
 3 Produced 1
 4 Produced 2
 5 Produced 3
 6 Produced 4
 7 Consumed 0
 8 Produced 5
 9 Consumed 1
10 Produced 6
11 Produced 7
12 Consumed 2
13 Produced 8
14 ...

Thread sleep 使得生產者/消費者 生產、消費這此消息有必定的延遲。

 

原文連接: Javarevisited 翻譯: TonySpark
譯文連接: http://www.cnblogs.com/tonyspark/p/3722013.html

轉載請保留原文出處、譯者和譯文連接。]

相關文章
相關標籤/搜索