public class MyQueue<T> { private ReentrantLock lock=new ReentrantLock(); private Condition pullConditon=lock.newCondition(); private Condition pushCondition=lock.newCondition(); private int maxSize; private LinkedList<T> list=new LinkedList<>(); public MyQueue(int size) { maxSize=size; } public void push(T t){ lock.lock(); try { while (list.size()== maxSize) { // Current push thread release lock and sleep. pushCondition.await(); } list.push(t); System.out.printf("%s Push Size %d\n", Thread.currentThread().getName(), list.size()); // Week up all pull thread pullConditon.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public T pull(){ T t=null; lock.lock(); try { while (list.size() == 0) { // Current poll thread release lock and sleep. pullConditon.await(); } t=list.poll(); System.out.printf("%s Pull Size %d\n", Thread.currentThread().getName(), list.size()); //Week up all push threads pushCondition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return t; } }(2)建立消費者
public class Consumer implements Runnable{ private MyQueue<Integer> myQueue; public Consumer(MyQueue<Integer> myQueue) { this.myQueue = myQueue; } @Override public void run() { for (int i = 0; i < 50; i++) { myQueue.pull(); try { Random random=new Random(); Thread.sleep(random.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Producer implements Runnable{ private MyQueue<Integer> myQueue; public Producer(MyQueue<Integer> myQueue) { this.myQueue = myQueue; } @Override public void run() { for (int i = 0; i < 50; i++) { myQueue.push(i); try { Random random=new Random(); Thread.sleep(random.nextInt(500)); } catch (InterruptedException e) { e.printStackTrace(); } } } }(4)Main,建立3個消費者,3個生產者
public class Main { public static void main(String[] args) { MyQueue<Integer> myQueue = new MyQueue<>(100); Thread[] producer = new Thread[3]; Thread[] consumer = new Thread[3]; for (int i=0;i<producer.length;i++) { producer[i] = new Thread(new Producer(myQueue)); } for (int j=0;j<consumer.length;j++) { consumer[j] = new Thread(new Consumer(myQueue)); } for (Thread c : consumer) { c.start(); } for (Thread p : producer) { p.start(); } } }因爲生產者和消費者同樣多,生產者生產的和消費者消費的恰好,因此最終程序會正常結束;