Java系列筆記(6) - 併發(上)

目錄html

1,基本概念java

2,volatileandroid

3,atom程序員

4,ThreadLocal算法

5,CountDownLatch和CyclicBarrier數據庫

6,信號量編程

7,Condition數組

8,Exchanger緩存

 

在Java中,JVM、併發、容器、IO/NIO是我認爲最重要的知識點,本章將介紹其中的併發,這也是從「會Java」到精通Java所必須經歷的一步。本章承接上一張《Java系列筆記(5) - 線程》,其中介紹了Java線程的相關知識,是本章介紹內容的基礎,若是對於線程不熟悉的,能夠先閱讀如下這篇博客瞭解一下。安全

在上一篇博客《線程》中,咱們講到了Java的內存模型,事實上,Java內存模型的創建,是圍繞着一個原則來進行的:在保證線程間合做的基礎上,避免線程的不良影響。而這一原則,也是本章所介紹的併發機制的最終目的。

本文大量參考了系列文章《深刻淺出Java Concurrency,http://www.blogjava.net/xylz/archive/2010/07/08/325587.html》,這是一系列十分優秀也十分明晰的文章,我在學習java併發過程當中,對這個系列的文章讀了不少遍,這個系列的文章做者寫的很容易理解並且很詳盡,想要進一步理解java併發的同窗,能夠仔細去拜讀一下《深刻淺出Java Concurrency》這一系列文章。

在原來的計劃中,鎖和Condition等概念是放在一塊兒的,不過限於篇幅問題,拆成兩個部分,上部分是本文,講解基本的併發概念、CountDownLatch,原子類、信號量等類,下部分集中講鎖和併發異常。

本文在編寫過程當中,參考、引用和總結了《深刻淺出Java Concurrency》的內容以及其餘Java併發書籍博客的內容,篇幅有限,因此可能總結的不到位,敬請指正。

 

1,基本概念


Java併發的重要性毋庸置疑,Java併發的設計目的在於3個方面:

簡單,意味着程序員儘量少的操做底層或者實現起來要比較容易;

高效,意味着耗用資源要少,程序處理速度要快;

線程安全,意味着在多線程下能保證數據的正確性。

在Java併發中,有幾個常見概念,須要在講述併發以前進行解釋:

 

臨界資源和臨界區

臨界資源是通常是一種內存資源,一個時刻只容許一個進程(在java中,是線程)訪問,一個線程正在使用臨界資源的時候,另外一個線程不能使用。臨界資源是非可剝奪性資源,即便是操做系統(或JVM)也沒法阻止這種資源的獨享行爲。

臨界區是一種進程中範文臨界資源的那段程序代碼,注意,是程序代碼,不是內存資源了,這就是臨界資源與臨界區的區別。咱們規定臨界區的使用原則(也即同步機制應遵循的準則)十六字訣:「空閒讓進,忙則等待,有限等待,讓權等待」–strling。讓咱們分別來解釋一下:

(1)空閒讓進:臨界資源空閒時必定要讓進程進入,不發生「互斥禮讓」行爲。

(2)忙則等待:臨界資源正在使用時外面的進程等待。

(3)有限等待:進程等待進入臨界區的時間是有限的,不會發生「餓死」的狀況。

(4)讓權等待:進程等待進入臨界區是應該放棄CPU的使用。

併發

狹義的只就Java而言,Java多線程在訪問同一資源時,出現競爭的問題,叫作併發問題,Java併發模型是圍繞着在併發過程當中如何處理原子性、可見性、有序性這3個特徵來設計的。

 

線程安全

若是一個操做序列,不考慮耗時和資源消耗,在單線程執行和多線程執行的狀況下,最終獲得的結果永遠是相同的,則這個操做序列叫作線程安全的。

若是存在不相同的機率,則就是非線程安全的。

 

原子性(Atomicity)

若是一個操做時不可分割的,那就是一個原子操做,也叫這個操做具備原子性。相反的,一個操做時能夠分割的(如a++,它其實是a=a+1),則就是非原子操做;原子操做是線程安全的,非原子操做都是非線程安全的,可是咱們能夠經過同步技術(lock)或同步數據模型(Concurrent容器等)把非原子操做序列變成線程安全的原子操做。

事實上,java併發主要研究的就是3個方面的問題:

1,怎麼更好的使用原子操做;

2,怎麼把非原子操做變得線程安全;

3,怎麼提升原子操做和非原子操做的效率並減小資源消耗。

 

可見性(Visibility)

一個變量被多個線程共享,若是一個線程修改了這個變量的值,其它線程可以當即得知這個修改,則咱們稱這個修改具備可見性。

(可參考上一章《Java系列筆記(5)-線程》中的Java線程內存模型部分),Java線程內存模型的設計,是每一個線程擁有一份本身的工做內存,當變量修改以後,將新值同步到主內存。可是對於普通變量而言,這種同步,並不能保證及時性,因此可能出現工做內存以及更改,主內存還沒有同步的狀況。

Java中,最簡單的方法是使用volatile實現強制同步,它的實現方式是保證了變量在修改後當即同步到主內存,且每次使用該變量前,都先從主內存刷新該值。

另外,能夠採用synchronized或final關鍵字實現可見性;

synchronized的實現原理在於,一個變量若是要執行unlock操做,必須先把改變量同步到主內存中(執行store和write)。所以一個變量若是被synchronized實現強制同步,則即便不用volatile,也能夠實現強制可見性。

final的實現原理在於,一個變量被final修飾,則其值(或引用地址)不能夠再被修改,因此其它線程若是能看到,也只是能看到這個變量的這個惟一值(對於對象而言,是惟一引用)。

須要注意,一個操做被保證了可見性,並不必定能保證原子性,好比:

volatile int a;
a++;

在上面這段代碼中,a是知足可見性的,可是a++仍然不是原子性操做。當有多個線程執行a++時,仍然存在併發問題。

 

有序性(Ordering)

Java線程的有序性,表現爲兩個方面:

在一個線程內部觀察,全部操做都是有序的,全部指令按照「串行(as-if-serial,字面意思是「像排了序同樣」,as-if-serial的真正含義是無論怎麼重排序,一個單線程程序的執行結果都必須相同)」 的方式執行。

 

在線程間觀察,也就是從某個線程觀察另外一個線程,則全部其餘線程均可以交叉並行執行,是正序的,惟一例外的是被同步方法、同步塊、volatile等字段修飾的強制同步的代碼,須要在線程間保持有序。

 

注:關於指令重排序、as-if-serial、happens-before等,能夠參考上一章《Java系列筆記(5)-線程》,也能夠參考網上的衆多資料,這裏再也不敘述。

 

JUC

java.util.concurrent包,這個包是從JDK1.5開始引入的,在此以前,這個包獨立存在着,它是由Doug Lea開發的,名字叫backport-util-concurrent,在1.5開始引入java,命名路徑爲java.util.concurrent,其中的基本實現方式,也有所改變。主要包括如下類:(來源於:深刻淺出Java Concurreny(http://www.blogjava.net/xylz/archive/2010/06/30/324915.html))

 

JNI

Java native interface,java本地方法接口,因爲Java自己是與平臺無關的,因此在性能等方面有可能存在影響(雖然隨着java的發展,這種狀況不多),爲了解決這種問題,使用C/C++編寫了JNI接口,在java中能夠直接調用這些代碼的產生的機器碼,從而避免嚴重影響性能的代碼段。關於JNI,能夠參考這篇文章:http://www.cnblogs.com/mandroid/archive/2011/06/15/2081093.html

 

CAS

CAS,compare and swap,比較和替換(也有人直接理解爲compare and set,實際上是同樣的)。CAS是一種樂觀鎖作法,並且整個JUC的實現都是基於CAS機制的。

若是直接用synchronized加鎖,這是一種悲觀鎖作法,所謂悲觀鎖,就是悲觀的認爲線程是絕對不安全的,必須保證在swap值以前,沒有任何其它線程操做當前值。synchronized是一種獨佔鎖,性能受限於這種悲觀策略。這一點將在後面詳述。

而CAS是一種樂觀鎖機制,所謂樂觀鎖,就是相信在compare 和swap之間,被其它線程影響的可能性不大,只要compare校驗經過,就能夠進行swap。

在Java中,compareAndSet的基本代碼以下:

1 public final boolean compareAndSet(int expect, int update) {
2   return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
3 }

 

從代碼中看,java的compareAndSet使用使用JNI中的unsafe接口來實現的,這是由於,現代CPU基本都提供了特殊的指令,可以作到自動更新共享數據的同時,檢測其它線程的干擾,也就是說,CPU自己提供了compareAndSet功能。因此才能提供JNI的CAS接口。

有了JNI的CAS接口,基於該接口的JUC就能得到更高性能。

在 Intel 處理器中,比較並交換經過指令cmpxchg實現。比較是否和給定的數值一致,若是一致則修改,不一致則不修改。

 

AQS

AbstractQueuedSynchronizer,簡稱AQS,是J.U.C最複雜的一個類。這個類是CountDownLatch/FutureTask /ReentrantLock/RenntrantReadWriteLock/Semaphore的基礎,是Lock和Executor實現的前提。參考:(http://www.blogjava.net/xylz/archive/2010/07/06/325390.html)

非阻塞算法

任何一個線程的失敗或掛起不該該影響其餘線程的失敗或掛起的算法叫作非阻塞算法。現代CPU可以提供非阻塞功能,它能夠在自動更新共享數據的同時,檢查其它線程的干擾。

 

2,volatile


正如前面所述,java中volatile字段的做用是保證併發過程當中某個變量的可見性。而volatile保證可見性的方法以下:

1,Java內存模型不會對volatile指令進行重排序,從而保證對volatile變量的執行順序,永遠是按照其出現順序執行的。重排序的依據是happens-before法則,happens-before法則共8條,其中有一條與volatile相關,就是:「對volatile字段的寫入操做happens-before於每個後續的同一個字段的讀操做」。

注:happens-before法則:http://www.blogjava.net/xylz/archive/2010/07/03/325168.html

2,volatile變量不會被緩存在寄存器中(只有擁有線程可見)或者其餘對CPU不可見的地方,每次老是從主存中讀取volatile變量的結果。

不過須要注意的是:volatile字段只能保證可見性,不能保證原子性,也不能保證線程安全。

 

volatile的工做原理

下面這段話摘自《深刻理解Java虛擬機》:

  「觀察加入volatile關鍵字和沒有加入volatile關鍵字時所生成的彙編代碼發現,加入volatile關鍵字時,會多出一個lock前綴指令」

  lock前綴指令實際上至關於一個內存屏障(也成內存柵欄),內存屏障會提供3個功能:

  1)它確保指令重排序時不會把其後面的指令排到內存屏障以前的位置,也不會把前面的指令排到內存屏障的後面;即在執行到內存屏障這句指令時,在它前面的操做已經所有完成;

  2)它會強制將對緩存的修改操做當即寫入主存;

  3)若是是寫操做,它會致使其餘CPU中對應的緩存行無效。

上面的說法解釋了volatile的工做原理的起源。不過,建議你們複習一下本系列文章第3章JVM內存分配和第5章線程的內容,來理解下面的解釋。與前面這兩章中宏觀的講解內存分配和線程內存模型相區別,下面的部分專一於解析java內存模型和volatile的工做原理,但也能更好的理解之前的知識:

注:下面的圖參考了:http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html,其中描述了內存模型的6種操做,比上一章中介紹的8種操做少了lock、unlock 2 種,這6種操做都是原子性的。

 

在上圖中,若是是普通變量:

1,變量值從主內存(在堆中)load到本地內存(在當前線程的棧楨中);

2,以後,線程就再也不和該變量在主內存中的值由任何關係,而是直接操做在副本變量上(這樣速度很快),這時,若是主存中的count或本地內存中的副本發生任何變化,都不會影響到對方,也正是這個緣由致使併發狀況下出現數據不一致;

3,在修改完的某個時刻(線程退出以前),自動把本地內存中的變量副本值回寫到對象在堆中的對應變量。

 

這6步操做中:

read和load是將主存變量加載到當前本地內存;

use和assign是執行線程代碼,改變副本值,能夠屢次執行;

store和write是用本地內存回寫到主存;

 

若是是volatile修飾的變量:

volatile仍然在執行一個從主存加載到工做內存,而且將變動的值寫回主存的操做,可是:

1,volatile保證每次讀取該變量前,都判斷當前值是否已經失效(便是否已經與主存不一致),若是已經失效,則從主存load最新的變量;

2,volatile保證每次對該變量作出修改時,都當即寫入主存;

 

須要注意的是,雖然volatile保證了上面的特性,可是它只是保證了可見性,卻沒有保證原子性,也就是說,read-load-use-assign-store-write,這些操做序列組合起來仍然是非原子操做。舉個例子:

共享變量當前在主存中的值爲count=10;線程1和線程2都對該值進行自增操做,按以下步驟進行:

1,線程1和2都讀取最新值,獲得值爲count=10;

2,線程1被阻塞;

3,線程2執行自增,寫回count=11;

4,線程1喚醒,因爲以前已經完成了讀取變量的操做,因此這裏直接進行自增。因而也自增到11,回寫主存,最終count=11;

與咱們指望的兩次自增count=12衝突;

目前來講,要保證原子性,只能經過synchronized、Lock接口、Atomic*來實現。

 

說了這麼多,有同窗可能會問,爲何volatile這也不行那也不行,陷阱這麼多,咱們還要用它呢?

volatile相對於synchronized,最大的好處是某些狀況下它的性能高,並且使用起來直觀簡便。並且,若是你的「代碼自己能保證原子性」,那麼用volatile是個不錯的選擇:

這裏所說的代碼自己能保證原子性,是指:

1,對變量的寫操做,不依賴於當前的值(就是說,不會先讀取當前值,而後在當前值的基礎上進行改變,好比,不是自增,而是賦值);

2,變量沒有包含在其它變量的不變式中(這一點不是很好理解,能夠參考這裏:http://www.ibm.com/developerworks/cn/java/j-jtp06197.html)

 

一個最多見的volatile的應用場景是boolean的共享狀態標誌位,或者單例模式的雙重檢查鎖(參考Java併發編程:volatile關鍵字解析,http://www.cnblogs.com/dolphin0520/p/3920373.html)

 

另外,有一個關於volatile的常見的坑就是:從上面的描述能夠看出,volatile對於基本數據類型(值直接從主內存向工做內存copy)纔有用。可是對於對象來講,彷佛沒有用,由於volatile只是保證對象引用的可見性,而對對象內部的字段,它保證不了任何事。即使是在使用ThreadLocal時,每一個線程都有一份變量副本,這些副本自己也是存儲在堆中的,線程棧楨中保存的仍然是基本數據類型和變量副本的引用。

因此,千萬不要期望有了volatile修飾對象,對象就會像基本數據類型同樣總體呈現原子性的工做了。

事實上,若是一個對象被volatile修飾,那麼就表示它的引用具備了可見性。從而使得對於變量引用的任何變動,都在線程間可見。

這一點在後面將要介紹的AtomicReference中就有應用。

 

3,Atom


java中,可能有一些場景,操做很是簡單,可是容易存在併發問題,好比i++,此時,若是依賴鎖機制,可能帶來性能損耗等問題,因而,如何更加簡單的實現原子性操做,就成爲java中須要面對的一個問題。

java中的atom操做,好比AtomicInteger,AtomicLong,AtomicBoolean,AtomicReference,AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray;這些操做中旺旺提供一些原子化操做,好比incrementAndGet(至關於i++),compareAndSet(安全賦值)等,相關方法和用法就再也不贅述,網上有不少相似資料,或者直接讀源代碼也很容易懂。

在backport-util-concurrent沒有被引入java1.5併成爲JUC以前,這些原子類和原子操做方法,都是使用synchronized實現的。不過JUC出現以後,這些原子操做基於JNI提供了新的實現,以AtomicInteger爲例,看看它是怎麼作到的:

若是是讀取值,很簡單,將value聲明爲volatile的,就能夠保證在沒有鎖的狀況下,數據是線程可見的:

1     private volatile int value;public final int get() {
2       return value;
3     }

 

那麼,涉及到值變動的操做呢?以AtomicInteger實現:++i爲例:

複製代碼

1     public final int incrementAndGet() {
2      for (;;) {
3        int current = get();
4        int next = current + 1;  
5        if (compareAndSet(current, next))  
6          return next;  
7        }
8     }

複製代碼

 

在這裏採用了CAS操做,每次從內存中讀取數據而後將此數據和+1後的結果進行CAS操做,若是成功就返回結果,不然重試直到成功爲止。

而這裏的comparAndSet(current,next)就是前面介紹CAS的時候所說的依賴JNI實現的樂觀鎖作法:

public final boolean compareAndSet(int expect, int update) {  
       return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

 

除了基本數據類型的原子化操做之外,JUC還提供了數組的原子化、引用的原子化,以及Updater的原子化,分別爲:

下面主要介紹這3類原子化操做爲何要原子化以及分別是怎麼實現的。

 

數組原子化

注意,Java中Atomic*Array,並非對整個數組對象實現原子化(也沒有必要這樣作),而是對數組中的某個元素實現原子化。例如,對於一個整型原子數組,其中的原子方法,都是對每一個元素的:

複製代碼

1 public final int getAndDecrement(int i) {
2       while (true) {
3        int current = get(i);
4        int next = current - 1;
5        if (compareAndSet(i, current, next))
6             return current;
7       }
8 }

複製代碼

 

引用原子化

有些同窗可能會疑惑,引用的操做自己不就是原子的嗎?一個對象的引用,從A切換到B,自己也不會出現非原子操做啊?這種想法自己沒有什麼問題,可是考慮下嘛的場景:對象a,當前執行引用a1,線程X指望將a的引用設置爲a2,也就是a=a2,線程Y指望將a的引用設置爲a3,也就是a=a3。

若是線程X和線程Y都不在乎a究竟是從哪一個引用經過賦值改變過來的,也就是說,他們不在乎a1->a2->a3,或者a1->a3->a2,那麼就徹底沒有關係。

可是,若是他們在意呢?

X要求,a必須從a1變爲a2,也就是說compareAndSet(expect=a1,setValue=a2);Y要求a必須從a1變爲a3,也就是說compareAndSet(expect=a1,setValue=a3)。若是嚴格遵循要求,應該出現X把a的引用設置爲a2後,Y線程操做失敗的狀況,也就是說:

X:a==a1--> a=a2;

Y:a!=a1 --> Exception;

可是若是沒有原子化,那麼Y會直接將a賦值爲a3,從而致使出現髒數據。

 

這就是原子引用AtomicReference存在的緣由。

複製代碼

1      public final V getAndSet(V newValue) {
2            while (true) {
3                V x = get();
4                if (compareAndSet(x, newValue))
5                    return x;
6            }
7        }

複製代碼

注意,AtomicReference要求引用也是volatile的。

Updater原子化

其它幾個Atomic類,都是對被volatile修飾的基本數據類型的自身數據進行原子化操做,可是若是一個被volatile修飾的變量自己已經存在在類中,那要如何提供原子化操做呢?好比,一個Person,其中有個屬性爲age,private volatile int age;,如何對age提供原子化操做呢?

1 private AtomicIntegerFieldUpdater<Person> updater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");  
2 updater.getAndIncrement(5);//加5歲
3 updater.compareAndSet(person, 30, 35)//若是一我的的年齡是30,設置爲35。

 

4,ThreadLocal


對於多線程的Java程序而言,不免存在多線程競爭資源的狀況。對於競爭的資源,解決的方式每每分爲以時間換空間或以空間換時間兩種方式。

1,後者的作法是將一份資源複製成多份,佔用多份空間,可是每一個線程本身訪問本身的資源,從而消除競爭,這種作法是ThreadLocal的作法,它雖然消除了競爭,但它是經過數據隔離的方法實現的,因此被隔離的各份數據是沒法同步的,本節就要介紹這種作法。

2,也有不少資源是沒法複製成多份或者不適合複製成多份的,如打印機資源。所以以時間換空間的作法就是隻有一份資源,你們按照必定的順序串行的去訪問這個資源。這種方式的主要作法,就是在資源上加鎖,加鎖的方法,將在後面第9節介紹。

 

示例

下面經過一個典型的ThreadLocal的應用案例做爲入口,來分析ThreadLocal的原理和用法(更詳細代碼請參考《Java併發編程:深刻剖析ThreadLocal》http://www.cnblogs.com/dolphin0520/p/3920407.html):

設想下面的場景:

編寫一個數據庫鏈接器(或 http session管理器),要求多個線程可以鏈接和關閉數據庫,優先考慮下面的方案:

複製代碼

1     class ConnectionManager {
 2        private static Connection connect = null;
 3        public static Connection openConnection() {
 4            if(connect == null){
 5                connect = DriverManager.getConnection();
 6            }
 7            return connect;
 8        }
 9        public static void closeConnection() {
10            if(connect!=null)
11                connect.close();
12        }
13     }

複製代碼

這個方案中,多個線程公用ConnectionManager.openConnection()和ConnectionManager.closeConnnection(),因爲沒有同步控制,因此很容易出現併發問題,好比,同時建立了多個鏈接,或者線程1openConnection時,線程2剛好在執行closeConnection。

解決這個問題有兩種方案:

1,對connectionManager中openConnection和closeConection加synchronized強制同步。這種方案解決了併發,卻帶來了新問題,因爲synchronized致使了同一只可只有一個線程能訪問被鎖對象,因此其它線程只能等待。

2,去掉ConnectionManager中的static,使得每次訪問Connectionmanager,都必須new一個對象,這樣每一個線程都用本身的獨立對象,相互不影響。eg:

複製代碼

1     public void insert() {
2            ConnectionManager connectionManager = new ConnectionManager();
3            Connection connection = connectionManager.openConnection();
4            
5            //使用connection進行操做
6            
7            connectionManager.closeConnection();
8     }

複製代碼

 

這個確實解決了併發,而且也能夠多線程同步執行,可是它存在嚴重的性能問題,每執行一次操做,就須要new一個對象而後再銷燬。

ThreadLocal的引入,恰當的解決了上面的問題,ThreadLocal不是線程,它是一種變量,不過,它是線程變量的副本,它是一個泛型對象,例如,線程A建立時,初始化了一個對象user,那麼ThreadLocal<User> userLocal就是user在線程A中的一個副本,userLocal中的值在初始時與user相同,可是在線程A運行過程當中,userlocal的任何變化不會同步到user上,不會影響user的值。

若是採用ThreadLocal,上面的數據庫鏈接管理器問題的解決方案是:

複製代碼

1     class ConnectionManager {
 2         private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
 3              public Connection initialValue() {
 4         return DriverManager.getConnection(DB_URL);
 5              }
 6         };
 7         public static Connection getConnection() {
 8              return connectionHolder.get();
 9         }
10        
11        public static void closeConnection() {
12            if(connectionHolder.get()!=null)
13               connectionHolder.get().close();
14        }
15     }

複製代碼

 

ThreadLocal的方法

ThreadLocal提供的方法很簡單,主要有:

  1. public T get() { }
  2. public void set(T value) { }
  3. public void remove() { }
  4. protected T initialValue() { }

 

ThreadLocal的原理

分析ThreadLocal的源代碼(分析過程參考這裏:http://www.cnblogs.com/dolphin0520/p/3920407.html),可得,ThreadLocal的原理是:

1,在每一個線程Thread內部有一個ThreadLocal.ThreadLocalMap類型的成員變量threadLocals,這個threadLocals就是用來存儲實際的變量副本的,鍵值爲當前ThreadLocal變量,value爲變量副本(即T類型的變量)。

2,初始時,在Thread裏面,threadLocals爲空,當經過ThreadLocal變量調用get()方法或者set()方法,就會對Thread類中的threadLocals進行初始化,而且以當前ThreadLocal變量爲鍵值,以ThreadLocal要保存的副本變量爲value,存到threadLocals。

3,注意,通常在get以前,須要先執行set(),以保證threadlocals中有值,若是在get()以前,沒有執行過set(),則ThreadLocal會自動調用setInitialValue()方法,setInitialValue()的源代碼是這樣的:

複製代碼

1         private T setInitialValue() {
 2            T value = initialValue();
 3            Thread t = Thread.currentThread();
 4            ThreadLocalMap map = getMap(t);
 5            if (map != null)
 6                map.set(this, value);
 7            else
 8                createMap(t, value);
 9            return value;
10        }

複製代碼

 

它先取出當前線程的ThreadLocalMap,即threadLocals變量,而後將value set進去,因此,若是沒有提早執行過set方法,initialValue()默認返回的又是null,因此可能致使運行過程當中出現NPE。建議最好在聲明ThreadLocal變量時,重寫initialValue()方法,這樣即便沒有提早執行set,也能有個初始值(如前面ConnectionHolder中的代碼)。

4,而後在當前線程裏面,若是要使用副本變量,就能夠經過get方法在threadLocals裏面查找。

ThreadLocal泛型的變量類型,不能是基本數據類型,只能是類,若是必定要將基本上數據類型作泛型參數,則能夠採用Integer、Long、Double等類。

 

使用ThreadLocal的步驟

1,、在多線程的類(如ThreadDemo類)中,建立一個ThreadLocal<Object>對對象xxxLocal,用來保存線程間須要隔離處理的對象xxx。

二、在ThreadDemo類中,建立一個獲取要隔離訪問的數據的方法getXxx(),在方法中判斷,若ThreadLocal對象爲null時候,應該new()一個隔離訪問類型的對象,並強制轉換爲要應用的類型。

三、在ThreadDemo類的run()方法中,經過getXxx()方法獲取要操做的數據,這樣能夠保證每一個線程對應一個數據對象,在任什麼時候刻都操做的是這個對象。

 

ThreadLocal實現變量副本的方法

ThreadLocal實現變量副本,並無真的將原來的變量clone一份出來,而是採用了一種很靈活的方法,假設對每一個單獨的線程ThreadA而言,當前ThreadLocal爲localXx(這是key),初始外部變量爲va(這是value):

1,第一次執行set時,new了一個Entry(localXx, va),並添加到localXx的ThreadLocalMap中,此時,Entry.value的引用就是指向va的強引用;

2,此時若是執行localXx.get(),會獲得va

3,此時,若是在當前線程ThreadA直接對va執行set操做,仍然會更新外部變量va的值,但若是在另一個線程ThreadB中但願對va進行操做,則因爲此時ThreadB直接執行get獲得的是null,因此沒法訪問va,除非咱們將va聲明爲final的,並set到ThreadB中;

3,後續再進行set時,好比set進來的新值爲va1,則直接替換Entry中的value,獲得Entry(localXx, va1),此時原來的va在ThreadLocal這裏,已經獲得釋放了,當前ThreadLocal跟原來的va已經沒有任何關係了。

4,若是此時再執行get操做,獲得的就是新的va1;

 

從上面的步驟能夠看出,ThreadLocal只是用原變量va作爲初始值,可是它並未真的複製va,後續執行ThreadLocal.set以後,ThreadLocal中存放的已是新set的對象了;

這也是爲何ThreadLocal只能對類對象有效的緣由了,由於它的set,改變的是value的引用。

具體例子能夠參考下面的代碼:

下面的例子中User包含兩個屬性:name、age,重寫了toString方法;

複製代碼

1     public class ThreadLocalTest {
 2      ThreadLocal<User > userLocal = new ThreadLocal <User>();
 3      public void set(User user) {
 4        userLocal.set(user);
 5      }
 6      public User get() {
 7        return userLocal.get();
 8      }
 9       public static void main( String[] args) throws InterruptedException {
10         final ThreadLocalTest test = new ThreadLocalTest();
11         final User user1 = new User( "AAA", 5 );//注意這個user1被聲明成final的了
12         test.set(user1);
13         System.out.println(test.get()); //這裏獲得的是user1的初始值:AAA,5
14         Thread thread1 = new Thread() {
15           public void run() {
16            test.set(user1);
17            test.get().setName( "BBB");//這裏get()獲得的是user1,因此會影響外部主線程
18            System.out.println(test.get()); //BBB,5
19            User user2 = new User("CCC" , 5);
20            test.set(user2); //這裏thread1的ThreadLocal.userLocal中存儲的值變爲user2了,外部主線程中仍然是user1
21            System.out.println(test.get()); // CCC, 5
22            test.get().setName( "DDD");//這裏get()獲得的是user2,不會影響外部主線程
23            System.out.println(test.get()); //DDD,5
24          };
25        };
26        thread1.start();
27        thread1.join();
28        // 這裏獲得的值user1,已經在上面設置BBB的時候已經被更新過了
29        // 可是不會受thread中更新CCC和DDD的影響,因此這裏獲得的是BBB,5
30        System.out.println(test.get());
31      }
32     }

複製代碼

 

獲得的結果爲:

[AAA,5]

[BBB,5]

[CCC,5]

[DDD,5]

[BBB,5]

 

ThreadLocal的內存泄露問題

在第一次將T類型的變量value set到ThreadLocal時,它是將value set到ThreadLocalMap 中去的,可是須要注意ThreadLocalMap並非Map接口的子類,它是一個ThreadLocal的內部類,其中的Entry是一種特殊實現:static class Entry extends WeakReference< ThreadLocal> 

對ThreadLocal.ThreadLocalMap.Entry執行set操做時,若是之前這個Entry(key,value)不存在,則會new一個Entry。若是這個Entry已經存在,則直接替換Entry.value的引用爲新的value;

下面的分析和圖來自於:http://www.cnblogs.com/onlywujun/p/3524675.html

以下圖,每一個thread中都存在一個map, map的類型是ThreadLocal.ThreadLocalMap. Map中的key爲一個threadlocal實例. 這個Map的確使用了WeakReference(虛線),不過弱引用只是針對key. 每一個key都弱引用指向threadlocal. 當把threadlocal實例置爲null之後(或threadLocal實例被GC回收了,弱引用會被回收),沒有任何強引用指向threadlocal實例,因此threadlocal將會被gc回收. 可是,咱們的value卻不能回收,由於存在一條從current thread鏈接過來的強引用. 只有當前thread結束之後, current thread就不會存在棧中,強引用斷開, Current Thread, Map, value將所有被GC回收.

 因此得出一個結論就是隻要這個線程對象被gc回收,就不會出現內存泄露,但在threadLocal設爲null和線程結束這段時間不會被回收的,就發生了咱們認爲的內存泄露。其實這是一個對概念理解的不一致,也沒什麼好爭論的。最要命的是線程對象不被回收的狀況,這就發生了真正意義上的內存泄露。好比使用線程池的時候,線程結束是不會銷燬的,會再次使用的。就可能出現內存泄露。

注意:Java爲了最小化減小內存泄露的可能性和影響,在ThreadLocal的get,set的時候都會執行一個for循環,遍歷其中全部的entiry,清除線程Map裏全部key爲null的value。這也大大減少了出現內存泄露的風險。但最怕的狀況就是,threadLocal對象設null了,開始發生「內存泄露」,而後使用線程池,這個線程結束,線程放回線程池中不銷燬,這個線程一直不被使用,或者分配使用了又再也不調用get,set方法,那麼這個期間就會發生真正的內存泄露。

關於ThreadLocal內存泄露問題的數據,有興趣的能夠參考這裏:http://liuinsect.iteye.com/blog/1827012

 

5,CountDownLatch和CyclicBarrier


CountDownLatch

CountDownLatch是一種Latch(門閂),它的操做相似於泄洪,或聚會。主要有兩種場景:

1,泄洪:即一個門閂(計數器爲1)擋住全部線程,放開後全部線程開始執行。在門閂打開以前,全部線程在池子裏等着,等着這個門閂的計數器減小到0,門閂打開以後,全部線程開始同時執行;

這種場景一個典型例子是併發測試器(啓動多個線程去執行測試用例,一聲令下,同步開始執行,即下面的beginLatch):

複製代碼

1         int threadNum =10; //併發線程數
 2        CountDownLatch beginLatch = new CountDownLatch(1 );// 用於觸發各線程同時開始
 3        CountDownLatch waitLatch = new CountDownLatch(threadNum);// 用於等待各線程執行結束
 4        ExecutorService executor = Executors. newFixedThreadPool(threadNum);
 5        for (int i = 0; i < threadNum; i++) {
 6      Callable<String> thread = new SubTestThread(beginLatch, waitLatch, method, notifier);
 7        executor. submit(thread);
 8       }
 9        beginLatch.countDown(); // 開始執行!
10        waitLatch.await(); // 等待結束
11     private class SubTestThread implements Callable< String> {
12        private CountDownLatch begin;
13        private CountDownLatch wait;
14        private FrameworkMethod method;
15        public SubTestThread(CountDownLatch begin, CountDownLatch wait, FrameworkMethod method) {
16         this.begin = begin;
17         this.wait = wait;
18         this.method = method;
19       }
20        @Override
21        public String call() throws Exception {
22         try {
23          begin.await();
24          runTest(method);
25        } catch (Exception e) {
26       throw e;
27        } finally {
28          wait.countDown();
29        }
30         return null ;
31       }
32      }

複製代碼

 

2,聚會:即N個線程正在執行,一個門閂(計數器爲N)擋住了後續操做,每一個線程執行完畢後,計數器減1,當門閂計數器減到0時,表示全部線程都執行完畢(全部人到齊,party開始),能夠開始執行後續動做了。

這種場景一個典型的例子是記帳彙總,即多個子公司的帳目,都要一一算完以後,才彙總到一塊兒算總帳。上面例子中的waitLatch就是這樣的latch;

 

countDownLatch的真正原理在於latch是一種計數器,它的兩個方法分別是countDown()和await(),其中countDown()是減數1,await()是等待減到0,當每次調用countDown()時,當前latch計數器減1,減到0以前,當前線程的await()會一直卡着(阻塞,WAITING狀態),當計數器減小到0,喚醒當前線程,繼續執行await()後面的代碼;

await(long timeout, TimeUtil unit)是另外一個await方法,特色是能夠指定wait的時間,

-若是超出指定的等待時間,await()再也不等待,返回值爲false;

-若是在指定時間內,計數器減到0,則返回值爲true;

-若是線程在等待中被中斷或進入方法時已經設置了中斷狀態,則拋出InterruptedException異常。

 

CyclicBarrier是一種迴環柵欄,它的做用相似於上面例子中的的waitLatch,即等到多個線程達到同一個點才繼續執行後續操做,如:

複製代碼

1     public class CyclicBarrierTest {
 2     public static class ComponentThread implements Runnable {
 3      CyclicBarrier barrier;// 計數器
 4      int ID; // 組件標識
 5      int[] array; // 數據數組
 6      // 構造方法
 7      public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
 8       this.barrier = barrier;
 9       this.ID = ID;
10       this.array = array;
11      }
12      public void run() {
13       try {
14        array[ID] = new Random().nextInt(100);
15        System.out.println("Component " + ID + " generates: " + array[ID]);
16        // 在這裏等待Barrier處
17        System.out.println("Component " + ID + " sleep");
18        barrier.await();
19        System.out.println("Component " + ID + " awaked");
20        // 計算數據數組中的當前值和後續值
21        int result = array[ID] + array[ID + 1];
22        System.out.println("Component " + ID + " result: " + result);
23       } catch (Exception ex) {
24       }
25      }
26     }
27     /**
28      * 測試CyclicBarrier的用法
29      */
30     public static void testCyclicBarrier() {
31      final int[] array = new int[3];
32      CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
33       // 在全部線程都到達Barrier時執行
34       public void run() {
35        System.out.println("testCyclicBarrier run");
36        array[2] = array[0] + array[1];
37       }
38      });
39      // 啓動線程
40      new Thread(new ComponentThread(barrier, array, 0)).start();
41      new Thread(new ComponentThread(barrier, array, 1)).start();
42     }
43     public static void main(String[] args) {
44      CyclicBarrierTest.testCyclicBarrier();
45     }
46     }

複製代碼

 

可見,cyclicBarrier與countDownLatch的後一種使用方法(聚會)很像,其實二者可以達到相同的目的。區別在於,cyclicBarrier能夠重複使用,也就是說,當一次cyclicBarrier到達彙總點以後,能夠再次開始,每次cyclicbarrier減數到0以後,會觸發彙總任務執行,而後,會把計數器再恢復成原來的值,這也是「迴環」的由來。

CountDownLatch的做用是容許1或N個線程等待其餘線程完成執行;而CyclicBarrier則是容許N個線程相互等待。

在實現方式上也有所不一樣,CountDownLatch是直接基於AQS編寫的,他的await和countDown過程,分別是一次acquireShared和releaseShared的過程;而cyclicBarrier是基於鎖、condition來實現的,讓當前線程阻塞,直到「有parties個線程到達barrier」 或 「當前線程被中斷」 或 「超時」這3者之一發生,當前線程才繼續執行。

(CyclicBarrier的原理,參考:Java多線程系列--「JUC鎖」10之 CyclicBarrier原理和示例:http://www.cnblogs.com/skywang12345/p/3533995.html?utm_source=tuicool)


6,信號量


信號量(Semaphore)與鎖相似,鎖是一次容許一次一個線程訪問(readWrite鎖除外),而信號量用來控制一組資源有多個線程訪問,好比一個店鋪最多能接受5個客戶 ,有10個客戶要求訪問的話,那麼能夠用信號量來控制。

Semaphore能夠控同時訪問的線程個數,經過 acquire() 獲取一個許可,若是沒有就等待,而 release() 釋放一個許可。

Semaphore類位於java.util.concurrent包下,它提供了2個構造器:

複製代碼

1     public Semaphore(int permits) {          //參數permits表示許可數目,即同時能夠容許多少線程進行訪問
2        sync = new NonfairSync(permits);
3     }
4     public Semaphore(int permits, boolean fair) {    //這個多了一個參數fair表示是不是公平的,即等待時間越久的越先獲取許可
5        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
6     }

複製代碼

 

下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()、release()方法:

  1. public void acquire() throws InterruptedException {  }     //獲取一個許可
  2. public void acquire(int permits) throws InterruptedException { }    //獲取permits個許可
  3. public void release() { }          //釋放一個許可
  4. public void release(int permits) { }    //釋放permits個許可

acquire()用來獲取一個許可,若無許可可以得到,則會一直等待,直到得到許可。

release()用來釋放許可。注意,在釋放許可以前,必須先獲得到許可。

這4個方法都會被阻塞,若是想當即獲得執行結果,可使用下面幾個方法:

  1. public boolean tryAcquire() { };    //嘗試獲取一個許可,若獲取成功,則當即返回true,若獲取失敗,則當即返回false
  2. public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //嘗試獲取一個許可,若在指定的時間內獲取成功,則當即返回true,不然則當即返回false
  3. public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若獲取成功,則當即返回true,若獲取失敗,則當即返回false
  4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則當即返回true,不然則當即返回false

另外還能夠經過availablePermits()方法獲得可用的許可數目。

下面經過一個例子來看一下Semaphore的具體使用:

倘若一個工廠有5臺機器,可是有8個工人,一臺機器同時只

能被一個工人使用,只有使用完了,其餘工人才能繼續使用。那麼咱們就能夠經過Semaphore來實現:

複製代碼

1     public class Test {
 2     public static void main(String[] args) {
 3      int N = 8;   //工人數
 4      Semaphore semaphore = new Semaphore(5); //機器數目
 5      for(int i=0;i<N;i++)
 6       new Worker(i,semaphore).start();
 7     }
 8     static class Worker extends Thread{
 9      private int num;
10      private Semaphore semaphore;
11      public Worker(int num,Semaphore semaphore){
12       this.num = num;
13       this.semaphore = semaphore;
14      }
15      @Override
16      public void run() {
17       try {
18        semaphore.acquire();
19        System.out.println("工人"+this.num+"佔用一個機器在生產...");
20        Thread.sleep(2000);
21        System.out.println("工人"+this.num+"釋放出機器");
22        semaphore.release();    
23       } catch (InterruptedException e) {
24        e.printStackTrace();
25       }
26      }
27     }
28     }

複製代碼

 

執行結果:

工人0佔用一個機器在生產... 

工人1佔用一個機器在生產... 

工人2佔用一個機器在生產... 

工人4佔用一個機器在生產... 

工人5佔用一個機器在生產... 

工人0釋放出機器 工人2釋放出機器 

工人3佔用一個機器在生產... 

工人7佔用一個機器在生產... 

工人4釋放出機器 工人5釋放出機器 

工人1釋放出機器 工人6佔用一個機器在生產... 

工人3釋放出機器 工人7釋放出機器 

工人6釋放出機器

7,Condition


在上一章「Java系列筆記(5)-線程」中,咱們曾經說過,線程間通訊並非靠消息,而是靠共享內存,不過,本節要介紹一種更加高效的通訊方式:Condition。

Condition 與上一章介紹的線程間通訊的wait、notify等方法有類似之處,但也有不一樣。其類似之處在於,都創建與鎖的基礎上,

wait、notify都是在同步代碼塊中,創建在synchronized所做用的對象上。

而Condition直接做用在Lock對象上,所以創建一個Condition對象,必須經過lock.newCondition()來構造。

  1. Lock lock = new ReentrantLock();  
  2. Condition condition = lock.newCondition();

 在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通訊方式,Condition均可以實現。

複製代碼

1     lock.lock();    //synchronized
 2      try {  
 3        while(bool) {                
 4           condition.await();//this.wait();  
 5        }  
 6        System.out.println("this is condition test" );
 7        condition.signal();//this.notify();  
 8     } finally {  
 9        lock.unlock();  
10     }

複製代碼

 

並且,對於wait、notify機制而言,只能做用於當前同步代碼塊,不能創建多重通訊條件,而,使用Condition機制,能夠創建多重通訊條件。

下面的例子,是一個頗有意思的併發緩衝區,其中用Condition創建了兩個條件,一個寫條件,一個讀條件,這個例子的具體用法和意義,參考這裏:http://blog.csdn.net/ghsau/article/details/7481142

 

複製代碼

1 class BoundedBuffer {  
 2       final Lock lock = new ReentrantLock();//鎖對象  
 3       final Condition notFull  = lock.newCondition();//寫線程條件  
 4       final Condition notEmpty = lock.newCondition();//讀線程條件  
 5      
 6       final Object[] items = new Object[100];//緩存隊列  
 7       int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/;  
 8      
 9       public void put(Object x) throws InterruptedException {  
10         lock.lock();  
11         try {  
12           while (count == items.length)//若是隊列滿了  
13             notFull.await();//阻塞寫線程  
14           items[putptr] = x;//賦值  
15           if (++putptr == items.length) putptr = 0;//若是寫索引寫到隊列的最後一個位置了,那麼置爲0  
16           ++count;//個數++  
17           notEmpty.signal();//喚醒讀線程  
18         } finally {  
19           lock.unlock();  
20         }  
21       }  
22      
23       public Object take() throws InterruptedException {  
24         lock.lock();  
25         try {  
26           while (count == 0)//若是隊列爲空  
27             notEmpty.await();//阻塞讀線程  
28           Object x = items[takeptr];//取值  
29           if (++takeptr == items.length) takeptr = 0;//若是讀索引讀到隊列的最後一個位置了,那麼置爲0  
30           --count;//個數--  
31           notFull.signal();//喚醒寫線程  
32           return x;  
33         } finally {  
34           lock.unlock();  
35         }  
36       }  
37     }

複製代碼

 

可見,用兩個條件,能夠靈活的肯定應該喚醒寫線程仍是讀線程,這就是使用Condition的靈活之處。

8,Exchanger


從JDK1.5開始,Java開始提供一個叫Exchanger的工具套件,這個工具套件,能夠真正用於兩個線程之間交換數據。

Exchanger類容許在2個線程間定義同步點,當2個線程到達這個點,他們相互交換數據類型,使用第一個線程的數據類型變成第二個的,而後第二個線程的數據類型變成第一個的。

Exchanger提供的方法很是簡單,其接口就是兩個方法:

 

  1. public V exchange(V x) throws InterruptedException
  2. public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

從官方的javadoc能夠知道,當一個線程到達exchange調用點時,若是它的夥伴線程此前已經調用了此方法,那麼它的夥伴會被調度喚醒並與之進行 對象交換,而後各自返回。若是它的夥伴還沒到達交換點,那麼當前線程將會被掛起,直至夥伴線程到達——完成交換正常返回;或者當前線程被中斷——拋出中斷 異常;又或者是等候超時——拋出超時異常。

Exchanger如今用的並很少,由於它的場景比較特定,並且,就算是真的有數據交換,用到Exchanger的地方,也能夠用其它更加直觀的方式替代,好比用共享變量+鎖同步的方式,所以Exchanger在實際使用中比較少見,有興趣的同窗,能夠參考這一篇文章:http://lixuanbin.iteye.com/blog/2166772

參考資料


《深刻理解Java虛擬機:JVM高級特效與最佳實現》

深刻理解Java虛擬機筆記---原子性、可見性、有序性 :http://blog.csdn.net/xtayfjpk/article/details/41969915?utm_source=tuicool

深刻淺出 Java Concurrency (1) : J.U.C的總體認識 : http://www.blogjava.net/xylz/archive/2010/06/30/324915.html

JAVA基礎之理解JNI原理:http://www.cnblogs.com/mandroid/archive/2011/06/15/2081093.html

深刻淺出 Java Concurrency (7): 鎖機制 part 2 AQS:http://www.blogjava.net/xylz/archive/2010/07/06/325390.html

Java併發編程:volatile關鍵字解析:http://www.cnblogs.com/dolphin0520/p/3920373.html

java中volatile關鍵字的含義:http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html

Java 理論與實踐: 正確使用 Volatile 變量: http://www.ibm.com/developerworks/cn/java/j-jtp06197.html

Java Atomic: http://wsmajunfeng.iteye.com/blog/1520705

聊聊併發(二)——Java SE1.6中的Synchronized:http://www.infoq.com/cn/articles/java-se-16-synchronized

Java併發編程:深刻剖析ThreadLocal》http://www.cnblogs.com/dolphin0520/p/3920407.html

ThreadLocal可能引發的內存泄露:http://www.cnblogs.com/onlywujun/p/3524675.html

ThreadLocal內存泄露分析:http://liuinsect.iteye.com/blog/1827012

CountDownLatch的介紹和使用:http://www.itzhai.com/the-introduction-and-use-of-a-countdownlatch.html

Java併發編程:CountDownLatch、CyclicBarrier和Semaphore:http://www.cnblogs.com/dolphin0520/p/3920397.html

CyclicBarrier使用詳解:http://xijunhu.iteye.com/blog/713433

Java系列筆記:http://www.cnblogs.com/skywang12345/p/java_threads_category.html

用信號量解決進程的同步與互斥探討: http://blog.jobbole.com/86709/

Java線程(九):Condition-線程通訊更高效的方式:http://blog.csdn.net/ghsau/article/details/7481142

java.util.concurrent.Exchanger應用範例與原理淺析:http://lixuanbin.iteye.com/blog/2166772

相關文章
相關標籤/搜索