生產者-消費者模型是多線程問題裏面的經典問題,也是面試的常見問題。有以下幾個常見的實現方法:html
1. wait()/notify()面試
2. lock & condition多線程
3. BlockingQueue併發
下面來逐一分析。ide
1. wait()/notify()工具
第一種實現,利用根類Object的兩個方法wait()/notify(),來中止或者喚醒線程的執行;這也是最原始的實現。this
1 public class WaitNotifyBroker<T> implements Broker<T> { 2 3 private final Object[] items; 4 5 private int takeIndex; 6 private int putIndex; 7 private int count; 8 9 public WaitNotifyBroker(int capacity) { 10 this.items = new Object[capacity]; 11 } 12 13 @SuppressWarnings("unchecked") 14 @Override 15 public T take() { 16 T tmpObj = null; 17 try { 18 synchronized (items) { 19 while (0 == count) { 20 items.wait(); 21 } 22 tmpObj = (T) items[takeIndex]; 23 if (++takeIndex == items.length) { 24 takeIndex = 0; 25 } 26 count--; 27 items.notify(); 28 } 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 33 return tmpObj; 34 } 35 36 @Override 37 public void put(T obj) { 38 try { 39 synchronized (items) { 40 while (items.length == count) { 41 items.wait(); 42 } 43 44 items[putIndex] = obj; 45 if (++putIndex == items.length) { 46 putIndex = 0; 47 } 48 count++; 49 items.notify(); 50 } 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 } 54 55 } 56 57 }
這裏利用Array構造一個Buffer去存取數據,並利用count, putIndex和takeIndex來保證First-In-First-Out。spa
若是利用LinkedList來代替Array,相對來講會稍微簡單些。線程
LinkedList的實現,能夠參考《Java 7 Concurrency Cookbook》第2章wait/notify。code
2. lock & condition
lock & condition,實際上也實現了相似synchronized和wait()/notify()的功能,但在加鎖和解鎖、暫停和喚醒方面,更加細膩和可控。
在JDK的BlockingQueue的默認實現裏,也是利用了lock & condition。此文也詳細介紹了怎麼利用lock&condition寫BlockingQueue,這裏換LinkedList再實現一次:
1 public class LockConditionBroker<T> implements Broker<T> { 2 3 private final ReentrantLock lock; 4 private final Condition notFull; 5 private final Condition notEmpty; 6 private final int capacity; 7 private LinkedList<T> items; 8 9 public LockConditionBroker(int capacity) { 10 this.lock = new ReentrantLock(); 11 this.notFull = lock.newCondition(); 12 this.notEmpty = lock.newCondition(); 13 this.capacity = capacity; 14 15 items = new LinkedList<T>(); 16 } 17 18 @Override 19 public T take() { 20 T tmpObj = null; 21 lock.lock(); 22 try { 23 while (items.size() == 0) { 24 notEmpty.await(); 25 } 26 27 tmpObj = items.poll(); 28 notFull.signalAll(); 29 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } finally { 33 lock.unlock(); 34 } 35 return tmpObj; 36 } 37 38 @Override 39 public void put(T obj) { 40 lock.lock(); 41 try { 42 while (items.size() == capacity) { 43 notFull.await(); 44 } 45 46 items.offer(obj); 47 notEmpty.signalAll(); 48 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } finally { 52 lock.unlock(); 53 } 54 55 } 56 }
3. BlockingQueue
最後這種方法,也是最簡單最值得推薦的。利用併發包提供的工具:阻塞隊列,將阻塞的邏輯交給BlockingQueue。
實際上,上述1和2的方法實現的Broker類,也能夠視爲一種簡單的阻塞隊列,不過沒有標準包那麼完善。
1 public class BlockingQueueBroker<T> implements Broker<T> { 2 3 private final BlockingQueue<T> queue; 4 5 public BlockingQueueBroker() { 6 this.queue = new LinkedBlockingQueue<T>(); 7 } 8 9 @Override 10 public T take() { 11 try { 12 return queue.take(); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 17 return null; 18 } 19 20 @Override 21 public void put(T obj) { 22 try { 23 queue.put(obj); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 }
咱們的隊列封裝了標註包裏的LinkedBlockingQueue,十分簡單高效。
接下來,就是一個1P2C的例子:
1 public interface Broker<T> { 2 3 T take(); 4 5 void put(T obj); 6 7 } 8 9 10 public class Producer implements Runnable { 11 12 private final Broker<Integer> broker; 13 private final String name; 14 15 public Producer(Broker<Integer> broker, String name) { 16 this.broker = broker; 17 this.name = name; 18 } 19 20 @Override 21 public void run() { 22 try { 23 for (int i = 0; i < 5; i++) { 24 broker.put(i); 25 System.out.format("%s produced: %s%n", name, i); 26 Thread.sleep(1000); 27 } 28 broker.put(-1); 29 System.out.println("produced termination signal"); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 return; 33 } 34 35 } 36 37 } 38 39 40 public class Consumer implements Runnable { 41 42 private final Broker<Integer> broker; 43 private final String name; 44 45 public Consumer(Broker<Integer> broker, String name) { 46 this.broker = broker; 47 this.name = name; 48 } 49 50 @Override 51 public void run() { 52 try { 53 for (Integer message = broker.take(); message != -1; message = broker.take()) { 54 System.out.format("%s consumed: %s%n", name, message); 55 Thread.sleep(1000); 56 } 57 System.out.println("received termination signal"); 58 } catch (InterruptedException e) { 59 e.printStackTrace(); 60 return; 61 } 62 63 } 64 65 } 66 67 68 public class Main { 69 70 public static void main(String[] args) { 71 Broker<Integer> broker = new WaitNotifyBroker<Integer>(5); 72 // Broker<Integer> broker = new LockConditionBroker<Integer>(5); 73 // Broker<Integer> broker = new BlockingQueueBroker<Integer>(); 74 75 new Thread(new Producer(broker, "prod 1")).start(); 76 new Thread(new Consumer(broker, "cons 1")).start(); 77 new Thread(new Consumer(broker, "cons 2")).start(); 78 79 } 80 81 }
除了上述的方法,其實還有不少第三方的併發包能夠解決這個問題。例如LMAX Disruptor和Chronicle等
本文完。
參考:
《Java 7 Concurrency Cookbook》