生產者-消費者(producer-consumer)問題,也稱做有界緩衝區(bounded-buffer)問題,兩個進程共享一個公共的固定大小的緩衝區。
其中一個是生產者,用於將消息放入緩衝區;另一個是消費者,用於從緩衝區中取出消息。
問題出如今當緩衝區已經滿了,而此時生產者還想向其中放入一個新的數據項的情形,其解決方法是讓生產者此時進行休眠,等待消費者從緩衝區中取走了一個或者多個數據後再去喚醒它。
一樣地,當緩衝區已經空了,而消費者還想去取消息,此時也可讓消費者進行休眠,等待生產者放入一個或者多個數據時再喚醒它。java
Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成大相徑庭的對象,以便經過將這些對象與任意 Lock 實現組合使用,爲每一個對象提供多個等待 set (wait-set)。緩存
其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。多線程
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通訊方式,Condition均可以實現,這裏注意,Condition是被綁定到Lock上的,要建立一個Lock的Condition必須用newCondition()方法。app
這樣看來,Condition和傳統的線程通訊沒什麼區別,Condition的強大之處在於它能夠爲多個線程間創建不一樣的Condition,下面引入API中的一段代碼,加以說明。ide
代碼:spa
class BoundedBuffer { final Lock lock = new ReentrantLock();//鎖對象 final Condition notFull = lock.newCondition();//寫線程條件 final Condition notEmpty = lock.newCondition();//讀線程條件 final Object[] items = new Object[100];//緩存隊列 int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//若是隊列滿了 notFull.await();//阻塞寫線程 items[putptr] = x;//賦值 if (++putptr == items.length) putptr = 0;//若是寫索引寫到隊列的最後一個位置了,那麼置爲0 ++count;//個數++ notEmpty.signal();//喚醒讀線程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//若是隊列爲空 notEmpty.await();//阻塞讀線程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//若是讀索引讀到隊列的最後一個位置了,那麼置爲0 --count;//個數-- notFull.signal();//喚醒寫線程 return x; } finally { lock.unlock(); } } }
這是一個處於多線程工做環境下的緩存區,緩存區提供了兩個方法,put和take,put是存數據,take是取數據,內部有個緩存隊列,具體變量和方法說明見代碼,.net
這個緩存區類實現的功能:有多個線程往裏面存數據和從裏面取數據,其緩存隊列(先進先出後進後出)能緩存的最大數值是100,多個線程間是互斥的,線程
當緩存隊列中存儲的值達到100時,將寫線程阻塞,並喚醒讀線程,當緩存隊列中存儲的值爲0時,將讀線程阻塞,並喚醒寫線程,這也是ArrayBlockingQueue的內部實現。rest
下面分析一下代碼的執行過程:code
1. 一個寫線程執行,調用put方法;
2. 判斷count是否爲100,顯然沒有100;
3. 繼續執行,存入值;
4. 判斷當前寫入的索引位置++後,是否和100相等,相等將寫入索引值變爲0,並將count+1;
5. 僅喚醒讀線程阻塞隊列中的一個;
6. 一個讀線程執行,調用take方法;
7. ……
8. 僅喚醒寫線程阻塞隊列中的一個。
這就是多個Condition的強大之處,假設緩存隊列中已經存滿,那麼阻塞的確定是寫線程,喚醒的確定是讀線程,相反,阻塞的確定是讀線程,喚醒的確定是寫線程,那麼假設只有一個Condition會有什麼效果呢,緩存隊列中已經存滿,這個Lock不知道喚醒的是讀線程仍是寫線程了,若是喚醒的是讀線程,皆大歡喜,若是喚醒的是寫線程,那麼線程剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了不少時間。
Condition的應用:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { //初始化可重入鎖 final Lock lock = new ReentrantLock(); //第一個條件當屏幕上輸出到3 final Condition reachThreeCondition = lock.newCondition(); //第二個條件當屏幕上輸出到6 final Condition reachSixCondition = lock.newCondition(); //NumberWrapper只是爲了封裝一個數字,一邊能夠將數字對象共享,並能夠設置爲final //注意這裏不要用Integer, Integer 是不可變對象 final NumberWrapper num = new NumberWrapper(); //初始化A線程 Thread threadA = new Thread(new Runnable() { @Override public void run() { //須要先得到鎖 lock.lock(); try { System.out.println("threadA start write"); //A線程先輸出前3個數 while (num.value <= 3) { System.out.println(num.value); num.value++; } //輸出到3時要signal,告訴B線程能夠開始了 reachThreeCondition.signal(); } finally { lock.unlock(); } lock.lock(); try { //等待輸出6的條件 reachSixCondition.await(); System.out.println("threadA start write"); //輸出剩餘數字 while (num.value <= 9) { System.out.println(num.value); num.value++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); while (num.value <= 3) { //等待3輸出完畢的信號 reachThreeCondition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { lock.lock(); //已經收到信號,開始輸出4,5,6 System.out.println("threadB start write"); while (num.value <= 6) { System.out.println(num.value); num.value++; } //4,5,6輸出完畢,告訴A線程6輸出完了 reachSixCondition.signal(); } finally { lock.unlock(); } } }); //啓動兩個線程 threadB.start(); threadA.start(); } }
threadA start write 1 2 3 threadB start write 4 5 6 threadA start write 7 8 9
基本思路就是首先要A線程先寫1,2,3,這時候B線程應該等待reachThredCondition信號,而當A線程寫完3以後就經過signal告訴B線程「我寫到3了,該你了」,
這時候A線程要等reachSixCondition信號,同時B線程獲得通知,開始寫4,5,6,寫完4,5,6以後B線程通知A線程reachSixCondition條件成立了,這時候A線程就開始寫剩下的7,8,9了。
Java官方提供的例子:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { public static void main(String[] args) { final BoundedBuffer boundedBuffer = new BoundedBuffer(); Thread t1 = new Thread(new Runnable() { @Override public void run() { System.out.println("t1 run"); for (int i=0;i<20;i++) { try { System.out.println("putting.."); boundedBuffer.put(Integer.valueOf(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { for (int i=0;i<20;i++) { try { Object val = boundedBuffer.take(); System.out.println(val); } catch (InterruptedException e) { e.printStackTrace(); } } } }) ; t1.start(); t2.start(); } /** * BoundedBuffer 是一個定長100的集合,當集合中沒有元素時,take方法須要等待,直到有元素時才返回元素 * 當其中的元素數達到最大值時,要等待直到元素被take以後才執行put的操做 * @author yukaizhao * */ static class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { System .out.println("put wait lock"); lock.lock(); System.out.println("put get lock"); try { while (count == items.length) { System.out.println("buffer full, please wait"); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { System.out.println("take wait lock"); lock.lock(); System.out.println("take get lock"); try { while (count == 0) { System.out.println("no elements, please wait"); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } } }
t1 run putting.. take wait lock take get lock no elements, please wait put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock 0 take wait lock take get lock 1 take wait lock take get lock 2 take wait lock take get lock 3 take wait lock take get lock 4 take wait lock take get lock 5 take wait lock take get lock 6 take wait lock take get lock 7 take wait lock take get lock 8 take wait lock take get lock 9 take wait lock take get lock 10 take wait lock take get lock 11 take wait lock take get lock 12 take wait lock take get lock 13 take wait lock take get lock 14 take wait lock take get lock 15 take wait lock take get lock 16 take wait lock take get lock 17 take wait lock take get lock 18 take wait lock take get lock 19
也能夠修改t1,t2線程裏面的循環,改爲大於100,可能會碰到集合已滿的狀況。
這個示例中BoundedBuffer是一個固定長度的集合,
這個在其put操做時,若是發現長度已經達到最大長度,那麼要等待notFull信號才能繼續put,若是獲得notFull信號會像集合中添加元素,而且put操做會發出notEmpty的信號,
而在其take方法中若是發現集合長度爲空,那麼會等待notEmpty的信號,接受到notEmpty信號才能繼續take,同時若是拿到一個元素,那麼會發出notFull的信號。
Condition與Object中的wati,notify,notifyAll區別:
1.Condition中的await()方法至關於Object的wait()方法,Condition中的signal()方法至關於Object的notify()方法,Condition中的signalAll()至關於Object的notifyAll()方法。
不一樣的是,Object中的這些方法是和同步鎖捆綁使用的;而Condition是須要與互斥鎖/共享鎖捆綁使用的。
2.Condition它更強大的地方在於:可以更加精細的控制多線程的休眠與喚醒。對於同一個鎖,咱們能夠建立多個Condition,在不一樣的狀況下使用不一樣的Condition。
例如,假如多線程讀/寫同一個緩衝區:當向緩衝區中寫入數據以後,喚醒"讀線程";當從緩衝區讀出數據以後,喚醒"寫線程";而且當緩衝區滿的時候,"寫線程"須要等待;當緩衝區爲空時,"讀線程"須要等待。
若是採用Object類中的wait(),notify(),notifyAll()實現該緩衝區,當向緩衝區寫入數據以後須要喚醒"讀線程"時,不可能經過notify()或notifyAll()明確的指定喚醒"讀線程",而只能經過notifyAll喚醒全部線程(可是notifyAll沒法區分喚醒的線程是讀線程,仍是寫線程)。 可是,經過Condition,就能明確的指定喚醒讀線程。
http://outofmemory.cn/java/java.util.concurrent/lock-reentrantlock-condition
http://blog.csdn.net/ghsau/article/details/7481142