從0學習java併發編程實戰-讀書筆記-基礎構建模塊(4)

同步容器類

同步容器類包括Vector和Hashtable,這兩個是早期JDK的一部分。此外還包括在JDK1.2中添加Collections.synchronizedXxx等工廠方法,這些類實現線程安全的方式是:將它們的狀態封裝起來,並對公有方法進行同步,使得每次只有一個線程能訪問容器的狀態。java

同步容器類的問題

同步容器類都是線程安全的,可是某些複合操做須要額外的客戶端加鎖來保護,常見的複合操做:算法

  • 迭代:反覆訪問元素,直到遍歷完容器中的全部元素
  • 跳轉:根據指定順序找到當前元素的下一個元素
  • 條件運算:例如「若沒有就添加」

因爲同步容器類要遵照同步策略,即支持客戶端加鎖,所以可能會創造一些新的操做,只要咱們知道應該使用哪一鎖,那麼這些新操做就與其餘操做同樣都是原子操做。數組

for (int i = 0; i < vector.size();i++){
    doSomething(vector.get(i))
 }

一個線程訪問,一個線程刪除,在這兩個線程交替執行的時候,代碼就可能出現問題,將拋出ArrayIndexOutOfBoundsException。
解決方式是將怎麼for循環加鎖,鎖爲vector對象。安全

迭代器與ConcurrentModificationException

不管是直接迭代仍是jdk5引入的for-each語法,對容器的標準訪問方式就是使用Iterator(迭代器),可是Iterator並無考慮到併發修改的問題,迭代器使用的策略是快速失敗(fail-fast)。當迭代器發現容器在迭代過程被修改了,就會拋出一個ConcurrentModificationException異常。
這種快速失敗機制並不算是完備的處理機制,只是捕獲了可能會出現併發錯誤,只能做爲一個併發問題預警指示器。要想避免ConcurrentModificationException異常,就必須在迭代過程持有容器的鎖。多線程

隱藏迭代器

在某些狀況下,迭代器會隱藏起來。例如`javaSystem.out.println("iterm:"+set);,編譯器將字符串的鏈接操做轉換爲調用StringBuilder.append(Object),而這個方法將會調用容器的toString方法,標準容器的toString方法將迭代容器。併發

正如封裝對象的狀態有助於維持不變性條件同樣,封裝對象的同步機制一樣有助於確保實施同步策略。

併發容器

同步容器經過對全部容器狀態的訪問都串行化,以實現它們的線程安全性。固然這種辦法的代價是嚴重下降併發性,當多個線程競爭容器的鎖時,吞吐量將嚴重下降。app

經過併發容器來代替同步容器,能夠極大的提升伸縮性並下降風險。
  • JDK5中增長了ConcurrentHashMap,用來代替同步且基於散列的Map。
  • 以及CopyOnWriteArrayList,用於在遍歷操做爲主要操做的狀況下代替同步的List。
  • 在新的ConCurrentMap接口中增長了一些經常使用的複合條件,如「若沒有就添加」,替換,以及有條件刪除等。
  • JDK5中增長了兩種新容器,QueueBlockingQueue,用來臨時保存一組等待處理的元素。它提供了幾種實現,包括:函數

    • ConcurrentLinkedQueue,這是一個傳統的先進先出隊列。
    • PriorityQueue,這是一個(非併發的)優先隊列。
  • Queue上的操做不會阻塞,若是隊列爲空,那麼獲取元素的操做將會返回空值。
  • 能夠用List來模擬Queue的行爲(Queue自己就是由LinkedList實現的),可是仍是須要一個Queue類,由於它能去掉List的隨機訪問需求,實現更加高效的併發。
  • BlockingQueue拓展了Queue,增長了可阻塞的插入和獲取等操做。工具

    • 若是隊列爲空,那麼獲取元素的操做將一直阻塞。
    • 若是隊列已滿,那麼插入元素的操做將一直阻塞,直到隊列中出現可用的空間。
  • JDK6引入了:性能

    • ConcurrentSkipListMap:同步的SortedMap的併發替代品
    • ConcurrentSkipListSet:同步的SortedSet的併發替代品

ConcurrentHashMap

同步容器類在執行每一個操做期間都持有一個鎖。與HashMap同樣,ConcurrentHashMap也是一個基於散列的Map,使用了一種不一樣的策略來提供更高的併發性和伸縮性:

  • ConcurrentHashMap並非將每一個方法都在同一個鎖上同步並使得同時只能由一個線程訪問容器,而是利用分段鎖(Locking Striping)作更細粒度的加鎖機制來實現更大程度的共享。這樣的好處是,在多線程併發訪問下將實現更高的吞吐量,而單線程環境只損失很是小的性能。
  • ConcurrentHashMap與其餘容器加強了同步容器類:他們提供的迭代器不會拋出ConcurrentModificationException異常,ConcurrentHashMap返回的迭代器具備弱一致性(Weakly Consistent)弱一致性的迭代器能夠容忍併發的修改,當建立迭代器的時候會遍歷已有的元素,能夠(並不保證)在迭代器在被構造後將修改操做反映給容器。
與Hashtable和synconizedMap相比,ConcurrentHashMap有着更多的優點和更少的劣勢,在大多數併發狀況下,用ConcurrentHashMap來代替同步Map能提升代碼的可伸縮性,只有當須要加鎖Map進行訪問時,才應該放棄使用concurrentHashMap。

額外的原子Map操做

因爲ConcurrentMap並非經過持有鎖來控制對象的獨佔訪問,因此咱們沒法靠加鎖新建原子操做。可是常見的如:「若沒有則添加」,「若相等則移除」,「若相等則替換」 等複合操做都已經實現(具體能夠看接口描述)。

CopyOnWriteArraryList

CopyOnWriteArraryList 用於替代同步List,在某些狀況下它能提供更好的併發性能,在迭代期間並不須要對容器進行復制或者加鎖。

  • CopyOnWrite(寫入時複製)容器的線程安全性在於,只要正確發佈一個事實不變的對象,那麼在訪問該對象的時候就不須要進一步的同步。
  • 在每次修改的時候,都會建立並從新發佈一個新的容器副本,從而實現可變性。 容器的迭代器保留一個指向底層數組的引用,這個數組當且位於迭代器的起始位置,因爲每次都會建立新對象,因此和讀進程互不干擾。
  • 因爲每次修改都須要進行復制,因此不適合須要常常修改或者容器規模很大的狀況。

阻塞隊列和生產者-消費者模式

阻塞隊列提供了:

  • 可阻塞的puttake方法:若是隊列滿了,那麼put隊列將阻塞至有空間可用;若是隊列爲空,take方法將會阻塞至有元素可用。
  • 定時的offerpoll方法

隊列能夠是有界的也能夠是無界的,無界隊列永遠也不會充滿,因此put方法永遠也不會阻塞。

阻塞隊列支持生產者-消費者模式,把 找出須要完成的工做執行工做這兩個過程分開,並將工做放入一個 待完成列表,以便後續處理,而不是找出當即處理。
  • 生產者-消費者模式能簡化開發過程,它消除了生產者和消費者直接的代碼依賴性,該模式還將生產數據的過程與使用數據的過程解耦來簡化工做負載的管理,由於這兩個過程在處理數據的速率不一樣。
  • 若是生產者生成工做的速率比消費者處理工做的速度快,那麼工做就會在隊列中累計起來,直到耗盡內存。put的阻塞特性極大的簡化了生產者的編碼。若是使用有界隊列,那麼當隊列充滿時,生產者將阻塞而且不能繼續生成工做,而消費者就有時間來遇上工做處理進度。
在構建高可靠的應用程序時,有界隊列是一種強大的資源管理工具:它們能抑制並防止產生過多的工做項,使應用程序在負荷過載的狀況下變得更加健壯。

類庫中的BlockingQueue實現

  • LinkedBlockingQueueArrayBlockingQueueFirst In,First Out(FIFO)隊列,分別和LinkedList和ArrayList類似。但比同步List有更好的併發性能。
  • PriorityBlockingQueue是一個優先隊列,能夠經過元素的天然順序比較,也可使用Comparator方法。
  • SynchronousQueue,實際上並不算是一個真正的隊列,由於它不會爲隊列中的元素維護存儲空間。與其餘隊列不一樣的是,它維護一組線程,這些線程在等着把元素加入或者移出隊列。這種實現隊列的方式看起來很奇怪,等於直接將工做交付給消費者線程,從而下降了將數據從生產者移動到消費者的延遲(避免串行入列和出列)。而且一旦工做被交付,生產者能夠當即獲得反饋。

串行線程封閉

  • 對於可變對象,生產者-消費者這種設計與阻塞隊列一塊兒,促進了串行線程封閉,從而將對象全部權從生產者交付給消費者。
  • 線程封閉對象只能由單個線程擁有,可是能夠經過安全發佈該對象來「轉移」全部權,而且轉移後,發佈對象的線程不會再訪問它,對象被封閉在新的線程裏。
  • 對象池利用了串行線程封閉,將對象「借給」一個請求線程,重要對象池保護足夠的內部同步來安全發佈池中的對象,而且客戶代碼自己不會發布池中的對象,且在將對象返回給對象池以後不會再使用它,那麼就能夠安全地在線程之間傳遞全部權。

雙端隊列與工做密取

jdk6增長了兩種容器類型:

  • Deque:對Queue進行了拓展
  • BlockingDeque:對BlockingQueue進行了拓展
Deque是一個 雙端隊列,實現了在隊列頭和隊列尾的高效插入和移除,具體實現有ArrayDeque和LinkedBlockingDeque。
正如阻塞隊列適用於生產者-消費者模式,雙端隊列適用於工做密取模式。
在生產者-消費者模式中,全部消費者都有一個共享的工做隊列。而工做密取中,每一個消費者都有一個本身的雙端隊列,若是一個消費者完成了本身雙端隊列中的所有工做,那麼它能夠從其餘消費者隊列的尾部來祕密獲取工做(G1中的處理DCQ就是使用工做密取模式)。

工做密取比生產者消費者模式具備更好的伸縮性,消費者基本不會在單個共享的任務隊列上發生競爭。當一個消費者線程要訪問另外一個隊列時,是從尾部而不是頭部獲取,進一步下降了隊列的競爭程度。

阻塞方法與中斷方法

線程可能會阻塞或暫停執行,緣由有多種:

  • 等待I/O操做結束
  • 等待得到一個鎖
  • 等待從Thread.sleep方法中醒來
  • 等待另外一個線程的計算結果

當線程阻塞時,它一般被掛起,並處於某種阻塞狀態:

  • BLOCK
  • WAITING
  • TIMED_WAITING

被阻塞的線程必須等待某個不受它控制的事件發生後才能繼續執行,例如等待的I/O操做已經完成,鎖可用了等等。當某個外部事件發生時,線程被置回RUNNABLE狀態,而且能夠執行調度。

Thread提供了interrupt方法,用於中斷線程或者查詢線程是否已經被中斷,每一個線程都有個boolean來表示線程的中斷狀態。

中斷是一種協做機制,一個線程不能強制其餘線程中止正在執行的操做而去執行其餘操做。
當線程A中斷B,A僅僅是要求B在執行到某個能夠暫停的地方中止正在執行的操做(前提是若是B願意停下)。最常使用的中斷的狀況就是取消某個操做。
當在代碼中調用了一個將拋出InterruptedException異常的方法時,你的方法就變成了阻塞方法,而且必須處理中斷的響應:

  • 傳遞InterruptedExcption:避開這個異常一般是最好的選擇,只須要將這個異常傳遞給調用者,並不捕獲或者恢復該異常,而後在執行某種簡單的清理工做後再次拋出這個異常。
  • 恢復中斷:有時候不能拋出這個異常,例如代碼是Runnable的一部分時,必須捕獲該異常,並經過當前線程上的interrupt方法恢復中斷。

同步工具類

閉鎖

閉鎖是一種同步工具類,能夠延遲線程的進度直到其到達終止狀態。閉鎖能夠用來確保某些活動直到其餘活動都完成後才繼續執行。
CountDownLatch是一種靈活的閉鎖實現,它可使一個或多個線程等待一組事件發生。閉鎖狀態有一個計數器,這個計數器被初始化爲一個正數,表示等待的事件數量。countDown方法遞減計數器,表示已經有一個事件發生了,而await方法會一直阻塞,直到計數器爲0,或者等待的線程中斷或者超時。
public class TestHarness {
    public long timeTask(int nThreads, final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(){
              public void run(){
                  try {
                      startGate.await();
                      try{
                          task.run();
                      }finally {
                          endGate.countDown();
                      }
                  }catch (InterruptedException ignored){}
              }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;

    }
}
  • 第一個閉鎖:startGate,確保全部的線程都準備就緒後纔開始執行任務。
  • 第二個閉鎖:endGate,等到全部線程都執行完成後計算消耗的時間。

FutureTask

FutureTask也能夠用作爲閉鎖。FutureTask實現了Future語義,表示一種抽象的可生成結果的計算。FutureTask表示的計算是經過Callable來實現的。
FutureTask能夠處於如下三種狀態:
  • 等待運行(Waiting to run)
  • 正在運行 (Running)
  • 運行完成(Completed)

當FutureTask進入完成狀態後,它會永遠停在這個狀態上。

Future.get行爲取決於任務的狀態:

  • 任務完成:get當即返回結果
  • 任務還未完成:get將阻塞直到任務進入完成狀態,而後返回結果或是拋出異常

FutureTask將計算結果從執行計算的線程傳遞到獲取這個結果的線程,而且FutureTask可以保證這種傳遞過程能實現結果的安全發佈

FutureTask類實現了 RunnableFuture接口, RunnableFuture繼承了 Runnable接口和Future接口,因此它既能夠做爲Runnable被線程執行,又能夠做爲Future獲得Callable的返回值。 事實上,FutureTask是Future接口的一個惟一實現類。

Callable表示的任務能夠拋出受檢查的或未受檢查的異常,而且任何代碼均可能拋出一個Error。不管任務代碼拋出什麼異常,都會被封裝到一個ExecutionException中,並在Future.get中從新拋出。因此當ExcutionException時,多是如下三種狀況之一:

  • Callable拋出的受檢查異常
  • RuntimeException
  • Error

信號量

計數信號量(Counting Semaphore):用來控制訪問某個特定資源的操做數量,或者同時執行某個制定操做的數量。計數信號量還能夠用來實現某些資源池,或者對容器加邊界。

Semaphore中管理一組許可,許可的初始數量能夠經過構造函數來指定。在執行操做的時候,首先得到許可,在使用之後釋放許可。若是沒有許可,acquire將阻塞直到有許可(或者被中斷或超時)。

public class BoundedHashSet<T> {

    private final Set<T> set;

    private final Semaphore semaphore;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        semaphore = new Semaphore(bound);
    }

    public boolean add(T t) throws InterruptedException {
        semaphore.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(t);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                semaphore.release();
            }
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            semaphore.release();
        }
        return wasRemoved;
    }
}

這裏實現了一個有界阻塞Set容器,信號量的計數值初始化爲容器的最大值。每次添加元素以前,要得到一個許可,若是add失敗,並無成功添加上元素,就釋放許可。刪除成功也釋放許可,底層的set實現並不知道關於邊界的信息,都是由BoundedHashSet來處理的。

柵欄(Barrier)

閉鎖和柵欄的區別

  • 閉鎖是一次性對象,一旦進入終止狀態,就不能被重製
  • 柵欄能阻塞一組線程直到某個事件發生,全部線程必須同時到達柵欄位置,才能繼續執行。

閉鎖用於等待事件,而柵欄用來等待其餘線程。

CyclicBarrier可使必定數量的參與方反覆地在柵欄位置聚集,它在並行迭代算法中很是有用:

  • 一般能將一個問題拆分紅若干個相互獨立的子問題。
  • 當線程到達了柵欄時將調用await方法,這個方法將阻塞直到全部線程都到達柵欄位置。若是全部線程都到達了柵欄位置,那麼將釋放全部線程,重製柵欄以備下次使用。
  • 若是對await調用超時,或者await阻塞的線程被中斷,那麼認爲柵欄被破壞,全部阻塞的await調用都將終止並拋出BrokenBarrierException.
  • 若是成功經過柵欄,那麼await將爲每一個線程都返回一個惟一的到達索引號,利用這些索引來選舉產生一個新的領導線程,並在下一次迭代中由這個領導線程執行新的工做。
  • CyclicBarrier還能夠將一個柵欄操做傳遞給構造函數,這是一個Runnable,當成功經過柵欄,會從其中一個子任務線程(領導線程)中執行它,可是在阻塞線程被釋放以前是不會執行的。
相關文章
相關標籤/搜索