Java併發編程實戰筆記3:基礎構建模塊

在上文已經說明,委託是構造線程安全類的一個最有效策略,也就是讓現有的線程安全類管理全部的狀態便可。如下將介紹這些基礎構建模塊。html

同步容器類

同步容器類包括Vector和Hashtable以及由Collections.synchronizedXxx等工廠方法建立的同步封裝器類。這些類實現線程安全的方式是:將它們的狀態封裝起來,並對每一個公有方法都進行同步,使得每次只有一個線程能訪問容器的狀態。同步容器對全部容器狀態的訪問都串行化,嚴重下降了併發性;當多個線程競爭鎖時,吞吐量嚴重降低。java

同步容器類存在的問題

同步容器類都是線程安全的,可是在某些狀況下可能須要額外的客戶端加鎖來保護複合操做。git

好比,在Vecotr中,getLast()和deleteLast()操做,若是是在多線程的環境下運行,若是不加鎖,會產生異常狀況。一個線程在getLast()後,另外一個線程deleteLast(),而後該線程繼續執行,進行deleteLast()操做,此時會拋出下標越界的異常。編程

又好比,在迭代的過程當中,使用get(index)的操做,若是有多個線程運行,可能會刪除其中元素,一樣會形成異常。設計模式

對於如上的狀況,咱們須要經過客戶端加鎖來解決線程安全的問題。如在迭代時加鎖:數組

synchronized(vector){
    for(int i=0;i<vector.size();i++){
        vector.get(i);
    }
}
複製代碼

迭代器

在迭代或者for-each循環語法時,對容器類進行迭代的標準方式都是使用Iterator。然而,在設計同步容器類的迭代器時並無考慮到併發修改的問題,而且它們表現出的行爲時「及時失敗」的,也就是當它們發現容器在迭代過程當中被修改時,就會拋出ConcurrentModificationException。緩存

若是在迭代期間,對容器加鎖,首先會下降效率,提升線程的等待時間;而後還可能會產生死鎖;下降了吞吐量和CPU的利用率。安全

若是不但願在迭代期間加鎖,可使用克隆容器的方法,並在克隆副本上進行迭代。多線程

加鎖能夠防止迭代器拋出ConcurrentModificationException,可是要在全部對容器進行迭代的地方都要加鎖。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器爲參數時,都會對容器進行迭代。這些間接的迭代操做可能拋出ConcurrentModificationException。併發

併發容器

Java 5.0提供了多種併發容器類來改進同步容器的性能。同步容器對全部容器狀態的訪問都串行化,嚴重下降了併發性;當多個線程競爭鎖時,吞吐量嚴重降低。

併發容器是針對多個線程併發訪問設計的。經過併發容器來替代同步容器,能夠極大地提升伸縮性並下降風險。併發容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。

ConcurrentHashMap

同步容器類在執行每一個操做期間都持有一個鎖。ConcurrentHashMap採用了不一樣的加鎖策略來提供更高的併發性和伸縮性。它並非將每一個方法都在同一個鎖上同步,而是使用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制稱爲分段鎖。

分段鎖機制使得任意數量的讀取線程能夠併發訪問Map,執行讀取操做的線程和執行寫入操做的線程能夠併發訪問Map,而且必定數量的寫入線程能夠併發地修改Map,所以提升了併發訪問的吞吐量。

併發容器加強了同步容器類,它們提供的迭代器不會拋出ConcurrentModificationException,所以不須要在迭代過程當中對容器加鎖。其迭代器具備弱一致性,能夠容忍併發的修改,在建立迭代器時會遍歷已有元素,並能夠(可是不保證)在迭代器被構造後將修改操做反映給容器。size(),isEmpty()等方法返回的是一個近似值。

因爲ConcurrentHashMap與Hashtable和synchronizedMap有更多的優點,所以大多數狀況應該使用併發容器類,至於當須要對整個容器加鎖進行獨佔訪問時,才應該放棄使用併發容器。

注意,此時不能再經過客戶端加鎖新建新的原子操做了,客戶端只能對併發容器自身加鎖,但併發容器內部使用的並非自身鎖。

CopyOnWriteArrayList

寫入時複製容器,在每次修改時都會加鎖並建立和從新發佈一個新的容器副本,直接修改容器引用,從而實現可見性。 寫操做在一個複製的數組上進行,讀操做仍是在原始數組中進行,讀寫分離,互不影響。寫操做須要加鎖,防止併發寫入時致使寫入數據丟失。寫操做結束以後須要把原始數組指向新的複製數組。

CopyOnWriteArrayList 在寫操做的同時容許讀操做,大大提升了讀操做的性能,所以很適合讀多寫少的應用場景。 可是 CopyOnWriteArrayList 有其缺陷:

  • 內存佔用:在寫操做時須要複製一個新的數組,使得內存佔用爲原來的兩倍左右;
  • 數據不一致:讀操做不能讀取實時性的數據,由於部分寫操做的數據還未同步到讀數組中。

阻塞隊列

阻塞隊列支持生產者-消費者模式。簡化了開發過程,消除了生產者和消費者之間的代碼依賴性。阻塞隊列簡化了生產者-消費者設計的實現過程。一種常見的生產者-消費者設計模式就是線程池與工做隊列的組合。

阻塞隊列提供了四種處理方法:

  1. 拋出異常,使用add(e)插入,remove()刪除,element()查詢。當阻塞隊列滿時,插入元素;當隊列空,刪除元素都會拋出異常。
  2. 返回特殊值,使用offer(e)插入,poll()刪除,peek()查詢。插入時,若是成功返回true,移除時,若是沒有對應的元素返回null。
  3. 阻塞,使用put(e)插入,take()刪除。隊列滿,插入元素時會阻塞;隊列空,取元素會阻塞。
  4. 超時退出:使用offer(e,time,unit)插入,poll(time,unit)刪除。當隊列滿時,會阻塞,超過必定的時間,線程會退出。

阻塞隊列有多種實現。

  • ArrayBlokcingQueue和LinkedBlockingQueue分別是數組和鏈表結構組成的有界的FIFO阻塞隊列。
  • PriorityBlockingQueue是一個支持優先級排序的無界阻塞隊列。
  • SynchronousQueue是一個不存儲元素的阻塞隊列,它不會爲隊列中元素維護存儲空間。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

雙端隊列與工做密取

Java 6提供了Dqueue和BlockingDeque,是雙端隊列,實現了在隊列頭和隊列尾的高效插入和移除。雙端隊列適用於工做密取模式。在工做密取中,每一個消費者都有各自的雙端隊列。若是一個消費者完成了本身的雙端隊列的所有工做,能夠從其餘消費者雙端隊列末尾祕密的獲取工做。由於工做者線程不會再單個共享的任務隊列上發生競爭。適用於既是生產者又是消費者問題。

阻塞方法與中斷方法

線程會阻塞或暫停執行。被阻塞的線程必須等待某個不受它控制的事件發生後才能繼續執行。當在代碼中調用一個能夠拋出InterruptedException的方法時,本身的方法就編程了阻塞方法,必須處理中斷的響應。若是這個方法被中斷,那麼它將努力提早結束狀態。

處理中斷的響應有兩種基本選擇:

  1. 傳遞InterruptedException,把該異常拋出給方法的調用者。
  2. 恢復中斷,捕獲異常,並調用當前線程的interrupt方法恢復中斷,引起更高層的代碼中斷。
public void run(){
    try{
        something();
    }catch(InterruptedException e){
        Thread.currentThread().interrupt();
    }
}
複製代碼

同步工具類

同步工具類能夠是任何一個對象,只要它根據其自身的狀態來協調線程的控制流。包括阻塞隊列,信號量,柵欄以及閉鎖。

閉鎖

閉鎖用來確保某些活動直到其餘活動都完成了才繼續執行。若是有多個線程,其中一個線程須要等到其餘全部線程活動結束後才繼續執行,使用閉鎖。

CountDownLatch是一種閉鎖的實現,可使得一個或者多個線程等待一組事情發生。包括一個計數器,表示須要等待的事件數量;countDown方法用來遞減計數器,表示有一個事件已經發生了;await方法等待計數器爲0,表示全部須要等待的事情已經發生。

// 初始化閉鎖,並設置資源個數
CountDownLatch latch = new CountDownLatch(2);

Thread t1 = new Thread( new Runnable(){
    public void run(){
        // 加載資源1
        加載資源的代碼……
        // 本資源加載完後,閉鎖-1
        latch.countDown();
    }
} ).start();

Thread t2 = new Thread( new Runnable(){
    public void run(){
        // 加載資源2
        資源加載代碼……
        // 本資源加載完後,閉鎖-1
        latch.countDown();
    }
} ).start();

Thread t3 = new Thread( new Runnable(){
    public void run(){
        // 本線程必須等待全部資源加載完後才能執行
        latch.await();
        // 當閉鎖數量爲0時,await返回,執行接下來的任務
        任務代碼……
    }
} ).start();
複製代碼

柵欄(同步屏障)

閉鎖是一次性對象,一旦進入終止狀態,就不能被重置。柵欄相似於閉鎖,能阻塞一組進程直到某個時間發生。柵欄與閉鎖的區別在於,全部線程必須同時到達柵欄位置,才能繼續執行。

如有多條線程,他們到達屏障時將會被阻塞,只有當全部線程都到達屏障時才能打開屏障,全部線程同時執行,如有這樣的需求可使用同步屏障。此外,當屏障打開的同時還能指定執行的任務。

閉鎖只會阻塞一條線程,目的是爲了讓該條任務線程知足條件後執行; 而同步屏障會阻塞全部線程,目的是爲了讓全部線程同時執行(實際上並不會同時執行,而是儘可能把線程啓動的時間間隔降爲最少)。

// 建立同步屏障對象,並制定須要等待的線程個數 和 打開屏障時須要執行的任務
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
    public void run(){
        //當全部線程準備完畢後觸發此任務
    }
});

// 啓動三條線程
for( int i=0; i<3; i++ ){
    new Thread( new Runnable(){
        public void run(){
            // 等待,(每執行一次barrier.await,同步屏障數量-1,直到爲0時,打開屏障)
            barrier.await();
            // 任務
            任務代碼……
        }
    } ).start();
}
複製代碼

信號量

信號量用於控制同時訪問某個特定資源的操做數量,或者執行某個指定操做的數量。計數信號量還能夠用來實現某種資源池,或者對容器施加邊界。

信號量能夠用於實現資源池,也能夠用於將容器變爲有界阻塞容器。信號量管理着一組虛擬的許可,在執行操做時首先獲取許可,並在使用之後釋放許可。若是沒有許可,將阻塞直到有許可或被中斷,超時。

信號量的使用場景是,有m個資源,n個線程,且n>m,同一時刻只能容許m條線程訪問資源。

// 建立信號量對象,並給予3個資源
Semaphore semaphore = new Semaphore(3);

// 開啓10條線程
for ( int i=0; i<10; i++ ) {
    new Thread( new Runnbale(){
        public void run(){
            // 獲取資源,若此時資源被用光,則阻塞,直到有線程歸還資源
            semaphore.acquire();
            // 任務代碼
            ……
            // 釋放資源
            semaphore.release();
        }
    } ).start();
}
複製代碼

FutureTask

能夠用做閉鎖,是一種能夠生成結果的Runnable,能夠處於如下三種狀態:等待運行,正在運行和運行完成。當FutureTask進入完成狀態後,它會中止在這個狀態上。

FutureTask在Executor框架中表示異步任務,此外還能夠用來表示一些時間較長的運算,這些計算能夠在使用計算結構以前啓動。

實戰:構建緩存

首先,使用HashMap和同步機制來初始化緩存。

public interface Computable<A,V> {
    V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunc implements Computable<String,BigInteger> {

    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        return new BigInteger(arg);
    }
}
public class Memoizer1<A,V> implements Computable<A,V> {
    private final Map<A,V> cache=new HashMap<>();
    private final Computable<A,V> c;

    public Memoizer1(Computable<A,V> c){
        this.c=c;
    }

    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result=cache.get(arg);
        if(result==null){
            result=c.compute(arg);
            cache.put(arg,result);
        }
        return result;
    }
}
複製代碼

在這種實現方法中,使用HashMap保存以前計算的結果。首先檢查須要的結果是否已經在緩存中,若是存在則返回以前計算,不然將計算結果緩存到HashMap再返回。

爲了確保線程安全,將整個compute方法進行同步。可是這樣伸縮性差,緩存的性能並無獲得提高。

下面使用ConcurrentHashMap替換HashMap。可是,這種方法存在一些不足,當兩個線程同時調用compute時,可能會致使計算獲得相同的值。這樣是低效的,由於緩存的做用就是避免相同的數據被計算屢次。其問題在於,若是某個線程啓動了一個計算,而其餘線程並不知道這個計算正在進行,極可能會重複這個計算。

針對如上問題,咱們考慮可使用FutureTask來解決。使用該類來表示計算的過程,若是有結果可用,則返回結果,不然一直阻塞。

public class Memo2 <A,V> implements Computable<A,V>{
    private final Map<A,Future<V>> cache=new ConcurrentHashMap<>();
    private final Computable<A,V>c;
    public Memo2(Computable<A,V>c){
        this.c=c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> future=cache.get(arg);
        if(future==null){
            Callable<V> eval=new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft=new FutureTask<>(eval);
            future=cache.putIfAbsent(arg,ft);
            if(future==null){
                future=ft;
                ft.run();
            }
        }
        try{
            return future.get();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
        return null;
    }

}
複製代碼

參考資料

相關文章
相關標籤/搜索