java concurrent包介紹及使用

說一說java的concurrent包1-concurrent包簡介

前面一個系列的文章都在圍繞hash展開,今天準備先說下concurrent包,這個系列可能會以使用場景說明爲主,concurrent包自己的代碼分析可能比較少; 我在這方面的實踐經驗較爲有限,有錯誤歡迎批評指正 html

不過前一個系列並未結束,還有一些文章沒有放出來,歡迎關注核桃博客 

concurrent包是jdk1.5引入的重要的包,主要代碼由大牛Doug Lea完成,實際上是在jdk1.4時代,因爲java語言內置對多線程編程的支持比較基礎和有限,因此他寫了這個,由於實在太過於優秀,因此被加入到jdk之中; 

一般所說的concurrent包基本有3個package組成 
java.util.concurrent:提供大部分關於併發的接口和類,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等 
java.util.concurrent.atomic:提供全部原子操做的類, 如AtomicInteger, AtomicLong等; 
java.util.concurrent.locks:提供鎖相關的類, 如Lock, ReentrantLock, ReadWriteLock, Condition等; 

concurrent包的優勢: 
1. 首先,功能很是豐富,諸如線程池(ThreadPoolExecutor),CountDownLatch等併發編程中須要的類已經有現成的實現,不須要本身去實現一套; 畢竟jdk1.4對多線程編程的主要支持幾乎就只有Thread, Runnable,synchronized等 

2. concurrent包裏面的一些操做是基於硬件級別的CAS(compare and swap),就是在cpu級別提供了原子操做,簡單的說就能夠提供無阻塞、無鎖定的算法; 而現代cpu大部分都是支持這樣的算法的;java

說一說java的concurrent包2-等待多個線程完成執行的CountDownLatch 

前面一篇說了concurrent包的基本結構,接下來首先看一下一個很是有用的類,CountDownLatch, 能夠用來在一個線程中等待多個線程完成任務的類; 算法

前面一篇說了concurrent包的基本結構,接下來首先看一下一個很是有用的類,CountDownLatch, 能夠用來在一個線程中等待多個線程完成任務的類; 
一般的使用場景是,某個主線程接到一個任務,起了n個子線程去完成,可是主線程須要等待這n個子線程都完成任務了之後纔開始執行某個操做; 

下面是一段演示代碼 

Java代碼  編程

@Test  
public void demoCountDown()  
{  
    int count = 10;  
  
    final CountDownLatch l = new CountDownLatch(count);  
    for(int i = 0; i < count; ++i)  
    {  
        final int index = i;  
        new Thread(new Runnable() {  
  
            @Override  
            public void run() {  
  
                try {  
                    Thread.currentThread().sleep(20 * 1000);  
                } catch (InterruptedException e) {  
  
                    e.printStackTrace();  
                }  
  
                System.out.println("thread " + index + " has finished...");  
  
                l.countDown();  
  
            }  
        }).start();  
    }  
  
    try {  
        l.await();  
    } catch (InterruptedException e) {  
  
        e.printStackTrace();  
    }  
  
    System.out.println("now all threads have finished");  
  
}
運行的結果 
thread 1 has finished... 
thread 3 has finished... 
thread 4 has finished... 
thread 6 has finished... 
thread 8 has finished... 
thread 0 has finished... 
thread 7 has finished... 
thread 9 has finished... 
thread 2 has finished... 
thread 5 has finished... 
now all threads have finished 

前面10個線程的執行完成順序會變化,可是最後一句始終會等待前面10個線程都完成以後纔會執行數組

說一說java的concurrent包3-線程安全而且無阻塞的Atomic類 

有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來咱們看下Atomic相關的類, 好比AtomicLong, AtomicInteger等這些; 緩存

有了CountDownLatch,涉及到多線程同步的演示就比較容易了,接下來咱們看下Atomic相關的類, 好比AtomicLong, AtomicInteger等這些; 
簡單的說,這些類都是線程安全的,支持無阻塞無鎖定的 

Java代碼  安全

set()  多線程

get()  併發

getAndSet()  jvm

getAndIncrement()  

getAndDecrement()  

getAndAdd()  


等操做 

下面是一個測試代碼 

Java代碼  

package com.hetaoblog.concurrent.test;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.atomic.AtomicLong;  
  
import org.junit.Test;  
/** 
 * 
 * by http://www.hetaoblog.com 
 * @author hetaoblog 
 * 
 */  
public class AtomicTest {  
  
    @Test  
    public void testAtomic()  
    {  
        final int loopcount = 10000;  
        int threadcount = 10;  
  
        final NonSafeSeq seq1 = new NonSafeSeq();  
        final SafeSeq seq2 = new SafeSeq();  
  
        final CountDownLatch l = new CountDownLatch(threadcount);  
  
        for(int i = 0; i < threadcount; ++i)  
        {  
            final int index = i;  
            new Thread(new Runnable() {  
  
                @Override  
                public void run() {  
                    for(int j = 0; j < loopcount; ++j)  
                    {  
  
                        seq1.inc();  
                        seq2.inc();  
                    }  
  
                    System.out.println("finished : " + index);  
                    l.countDown();  
  
                }  
            }).start();  
        }  
  
        try {  
            l.await();  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println("both have finished....");  
  
        System.out.println("NonSafeSeq:" + seq1.get());  
        System.out.println("SafeSeq with atomic: " + seq2.get());  
  
    }  
}  
  
class NonSafeSeq{  
    private long count = 0;  
    public void inc()  
    {  
        count++;  
    }  
  
    public long  get()  
    {  
        return count;  
    }  
}  
  
class SafeSeq{  
    private AtomicLong count  = new AtomicLong(0);  
  
    public void inc()  
    {  
        count.incrementAndGet();  
    }  
  
    public long get()  
    {  
        return count.longValue();  
    }  
}
其中NonSafeSeq是做爲對比的類,直接放一個private long count不是線程安全的,而SafeSeq裏面放了一個AtomicLong,是線程安全的;能夠直接調用incrementAndGet來增長 

運行代碼,能夠獲得相似這樣的結果 
finished : 1 
finished : 0 
finished : 3 
finished : 2 
finished : 5 
finished : 4 
finished : 6 
finished : 8 
finished : 9 
finished : 7 
both have finished.... 
NonSafeSeq:91723 
SafeSeq with atomic: 100000 

能夠看到,10個線程,每一個線程運行了10,000次,理論上應該有100,000次增長,使用了普通的long是非線程安全的,而使用了AtomicLong是線程安全的; 

注意,這個例子也說明,雖然long自己的單個設置是原子的,要麼成功要麼不成功,可是諸如count++這樣的操做就不是線程安全的;由於這包括了讀取和寫入兩步操做;

說一說java的concurrent包4--能夠代替synchronized關鍵字的ReentrantLock 

在jdk 1.4時代,線程間的同步主要依賴於synchronized關鍵字,本質上該關鍵字是一個對象鎖,能夠加在不一樣的instance上或者class上,從使用的角度則分別能夠加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法; 

在jdk 1.4時代,線程間的同步主要依賴於synchronized關鍵字,本質上該關鍵字是一個對象鎖,能夠加在不一樣的instance上或者class上,從使用的角度則分別能夠加在非靜態方法,靜態方法,以及直接synchronized(MyObject)這樣的用法; 
concurrent包提供了一個能夠替代synchronized關鍵字的ReentrantLock, 
簡單的說你能夠new一個ReentrantLock, 而後經過lock.lock和lock.unlock來獲取鎖和釋放鎖;注意必須將unlock放在finally塊裏面, 
reentrantlock的好處 
1. 是更好的性能, 
2. 提供同一個lock對象上不一樣condition的信號通知 
3. 還提供lockInterruptibly這樣支持響應中斷的加鎖過程,意思是說你試圖去加鎖,可是當前鎖被其餘線程hold住,而後你這個線程能夠被中斷; 

簡單的一個例子: 

Java代碼  

package com.hetaoblog.concurrent.test;  
  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.locks.ReentrantLock;  
  
import org.junit.Test;  
  
public class ReentrantLockDemo {  
  
    @Test  
    public void demoLock()  
    {  
        final int loopcount = 10000;  
        int threadcount = 10;  
  
        final SafeSeqWithLock seq = new SafeSeqWithLock();  
  
        final CountDownLatch l = new CountDownLatch(threadcount);  
  
        for(int i = 0; i < threadcount; ++i)  
        {  
            final int index = i;  
            new Thread(new Runnable() {  
  
                @Override  
                public void run() {  
                    for(int j = 0; j < loopcount; ++j)  
                    {  
  
                        seq.inc();  
  
                    }  
  
                    System.out.println("finished : " + index);  
                    l.countDown();  
  
                }  
            }).start();  
        }  
  
        try {  
            l.await();  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println("both have finished....");  
  
        System.out.println("SafeSeqWithLock:" + seq.get());  
  
    }  
}  
  
class SafeSeqWithLock{  
    private long count = 0;  
  
    private ReentrantLock lock = new ReentrantLock();  
  
    public void inc()  
    {  
        lock.lock();  
  
        try{  
            count++;  
        }  
        finally{  
            lock.unlock();  
        }  
    }  
  
    public long get()  
    {  
        return count;  
    }  
}
一樣之前面的相似Sequence的類舉例,經過對inc操做加鎖,保證了線程安全; 
固然,這裏get()我沒有加鎖,對於這樣直接讀取返回原子類型的函數,我認爲不加鎖是沒問題的,至關於返回最近成功操做的值; 

運行結果相似這樣, 
finished : 7 
finished : 2 
finished : 6 
finished : 1 
finished : 5 
finished : 3 
finished : 0 
finished : 9 
finished : 8 
finished : 4 
both have finished.... 

SafeSeqWithLock:100000

說一說java的concurrent包5--讀寫鎖ReadWriteLock 

concurrent包裏面還提供了一個很是有用的鎖,讀寫鎖ReadWriteLock 

concurrent包裏面還提供了一個很是有用的鎖,讀寫鎖ReadWriteLock 
下面是ReadWriteLock接口的說明: 
A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive. 

意思是說讀鎖能夠有不少個鎖同時上鎖,只要當前沒有寫鎖; 
寫鎖是排他的,上了寫鎖,其餘線程既不能上讀鎖,也不能上寫鎖;一樣,須要上寫鎖的前提是既沒有讀鎖,也沒有寫鎖; 
兩個寫鎖不能同時得到無需說明,下面一段程序說明下上了讀鎖之後,其餘線程須要上寫鎖也沒法得到 

Java代碼  

@Test  
public void testRWLock_getw_onr()  
{  
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
    final Lock rlock = lock.readLock();  
    final Lock wlock = lock.writeLock();  
  
    final CountDownLatch l  = new CountDownLatch(2);  
  
    // start r thread  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
  
            System.out.println(new Date() + "now to get rlock");  
            rlock.lock();  
  
            try {  
                Thread.currentThread().sleep(20 * 1000);  
            } catch (InterruptedException e) {  
  
                e.printStackTrace();  
            }  
  
            System.out.println(new Date() + "now to unlock rlock");  
            rlock.unlock();  
  
            l.countDown();  
        }  
    }).start();  
  
    // start w thread  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
  
            System.out.println(new Date() + "now to get wlock");  
            wlock.lock();  
  
            System.out.println(new Date() + "now to unlock wlock");  
            wlock.unlock();  
  
            l.countDown();  
        }  
    }).start();  
  
    try {  
        l.await();  
    } catch (InterruptedException e) {  
  
        e.printStackTrace();  
    }  
  
    System.out.println(new Date() + "finished");  
}
這代碼在我機器上打印的結果是, 也就是試圖得到寫鎖的線程只有當另一個線程將讀鎖釋放了之後才能夠得到 
Tue Feb 28 23:18:13 CST 2012now to get rlock 
Tue Feb 28 23:18:13 CST 2012now to get wlock 
Tue Feb 28 23:18:33 CST 2012now to unlock rlock 
Tue Feb 28 23:18:33 CST 2012now to unlock wlock 
Tue Feb 28 23:18:33 CST 2012finished 

ReadWriteLock的實現是ReentrantReadWriteLock, 
有趣的是,在一個線程中,讀鎖不能直接升級爲寫鎖,可是寫鎖能夠降級爲讀鎖; 
這意思是,若是你已經有了讀鎖,再去試圖得到寫鎖,將會沒法得到, 一直堵住了; 
可是若是你有了寫鎖,再去試圖得到讀鎖,沒問題; 

下面是一段降級的代碼, 

Java代碼  

@Test  
public void testRWLock_downgrade()  
{  
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
    Lock rlock = lock.readLock();  
    Lock wlock = lock.writeLock();  
  
    System.out.println("now to get wlock");  
  
    wlock.lock();  
    System.out.println("now to get rlock");  
    rlock.lock();  
  
    System.out.println("now to unlock wlock");  
  
    wlock.unlock();  
  
    System.out.println("now to unlock rlock");  
    rlock.unlock();  
  
    System.out.println("finished");  
  
}
能夠正常打印出 
now to get wlock 
now to get rlock 
now to unlock wlock 
now to unlock rlock 
finished 

下面是一段升級的代碼, 

Java代碼  

@Test  
    public void testRWLock_upgrade()  
    {  
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();  
  
        Lock rlock = lock.readLock();  
        Lock wlock = lock.writeLock();  
  
        System.out.println("now to get rlock");  
        rlock.lock();  
  
        System.out.println("now to get wlock");  
        wlock.lock();  
  
        System.out.println("now to unlock wlock");  
        wlock.unlock();  
  
        System.out.println("now to unlock rlock");  
        rlock.unlock();  
  
        System.out.println("finished");  
  
    }
只能打印出下面兩句,後面就一直掛住了 
now to get rlock 

now to get wlock

說一說java的concurrent包6–java裏面的線程基礎類Thread 

有網友建議我在介紹concurrent包以前先介紹下jdk1.5以前的多線程知識,這是個至關不錯的想法, 這篇就先介紹下Thread類; 

有網友建議我在介紹concurrent包以前先介紹下jdk1.5以前的多線程知識,這是個至關不錯的想法, 這篇就先介紹下Thread類; 
Thread類是java中的線程,幾乎全部的多線程都在Thread這個類的基礎以後展開; 
下面介紹這個類的基本用法,Thread類的最基本函數就是run函數 
public void run() 
簡單的說來,基本的建立一個完成本身功能的線程能夠繼承Thread類,而後override這個run方法, 以下所示 

Java代碼  

public class ThreadDemo {  
  
    @Test  
    public void testThread()  
    {  
        SimpleThread t = new SimpleThread();  
        t.start();  
  
    }   
  
}  
class SimpleThread extends Thread{  
  
    @Override  
    public void run() {  
  
        System.out.println( Thread.currentThread().getName() + " is running  ");  
    }  
}
一般在run方法裏面實現本身要作的功能,這裏簡單的打印了了一句話, 運行結果是 
Thread-0 is running 
啓動一個線程就是new一個本身的Thread對象,而後調用其中的start方法啓動這個線程;注意, run()方法運行結束以後這個線程的生命週期就結束了; 

上面舉的例子是說啓動一個線程就去完成一個任務,有的時候咱們須要一個線程始終在跑,按期執行一些任務,而後在某個時刻中止這個線程的運行; 那麼能夠有相似下面的一段代碼: 

Java代碼  

public class ThreadDemo {  
  
    public static void main(String[] args)  
    {  
        PeriodicalRunningThread t = new PeriodicalRunningThread();  
        t.start();  
  
        System.out.println("main thread is going to sleep...");  
        try {  
            Thread.currentThread().sleep(20 * 1000);  
  
        } catch (InterruptedException e) {  
  
            e.printStackTrace();  
        }  
  
        System.out.println(new Date() + " now to stop PeriodicalRunningThread");  
        t.setRunning(false);  
  
    }  
  
}   
  
class PeriodicalRunningThread extends Thread{  
  
    private volatile boolean running = true;  
  
    @Override  
    public void run() {  
  
        while(running)  
        {  
            System.out.println(new Date() + " " + Thread.currentThread().getName() +  " is running " + new Date());  
  
            try {  
                Thread.currentThread().sleep(5 * 1000);  
  
            } catch (InterruptedException e) {  
  
                e.printStackTrace();  
            }  
        }  
  
        System.out.println(new Date() + " " + Thread.currentThread().getName() + " will end");  
    }  
  
    public void setRunning(boolean running) {  
        this.running = running;  
    }  
  
}
這段代碼的打印結果是: 
main thread is going to sleep… 
Wed Feb 29 21:10:39 CST 2012 Thread-0 is running Wed Feb 29 21:10:39 CST 2012 
Wed Feb 29 21:10:44 CST 2012 Thread-0 is running Wed Feb 29 21:10:44 CST 2012 
Wed Feb 29 21:10:49 CST 2012 Thread-0 is running Wed Feb 29 21:10:49 CST 2012 
Wed Feb 29 21:10:54 CST 2012 Thread-0 is running Wed Feb 29 21:10:54 CST 2012 
Wed Feb 29 21:10:59 CST 2012 now to stop PeriodicalRunningThread 
Wed Feb 29 21:10:59 CST 2012 Thread-0 will end 

這裏經過一個volatile的boolean值來做爲標識表示這個線程的中止; 
關於這裏的volatile關鍵字的使用,若有興趣能夠先看這個,核桃博客也會在這個系列的後續文章中對這個關鍵字作說明 
http://www.ibm.com/developerworks/cn/java/j-jtp06197.html 

這樣,在這個running標識爲true的時候,該線程一直在跑,可是完成一段任務後會sleep一段時間,而後繼續執行;

說一說java的concurrent包7–Thread和Runnable 

這篇仍是Thread和Runnable的基礎 

這篇仍是Thread和Runnable的基礎 
在前面一篇的代碼裏面已經介紹了Thread類的其餘幾個經常使用的方法, 
1. sleep函數,做用是讓當前線程sleep一段時間,單位以毫秒計算; 
public static void sleep(long millis) 
2. 靜態方法Thread.currentThread(), 獲得當前線程 
public static Thread currentThread() 
3. getName方法,獲得當前線程名稱 
public final String getName() 

這個名稱能夠在構造Thread的時候傳入, 也能夠經過setName()方法設置;這個在多線程調試的時候是比較有用的,設置當前線程名,而後在log4j的輸出字符串格式裏面加入%t,就能夠在日誌中打印當前線程名稱,方便看到當前的日誌是從哪裏來的; 

如今介紹下多線程裏面另一個重要的接口Runnable, 這個接口表示能夠被一個線程執行的任務,事實上Thread類也實現了這個Runnable接口; 
這個接口只有一個函數, 實現者只要在裏面調用代碼就能夠了 
void run() 
同時, Thread類有個構造函數是傳入一個Runnable實現的; 
經常使用的一個用法就是經過匿名內部類來建立線程執行簡單任務,避免寫太多的類,外部須要的變量能夠經過加final修飾符後傳入, 代碼例子以下: 

Java代碼  

public static void testThreadWithRunnable()  
{  
    final String word = "hello,world";  
    new Thread(new Runnable() {  
  
        @Override  
        public void run() {  
            System.out.println(word);  
  
        }  
    }).start();  
}  
  
public static void main(String[] args)  
{  
    //periodicalThreadTest();  
  
    testThreadWithRunnable();  
  
}
上面的代碼會打印 

hello,world

說一說java的concurrent包8–用在一個lock上的多個Condition 

concurrent系列的前一篇說到說一說java的concurrent包7–thread和runnable,如今繼續,今天介紹下Condtion這個接口,能夠用在一個lock上的多個不一樣的狀況; 

在jdk的線程同步代碼中,不管的synchronized關鍵字,或者是lock上的await/signal等,都只能在一個鎖上作同步通知; 
假設有3個線程,要對一個資源作同步,通常只能有一個鎖來作同步通知操做,那麼通知的時候沒法作到精確的通知3個線程中的某一個的; 
由於你調用了wait()/notify()的時候,具體的調度是jvm決定的; 

可是有的時候的確須要須要對一個鎖作多種不一樣狀況的精確通知, 好比一個緩存,滿了和空了是兩種不一樣的狀況,能夠分別通知取數據的線程和放數據的線程; 

Condition的基本使用以下: 
* Condition是個接口,基本的方法就是await()和signal()方法; 
* Condition依賴於Lock接口,生成一個Condition的基本代碼是lock.newCondition() 
* 調用Condition的await()和signal()方法,都必須在lock保護以內,就是說必須在lock.lock()和lock.unlock之間才能夠 
* 和Object.wait()方法同樣,每次調用Condition的await()方法的時候,當前線程就自動釋放了對當前鎖的擁有權 

固然,Condition實際上是個接口,上面說的這幾點,在實現Condition的時候能夠自由控制一點;可是jdk的javadoc說了,若是有啥特別的實現,必需要清楚的說明的; 

下一節我會結合具體的代碼來介紹下Condition的使用;

說一說java的concurrent包9–Condition的代碼例子BoundedBuffer 

面說了Condition的基本含義,今天這篇說下Condition的一個代碼例子; 
javadoc裏面對Condition有一個絕佳的例子,BoundedBuffer類,就是一個線程安全的有界限的緩存;很是巧妙的利用了Condition,根據來通知不一樣的線程作不一樣的事情; 
下面先看下具體代碼: 

Java代碼  

class BoundedBuffer {  
  
   final Lock lock = new ReentrantLock();  
  
   final Condition notFull  = lock.newCondition();   
  
   final Condition notEmpty = lock.newCondition();   
  
  
  
   final Object[] items = new Object[100];  
  
   int putptr, takeptr, count;  
  
  
  
   public void put(Object x) throws InterruptedException {  
  
     lock.lock();  
  
     try {  
  
       while (count == items.length)   
  
         notFull.await();  
  
       items[putptr] = x;   
  
       if (++putptr == items.length) putptr = 0;  
  
       ++count;  
  
       notEmpty.signal();  
  
     } finally {  
  
       lock.unlock();  
  
     }  
  
   }  
  
  
  
   public Object take() throws InterruptedException {  
  
     lock.lock();  
  
     try {  
  
       while (count == 0)   
  
         notEmpty.await();  
  
       Object x = items[takeptr];   
  
       if (++takeptr == items.length) takeptr = 0;  
  
       --count;  
  
       notFull.signal();  
  
       return x;  
  
     } finally {  
  
       lock.unlock();  
  
     }  
  
   }   
  
 }
代碼意思不復雜,一個有界的buffer,裏面是個數組,能夠往裏面放數據和取數據; 
因爲該buffer被多個線程共享,因此每次放和取操做的時候都用一個lock保護起來; 
每次取數據(take)的時候, 
a. 若是當前個數是0(用一個count計數), 那麼就調用notEmpty.await等待,鎖就釋放了; 
b. 取數據的索引專門有一個,每次向前一步; 若是到頭了就從0開始循環使用 
c.若是有數據,那就取一個數據,將count減1,同時調用notfull.signal(), 

每次放數據(put)的時候 
a.若是count和length相等,也就是滿了,那就調用notFull.await等待,釋放了鎖; 等待有一些take()調用完成以後纔會進入 
b. 放數據也有一個索引putptr, 放入數據; 若是到頭了也從0開始循環使用 

c. 調用notempty.signal(); 若是有線程在take()的時候await住了,那麼就會被通知到,能夠繼續進行操做

說一說java的concurrent包10–Condition和BoundedBuffer的測試代碼

前面一篇說了Condition和BoundedBuffer的基本代碼,下面寫一個簡單的程序測試下這個BoundedBuffer; 

前面一篇說了Condition和BoundedBuffer的基本代碼,下面寫一個簡單的程序測試下這個BoundedBuffer; 

這段程序的目的是測試先put()後take()的操做, 
1. 我將BoundedBuffer的大小設置成5,同時在每次進入notFull和notEmpty的await()的時候打印一下表示當前線程正在等待; 
2. 先開啓10個線程作put()操做,預計有5個線程能夠完成,另外5個會進入等待 
3. 主線程sleep10秒中,而後啓動10個線程作take()操做; 

這個時候,首先第一個take()必然成功完成,在這以前等待的5個put()線程都不會被喚醒, 接下來的事情就很差說了; 
剩下的5個put()線程和9個take()線程中的任何一個均可能會被jvm調度; 
好比可能出現 
a. 開始take()的時候,有5個連續的take()線程完成操做; 而後又進入put()和take()交替的狀況 
b. 第一個take()以後,馬上會有一個put()線程被notFull().signal()喚醒; 而後繼續有take()和put()交替的狀況; 

其中take()線程也可能進入notEmpty.await()操做; 
可是任什麼時候候,未完成的take()線程始終>=未完成的put()線程, 這個也是很天然的; 



Java代碼  

package com.hetaoblog.concurrent.test;  
  
  
  
import java.util.Date;  
  
import java.util.concurrent.CountDownLatch;  
  
import java.util.concurrent.locks.Condition;  
  
import java.util.concurrent.locks.Lock;  
  
import java.util.concurrent.locks.ReentrantLock;  
  
  
  
import org.junit.Test;  
  
  
  
public class BoundedBufferTest {  
  
      
  
      
  
      
  
    @Test  
  
    public void testPutTake()  
  
    {  
  
          
  
        final BoundedBuffer bb = new BoundedBuffer();  
  
          
  
          
  
        int count = 10;  
  
        final CountDownLatch c = new CountDownLatch(count * 2);  
  
          
  
          
  
        System.out.println(new Date() + " now try to call put for " + count );  
  
          
  
        for(int i = 0; i < count ; ++i)  
  
        {  
  
            final int index = i;  
  
            try {  
  
                Thread t = new Thread(new Runnable() {  
  
                      
  
                    @Override  
  
                    public void run() {  
  
                          
  
                        try {  
  
                            bb.put(index);  
  
                            System.out.println(new Date() + "  put finished:  " + index);  
  
                        } catch (InterruptedException e) {  
  
                              
  
                            e.printStackTrace();  
  
                        }  
  
                          
  
                        c.countDown();  
  
                    }  
  
                });  
  
  
  
                t.start();  
  
                  
  
                  
  
            } catch (Exception e) {  
  
                  
  
                e.printStackTrace();  
  
            }  
  
        }  
  
          
  
        try {  
  
            System.out.println(new Date() + " main thread is going to sleep for 10 seconds");  
  
            Thread.sleep(10 * 1000);  
  
        } catch (InterruptedException e1) {  
  
              
  
            e1.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " now try to take for count: " + count);  
  
          
  
          
  
        for(int i =0; i < count; ++i)  
  
        {  
  
              
  
            Thread t= new Thread(new Runnable() {  
  
                  
  
                @Override  
  
                public void run() {  
  
                      
  
                    try {  
  
                          
  
                          
  
                        Object o = bb.take();  
  
                          
  
                        System.out.println(new Date() + " take get: " + o);  
  
                    } catch (InterruptedException e) {  
  
                          
  
                        e.printStackTrace();  
  
                    }  
  
                      
  
                      
  
                      
  
                      
  
                      
  
                      
  
                    c.countDown();  
  
                      
  
                }  
  
            });  
  
              
  
            t.start();  
  
        }  
  
          
  
        try {  
  
              
  
            System.out.println(new Date() + ": main thread is to wait for all threads");  
  
            c.await();  
  
              
  
              
  
        } catch (InterruptedException e) {  
  
              
  
            e.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " all threads finished");  
  
          
  
    }  
  
  
  
}  
  
class BoundedBuffer {  
  
       final Lock lock = new ReentrantLock();  
  
       final Condition notFull  = lock.newCondition();   
  
       final Condition notEmpty = lock.newCondition();   
  
  
  
       final Object[] items = new Object[5];  
  
       int putptr, takeptr, count;  
  
  
  
       public void put(Object x) throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == items.length)   
  
           {  
  
               System.out.println(new Date() + " put  is to wait....");  
  
             notFull.await();  
  
           }  
  
           items[putptr] = x;   
  
           if (++putptr == items.length) putptr = 0;  
  
           ++count;  
  
           notEmpty.signal();  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }  
  
  
  
       public Object take() throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == 0)  
  
           {  
  
               System.out.println(new Date() + " take is going to wait..");  
  
             notEmpty.await();  
  
           }  
  
           Object x = items[takeptr];   
  
           if (++takeptr == items.length) takeptr = 0;  
  
           --count;  
  
           notFull.signal();  
  
           return x;  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }   
  
     }
下面是這段程序在我機器上的運行結果: 

這是其中一個執行結果,正好對應前面說的狀況a, 5個take()先完成;這裏出現了take()線程調用notEmpty.await()的狀況 
Thu Mar 15 21:15:13 CST 2012 now try to call put for 10 
Thu Mar 15 21:15:13 CST 2012 put finished: 0 
Thu Mar 15 21:15:13 CST 2012 put finished: 2 
Thu Mar 15 21:15:13 CST 2012 put finished: 3 
Thu Mar 15 21:15:13 CST 2012 put finished: 1 
Thu Mar 15 21:15:13 CST 2012 main thread is going to sleep for 10 seconds 
Thu Mar 15 21:15:13 CST 2012 put finished: 4 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:13 CST 2012 put is to wait.... 
Thu Mar 15 21:15:23 CST 2012 now try to take for count: 10 
Thu Mar 15 21:15:23 CST 2012 take get: 3 
Thu Mar 15 21:15:23 CST 2012 take get: 2 
Thu Mar 15 21:15:23 CST 2012 take get: 1 
Thu Mar 15 21:15:23 CST 2012 take get: 0 
Thu Mar 15 21:15:23 CST 2012 take get: 4 
Thu Mar 15 21:15:23 CST 2012 put finished: 5 
Thu Mar 15 21:15:23 CST 2012: main thread is to wait for all threads 
Thu Mar 15 21:15:23 CST 2012 take is going to wait.. 
Thu Mar 15 21:15:23 CST 2012 take get: 5 
Thu Mar 15 21:15:23 CST 2012 put finished: 6 
Thu Mar 15 21:15:23 CST 2012 put finished: 8 
Thu Mar 15 21:15:23 CST 2012 put finished: 7 
Thu Mar 15 21:15:23 CST 2012 put finished: 9 
Thu Mar 15 21:15:23 CST 2012 take get: 6 
Thu Mar 15 21:15:23 CST 2012 take get: 7 
Thu Mar 15 21:15:23 CST 2012 take get: 8 
Thu Mar 15 21:15:23 CST 2012 take get: 9 
Thu Mar 15 21:15:23 CST 2012 all threads finished 

這是另外一個執行結果: 
Thu Mar 15 21:02:49 CST 2012 now try to call put for 10 
Thu Mar 15 21:02:49 CST 2012 put finished: 3 
Thu Mar 15 21:02:49 CST 2012 put finished: 1 
Thu Mar 15 21:02:49 CST 2012 put finished: 0 
Thu Mar 15 21:02:49 CST 2012 put finished: 2 
Thu Mar 15 21:02:49 CST 2012 put finished: 4 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 main thread is going to sleep for 10 seconds 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:49 CST 2012 put is to wait.... 
Thu Mar 15 21:02:59 CST 2012 now try to take for count: 10 
Thu Mar 15 21:02:59 CST 2012 take get: 1 
Thu Mar 15 21:02:59 CST 2012 take get: 0 
Thu Mar 15 21:02:59 CST 2012 take get: 3 
Thu Mar 15 21:02:59 CST 2012 take get: 4 
Thu Mar 15 21:02:59 CST 2012: main thread is to wait for all threads 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 put finished: 5 
Thu Mar 15 21:02:59 CST 2012 take get: 2 
Thu Mar 15 21:02:59 CST 2012 take get: 5 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 take is going to wait.. 
Thu Mar 15 21:02:59 CST 2012 put finished: 7 
Thu Mar 15 21:02:59 CST 2012 put finished: 6 
Thu Mar 15 21:02:59 CST 2012 put finished: 8 
Thu Mar 15 21:02:59 CST 2012 put finished: 9 
Thu Mar 15 21:02:59 CST 2012 take get: 7 
Thu Mar 15 21:02:59 CST 2012 take get: 6 
Thu Mar 15 21:02:59 CST 2012 take get: 8 
Thu Mar 15 21:02:59 CST 2012 take get: 9 
Thu Mar 15 21:02:59 CST 2012 all threads finished 

執行結果2: 
Thu Mar 15 21:14:30 CST 2012 now try to call put for 10 
Thu Mar 15 21:14:30 CST 2012 main thread is going to sleep for 10 seconds 
Thu Mar 15 21:14:30 CST 2012 put finished: 8 
Thu Mar 15 21:14:30 CST 2012 put finished: 6 
Thu Mar 15 21:14:30 CST 2012 put finished: 2 
Thu Mar 15 21:14:30 CST 2012 put finished: 0 
Thu Mar 15 21:14:30 CST 2012 put finished: 4 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:30 CST 2012 put is to wait.... 
Thu Mar 15 21:14:40 CST 2012 now try to take for count: 10 
Thu Mar 15 21:14:40 CST 2012 take get: 8 
Thu Mar 15 21:14:40 CST 2012 take get: 6 
Thu Mar 15 21:14:40 CST 2012 take get: 4 
Thu Mar 15 21:14:40 CST 2012 take get: 2 
Thu Mar 15 21:14:40 CST 2012: main thread is to wait for all threads 
Thu Mar 15 21:14:40 CST 2012 take get: 0 
Thu Mar 15 21:14:40 CST 2012 take is going to wait.. 
Thu Mar 15 21:14:40 CST 2012 take is going to wait.. 
Thu Mar 15 21:14:40 CST 2012 take is going to wait.. 
Thu Mar 15 21:14:40 CST 2012 put finished: 1 
Thu Mar 15 21:14:40 CST 2012 put finished: 5 
Thu Mar 15 21:14:40 CST 2012 put finished: 3 
Thu Mar 15 21:14:40 CST 2012 put finished: 9 
Thu Mar 15 21:14:40 CST 2012 take get: 1 
Thu Mar 15 21:14:40 CST 2012 put finished: 7 
Thu Mar 15 21:14:40 CST 2012 take get: 5 
Thu Mar 15 21:14:40 CST 2012 take get: 3 
Thu Mar 15 21:14:40 CST 2012 take get: 7 
Thu Mar 15 21:14:40 CST 2012 take get: 9 
Thu Mar 15 21:14:40 CST 2012 all threads finished 

在幾回不一樣的執行中,始終能夠觀察到任什麼時候候,未完成的take()線程數>= 未完成的put()線程; 在未完成的線程數相等的狀況下,即便jvm首先調度到了take()線程,也會進入notEmpty.await()釋放鎖,進入等待

說一說java的concurrent包11–Condition和BoundedBuffer的測試代碼2 

前面一篇說了一個Condition和BoundedBuffer的測試代碼例子,前面測試的是先put()再take()的操做,這篇說一下先take()再put()的操做; 

前面一篇說了一個Condition和BoundedBuffer的測試代碼例子,前面測試的是先put()再take()的操做,這篇說一下先take()再put()的操做; 
固然,必須先要說明的是,這篇和前面這篇在打印日誌的時候實際上是有錯誤的,這個錯誤在前面一篇並不明顯,不會致使明顯的問題; 
可是一樣的緣由致使如今這個先take()再put()的操做會出現明顯的錯誤,看上去會顯得難以想象; 
具體狀況留到下一篇詳細說明,這裏先上測試目的,測試代碼和運行結果; 
同時說明多線程編程須要很是謹慎,不然極易出錯 

測試目的: 
1. 我將BoundedBuffer的大小設置成5,同時在每次進入notFull和notEmpty的await()的時候打印一下表示當前線程正在等待; 
2. 先開啓10個線程作take()操做,因爲開始BoundedBuffer裏面沒有東西,因此10個線程所有調用await進入等待 
3. 主線程sleep10秒中,而後啓動10個線程作put()操做; 
在第一個put()完成以後,接下來應該會有部分put()線程和take()線程前後完成; 
理論上, 
a. 任何一個元素的put()都會發生在take()以前; 
b. 若是X表示某個操做成功的次數,在X(put)-X(take)<5的時候,put線程不會進入等待狀態 


下面是測試代碼: 



Java代碼  

    @Test  
  
    public void testTakePut()  
  
    {  
  
  
  
        final BoundedBuffer bb = new BoundedBuffer();  
  
          
  
          
  
        int count = 10;  
  
        final CountDownLatch c = new CountDownLatch(count * 2);  
  
          
  
  
  
        System.out.println(new Date() + " first try to call take for count: " + count);  
  
        for(int i =0; i < count; ++i)  
  
        {  
  
            final int index = i;  
  
            Thread t= new Thread(new Runnable() {  
  
                  
  
                @Override  
  
                public void run() {  
  
                      
  
                    try {  
  
                          
  
                        Thread.currentThread().setName(" TAKE " + index);  
  
                          
  
                          
  
                        Object o = bb.take();  
  
                          
  
                        System.out.println(new Date() + " " + " take get: " + o );  
  
                    } catch (InterruptedException e) {  
  
                          
  
                        e.printStackTrace();  
  
                    }  
  
                      
  
                      
  
                      
  
                      
  
                      
  
                      
  
                    c.countDown();  
  
                      
  
                }  
  
            });  
  
              
  
            t.start();  
  
        }  
  
          
  
        try {  
  
            System.out.println(new Date() + " main thread is going to sleep for 10 seconds");  
  
            Thread.sleep(10 * 1000);  
  
        } catch (InterruptedException e1) {  
  
              
  
            e1.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " now try to call put for " + count );  
  
          
  
        for(int i = 0; i < count ; ++i)  
  
        {  
  
            final int index = i;  
  
            try {  
  
                Thread t = new Thread(new Runnable() {  
  
                      
  
                    @Override  
  
                    public void run() {  
  
                          
  
                          
  
                        Thread.currentThread().setName(" PUT " + index);  
  
                          
  
                        try {  
  
                            bb.put(index);  
  
                            System.out.println(new Date() + " " + "  put finished:  " + index );  
  
                        } catch (InterruptedException e) {  
  
                              
  
                            e.printStackTrace();  
  
                        }  
  
                          
  
                        c.countDown();  
  
                    }  
  
                });  
  
  
  
                t.start();  
  
                  
  
                  
  
            } catch (Exception e) {  
  
                  
  
                e.printStackTrace();  
  
            }  
  
        }  
  
          
  
          
  
        try {  
  
              
  
            System.out.println(new Date() + ": main thread is to wait for all threads");  
  
            c.await();  
  
              
  
              
  
        } catch (InterruptedException e) {  
  
              
  
            e.printStackTrace();  
  
        }  
  
          
  
        System.out.println(new Date() + " all threads finished");  
  
    }  
  
  
  
  
  
class BoundedBuffer {  
  
       final Lock lock = new ReentrantLock();  
  
       final Condition notFull  = lock.newCondition();   
  
       final Condition notEmpty = lock.newCondition();   
  
  
  
       final Object[] items = new Object[5];  
  
       int putptr, takeptr, count;  
  
  
  
       public void put(Object x) throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == items.length)   
  
           {  
  
               System.out.println(new Date() + " " + Thread.currentThread().getName() + " put  is to wait....: " + System.currentTimeMillis());    
  
             notFull.await();  
  
               
  
           }  
  
           items[putptr] = x;   
  
           if (++putptr == items.length) putptr = 0;  
  
           ++count;  
  
             
  
           notEmpty.signal();  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }  
  
  
  
       public Object take() throws InterruptedException {  
  
         lock.lock();  
  
         try {  
  
           while (count == 0)  
  
           {  
  
               System.out.println(new Date() + " " + Thread.currentThread().getName() + " take is going to wait.. " + System.currentTimeMillis());    
  
             notEmpty.await();  
  
               
  
           }  
  
           Object x = items[takeptr];   
  
           if (++takeptr == items.length) takeptr = 0;  
  
           --count;  
  
             
  
             
  
             
  
           notFull.signal();  
  
           return x;  
  
         } finally {  
  
           lock.unlock();  
  
         }  
  
       }   
  
     }
運行結果1:  Fri Mar 16 20:50:10 CST 2012 first try to call take for count: 10  Fri Mar 16 20:50:10 CST 2012 TAKE 0 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 1 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 2 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 3 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 5 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 main thread is going to sleep for 10 seconds  Fri Mar 16 20:50:10 CST 2012 TAKE 4 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 7 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 6 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 9 take is going to wait..  Fri Mar 16 20:50:10 CST 2012 TAKE 8 take is going to wait..  Fri Mar 16 20:50:20 CST 2012 now try to call put for 10  Fri Mar 16 20:50:20 CST 2012: main thread is to wait for all threads  Fri Mar 16 20:50:20 CST 2012 PUT 7 put finished: 7  Fri Mar 16 20:50:20 CST 2012 PUT 9 put finished: 9  Fri Mar 16 20:50:20 CST 2012 PUT 8 put finished: 8  Fri Mar 16 20:50:20 CST 2012 PUT 3 put is to wait....  Fri Mar 16 20:50:20 CST 2012 PUT 1 put is to wait....  Fri Mar 16 20:50:20 CST 2012 PUT 5 put finished: 5  Fri Mar 16 20:50:20 CST 2012 PUT 4 put is to wait....  Fri Mar 16 20:50:20 CST 2012 TAKE 0 take get: 8  Fri Mar 16 20:50:20 CST 2012 TAKE 2 take get: 9  Fri Mar 16 20:50:20 CST 2012 TAKE 3 take get: 0  Fri Mar 16 20:50:20 CST 2012 TAKE 5 take get: 6  Fri Mar 16 20:50:20 CST 2012 TAKE 4 take get: 5  Fri Mar 16 20:50:20 CST 2012 PUT 2 put finished: 2  Fri Mar 16 20:50:20 CST 2012 PUT 3 put finished: 3  Fri Mar 16 20:50:20 CST 2012 PUT 1 put finished: 1  Fri Mar 16 20:50:20 CST 2012 TAKE 7 take get: 2  Fri Mar 16 20:50:20 CST 2012 TAKE 6 take get: 3  Fri Mar 16 20:50:20 CST 2012 TAKE 9 take get: 1  Fri Mar 16 20:50:20 CST 2012 TAKE 8 take get: 4  Fri Mar 16 20:50:20 CST 2012 PUT 6 put finished: 6  Fri Mar 16 20:50:20 CST 2012 PUT 0 put finished: 0  Fri Mar 16 20:50:20 CST 2012 PUT 4 put finished: 4  Fri Mar 16 20:50:20 CST 2012 TAKE 1 take get: 7  Fri Mar 16 20:50:20 CST 2012 all threads finished  注意到紅色部分:  第一個加爲紅色是由於按照打印結果,put()只完成了3次,就開始有put()進入等待了,而BoundedBuffer的大小是5,理論上應該沒有滿的!  第二個加爲紅色是由於元素4居然先被take,而後再被put! 顯然程序有地方出錯了!具體緣由分析,歡迎關注核桃博客:)
相關文章
相關標籤/搜索