JDK版本:oracle java 1.8.0_102node
public interface Consumer { void consume() throws InterruptedException; } 複製代碼
public interface Producer { void produce() throws InterruptedException; } 複製代碼
abstract class AbstractConsumer implements Consumer, Runnable { @Override public void run() { while (true) { try { consume(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } 複製代碼
abstract class AbstractProducer implements Producer, Runnable { @Override public void run() { while (true) { try { produce(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } 複製代碼
public interface Model { Runnable newRunnableConsumer(); Runnable newRunnableProducer(); } 複製代碼
public class Task { public int no; public Task(int no) { this.no = no; } } 複製代碼
public class BlockingQueueModel implements Model { private final BlockingQueue<Task> queue; private final AtomicInteger increTaskNo = new AtomicInteger(0); public BlockingQueueModel(int cap) { // LinkedBlockingQueue 的隊列是 lazy-init 的,但 ArrayBlockingQueue 在建立時就已經 init this.queue = new LinkedBlockingQueue<>(cap); } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { Task task = queue.take(); // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); Task task = new Task(increTaskNo.getAndIncrement()); System.out.println("produce: " + task.no); queue.put(task); } } public static void main(String[] args) { Model model = new BlockingQueueModel(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } 複製代碼
produce: 0
produce: 4
produce: 2
produce: 3
produce: 5
consume: 0
produce: 1
consume: 4
produce: 7
consume: 2
produce: 8
consume: 3
produce: 6
consume: 5
produce: 9
consume: 1
produce: 10
consume: 7
// 舊的錯誤代碼 queue.put(task); System.out.println("produce: " + task.no); 複製代碼// 正確代碼 System.out.println("produce: " + task.no); queue.put(task); 複製代碼具體來講,生產日誌應放在入隊操做以前,消費日誌應放在出隊操做以後,以保障:
- 消費線程中queue.take()返回以後,對應生產線程(生產該task的線程)中queue.put()及以前的行爲,對於消費線程來講都是可見的。
若是不能將併發與容量控制都封裝在緩衝區中,就只能由消費者與生產者完成。最簡單的方案是使用樸素的wait && notify
public class WaitNotifyModel implements Model { private final Object BUFFER_LOCK = new Object(); private final Queue<Task> buffer = new LinkedList<>(); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public WaitNotifyModel(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { synchronized (BUFFER_LOCK) { while (buffer.size() == 0) { BUFFER_LOCK.wait(); } Task task = buffer.poll(); assert task != null; // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); BUFFER_LOCK.notifyAll(); } } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); synchronized (BUFFER_LOCK) { while (buffer.size() == cap) { BUFFER_LOCK.wait(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); System.out.println("produce: " + task.no); BUFFER_LOCK.notifyAll(); } } } public static void main(String[] args) { Model model = new WaitNotifyModel(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } 複製代碼
樸素的wait && notify
咱們要保證理解wait && notify
機制。實現時可使用Object類提供的wait()方法與notifyAll()方法,但更推薦的方式是使用java.util.concurrent包提供的Lock && Condition
public class LockConditionModel1 implements Model { private final Lock BUFFER_LOCK = new ReentrantLock(); private final Condition BUFFER_COND = BUFFER_LOCK.newCondition(); private final Queue<Task> buffer = new LinkedList<>(); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public LockConditionModel1(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { BUFFER_LOCK.lockInterruptibly(); try { while (buffer.size() == 0) { BUFFER_COND.await(); } Task task = buffer.poll(); assert task != null; // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); BUFFER_COND.signalAll(); } finally { BUFFER_LOCK.unlock(); } } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); BUFFER_LOCK.lockInterruptibly(); try { while (buffer.size() == cap) { BUFFER_COND.await(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); System.out.println("produce: " + task.no); BUFFER_COND.signalAll(); } finally { BUFFER_LOCK.unlock(); } } } public static void main(String[] args) { Model model = new LockConditionModel1(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } } 複製代碼
實現三的併發瓶頸很明顯,由於在鎖 BUFFER_LOCK
看來,任何消費者線程與生產者線程都是同樣的。換句話說,同一時刻,最多隻容許有一個線程(生產者或消費者,二選一)操做緩衝區 buffer。
那麼思路就簡單了:須要兩個鎖 CONSUME_LOCK
public class LockConditionModel2 implements Model { private final Lock CONSUME_LOCK = new ReentrantLock(); private final Condition NOT_EMPTY = CONSUME_LOCK.newCondition(); private final Lock PRODUCE_LOCK = new ReentrantLock(); private final Condition NOT_FULL = PRODUCE_LOCK.newCondition(); private final Buffer<Task> buffer = new Buffer<>(); private AtomicInteger bufLen = new AtomicInteger(0); private final int cap; private final AtomicInteger increTaskNo = new AtomicInteger(0); public LockConditionModel2(int cap) { this.cap = cap; } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { int newBufSize = -1; CONSUME_LOCK.lockInterruptibly(); try { while (bufLen.get() == 0) { System.out.println("buffer is empty..."); NOT_EMPTY.await(); } Task task = buffer.poll(); newBufSize = bufLen.decrementAndGet(); assert task != null; // 固定時間範圍的消費,模擬相對穩定的服務器處理過程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); if (newBufSize > 0) { NOT_EMPTY.signalAll(); } } finally { CONSUME_LOCK.unlock(); } if (newBufSize < cap) { PRODUCE_LOCK.lockInterruptibly(); try { NOT_FULL.signalAll(); } finally { PRODUCE_LOCK.unlock(); } } } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期生產,模擬隨機的用戶請求 Thread.sleep((long) (Math.random() * 1000)); int newBufSize = -1; PRODUCE_LOCK.lockInterruptibly(); try { while (bufLen.get() == cap) { System.out.println("buffer is full..."); NOT_FULL.await(); } Task task = new Task(increTaskNo.getAndIncrement()); buffer.offer(task); newBufSize = bufLen.incrementAndGet(); System.out.println("produce: " + task.no); if (newBufSize < cap) { NOT_FULL.signalAll(); } } finally { PRODUCE_LOCK.unlock(); } if (newBufSize > 0) { CONSUME_LOCK.lockInterruptibly(); try { NOT_EMPTY.signalAll(); } finally { CONSUME_LOCK.unlock(); } } } } private static class Buffer<E> { private Node head; private Node tail; Buffer() { // dummy node head = tail = new Node(null); } public void offer(E e) { tail.next = new Node(e); tail = tail.next; } public E poll() { head = head.next; E e = head.item; head.item = null; return e; } private class Node { E item; Node next; Node(E item) { this.item = item; } } } public static void main(String[] args) { Model model = new LockConditionModel2(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } 複製代碼
須要注意的是,因爲須要同時在UnThreadSafe的緩衝區 buffer 上進行消費與生產,咱們不能使用實現2、三中使用的隊列了,須要本身實現一個簡單的緩衝區 Buffer。Buffer要知足如下條件:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { ... /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); ... /** * Signals a waiting take. Called only from put/offer (which do not * otherwise ordinarily lock takeLock.) */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } /** * Links node at end of queue. * * @param node the node */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } /** * Removes a node from head of queue. * * @return the node */ private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } ... /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } ... /** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } ... public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } ... } 複製代碼
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。