ReentrantLock配合Condition實現生產者消費者模式

ReentrantLock和Condition

ReentrantLock是一個可重入的獨佔鎖,和synchronized相比功能更增強大,synchronized底層是基於JVM虛擬機實現的,而ReentrantLock底層則是基於AQS(AbstractQueuedSynchronizer,Java中大量的同步API都是基於這個抽象類進行封裝)實現的。dom


ReentrantLock基本使用

初始化的時候能夠設置是否爲公平鎖,true爲公平鎖,false爲非公平鎖,默認是非公平鎖。ide

ReentrantLock lock = new ReentrantLock(true);

lock()方法和unlock()方法分別是加鎖和解鎖,由於它不會自動釋放鎖,因此使用的時候必定要在finally語句中調用unlock()方法。this

try {
     lock.lock();
     // 邏輯代碼
 } finally {
     lock.unlock();
 }

相比synchronized關鍵字,多了嘗試獲取鎖、設置超時時間,查看重入次數等功能。線程

// 嘗試獲取鎖,返回是否成功結果
boolean success = lock.tryLock();
// 嘗試獲取鎖,返回是否成功結果,設置超時時間
boolean success = lock.tryLock(1,TimeUnit.SECONDS);
// 統計當前線程獲取鎖的次數
int count = lock.getHoldCount();

Condition

相比於上面的功能,ReentrantLock最值得注意的就是Condition功能,能夠使用ReentrantLock鎖實例獲取一個Condition對象,await()/signal()/signalAll()三個是最核心的方法,分別是等待、喚醒單個線程和喚醒全部線程,這和wait()/notify()/notifyAll()很像,使用方法也很像,必須是得到到鎖才能夠調用。不過更加靈活的地方是能夠new多個Condition對象,而每一個Condition對象只能喚醒調用本身的await()方法的那個線程,好比ConditionA和ConditionB,線程A調用ConditionA的await()方法,那麼必須調用ConditionA的signal()/signalAll()方法才能夠喚醒,調用ConditionB的喚醒方法是沒效果的。code

public class ConditionTest {

 public static void main(String[] args) throws Exception{

     Lock lock = new Lock();
     // 開啓一個子線程
     new Thread(new MyRunnableA(lock)).start();
     TimeUnit.SECONDS.sleep(1);

     // signalB()沒法喚醒子線程,signalB()能夠
     // lock.signalB();
     lock.signalA();
 }

}

// 線程實現類
class MyRunnableA implements Runnable{

 private Lock lock = null;

 public MyRunnableA(Lock lock) {
     this.lock = lock;
 }

 @Override
 public void run() {
     System.out.println("線程等待...");
     // 調用ConditionA的await()方法
     lock.awaitA();
     System.out.println("線程喚醒...");
 }
}

// 鎖封裝類
class Lock {
 private ReentrantLock lock = new ReentrantLock();
 // 初始化兩個Condition對象
 private Condition conditionA = lock.newCondition();
 private Condition conditionB = lock.newCondition();

 //調用ConditionA的await()方法
 public void awaitA(){
     try {
         System.out.println("awaitA");
         lock.lock();
         System.out.println("awaitA獲得鎖");
         conditionA.await();
     }catch (Exception e){
         e.printStackTrace();
     }finally {
         lock.unlock();
     }
 }

 //調用ConditionA的signalAll()方法
 public void singalA(){
     try {
         System.out.println("singalA");
         lock.lock();
         System.out.println("singalA獲得鎖");
         conditionA.signalAll();
     }catch (Exception e){
         e.printStackTrace();
     }finally {
         lock.unlock();
     }
 }

 //調用ConditionB的await()方法
 public void awaitB(){
     try {
         System.out.println("awaitB");
         lock.lock();
         System.out.println("awaitB獲得鎖");
         conditionA.await();
     }catch (Exception e){
         e.printStackTrace();
     }finally {
         lock.unlock();
     }
 }

 //調用ConditionB的signalAll()方法
 public void singalB(){
     try {
         System.out.println("singalB");
         lock.lock();
         System.out.println("singalB獲得鎖");
         conditionB.signalAll();
     }catch (Exception e){
         e.printStackTrace();
     }finally {
         lock.unlock();
     }
 }
}

基於ReentrantLock和Condition實現一個簡單隊列

原理就是基於上面的Condition的等待和喚醒必須基於同一個Condition實例,消費者和生產者各有對應的Condition來控制等待和喚醒,消費者和生產者之間互相喚醒。對象

public class Test2 {

 public static void main(String[] args) {
     

     // 兩個生產者,一個消費者
     Queue queue = new Queue();
     Thread producer1 = new Thread(new Producer(queue));
     producer1.setName("Producer1");
     producer1.start();
     Thread producer2 = new Thread(new Producer(queue));
     producer2.setName("Producer2");
     producer2.start();
     Thread customer = new Thread(new Customer(queue));
     customer.setName("Customer");
     customer.start();
 }
}

// 隊列封裝類
class Queue {
 private int[] arr = new int[5];
 int size = 0;

 // 初始化鎖和兩個Condition
 private ReentrantLock lock = new ReentrantLock();
 public Condition pCondition = lock.newCondition();
 public Condition cCondition = lock.newCondition();
 public void lock() {
     lock.lock();
 }

 public void unLock() {
     lock.unlock();
 }

 public boolean isEmpty() {
     return size==0;
 }

 public boolean isFull() {
     return size==5;
 }

 public void put(Integer value,String name) throws Exception {

     try {
         lock.lock();
         if (isFull()){
             // 隊列滿了讓生產者等待
             pCondition.await();
         }
         arr[size % 5] = value;
         size++;
         // 生產完喚醒消費者
         cCondition.signalAll();
     } finally {
         System.out.println(name +"-put-" + Arrays.toString(arr));
         lock.unlock();
     }
 }

 public int take() throws Exception {
     try {
         lock.lock();
         // 隊列空了就讓生產者等待
         if (isEmpty()){
             cCondition.await();
         }
         int value = arr[size % 5];
         size--;
         // 消費完通知生產者
         pCondition.signalAll();
         return value;
     } finally {
         System.out.println("take-" + Arrays.toString(arr));
         lock.unlock();
     }
 }
}

class Producer implements Runnable {

 Queue queue = null;

 public Producer(Queue queue) {
     this.queue = queue;
 }

 @Override
 public void run() {
     String threadName = Thread.currentThread().getName();

     try {
         // 隔10秒輪詢生產一次
         while (true) {
             System.out.println("Producer");
             TimeUnit.SECONDS.sleep(10);
             queue.put(new Random().nextInt(100),threadName);
         }
     } catch (Exception e) {
         e.printStackTrace();
     }
 }
}

class Customer implements Runnable {

 Queue queue = null;

 public Customer(Queue queue) {
     this.queue = queue;
 }

 @Override
 public void run() {
     String threadName = Thread.currentThread().getName();
     try {

         // 隔3秒輪詢消費一次
         while (true) {
             System.out.println("Customer");
             TimeUnit.SECONDS.sleep(3);
             System.out.println("取到的值-" + queue.take());
         }
     } catch (Exception e) {
         e.printStackTrace();
     }
 }
}
相關文章
相關標籤/搜索