若是生產者的隊列滿了(while循環判斷是否滿),則等待。若是生產者的隊列沒滿,則生產數據並喚醒消費者進行消費。java
若是消費者的隊列空了(while循環判斷是否空),則等待。若是消費者的隊列沒空,則消費數據並喚醒生產者進行生產。dom
package test;ide
import java.util.Random;this import java.util.Vector;atom import java.util.concurrent.atomic.AtomicInteger;spa
public class Producer implements Runnable {線程
// true--->生產者一直執行,false--->停掉生產者隊列 private volatile boolean isRunning = true;內存
// 公共資源ci private final Vector sharedQueue;
// 公共資源的最大數量 private final int SIZE;
// 生產數據 private static AtomicInteger count = new AtomicInteger();
public Producer(Vector sharedQueue, int SIZE) { this.sharedQueue = sharedQueue; this.SIZE = SIZE; }
@Override public void run() { int data; Random r = new Random();
System.out.println("start producer id = " + Thread.currentThread().getId()); try { while (isRunning) { // 模擬延遲 Thread.sleep(r.nextInt(1000));
// 當隊列滿時阻塞等待 while (sharedQueue.size() == SIZE) { synchronized (sharedQueue) { System.out.println("Queue is full, producer " + Thread.currentThread().getId() + " is waiting, size:" + sharedQueue.size()); sharedQueue.wait(); } }
// 隊列不滿時持續創造新元素 synchronized (sharedQueue) { // 生產數據 data = count.incrementAndGet(); sharedQueue.add(data);
System.out.println("producer create data:" + data + ", size:" + sharedQueue.size()); sharedQueue.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupted(); } }
public void stop() { isRunning = false; } }
|
package test;
import java.util.Random; import java.util.Vector;
public class Consumer implements Runnable {
// 公共資源 private final Vector sharedQueue;
public Consumer(Vector sharedQueue) { this.sharedQueue = sharedQueue; }
@Override public void run() {
Random r = new Random();
System.out.println("start consumer id = " + Thread.currentThread().getId()); try { while (true) { // 模擬延遲 Thread.sleep(r.nextInt(1000));
// 當隊列空時阻塞等待 while (sharedQueue.isEmpty()) { synchronized (sharedQueue) { System.out.println("Queue is empty, consumer " + Thread.currentThread().getId() + " is waiting, size:" + sharedQueue.size()); sharedQueue.wait(); } } // 隊列不空時持續消費元素 synchronized (sharedQueue) { System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size()); sharedQueue.notifyAll(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
|
package test;
import java.util.Vector; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class Test2 {
public static void main(String[] args) throws InterruptedException {
// 1.構建內存緩衝區 Vector sharedQueue = new Vector(); int size = 4;
// 2.創建線程池和線程 ExecutorService service = Executors.newCachedThreadPool(); Producer prodThread1 = new Producer(sharedQueue, size); Producer prodThread2 = new Producer(sharedQueue, size); Producer prodThread3 = new Producer(sharedQueue, size); Consumer consThread1 = new Consumer(sharedQueue); Consumer consThread2 = new Consumer(sharedQueue); Consumer consThread3 = new Consumer(sharedQueue); service.execute(prodThread1); service.execute(prodThread2); service.execute(prodThread3); service.execute(consThread1); service.execute(consThread2); service.execute(consThread3);
// 3.睡一下子而後嘗試中止生產者(結束循環) Thread.sleep(10 * 1000); prodThread1.stop(); prodThread2.stop(); prodThread3.stop();
// 4.再睡一下子關閉線程池 Thread.sleep(3000);
// 5.shutdown()等待任務執行完才中斷線程(由於消費者一直在運行的,因此會發現程序沒法結束) service.shutdown();
} } |