Java多線程:CountDownLatch、CyclicBarrier 和 Semaphore

場景描述:

  多線程設計過程當中,常常會遇到須要等待其它線程結束之後再作其餘事情的狀況。
有幾種方案:
 
  1.在主線程中設置一自定義全局計數標誌,在工做線程完成時,計數減1。主線程偵測該標誌是否爲0,一旦爲0,表示全部工做線程已經完成。
  2.使用Java標準的類CountDownLatch來完成這項工做,原理是同樣的,計數。
 
 

CountDownLatch

一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。 
其機制是:
  當多個(具體數量等於初始化CountDownLatch時的count參數的值)線程都達到了預期狀態或完成預期工做時觸發事件,其餘線程能夠等待這個事件來觸發本身的後續工做。這裏須要注意的是,等待的線程能夠是多個,即CountDownLatch是能夠喚醒多個等待的線程的。達到本身預期狀態的線程會調用CountDownLatch的countDown方法,而等待的線程會調用CountDownLatch的await方法。
CountDownLatch 很適合用來將一個任務分爲n個獨立的部分,等這些部分都完成後繼續接下來的任務,CountDownLatch 只能出發一次,計數值不能被重置。

流程圖

 

如上圖所示,當7個線程都完成latch.countDown調用後,最下面那條線程會從latch.await返回,繼續執行後面的代碼html

函數列表

  • CountDownLatch(int count) :構造一個用給定計數初始化的 CountDownLatch。
  • void await():使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷。
  • boolean await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷或超出了指定的等待時間。
  • void countDown() 遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。

實現原理

 

實例

   咱們來看一個具體的例子。假設咱們使用一臺多核的機器對一組數據進行排序,那麼咱們能夠把這組數據分到不一樣線程中進行排序,而後合併;能夠利用線程池來管理多線程;能夠將CountDownLatch用做各個分組數據都排好序的通知。下面是代碼片斷:java

先看主線程編程

int count = 10;
final CountDownLatch latch = new CountDownLatch(count);
int[] datas = new int[10204];
int step = datas.length / count;
for (int i=0; i < count; i++) {
    int start = i * step;
    int end = (i+1) * step;
    if (i == count - 1) end = datas.length;
    threadpool.execute(new MyRunnable(latch, datas, start, end));
}
latch.await();
//合併數據

咱們再看一下具體任務的代碼,即MyRunnable的run方法的實現:多線程

public void run() {
      //數據排序
     latch.countDown(); 
}

 

CyclicBarrier

能夠協同多個線程,讓多個線程在這個屏障前等待,直到全部線程都達到了這個屏障時,再一塊兒繼續執行後面的動做。
CyclicBarrier適用於多個線程有固定的多步須要執行,線程間互相等待,當都執行完了,再一塊兒執行下一步。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。
 

流程圖

 上圖中的7個線程各有一個barrier.await,那麼任何一個線程在執行到barrier.await時就會進入阻塞等待狀態,直到7個線程都到了barrier.await時纔會同時從await返回,繼續後面的工做。此外若是在構造CyclicBarrier時設置了一個Runnable實現,那麼最後一個到barrier.await的線程會執行這個Runnable的run方法,以完成一些預設的工做。
 
注意比較CountDownLatchCyclicBarrier
  (01) CountDownLatch的做用是容許1或N個線程等待其餘線程完成執行;而CyclicBarrier則是容許N個線程相互等待。
  (02) CountDownLatch的計數器沒法被重置;CyclicBarrier的計數器能夠被重置後使用,所以它被稱爲是循環的barrier。
CountDownLatch 適用於一組線程和另外一個主線程之間的工做協做。一個主線程等待一組工做線程的任務完畢才繼續它的執行是使用 CountDownLatch 的主要場景;CyclicBarrier 用於一組或幾組線程,好比一組線程須要在一個時間點上達成一致,例如同時開始一個工做。另外,CyclicBarrier 的循環特性和構造函數所接受的 Runnable 參數也是 CountDownLatch 所不具有的。
 
 
 
CountDownLatch
CyclicBarrier
適用場景
主線程等待其餘工做線程結束
多個線程相互等待,直到全部線程都達到一個障礙點Barrier
主要方法
CountDownLatch(int count) 主線程調用:初始化計數
 
await() 主線程調用 : 阻塞,直到等待計數爲0時解除阻塞 
 
countDown() 工做線程調用 : 計數減1
CyclicBarrier(int parties , Runnnable barrierAction) : 初始化參與者數量和障礙點執行Action,action可選,由主線程初始化
 
await() : 由工做線程調用,每被調用一次,計數便會減小1,並阻塞住當前線程 , 直到全部線程都達到障礙點
等待結束
各線程之間再也不相互影響, 能夠繼續作本身的事情, 再也不執行下一個工做目標。
在障礙點到達後, 容許全部線程繼續執行,到達下一個目標後,能夠恢復使用CyclicBarrier, barrier 在釋放等待線程後能夠重用
異常
 
若是其中一個線程因爲中斷、錯誤、或者超時致使永久離開障礙點,其餘線程也將拋出異常。
 

實例

   
int count = 10;
final CyclicBarrier barrier = new CyclicBarrier(count + 1);
int[] datas = new int[10204];
int step = datas.length / count;
for (int i=0; i < count; i++) {
    int start = i * step;
    int end = (i+1) * step;
    if (i == count - 1) end = datas.length;
    threadpool.execute(new MyRunnable(barrier, datas, start, end));
}
barrier.await();
//合併數據

能夠看到CyclicBarrier對象傳入的參數值比CountDownLatch大1,緣由是構造CountDownLatch的參數是調用countDown的數量,而CyclicBarrier的數量是await的數量併發

public void run() {
      //數據排序
     try {
         barrier.await(); 
    }catch (...)
}

 

Semaphore

Semaphore 信號量對象管理的信號就像令牌,構造時傳入個數,總數就是控制併發的數量 。咱們須要控制併發的代碼,執行前先獲取信號(經過acquire獲取信號許可),執行後歸還信號(經過release歸還信號許可)。每次acquire成功返回後,Semaphore可用的信號量就會減小一個,若是沒有可用的信號,acquire調用就會阻塞,等待有release調用釋放信號後,acquire纔會獲得信號並返回。
若是Semaphore管理的信號量爲1個,那麼就退化到互斥鎖了;若是多於一個信號量,則主要用於控制併發數。與經過控制線程數來控制併發數的方式相比,經過Semaphore來控制併發數能夠控制得更加細粒度,由於真正被控制最大併發的代碼放到acquire和release之間就好了。
  Semaphore類位於java.util.concurrent包下,它提供了2個構造器:
public Semaphore(int permits) {          //參數permits表示許可數目,即同時能夠容許多少線程進行訪問
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {    //這個多了一個參數fair表示是不是公平的,即等待時間越久的越先獲取許可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

實例

   例如咱們須要控制遠程方法的併發量,超過併發量的方法就等待有其餘方法執行返回後再執行,那麼其代碼以下:
semaphore.acquire();
try {
    //調用遠程通訊的方法
}
finally {
    semaphore.release();
}

 

 
 
 

參考資料:

相關文章
相關標籤/搜索