經過ReentrantLock與Condition的設計,以數組爲基礎,能夠實現簡單的隊列和棧的數據結構,臨界阻塞的效果。java
ReentrantLock相對於synchronized比較大的一個區別是有條件變量:Condition,很大一個程度上是爲了解決Object.wait/notify/notifyAll難以使用的問題。Condition(也稱爲條件隊列 或條件變量)爲線程提供了一個含義,以便在某個狀態條件如今可能爲 true 的另外一個線程通知它以前,一直掛起該線程(即讓其「等待」)。由於訪問此共享狀態信息發生在不一樣的線程中,因此它必須受保護,所以要將某種形式的鎖與該條件相關聯。等待提供一個條件的主要屬性是:以原子方式 釋放相關的鎖,並掛起當前線程,就像 Object.wait
作的那樣。多個Condition須要綁定到同一鎖上,能夠實現隊列與棧。數組
隊列:先進先出的原則緩存
棧:先進後出的原則數據結構
類一:模擬隊列的讀寫操做ide
1 package reentranlock; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 public class BoundedBufferQueue { 8 9 static Lock lock = new ReentrantLock(); 10 static Condition read = lock.newCondition(); 11 static Condition write = lock.newCondition(); 12 static Object [] data = new Object [10];// 構造一個緩存隊列 13 14 private static int count = 0;// 用來標識隊列中存放的數據量 15 private static int readIndex = 0;// 標識讀取的下標 16 private static int writeIndex = 0;// 標識寫入的下標 17 18 public static void put(Integer num) throws InterruptedException { 19 try { 20 lock.lock(); 21 if (count == 10) { 22 write.await();// 數據量滿了則阻塞寫的操做 23 } 24 data[writeIndex] = num; 25 count++; 26 if (++writeIndex == 10) {// 循環寫入數據 27 writeIndex = 0; 28 } 29 read.signal();// 觸發讀操做 30 } finally { 31 lock.unlock(); 32 } 33 } 34 35 public static Object take() throws InterruptedException { 36 Object result = null; 37 try { 38 lock.lock(); 39 if (count == 0) {// 若是隊列無數據量則阻塞讀操做 40 read.await(); 41 } 42 result = (Integer) data[readIndex]; 43 count--; 44 if (++readIndex == 10) {// 循環取數據 45 readIndex = 0; 46 } 47 write.signal();// 觸發寫操做 48 } finally { 49 lock.unlock(); 50 } 51 return result; 52 } 53 54 // 下面是模擬讀寫操做過程,能夠經過操做時間不一樣來驗證隊列讀取。 55 public static void main(String[] args) throws InterruptedException { 56 57 Runnable readThread = new Runnable() { 58 @Override 59 public void run() { 60 while(true){ 61 for(int i=1;i<Integer.MAX_VALUE;i++){ 62 try { 63 Integer o = (Integer) take(); 64 System.out.println("讀取:"+o); 65 Thread.sleep(3000); 66 } catch (InterruptedException e) { 67 e.printStackTrace(); 68 } 69 } 70 } 71 72 } 73 }; 74 75 Runnable writeThread = new Runnable() { 76 @Override 77 public void run() { 78 while(true){ 79 for(int i=1;i<Integer.MAX_VALUE;i++){ 80 try { 81 put(i); 82 System.out.println("寫入:"+i); 83 Thread.sleep(1000); 84 } catch (InterruptedException e) { 85 e.printStackTrace(); 86 } 87 } 88 } 89 90 } 91 }; 92 93 Thread read = new Thread(readThread); 94 Thread write = new Thread(writeThread); 95 96 read.start(); 97 Thread.currentThread().join(1000); 98 write.start(); 99 } 100 101 }
類二:模擬棧的讀寫操做spa
1 package reentranlock; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 public class BoundedBufferStack { 8 9 static Lock lock = new ReentrantLock(); 10 static Condition read = lock.newCondition(); 11 static Condition write = lock.newCondition(); 12 static Object [] data = new Object [10];// 構造一個緩存棧 13 14 private static int count = 0;// 用來標識棧中存放的數據量 15 private static int index = 0;// 標識的下標 16 17 public static void put(Integer num) throws InterruptedException { 18 try { 19 lock.lock(); 20 if (count == 10) {// 數據量滿了則阻塞寫操做 21 write.await(); 22 } 23 data[index] = num; 24 count++; 25 index++; 26 if (index == 10) { 27 index = 0; 28 } 29 read.signal();// 觸發讀操做 30 } finally { 31 lock.unlock(); 32 } 33 } 34 35 public static Object take() throws InterruptedException { 36 Object result = null; 37 try { 38 lock.lock(); 39 if (count == 0) {// 數據量爲空則阻塞讀操做 40 read.await(); 41 } 42 if(index == 0 && count == 10){// 爲了仿造棧的後進先出的模式,取最後寫入的數據 43 index = 9; 44 }else{ 45 index --; 46 } 47 result = (Integer) data[index]; 48 count--; 49 if (index == 0) { 50 index = 0; 51 } 52 write.signal();// 觸發寫操做 53 } finally { 54 lock.unlock(); 55 } 56 return result; 57 } 58 59 // 下面是模擬讀寫操做過程,能夠經過操做時間不一樣來驗證棧的讀取。 60 public static void main(String[] args) throws InterruptedException { 61 62 Runnable readThread = new Runnable() { 63 @Override 64 public void run() { 65 while(true){ 66 for(int i=1;i<Integer.MAX_VALUE;i++){ 67 try { 68 Integer o = (Integer) take(); 69 System.out.println("讀取:"+o); 70 Thread.sleep(5000); 71 } catch (InterruptedException e) { 72 e.printStackTrace(); 73 } 74 } 75 } 76 77 } 78 }; 79 80 Runnable writeThread = new Runnable() { 81 @Override 82 public void run() { 83 while(true){ 84 for(int i=1;i<Integer.MAX_VALUE;i++){ 85 try { 86 put(i); 87 System.out.println("寫入:"+i); 88 Thread.sleep(1000); 89 } catch (InterruptedException e) { 90 e.printStackTrace(); 91 } 92 } 93 } 94 95 } 96 }; 97 98 Thread read = new Thread(readThread); 99 Thread write = new Thread(writeThread); 100 101 write.start(); 102 Thread.currentThread().join(1000); 103 read.start(); 104 } 105 106 }
ArrayBlockingQueue也是這種設計 "經過平衡生產者和消費者的處理能力來提升總體處理數據的速度",只不過運用ArrayBlockingQueue不要擔憂非單一輩子產者/消費者場景下的系統假死問題,緩衝區空、緩衝區滿的場景BlockingQueue都是定義了不一樣的Condition,因此不會喚醒本身的同類。線程