ReentrantLock與Condition構造有界緩存隊列與數據棧

經過ReentrantLock與Condition的設計,以數組爲基礎,能夠實現簡單的隊列和棧的數據結構,臨界阻塞的效果。java

ReentrantLock相對於synchronized比較大的一個區別是有條件變量:Condition,很大一個程度上是爲了解決Object.wait/notify/notifyAll難以使用的問題。Condition(也稱爲條件隊列 或條件變量)爲線程提供了一個含義,以便在某個狀態條件如今可能爲 true 的另外一個線程通知它以前,一直掛起該線程(即讓其「等待」)。由於訪問此共享狀態信息發生在不一樣的線程中,因此它必須受保護,所以要將某種形式的鎖與該條件相關聯。等待提供一個條件的主要屬性是:以原子方式 釋放相關的鎖,並掛起當前線程,就像 Object.wait 作的那樣。多個Condition須要綁定到同一鎖上,能夠實現隊列與棧。數組

隊列:先進先出的原則緩存

棧:先進後出的原則數據結構

類一:模擬隊列的讀寫操做ide

  1 package reentranlock;
  2 
  3 import java.util.concurrent.locks.Condition;
  4 import java.util.concurrent.locks.Lock;
  5 import java.util.concurrent.locks.ReentrantLock;
  6 
  7 public class BoundedBufferQueue {
  8 
  9     static Lock lock = new ReentrantLock();
 10     static Condition read = lock.newCondition();
 11     static Condition write = lock.newCondition();
 12     static Object [] data = new Object [10];// 構造一個緩存隊列
 13     
 14     private static int count = 0;// 用來標識隊列中存放的數據量
 15     private static int readIndex = 0;// 標識讀取的下標
 16     private static int writeIndex = 0;// 標識寫入的下標
 17     
 18     public static void put(Integer num) throws InterruptedException {
 19         try {
 20             lock.lock();
 21             if (count == 10) {
 22                 write.await();// 數據量滿了則阻塞寫的操做
 23             }
 24             data[writeIndex] = num;
 25             count++;
 26             if (++writeIndex == 10) {// 循環寫入數據
 27                 writeIndex = 0;
 28             }
 29             read.signal();// 觸發讀操做
 30         } finally {
 31             lock.unlock();
 32         }
 33     }
 34     
 35     public static Object take() throws InterruptedException {
 36         Object result = null;
 37         try {
 38             lock.lock();
 39             if (count == 0) {// 若是隊列無數據量則阻塞讀操做
 40                 read.await();
 41             }
 42             result = (Integer) data[readIndex];
 43             count--;
 44             if (++readIndex == 10) {// 循環取數據
 45                 readIndex = 0;
 46             }
 47             write.signal();// 觸發寫操做
 48         } finally {
 49             lock.unlock();
 50         }
 51         return result;
 52     }
 53     
 54     // 下面是模擬讀寫操做過程,能夠經過操做時間不一樣來驗證隊列讀取。
 55     public static void main(String[] args) throws InterruptedException {
 56         
 57         Runnable readThread = new Runnable() {
 58             @Override
 59             public void run() {
 60                 while(true){
 61                     for(int i=1;i<Integer.MAX_VALUE;i++){
 62                         try {
 63                             Integer o = (Integer) take();
 64                             System.out.println("讀取:"+o);
 65                             Thread.sleep(3000);
 66                         } catch (InterruptedException e) {
 67                             e.printStackTrace();
 68                         }
 69                     }
 70                 }
 71                 
 72             }
 73         };
 74         
 75         Runnable writeThread = new Runnable() {
 76             @Override
 77             public void run() {
 78                 while(true){
 79                     for(int i=1;i<Integer.MAX_VALUE;i++){
 80                         try {
 81                             put(i);
 82                             System.out.println("寫入:"+i);
 83                             Thread.sleep(1000);
 84                         } catch (InterruptedException e) {
 85                             e.printStackTrace();
 86                         }
 87                     }
 88                 }
 89                 
 90             }
 91         };
 92 
 93         Thread read = new Thread(readThread);
 94         Thread write = new Thread(writeThread);
 95         
 96         read.start();
 97         Thread.currentThread().join(1000);
 98         write.start();
 99     }
100 
101 }

 

類二:模擬棧的讀寫操做spa

  1 package reentranlock;
  2 
  3 import java.util.concurrent.locks.Condition;
  4 import java.util.concurrent.locks.Lock;
  5 import java.util.concurrent.locks.ReentrantLock;
  6 
  7 public class BoundedBufferStack {
  8 
  9     static Lock lock = new ReentrantLock();
 10     static Condition read = lock.newCondition();
 11     static Condition write = lock.newCondition();
 12     static Object [] data = new Object [10];// 構造一個緩存棧
 13     
 14     private static int count = 0;// 用來標識棧中存放的數據量
 15     private static int index = 0;// 標識的下標
 16     
 17     public static void put(Integer num) throws InterruptedException {
 18         try {
 19             lock.lock();
 20             if (count == 10) {// 數據量滿了則阻塞寫操做
 21                 write.await();
 22             }
 23             data[index] = num;
 24             count++;
 25             index++;
 26             if (index == 10) {
 27                 index = 0;
 28             }
 29             read.signal();// 觸發讀操做
 30         } finally {
 31             lock.unlock();
 32         }
 33     }
 34     
 35     public static Object take() throws InterruptedException {
 36         Object result = null;
 37         try {
 38             lock.lock();
 39             if (count == 0) {// 數據量爲空則阻塞讀操做
 40                 read.await();
 41             }
 42             if(index == 0 && count == 10){// 爲了仿造棧的後進先出的模式,取最後寫入的數據
 43                 index = 9;
 44             }else{
 45                 index --;
 46             }
 47             result = (Integer) data[index];
 48             count--;
 49             if (index == 0) {
 50                 index = 0;
 51             }
 52             write.signal();// 觸發寫操做
 53         } finally {
 54             lock.unlock();
 55         }
 56         return result;
 57     }
 58     
 59     // 下面是模擬讀寫操做過程,能夠經過操做時間不一樣來驗證棧的讀取。
 60     public static void main(String[] args) throws InterruptedException {
 61         
 62         Runnable readThread = new Runnable() {
 63             @Override
 64             public void run() {
 65                 while(true){
 66                     for(int i=1;i<Integer.MAX_VALUE;i++){
 67                         try {
 68                             Integer o = (Integer) take();
 69                             System.out.println("讀取:"+o);
 70                             Thread.sleep(5000);
 71                         } catch (InterruptedException e) {
 72                             e.printStackTrace();
 73                         }
 74                     }
 75                 }
 76                 
 77             }
 78         };
 79         
 80         Runnable writeThread = new Runnable() {
 81             @Override
 82             public void run() {
 83                 while(true){
 84                     for(int i=1;i<Integer.MAX_VALUE;i++){
 85                         try {
 86                             put(i);
 87                             System.out.println("寫入:"+i);
 88                             Thread.sleep(1000);
 89                         } catch (InterruptedException e) {
 90                             e.printStackTrace();
 91                         }
 92                     }
 93                 }
 94                 
 95             }
 96         };
 97 
 98         Thread read = new Thread(readThread);
 99         Thread write = new Thread(writeThread);
100         
101         write.start();
102         Thread.currentThread().join(1000);
103         read.start();
104     }
105 
106 }

 ArrayBlockingQueue也是這種設計 "經過平衡生產者和消費者的處理能力來提升總體處理數據的速度",只不過運用ArrayBlockingQueue不要擔憂非單一輩子產者/消費者場景下的系統假死問題,緩衝區空、緩衝區滿的場景BlockingQueue都是定義了不一樣的Condition,因此不會喚醒本身的同類。線程

相關文章
相關標籤/搜索