在上文已經說明,委託是構造線程安全類的一個最有效策略,也就是讓現有的線程安全類管理全部的狀態便可。如下將介紹這些基礎構建模塊。html
同步容器類包括Vector和Hashtable以及由Collections.synchronizedXxx
等工廠方法建立的同步封裝器類。這些類實現線程安全的方式是:將它們的狀態封裝起來,並對每一個公有方法都進行同步,使得每次只有一個線程能訪問容器的狀態。同步容器對全部容器狀態的訪問都串行化,嚴重下降了併發性;當多個線程競爭鎖時,吞吐量嚴重降低。java
同步容器類都是線程安全的,可是在某些狀況下可能須要額外的客戶端加鎖來保護複合操做。git
好比,在Vecotr中,getLast()和deleteLast()操做,若是是在多線程的環境下運行,若是不加鎖,會產生異常狀況。一個線程在getLast()後,另外一個線程deleteLast(),而後該線程繼續執行,進行deleteLast()操做,此時會拋出下標越界的異常。編程
又好比,在迭代的過程當中,使用get(index)的操做,若是有多個線程運行,可能會刪除其中元素,一樣會形成異常。設計模式
對於如上的狀況,咱們須要經過客戶端加鎖來解決線程安全的問題。如在迭代時加鎖:數組
synchronized(vector){
for(int i=0;i<vector.size();i++){
vector.get(i);
}
}
複製代碼
在迭代或者for-each循環語法時,對容器類進行迭代的標準方式都是使用Iterator。然而,在設計同步容器類的迭代器時並無考慮到併發修改的問題,而且它們表現出的行爲時「及時失敗」的,也就是當它們發現容器在迭代過程當中被修改時,就會拋出ConcurrentModificationException。緩存
若是在迭代期間,對容器加鎖,首先會下降效率,提升線程的等待時間;而後還可能會產生死鎖;下降了吞吐量和CPU的利用率。安全
若是不但願在迭代期間加鎖,可使用克隆容器的方法,並在克隆副本上進行迭代。多線程
加鎖能夠防止迭代器拋出ConcurrentModificationException,可是要在全部對容器進行迭代的地方都要加鎖。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器爲參數時,都會對容器進行迭代。這些間接的迭代操做可能拋出ConcurrentModificationException。併發
Java 5.0提供了多種併發容器類來改進同步容器的性能。同步容器對全部容器狀態的訪問都串行化,嚴重下降了併發性;當多個線程競爭鎖時,吞吐量嚴重降低。
併發容器是針對多個線程併發訪問設計的。經過併發容器來替代同步容器,能夠極大地提升伸縮性並下降風險。併發容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。
同步容器類在執行每一個操做期間都持有一個鎖。ConcurrentHashMap採用了不一樣的加鎖策略來提供更高的併發性和伸縮性。它並非將每一個方法都在同一個鎖上同步,而是使用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制稱爲分段鎖。
分段鎖機制使得任意數量的讀取線程能夠併發訪問Map,執行讀取操做的線程和執行寫入操做的線程能夠併發訪問Map,而且必定數量的寫入線程能夠併發地修改Map,所以提升了併發訪問的吞吐量。
併發容器加強了同步容器類,它們提供的迭代器不會拋出ConcurrentModificationException,所以不須要在迭代過程當中對容器加鎖。其迭代器具備弱一致性,能夠容忍併發的修改,在建立迭代器時會遍歷已有元素,並能夠(可是不保證)在迭代器被構造後將修改操做反映給容器。size(),isEmpty()等方法返回的是一個近似值。
因爲ConcurrentHashMap與Hashtable和synchronizedMap有更多的優點,所以大多數狀況應該使用併發容器類,至於當須要對整個容器加鎖進行獨佔訪問時,才應該放棄使用併發容器。
注意,此時不能再經過客戶端加鎖新建新的原子操做了,客戶端只能對併發容器自身加鎖,但併發容器內部使用的並非自身鎖。
寫入時複製容器,在每次修改時都會加鎖並建立和從新發佈一個新的容器副本,直接修改容器引用,從而實現可見性。 寫操做在一個複製的數組上進行,讀操做仍是在原始數組中進行,讀寫分離,互不影響。寫操做須要加鎖,防止併發寫入時致使寫入數據丟失。寫操做結束以後須要把原始數組指向新的複製數組。
CopyOnWriteArrayList 在寫操做的同時容許讀操做,大大提升了讀操做的性能,所以很適合讀多寫少的應用場景。 可是 CopyOnWriteArrayList 有其缺陷:
阻塞隊列支持生產者-消費者模式。簡化了開發過程,消除了生產者和消費者之間的代碼依賴性。阻塞隊列簡化了生產者-消費者設計的實現過程。一種常見的生產者-消費者設計模式就是線程池與工做隊列的組合。
阻塞隊列提供了四種處理方法:
阻塞隊列有多種實現。
Java 6提供了Dqueue和BlockingDeque,是雙端隊列,實現了在隊列頭和隊列尾的高效插入和移除。雙端隊列適用於工做密取模式。在工做密取中,每一個消費者都有各自的雙端隊列。若是一個消費者完成了本身的雙端隊列的所有工做,能夠從其餘消費者雙端隊列末尾祕密的獲取工做。由於工做者線程不會再單個共享的任務隊列上發生競爭。適用於既是生產者又是消費者問題。
線程會阻塞或暫停執行。被阻塞的線程必須等待某個不受它控制的事件發生後才能繼續執行。當在代碼中調用一個能夠拋出InterruptedException的方法時,本身的方法就編程了阻塞方法,必須處理中斷的響應。若是這個方法被中斷,那麼它將努力提早結束狀態。
處理中斷的響應有兩種基本選擇:
public void run(){
try{
something();
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
複製代碼
同步工具類能夠是任何一個對象,只要它根據其自身的狀態來協調線程的控制流。包括阻塞隊列,信號量,柵欄以及閉鎖。
閉鎖用來確保某些活動直到其餘活動都完成了才繼續執行。若是有多個線程,其中一個線程須要等到其餘全部線程活動結束後才繼續執行,使用閉鎖。
CountDownLatch是一種閉鎖的實現,可使得一個或者多個線程等待一組事情發生。包括一個計數器,表示須要等待的事件數量;countDown方法用來遞減計數器,表示有一個事件已經發生了;await方法等待計數器爲0,表示全部須要等待的事情已經發生。
// 初始化閉鎖,並設置資源個數
CountDownLatch latch = new CountDownLatch(2);
Thread t1 = new Thread( new Runnable(){
public void run(){
// 加載資源1
加載資源的代碼……
// 本資源加載完後,閉鎖-1
latch.countDown();
}
} ).start();
Thread t2 = new Thread( new Runnable(){
public void run(){
// 加載資源2
資源加載代碼……
// 本資源加載完後,閉鎖-1
latch.countDown();
}
} ).start();
Thread t3 = new Thread( new Runnable(){
public void run(){
// 本線程必須等待全部資源加載完後才能執行
latch.await();
// 當閉鎖數量爲0時,await返回,執行接下來的任務
任務代碼……
}
} ).start();
複製代碼
閉鎖是一次性對象,一旦進入終止狀態,就不能被重置。柵欄相似於閉鎖,能阻塞一組進程直到某個時間發生。柵欄與閉鎖的區別在於,全部線程必須同時到達柵欄位置,才能繼續執行。
如有多條線程,他們到達屏障時將會被阻塞,只有當全部線程都到達屏障時才能打開屏障,全部線程同時執行,如有這樣的需求可使用同步屏障。此外,當屏障打開的同時還能指定執行的任務。
閉鎖只會阻塞一條線程,目的是爲了讓該條任務線程知足條件後執行; 而同步屏障會阻塞全部線程,目的是爲了讓全部線程同時執行(實際上並不會同時執行,而是儘可能把線程啓動的時間間隔降爲最少)。
// 建立同步屏障對象,並制定須要等待的線程個數 和 打開屏障時須要執行的任務
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
public void run(){
//當全部線程準備完畢後觸發此任務
}
});
// 啓動三條線程
for( int i=0; i<3; i++ ){
new Thread( new Runnable(){
public void run(){
// 等待,(每執行一次barrier.await,同步屏障數量-1,直到爲0時,打開屏障)
barrier.await();
// 任務
任務代碼……
}
} ).start();
}
複製代碼
信號量用於控制同時訪問某個特定資源的操做數量,或者執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界。
信號量能夠用於實現資源池,也能夠用於將容器變爲有界阻塞容器。信號量管理着一組虛擬的許可,在執行操做時首先獲取許可,並在使用之後釋放許可。若是沒有許可,將阻塞直到有許可或被中斷,超時。
信號量的使用場景是,有m個資源,n個線程,且n>m,同一時刻只能容許m條線程訪問資源。
// 建立信號量對象,並給予3個資源
Semaphore semaphore = new Semaphore(3);
// 開啓10條線程
for ( int i=0; i<10; i++ ) {
new Thread( new Runnbale(){
public void run(){
// 獲取資源,若此時資源被用光,則阻塞,直到有線程歸還資源
semaphore.acquire();
// 任務代碼
……
// 釋放資源
semaphore.release();
}
} ).start();
}
複製代碼
能夠用做閉鎖,是一種能夠生成結果的Runnable,能夠處於如下三種狀態:等待運行,正在運行和運行完成。當FutureTask進入完成狀態後,它會中止在這個狀態上。
FutureTask在Executor框架中表示異步任務,此外還能夠用來表示一些時間較長的運算,這些計算能夠在使用計算結構以前啓動。
首先,使用HashMap和同步機制來初始化緩存。
public interface Computable<A,V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunc implements Computable<String,BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
return new BigInteger(arg);
}
}
public class Memoizer1<A,V> implements Computable<A,V> {
private final Map<A,V> cache=new HashMap<>();
private final Computable<A,V> c;
public Memoizer1(Computable<A,V> c){
this.c=c;
}
@Override
public synchronized V compute(A arg) throws InterruptedException {
V result=cache.get(arg);
if(result==null){
result=c.compute(arg);
cache.put(arg,result);
}
return result;
}
}
複製代碼
在這種實現方法中,使用HashMap保存以前計算的結果。首先檢查須要的結果是否已經在緩存中,若是存在則返回以前計算,不然將計算結果緩存到HashMap再返回。
爲了確保線程安全,將整個compute方法進行同步。可是這樣伸縮性差,緩存的性能並無獲得提高。
下面使用ConcurrentHashMap替換HashMap。可是,這種方法存在一些不足,當兩個線程同時調用compute時,可能會致使計算獲得相同的值。這樣是低效的,由於緩存的做用就是避免相同的數據被計算屢次。其問題在於,若是某個線程啓動了一個計算,而其餘線程並不知道這個計算正在進行,極可能會重複這個計算。
針對如上問題,咱們考慮可使用FutureTask來解決。使用該類來表示計算的過程,若是有結果可用,則返回結果,不然一直阻塞。
public class Memo2 <A,V> implements Computable<A,V>{
private final Map<A,Future<V>> cache=new ConcurrentHashMap<>();
private final Computable<A,V>c;
public Memo2(Computable<A,V>c){
this.c=c;
}
@Override
public V compute(A arg) throws InterruptedException {
Future<V> future=cache.get(arg);
if(future==null){
Callable<V> eval=new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft=new FutureTask<>(eval);
future=cache.putIfAbsent(arg,ft);
if(future==null){
future=ft;
ft.run();
}
}
try{
return future.get();
}catch (ExecutionException e){
e.printStackTrace();
}
return null;
}
}
複製代碼