前面一個系列的文章都在圍繞hash展開,今天準備先說下concurrent包,這個系列可能會以使用場景說明爲主,concurrent包自己的代碼分析可能比較少; 我在這方面的實踐經驗較爲有限,有錯誤歡迎批評指正 html
concurrent包是jdk1.5引入的重要的包,主要代碼由大牛Doug Lea完成,實際上是在jdk1.4時代,因爲java語言內置對多線程編程的支持比較基礎和有限,因此他寫了這個,由於實在太過於優秀,因此被加入到jdk之中;
java.util.concurrent:提供大部分關於併發的接口和類,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等
java.util.concurrent.atomic:提供全部原子操做的類, 如AtomicInteger, AtomicLong等;
java.util.concurrent.locks:提供鎖相關的類, 如Lock, ReentrantLock, ReadWriteLock, Condition等;
1. 首先,功能很是豐富,諸如線程池(ThreadPoolExecutor),CountDownLatch等併發編程中須要的類已經有現成的實現,不須要本身去實現一套; 畢竟jdk1.4對多線程編程的主要支持幾乎就只有Thread, Runnable,synchronized等 java
2. concurrent包裏面的一些操做是基於硬件級別的CAS(compare and swap),就是在cpu級別提供了原子操做,簡單的說就能夠提供無阻塞、無鎖定的算法; 而現代cpu大部分都是支持這樣的算法的;算法
前面一篇說了concurrent包的基本結構,接下來首先看一下一個很是有用的類,CountDownLatch, 能夠用來在一個線程中等待多個線程完成任務的類;
Java代碼 緩存
@Test public void demoCountDown() { int count = 10; final CountDownLatch l = new CountDownLatch(count); for(int i = 0; i < count; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("thread " + index + " has finished..."); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("now all threads have finished"); }
thread 1 has finished...
thread 3 has finished...
thread 4 has finished...
thread 6 has finished...
thread 8 has finished...
thread 0 has finished...
thread 7 has finished...
thread 9 has finished...
thread 2 has finished...
thread 5 has finished...
now all threads have finished
有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來咱們看下Atomic相關的類, 好比AtomicLong, AtomicInteger等這些;
簡單的說,這些類都是線程安全的,支持無阻塞無鎖定的 jvm
package com.hetaoblog.concurrent.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; /** * * by http://www.hetaoblog.com * @author hetaoblog * */ public class AtomicTest { @Test public void testAtomic() { final int loopcount = 10000; int threadcount = 10; final NonSafeSeq seq1 = new NonSafeSeq(); final SafeSeq seq2 = new SafeSeq(); final CountDownLatch l = new CountDownLatch(threadcount); for(int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < loopcount; ++j) { seq1.inc(); seq2.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("NonSafeSeq:" + seq1.get()); System.out.println("SafeSeq with atomic: " + seq2.get()); } } class NonSafeSeq{ private long count = 0; public void inc() { count++; } public long get() { return count; } } class SafeSeq{ private AtomicLong count = new AtomicLong(0); public void inc() { count.incrementAndGet(); } public long get() { return count.longValue(); } }
其中NonSafeSeq是做爲對比的類,直接放一個private long count不是線程安全的,而SafeSeq裏面放了一個AtomicLong,是線程安全的;能夠直接調用incrementAndGet來增長
finished : 1
finished : 0
finished : 3
finished : 2
finished : 5
finished : 4
finished : 6
finished : 8
finished : 9
finished : 7
both have finished....
SafeSeq with atomic: 100000
在jdk 1.4時代,線程間的同步主要依賴於synchronized關鍵字,本質上該關鍵字是一個對象鎖,能夠加在不一樣的instance上或者class上,從使用的角度則分別能夠加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法;
簡單的說你能夠new一個ReentrantLock, 而後經過lock.lock和lock.unlock來獲取鎖和釋放鎖;注意必須將unlock放在finally塊裏面,
1. 是更好的性能,
2. 提供同一個lock對象上不一樣condition的信號通知
3. 還提供lockInterruptibly這樣支持響應中斷的加鎖過程,意思是說你試圖去加鎖,可是當前鎖被其餘線程hold住,而後你這個線程能夠被中斷;
package com.hetaoblog.concurrent.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; public class ReentrantLockDemo { @Test public void demoLock() { final int loopcount = 10000; int threadcount = 10; final SafeSeqWithLock seq = new SafeSeqWithLock(); final CountDownLatch l = new CountDownLatch(threadcount); for(int i = 0; i < threadcount; ++i) { final int index = i; new Thread(new Runnable() { @Override public void run() { for(int j = 0; j < loopcount; ++j) { seq.inc(); } System.out.println("finished : " + index); l.countDown(); } }).start(); } try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("both have finished...."); System.out.println("SafeSeqWithLock:" + seq.get()); } } class SafeSeqWithLock{ private long count = 0; private ReentrantLock lock = new ReentrantLock(); public void inc() { lock.lock(); try{ count++; } finally{ lock.unlock(); } } public long get() { return count; } }
finished : 7
finished : 2
finished : 6
finished : 1
finished : 5
finished : 3
finished : 0
finished : 9
finished : 8
finished : 4
both have finished....
A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.
@Test public void testRWLock_getw_onr() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final Lock rlock = lock.readLock(); final Lock wlock = lock.writeLock(); final CountDownLatch l = new CountDownLatch(2); // start r thread new Thread(new Runnable() { @Override public void run() { System.out.println(new Date() + "now to get rlock"); rlock.lock(); try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + "now to unlock rlock"); rlock.unlock(); l.countDown(); } }).start(); // start w thread new Thread(new Runnable() { @Override public void run() { System.out.println(new Date() + "now to get wlock"); wlock.lock(); System.out.println(new Date() + "now to unlock wlock"); wlock.unlock(); l.countDown(); } }).start(); try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + "finished"); }
這代碼在我機器上打印的結果是, 也就是試圖得到寫鎖的線程只有當另一個線程將讀鎖釋放了之後才能夠得到
Tue Feb 28 23:18:13 CST 2012now to get rlock
Tue Feb 28 23:18:13 CST 2012now to get wlock
Tue Feb 28 23:18:33 CST 2012now to unlock rlock
Tue Feb 28 23:18:33 CST 2012now to unlock wlock
Tue Feb 28 23:18:33 CST 2012finished
這意思是,若是你已經有了讀鎖,再去試圖得到寫鎖,將會沒法得到, 一直堵住了;
@Test public void testRWLock_downgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }
now to get wlock
now to get rlock
now to unlock wlock
now to unlock rlock
@Test public void testRWLock_upgrade() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Lock rlock = lock.readLock(); Lock wlock = lock.writeLock(); System.out.println("now to get rlock"); rlock.lock(); System.out.println("now to get wlock"); wlock.lock(); System.out.println("now to unlock wlock"); wlock.unlock(); System.out.println("now to unlock rlock"); rlock.unlock(); System.out.println("finished"); }
now to get rlock
now to get wlock
有網友建議我在介紹concurrent包以前先介紹下jdk1.5以前的多線程知識,這是個至關不錯的想法, 這篇就先介紹下Thread類;
public void run()
簡單的說來,基本的建立一個完成本身功能的線程能夠繼承Thread類,而後override這個run方法, 以下所示
public class ThreadDemo { @Test public void testThread() { SimpleThread t = new SimpleThread(); t.start(); } } class SimpleThread extends Thread{ @Override public void run() { System.out.println( Thread.currentThread().getName() + " is running "); } }
一般在run方法裏面實現本身要作的功能,這裏簡單的打印了了一句話, 運行結果是
Thread-0 is running
啓動一個線程就是new一個本身的Thread對象,而後調用其中的start方法啓動這個線程;注意, run()方法運行結束以後這個線程的生命週期就結束了;
上面舉的例子是說啓動一個線程就去完成一個任務,有的時候咱們須要一個線程始終在跑,按期執行一些任務,而後在某個時刻中止這個線程的運行; 那麼能夠有相似下面的一段代碼:
public class ThreadDemo { public static void main(String[] args) { PeriodicalRunningThread t = new PeriodicalRunningThread(); t.start(); System.out.println("main thread is going to sleep..."); try { Thread.currentThread().sleep(20 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " now to stop PeriodicalRunningThread"); t.setRunning(false); } } class PeriodicalRunningThread extends Thread{ private volatile boolean running = true; @Override public void run() { while(running) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " is running " + new Date()); try { Thread.currentThread().sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(new Date() + " " + Thread.currentThread().getName() + " will end"); } public void setRunning(boolean running) { this.running = running; } }
main thread is going to sleep…
Wed Feb 29 21:10:39 CST 2012 Thread-0 is running Wed Feb 29 21:10:39 CST 2012
Wed Feb 29 21:10:44 CST 2012 Thread-0 is running Wed Feb 29 21:10:44 CST 2012
Wed Feb 29 21:10:49 CST 2012 Thread-0 is running Wed Feb 29 21:10:49 CST 2012
Wed Feb 29 21:10:54 CST 2012 Thread-0 is running Wed Feb 29 21:10:54 CST 2012
Wed Feb 29 21:10:59 CST 2012 now to stop PeriodicalRunningThread
Wed Feb 29 21:10:59 CST 2012 Thread-0 will end
1. sleep函數,做用是讓當前線程sleep一段時間,單位以毫秒計算;
public static void sleep(long millis)
2. 靜態方法Thread.currentThread(), 獲得當前線程
public static Thread currentThread()
3. getName方法,獲得當前線程名稱
public final String getName()
這個名稱能夠在構造Thread的時候傳入, 也能夠經過setName()方法設置;這個在多線程調試的時候是比較有用的,設置當前線程名,而後在log4j的輸出字符串格式裏面加入%t,就能夠在日誌中打印當前線程名稱,方便看到當前的日誌是從哪裏來的;
如今介紹下多線程裏面另一個重要的接口Runnable, 這個接口表示能夠被一個線程執行的任務,事實上Thread類也實現了這個Runnable接口;
這個接口只有一個函數, 實現者只要在裏面調用代碼就能夠了
void run()
同時, Thread類有個構造函數是傳入一個Runnable實現的;
經常使用的一個用法就是經過匿名內部類來建立線程執行簡單任務,避免寫太多的類,外部須要的變量能夠經過加final修飾符後傳入, 代碼例子以下:
public static void testThreadWithRunnable() { final String word = "hello,world"; new Thread(new Runnable() { @Override public void run() { System.out.println(word); } }).start(); } public static void main(String[] args) { //periodicalThreadTest(); testThreadWithRunnable(); }
可是有的時候的確須要須要對一個鎖作多種不一樣狀況的精確通知, 好比一個緩存,滿了和空了是兩種不一樣的狀況,能夠分別通知取數據的線程和放數據的線程;
* Condition是個接口,基本的方法就是await()和signal()方法;
* Condition依賴於Lock接口,生成一個Condition的基本代碼是lock.newCondition()
* 調用Condition的await()和signal()方法,都必須在lock保護以內,就是說必須在lock.lock()和lock.unlock之間才能夠
* 和Object.wait()方法同樣,每次調用Condition的await()方法的時候,當前線程就自動釋放了對當前鎖的擁有權
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
a. 若是當前個數是0(用一個count計數), 那麼就調用notEmpty.await等待,鎖就釋放了;
b. 取數據的索引專門有一個,每次向前一步; 若是到頭了就從0開始循環使用
a.若是count和length相等,也就是滿了,那就調用notFull.await等待,釋放了鎖; 等待有一些take()調用完成以後纔會進入
b. 放數據也有一個索引putptr, 放入數據; 若是到頭了也從0開始循環使用
c. 調用notempty.signal(); 若是有線程在take()的時候await住了,那麼就會被通知到,能夠繼續進行操做
1. 我將BoundedBuffer的大小設置成5,同時在每次進入notFull和notEmpty的await()的時候打印一下表示當前線程正在等待;
2. 先開啓10個線程作put()操做,預計有5個線程能夠完成,另外5個會進入等待
3. 主線程sleep10秒中,而後啓動10個線程作take()操做;
這個時候,首先第一個take()必然成功完成,在這以前等待的5個put()線程都不會被喚醒, 接下來的事情就很差說了;
a. 開始take()的時候,有5個連續的take()線程完成操做; 而後又進入put()和take()交替的狀況
b. 第一個take()以後,馬上會有一個put()線程被notFull().signal()喚醒; 而後繼續有take()和put()交替的狀況;
可是任什麼時候候,未完成的take()線程始終>=未完成的put()線程, 這個也是很天然的;
package com.hetaoblog.concurrent.test; import java.util.Date; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; public class BoundedBufferTest { @Test public void testPutTake(){ final BoundedBuffer bb = new BoundedBuffer(); int count = 10; final CountDownLatch c = new CountDownLatch(count * 2); System.out.println(new Date() + " now try to call put for " + count ); for(int i = 0; i < count ; ++i) { final int index = i; try { Thread t = new Thread(new Runnable() { @Override public void run() { try { bb.put(index); System.out.println(new Date() + " put finished: " + index); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } catch (Exception e) { e.printStackTrace(); } } try { System.out.println(new Date() + " main thread is going to sleep for 10 seconds"); Thread.sleep(10 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(new Date() + " now try to take for count: " + count); for(int i =0; i < count; ++i) { Thread t= new Thread(new Runnable() { @Override public void run() { try { Object o = bb.take(); System.out.println(new Date() + " take get: " + o); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } try { System.out.println(new Date() + ": main thread is to wait for all threads"); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " all threads finished"); } } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { System.out.println(new Date() + " put is to wait...."); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { System.out.println(new Date() + " take is going to wait.."); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
這是其中一個執行結果,正好對應前面說的狀況a, 5個take()先完成;這裏出現了take()線程調用notEmpty.await()的狀況
Thu Mar 15 21:15:13 CST 2012 now try to call put for 10
Thu Mar 15 21:15:13 CST 2012 put finished: 0
Thu Mar 15 21:15:13 CST 2012 put finished: 2
Thu Mar 15 21:15:13 CST 2012 put finished: 3
Thu Mar 15 21:15:13 CST 2012 put finished: 1
Thu Mar 15 21:15:13 CST 2012 main thread is going to sleep for 10 seconds
Thu Mar 15 21:15:13 CST 2012 put finished: 4
Thu Mar 15 21:15:13 CST 2012 put is to wait....
Thu Mar 15 21:15:13 CST 2012 put is to wait....
Thu Mar 15 21:15:13 CST 2012 put is to wait....
Thu Mar 15 21:15:13 CST 2012 put is to wait....
Thu Mar 15 21:15:13 CST 2012 put is to wait....
Thu Mar 15 21:15:23 CST 2012 now try to take for count: 10
Thu Mar 15 21:15:23 CST 2012 take get: 3
Thu Mar 15 21:15:23 CST 2012 take get: 2
Thu Mar 15 21:15:23 CST 2012 take get: 1
Thu Mar 15 21:15:23 CST 2012 take get: 0
Thu Mar 15 21:15:23 CST 2012 take get: 4
Thu Mar 15 21:15:23 CST 2012 put finished: 5
Thu Mar 15 21:15:23 CST 2012: main thread is to wait for all threads
Thu Mar 15 21:15:23 CST 2012 take is going to wait..
Thu Mar 15 21:15:23 CST 2012 take get: 5
Thu Mar 15 21:15:23 CST 2012 put finished: 6
Thu Mar 15 21:15:23 CST 2012 put finished: 8
Thu Mar 15 21:15:23 CST 2012 put finished: 7
Thu Mar 15 21:15:23 CST 2012 put finished: 9
Thu Mar 15 21:15:23 CST 2012 take get: 6
Thu Mar 15 21:15:23 CST 2012 take get: 7
Thu Mar 15 21:15:23 CST 2012 take get: 8
Thu Mar 15 21:15:23 CST 2012 take get: 9
Thu Mar 15 21:15:23 CST 2012 all threads finished
Thu Mar 15 21:02:49 CST 2012 now try to call put for 10
Thu Mar 15 21:02:49 CST 2012 put finished: 3
Thu Mar 15 21:02:49 CST 2012 put finished: 1
Thu Mar 15 21:02:49 CST 2012 put finished: 0
Thu Mar 15 21:02:49 CST 2012 put finished: 2
Thu Mar 15 21:02:49 CST 2012 put finished: 4
Thu Mar 15 21:02:49 CST 2012 put is to wait....
Thu Mar 15 21:02:49 CST 2012 put is to wait....
Thu Mar 15 21:02:49 CST 2012 put is to wait....
Thu Mar 15 21:02:49 CST 2012 main thread is going to sleep for 10 seconds
Thu Mar 15 21:02:49 CST 2012 put is to wait....
Thu Mar 15 21:02:49 CST 2012 put is to wait....
Thu Mar 15 21:02:59 CST 2012 now try to take for count: 10
Thu Mar 15 21:02:59 CST 2012 take get: 1
Thu Mar 15 21:02:59 CST 2012 take get: 0
Thu Mar 15 21:02:59 CST 2012 take get: 3
Thu Mar 15 21:02:59 CST 2012 take get: 4
Thu Mar 15 21:02:59 CST 2012: main thread is to wait for all threads
Thu Mar 15 21:02:59 CST 2012 take is going to wait..
Thu Mar 15 21:02:59 CST 2012 take is going to wait..
Thu Mar 15 21:02:59 CST 2012 put finished: 5
Thu Mar 15 21:02:59 CST 2012 take get: 2
Thu Mar 15 21:02:59 CST 2012 take get: 5
Thu Mar 15 21:02:59 CST 2012 take is going to wait..
Thu Mar 15 21:02:59 CST 2012 take is going to wait..
Thu Mar 15 21:02:59 CST 2012 put finished: 7
Thu Mar 15 21:02:59 CST 2012 put finished: 6
Thu Mar 15 21:02:59 CST 2012 put finished: 8
Thu Mar 15 21:02:59 CST 2012 put finished: 9
Thu Mar 15 21:02:59 CST 2012 take get: 7
Thu Mar 15 21:02:59 CST 2012 take get: 6
Thu Mar 15 21:02:59 CST 2012 take get: 8
Thu Mar 15 21:02:59 CST 2012 take get: 9
Thu Mar 15 21:02:59 CST 2012 all threads finished
Thu Mar 15 21:14:30 CST 2012 now try to call put for 10
Thu Mar 15 21:14:30 CST 2012 main thread is going to sleep for 10 seconds
Thu Mar 15 21:14:30 CST 2012 put finished: 8
Thu Mar 15 21:14:30 CST 2012 put finished: 6
Thu Mar 15 21:14:30 CST 2012 put finished: 2
Thu Mar 15 21:14:30 CST 2012 put finished: 0
Thu Mar 15 21:14:30 CST 2012 put finished: 4
Thu Mar 15 21:14:30 CST 2012 put is to wait....
Thu Mar 15 21:14:30 CST 2012 put is to wait....
Thu Mar 15 21:14:30 CST 2012 put is to wait....
Thu Mar 15 21:14:30 CST 2012 put is to wait....
Thu Mar 15 21:14:30 CST 2012 put is to wait....
Thu Mar 15 21:14:40 CST 2012 now try to take for count: 10
Thu Mar 15 21:14:40 CST 2012 take get: 8
Thu Mar 15 21:14:40 CST 2012 take get: 6
Thu Mar 15 21:14:40 CST 2012 take get: 4
Thu Mar 15 21:14:40 CST 2012 take get: 2
Thu Mar 15 21:14:40 CST 2012: main thread is to wait for all threads
Thu Mar 15 21:14:40 CST 2012 take get: 0
Thu Mar 15 21:14:40 CST 2012 take is going to wait..
Thu Mar 15 21:14:40 CST 2012 take is going to wait..
Thu Mar 15 21:14:40 CST 2012 take is going to wait..
Thu Mar 15 21:14:40 CST 2012 put finished: 1
Thu Mar 15 21:14:40 CST 2012 put finished: 5
Thu Mar 15 21:14:40 CST 2012 put finished: 3
Thu Mar 15 21:14:40 CST 2012 put finished: 9
Thu Mar 15 21:14:40 CST 2012 take get: 1
Thu Mar 15 21:14:40 CST 2012 put finished: 7
Thu Mar 15 21:14:40 CST 2012 take get: 5
Thu Mar 15 21:14:40 CST 2012 take get: 3
Thu Mar 15 21:14:40 CST 2012 take get: 7
Thu Mar 15 21:14:40 CST 2012 take get: 9
Thu Mar 15 21:14:40 CST 2012 all threads finished
在幾回不一樣的執行中,始終能夠觀察到任什麼時候候,未完成的take()線程數>= 未完成的put()線程; 在未完成的線程數相等的狀況下,即便jvm首先調度到了take()線程,也會進入notEmpty.await()釋放鎖,進入等待
3. 主線程sleep10秒中,而後啓動10個線程作put()操做;
2. 先開啓10個線程作take()操做,因爲開始BoundedBuffer裏面沒有東西,因此10個線程所有調用await進入等待
3. 主線程sleep10秒中,而後啓動10個線程作put()操做;
a. 任何一個元素的put()都會發生在take()以前;
b. 若是X表示某個操做成功的次數,在X(put)-X(take)<5的時候,put線程不會進入等待狀態
@Test public void testTakePut(){ final BoundedBuffer bb = new BoundedBuffer(); int count = 10; final CountDownLatch c = new CountDownLatch(count * 2); System.out.println(new Date() + " first try to call take for count: " + count); for(int i =0; i < count; ++i){ final int index = i; Thread t= new Thread(new Runnable() { @Override public void run() { try { Thread.currentThread().setName(" TAKE " + index); Object o = bb.take(); System.out.println(new Date() + " " + " take get: " + o ); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } try { System.out.println(new Date() + " main thread is going to sleep for 10 seconds"); Thread.sleep(10 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println(new Date() + " now try to call put for " + count ); for(int i = 0; i < count ; ++i){ final int index = i; try { Thread t = new Thread(new Runnable() { @Override public void run() { Thread.currentThread().setName(" PUT " + index); try { bb.put(index); System.out.println(new Date() + " " + " put finished: " + index ); } catch (InterruptedException e) { e.printStackTrace(); } c.countDown(); } }); t.start(); } catch (Exception e) { e.printStackTrace(); } } try { System.out.println(new Date() + ": main thread is to wait for all threads"); c.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date() + " all threads finished"); } class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length){ System.out.println(new Date() + " " + Thread.currentThread().getName() + " put is to wait....: " + System.currentTimeMillis()); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { System.out.println(new Date() + " " + Thread.currentThread().getName() + " take is going to wait.. " + System.currentTimeMillis()); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
運行結果1: Fri Mar 16 20:50:10 CST 2012 first try to call take for count: 10 Fri Mar 16 20:50:10 CST 2012 TAKE 0 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 1 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 2 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 3 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 5 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 main thread is going to sleep for 10 seconds Fri Mar 16 20:50:10 CST 2012 TAKE 4 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 7 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 6 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 9 take is going to wait.. Fri Mar 16 20:50:10 CST 2012 TAKE 8 take is going to wait.. Fri Mar 16 20:50:20 CST 2012 now try to call put for 10 Fri Mar 16 20:50:20 CST 2012: main thread is to wait for all threads Fri Mar 16 20:50:20 CST 2012 PUT 7 put finished: 7 Fri Mar 16 20:50:20 CST 2012 PUT 9 put finished: 9 Fri Mar 16 20:50:20 CST 2012 PUT 8 put finished: 8 Fri Mar 16 20:50:20 CST 2012 PUT 3 put is to wait.... Fri Mar 16 20:50:20 CST 2012 PUT 1 put is to wait.... Fri Mar 16 20:50:20 CST 2012 PUT 5 put finished: 5 Fri Mar 16 20:50:20 CST 2012 PUT 4 put is to wait.... Fri Mar 16 20:50:20 CST 2012 TAKE 0 take get: 8 Fri Mar 16 20:50:20 CST 2012 TAKE 2 take get: 9 Fri Mar 16 20:50:20 CST 2012 TAKE 3 take get: 0 Fri Mar 16 20:50:20 CST 2012 TAKE 5 take get: 6 Fri Mar 16 20:50:20 CST 2012 TAKE 4 take get: 5 Fri Mar 16 20:50:20 CST 2012 PUT 2 put finished: 2 Fri Mar 16 20:50:20 CST 2012 PUT 3 put finished: 3 Fri Mar 16 20:50:20 CST 2012 PUT 1 put finished: 1 Fri Mar 16 20:50:20 CST 2012 TAKE 7 take get: 2 Fri Mar 16 20:50:20 CST 2012 TAKE 6 take get: 3 Fri Mar 16 20:50:20 CST 2012 TAKE 9 take get: 1 Fri Mar 16 20:50:20 CST 2012 TAKE 8 take get: 4 Fri Mar 16 20:50:20 CST 2012 PUT 6 put finished: 6 Fri Mar 16 20:50:20 CST 2012 PUT 0 put finished: 0 Fri Mar 16 20:50:20 CST 2012 PUT 4 put finished: 4 Fri Mar 16 20:50:20 CST 2012 TAKE 1 take get: 7 Fri Mar 16 20:50:20 CST 2012 all threads finished 注意到紅色部分: 第一個加爲紅色是由於按照打印結果,put()只完成了3次,就開始有put()進入等待了,而BoundedBuffer的大小是5,理論上應該沒有滿的! 第二個加爲紅色是由於元素4居然先被take,而後再被put! 顯然程序有地方出錯了!具體緣由分析,歡迎關注核桃博客:)