【Java併發編程實戰】– 修改鎖的公平性 lock_3

1、概述

一、fairjava

ReentrantLock 和 ReentrantReadWriterLock 類的 構造器都含有一個布爾參數 fair,這個參數能夠容許你控制這兩個類的行爲。多線程

默認 fair 值是 false; 稱之爲 非公平模式(Non-Fair-Mode),在 非公平模式下,有不少線程組 等待鎖(ReentrantLock 和 ReentrantReadWriterLock)時,鎖將選擇它們中的一個來訪問臨界區,這個選擇是沒有任何約束的。ide

若 fair 值是 true,則稱爲公平模式(Fair Mode),在公平模式下,有不少線程組 等待鎖(ReentrantLock 和 ReentrantReadWriterLock)時,鎖將選擇它們中的一個來訪問臨界區,並且選擇的是 等待時間最長的。this

上面的 fair 2種模式只適用 lock() 和 unlock() 方法。而 Lock 接口的 tryLock() 方法沒有將線程置於休眠,fair 屬性並不影響這個方法。spa

二、Condition線程

Condition 將 Object的通訊方法(wait、notify 和 notifyAll)分解成大相徑庭的對象,以便經過將這些對象與任意 Lock 實現組合使用,爲每一個對象提供多個等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 通訊方法的使用。code

在Condition中,用 await() 替換wait(),用 signal() 替換 notify(),用 signalAll()替換 notifyAll(),傳統線程的通訊方式,Condition均可以實現,這裏注意,Condition是被綁定到Lock上的,要建立一個Lock的Condition必須用 newCondition() 方法。對象

Condition 的強大之處在於它能夠爲多個線程間創建不一樣的 Condition, 使用 synchronized/wait() 只有一個阻塞隊列,notifyAll會喚起全部阻塞隊列下的線程,而使用 lock/condition,能夠實現多個阻塞隊列,signalAll 只會喚起某個阻塞隊列下的阻塞線程。接口

一個鎖可能關聯一個或者多個條件,這些條件經過 Condition 接口聲明。目的是容許線程 獲取鎖而且查看等待的某一個條件是否知足,若是不知足就掛起直到某個線程喚醒它們。隊列

2、實現

import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用lock/condition實現生產者消費者模式
 * Condition 
 * Condition 將 Object的通訊方法(wait、notify 和 notifyAll)分解成大相徑庭的對象,
 * 以便經過將這些對象與任意 Lock 實現組合使用,爲每一個對象提供多個等待 set (wait-set)。
 * 其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 通訊方法的使用。
 */
public class Buffer {

	private Lock lock;
    private Condition notFull;
    private Condition notEmpty;
    private int maxSize;
    private List<Date> storage;
    public Buffer(int size){
        //使用鎖lock,而且建立兩個condition,至關於兩個阻塞隊列
        lock = new ReentrantLock();
        notFull = lock.newCondition();
        notEmpty = lock.newCondition();
        maxSize = size;
        storage = new LinkedList<>();
    }
    public void put()  {
        lock.lock();
        try {   
            while (storage.size() == maxSize ){//若是隊列滿了
                System.out.print(Thread.currentThread().getName()+": wait \n");;
                notFull.await(); // 阻塞生產線程  
            }
            storage.add(new Date());
            System.out.print(Thread.currentThread().getName()+": put:"+storage.size()+ "\n");
            Thread.sleep(1000);         
            //當生產者執行put方法時,調用 notEmpty.signalAll()只會喚醒  notEmpty.await()下的消費者線程。 
            notEmpty.signalAll(); // 喚醒消費線程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{   
            lock.unlock();
        }
    }

    public  void take() {       
        lock.lock();
        try {  
            while (storage.size() == 0 ){//若是隊列滿了
                System.out.print(Thread.currentThread().getName()+": wait \n");;
                notEmpty.await(); // 阻塞消費線程
            }
            ((LinkedList<Date>)storage).poll();
            System.out.print(Thread.currentThread().getName()+": take:"+storage.size()+ "\n");
            Thread.sleep(1000); 
            //當消費者執行塔克方法時,調用notFull.signalAll()只會喚醒notFull.await()下的消費者線程。
            notFull.signalAll(); // 喚醒生產線程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally{
            lock.unlock();
        }
    }
	
}
import java.util.concurrent.TimeUnit;

/**
 * 生產者
 */
public class Producer implements Runnable{

	private Buffer buffer;
	
	public Producer(Buffer buffer) {
		this.buffer = buffer;
	}
	
	@Override
	public void run() {
	    while(true){
	    	buffer.put();
	    	try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
	    }	
	}
}
import java.util.concurrent.TimeUnit;

/**
 * 消費者
 */
public class Consumer implements Runnable{

	private Buffer buffer;
	
	public Consumer(Buffer buffer) {
		this.buffer = buffer;
	}
	
	@Override
	public void run() {
		while(true){
			buffer.take();
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
public class BufferTest {

	public static void main(String[] args) {
		Buffer buffer = new Buffer(5);
		Producer producer = new Producer(buffer);
		Consumer consumer = new Consumer(buffer);
		for(int i=0;i<3;i++){
			Thread producerThread = new Thread(producer, "producer_" + i);
			producerThread.start();
		}
		
		for(int i=0;i<5;i++){
			Thread consumerThread = new Thread(consumer, "consumer_" + i);
			consumerThread.start();
		}
	}
	
}
相關文章
相關標籤/搜索