生產者-消費者模式的三種實現方式

一、查看程序中線程的名稱與狀態java

/**
	 * 4:Signal Dispatcher:RUNNABLE
	 * 3:Finalizer:WAITING
	 * 2:Reference Handler:WAITING
	 * 1:main:RUNNABLE
	 */
	@Test
	public void testName(){
		ThreadMXBean tb = ManagementFactory.getThreadMXBean();
		ThreadInfo[] infos = tb.dumpAllThreads(false, false);
		for(ThreadInfo info : infos){
			System.out.println(info.getThreadId()+":"
+info.getThreadName()+":"
+info.getThreadState());
		}
	}

二、等待通知機制this

一個線程修改一個對象的值,而另外一個線程感知到了變化,而後進行相應的操做,這個過程始於一個線程,而最終執行又是另外一個線程。前者是生產者,後者就是消費者。spa

生產者-消費者模式隔離了「作什麼」和「怎麼作」,在功能層面上實現瞭解耦,所以具有很好的擴展伸縮能力。線程

 

三、等待通知基本步驟code

A:等待方和通知方遵循以下原則對象

1)獲取對象的鎖(生產者和消費者必須是同一把鎖)隊列

2)若是條件不知足,則執行鎖的wait方法,等待被notify,notify後仍要繼續檢查條件,因此是whileip

3)若是條件知足,則執行對應的邏輯以及執行鎖的notify。rem

B:僞代碼get

synchronized(lock){  // 獲取對象的鎖
			while(list.size() - num < 0){ // 使用while作條件判斷
				try {
					lock.wait(); // 條件不知足則等待
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			for(int i = 0; i < num ; i++){
				list.remove(0); // 改變條件
			}
			lock.notifyAll(); // 執行邏輯以後調用notify
		}

 

四、生產者-消費者模式的具體實現

生產者-消費者模式的核心在於容器的容量,若是容器已經滿了則通知生產者不要生產;若是容器已經空了則通知消費者不要繼續消費。所以容器的加減應該作同步,容器的容量做爲通知的條件。這其實就是阻塞隊列的實現。

1)使用Object的 wait 和 notify 實現

2)使用Lock的condition中 await 和 signal 實現

3)使用阻塞隊列實現

使用Object的 wait 和 notify 實現 以下:

package more_service.base_info;

import java.util.ArrayList;
import java.util.List;

public class ThreadWaitNotifyTest {
	
	private static List<String> list = new ArrayList<String>();
	
	private static final int MAX = 15;
	
	private static Object lock = new Object();
	
	public static void main(String[] args) {
		Thread p1 = new Thread(new ProductThread(5));
		Thread p2 = new Thread(new ProductThread(5));
		Thread p3 = new Thread(new ProductThread(5));
		Thread p4 = new Thread(new ProductThread(5));
		
		Thread c1 = new Thread(new ConsumerThread(10));
		Thread c2 = new Thread(new ConsumerThread(10));
		
		c1.start();
		c2.start();
		p1.start();
		p2.start();
		p3.start();
		p4.start();
		
		
	}
	
	public static void product(int num){
		synchronized(lock){
			while(list.size() + num > MAX){
				try {
					lock.wait();
					System.out.println(Thread.currentThread().getName()+"---- 已經滿了,不能繼續生產");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			for(int i = 0; i < num ; i++){
				list.add(Thread.currentThread().getName()+":"+i);
				System.out.println(Thread.currentThread().getName()+":"+i);
			}
			lock.notifyAll();
		}
	}
	
	public static void consume(int num){
		synchronized(lock){
			while(list.size() - num < 0){
				try {
					lock.wait();
					System.out.println(Thread.currentThread().getName()+"---- 已經空了,不能繼續消費了");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			for(int i = 0; i < num ; i++){
				System.out.println(Thread.currentThread().getName()+":"+list.remove(0));
			}
			lock.notifyAll();
		}
	}
	
	
	static class ProductThread implements Runnable{
		
		private int num;
		
		public ProductThread(int num){
			this.num = num;
		}
		
		public void run() {
			product(this.num);
		}
	}
	
	
	static class ConsumerThread implements Runnable{

		private int num;
		
		public ConsumerThread(int num){
			this.num = num;
		}
		
		public void run() {
			consume(this.num);
		}
		
	}
	

}

使用Lock的condition中 await 和 signal 實現以下:

/**
 * <p>項目名稱:mvn
 * <p>Package名稱:com.hnust.test
 * 文件名稱:LockTest.java 
 * 版本:1.00 
 * 建立日期:2015年9月13日
 * Copyright©2014 HNUST .All Rights Reserved.
 */
package com.hnust.test;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
 
/**
 *@author:Heweipo
 *@version 1.00
 *
 */
public class LockTest {
     
    public static void main(String[] args) {
         
        Data data = new Data();
        MyTask1 t1 = new MyTask1(data);
        MyTask2 t2 = new MyTask2(data);
         
        t1.start();
        t2.start();
         
    }
}
 
class Data {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
     
    public Data(){
        System.out.println( c1 == c2);
        lock = new ReentrantReadWriteLock().readLock();
    }
     
    public int increase(){
        lock.lock();
        try{
            if(number != 0){
                try {
                    c1.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            number++;
            c2.signal();
        }finally{
            lock.unlock();
        }
        return number;
    }
     
    public int decrease(){
        lock.lock();
        try{
            if(number != 1){
                try {
                    c2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            number--;
            c1.signal();
        }finally{
            lock.unlock();
        }
        return number;
    }
     
}
 
class MyTask1 extends Thread{
     
    private Data data;
     
    public MyTask1(Data data){
        this.data = data;
    }
     
    public void run() {
        for(int i = 0 ; i < 10 ; i ++){
            System.out.println("decrease:"+data.decrease());
        }
    }
     
}
 
class MyTask2 extends Thread{
     
    private Data data;
     
    public MyTask2(Data data){
        this.data = data;
    }
     
    public void run() {
        for(int i = 0 ; i < 10 ; i ++){
            System.out.println("increase:"+data.increase());
        }
    }
     
}

使用阻塞隊列實現以下:

package more_service.base_info;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import more_service.base_info.ThreadWaitNotifyTest.ConsumerThread;
import more_service.base_info.ThreadWaitNotifyTest.ProductThread;

public class BlockingQueueTest {
	
	public static BlockingQueue<String> queue = new LinkedBlockingQueue(3);
	
	public static void main(String[] args) {
		Thread p1 = new Thread(new ProductThread("1"));
		Thread p2 = new Thread(new ProductThread("2"));
		Thread p3 = new Thread(new ProductThread("3"));
		Thread p4 = new Thread(new ProductThread("4"));
		
		Thread c1 = new Thread(new ConsumerThread());
		Thread c2 = new Thread(new ConsumerThread());
		Thread c3 = new Thread(new ConsumerThread());
		Thread c4 = new Thread(new ConsumerThread());
		
		c1.start();
		c2.start();
		c3.start();
		c4.start();
		p1.start();
		p2.start();
		p3.start();
		p4.start();
	}
	
	static class ProductThread implements Runnable{
		
		private String name;
		
		public ProductThread(String name){
			this.name = name;
		}
		
		public void run() {
			try {
				System.out.println("put:"+name);
				queue.put(name);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	
	static class ConsumerThread implements Runnable{
		public void run() {
			try {
				System.out.println("take:"+queue.take());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}
	
	

}
相關文章
相關標籤/搜索