Java裏的生產者-消費者模型(Producer and Consumer Pattern in Java)

生產者-消費者模型是多線程問題裏面的經典問題,也是面試的常見問題。有以下幾個常見的實現方法:html

1. wait()/notify()面試

2. lock & condition多線程

3. BlockingQueue併發

 

下面來逐一分析。ide

 

1. wait()/notify()工具

第一種實現,利用根類Object的兩個方法wait()/notify(),來中止或者喚醒線程的執行;這也是最原始的實現。this

 1 public class WaitNotifyBroker<T> implements Broker<T> {
 2 
 3     private final Object[] items;
 4 
 5     private int takeIndex;
 6     private int putIndex;
 7     private int count;
 8 
 9     public WaitNotifyBroker(int capacity) {
10         this.items = new Object[capacity];
11     }
12 
13     @SuppressWarnings("unchecked")
14     @Override
15     public T take() {
16         T tmpObj = null;
17         try {
18             synchronized (items) {
19                 while (0 == count) {
20                     items.wait();
21                 }
22                 tmpObj = (T) items[takeIndex];
23                 if (++takeIndex == items.length) {
24                     takeIndex = 0;
25                 }
26                 count--;
27                 items.notify();
28             }
29         } catch (InterruptedException e) {
30             e.printStackTrace();
31         }
32 
33         return tmpObj;
34     }
35 
36     @Override
37     public void put(T obj) {
38         try {
39             synchronized (items) {
40                 while (items.length == count) {
41                     items.wait();
42                 }
43 
44                 items[putIndex] = obj;
45                 if (++putIndex == items.length) {
46                     putIndex = 0;
47                 }
48                 count++;
49                 items.notify();
50             }
51         } catch (InterruptedException e) {
52             e.printStackTrace();
53         }
54 
55     }
56 
57 }

這裏利用Array構造一個Buffer去存取數據,並利用count, putIndex和takeIndex來保證First-In-First-Out。spa

若是利用LinkedList來代替Array,相對來講會稍微簡單些。線程

LinkedList的實現,能夠參考《Java 7 Concurrency Cookbook》第2章wait/notify。code

 

 

2. lock & condition

lock & condition,實際上也實現了相似synchronized和wait()/notify()的功能,但在加鎖和解鎖、暫停和喚醒方面,更加細膩和可控。

在JDK的BlockingQueue的默認實現裏,也是利用了lock & condition。此文也詳細介紹了怎麼利用lock&condition寫BlockingQueue,這裏換LinkedList再實現一次:

 1 public class LockConditionBroker<T> implements Broker<T> {
 2 
 3     private final ReentrantLock lock;
 4     private final Condition notFull;
 5     private final Condition notEmpty;
 6     private final int capacity;
 7     private LinkedList<T> items;
 8 
 9     public LockConditionBroker(int capacity) {
10         this.lock = new ReentrantLock();
11         this.notFull = lock.newCondition();
12         this.notEmpty = lock.newCondition();
13         this.capacity = capacity;
14 
15         items = new LinkedList<T>();
16     }
17 
18     @Override
19     public T take() {
20         T tmpObj = null;
21         lock.lock();
22         try {
23             while (items.size() == 0) {
24                 notEmpty.await();
25             }
26 
27             tmpObj = items.poll();
28             notFull.signalAll();
29 
30         } catch (InterruptedException e) {
31             e.printStackTrace();
32         } finally {
33             lock.unlock();
34         }
35         return tmpObj;
36     }
37 
38     @Override
39     public void put(T obj) {
40         lock.lock();
41         try {
42             while (items.size() == capacity) {
43                 notFull.await();
44             }
45 
46             items.offer(obj);
47             notEmpty.signalAll();
48 
49         } catch (InterruptedException e) {
50             e.printStackTrace();
51         } finally {
52             lock.unlock();
53         }
54 
55     }
56 }

 

 

3. BlockingQueue

最後這種方法,也是最簡單最值得推薦的。利用併發包提供的工具:阻塞隊列,將阻塞的邏輯交給BlockingQueue。

實際上,上述1和2的方法實現的Broker類,也能夠視爲一種簡單的阻塞隊列,不過沒有標準包那麼完善。

 1 public class BlockingQueueBroker<T> implements Broker<T> {
 2 
 3     private final BlockingQueue<T> queue;
 4 
 5     public BlockingQueueBroker() {
 6         this.queue = new LinkedBlockingQueue<T>();
 7     }
 8 
 9     @Override
10     public T take() {
11         try {
12             return queue.take();
13         } catch (InterruptedException e) {
14             e.printStackTrace();
15         }
16 
17         return null;
18     }
19 
20     @Override
21     public void put(T obj) {
22         try {
23             queue.put(obj);
24         } catch (InterruptedException e) {
25             e.printStackTrace();
26         }
27     }
28 
29 }

咱們的隊列封裝了標註包裏的LinkedBlockingQueue,十分簡單高效。

 

接下來,就是一個1P2C的例子:

 1 public interface Broker<T> {
 2 
 3     T take();
 4 
 5     void put(T obj);
 6 
 7 }
 8 
 9 
10 public class Producer implements Runnable {
11 
12     private final Broker<Integer> broker;
13     private final String name;
14 
15     public Producer(Broker<Integer> broker, String name) {
16         this.broker = broker;
17         this.name = name;
18     }
19 
20     @Override
21     public void run() {
22         try {
23             for (int i = 0; i < 5; i++) {
24                 broker.put(i);
25                 System.out.format("%s produced: %s%n", name, i);
26                 Thread.sleep(1000);
27             }
28             broker.put(-1);
29             System.out.println("produced termination signal");
30         } catch (InterruptedException e) {
31             e.printStackTrace();
32             return;
33         }
34 
35     }
36 
37 }
38 
39 
40 public class Consumer implements Runnable {
41 
42     private final Broker<Integer> broker;
43     private final String name;
44 
45     public Consumer(Broker<Integer> broker, String name) {
46         this.broker = broker;
47         this.name = name;
48     }
49 
50     @Override
51     public void run() {
52         try {
53             for (Integer message = broker.take(); message != -1; message = broker.take()) {
54                 System.out.format("%s consumed: %s%n", name, message);
55                 Thread.sleep(1000);
56             }
57             System.out.println("received termination signal");
58         } catch (InterruptedException e) {
59             e.printStackTrace();
60             return;
61         }
62 
63     }
64 
65 }
66 
67 
68 public class Main {
69 
70     public static void main(String[] args) {
71         Broker<Integer> broker = new WaitNotifyBroker<Integer>(5);
72 //         Broker<Integer> broker = new LockConditionBroker<Integer>(5);
73 //         Broker<Integer> broker = new BlockingQueueBroker<Integer>();
74 
75         new Thread(new Producer(broker, "prod 1")).start();
76         new Thread(new Consumer(broker, "cons 1")).start();
77         new Thread(new Consumer(broker, "cons 2")).start();
78 
79     }
80 
81 }

 

除了上述的方法,其實還有不少第三方的併發包能夠解決這個問題。例如LMAX Disruptor和Chronicle等

 

本文完。

 

參考:

《Java 7 Concurrency Cookbook》

相關文章
相關標籤/搜索