詳解CountDownLatch

CountDownLatch是什麼? 

jdk1.5開始concurrent包裏提供的,併發編程工具類。 node

CountDownLatch這個類可以使一個線程等待其餘線程完成各自的工做後再執行。CountDownLatch容許一個或多個線程等待其餘線程完成操做。 算法

例如,應用程序的主線程但願在負責啓動框架服務的線程已經啓動全部的框架服務以後再執行。編程

CountDownLatch很是適合於對任務進行拆分,使其並行執行,好比某個任務執行2s,其對數據的請求能夠分爲五個部分,那麼就能夠將這個任務拆分爲5個子任務,分別交由五個線程執行,執行完成以後再由主線程進行彙總,此時,總的執行時間將決定於執行最慢的任務,平均來看,仍是大大減小了總的執行時間。bash

CountDownLatch是不能複用的,不可能從新初始化或者修改CountDownLatch對象的內部計數器的值。
多線程

CountDownLatch如何工做?

CountDownLatch是經過維護一個計數器 cnt 來實現的,計數器的初始值爲線程的數量。每當一個線程完成了本身的任務後,調用 countDown() 方法會讓計數器的值就會減1。當計數器值到達0時,它表示全部的線程已經完成了任務,那些由於調用 await() 方法而在等待的線程就會被喚醒。
API
  • countDownLatch.countDown(); //使CountDownLatch初始值N減1;
  • countDownLatch.await(); //調用該方法的線程等到構造方法傳入的N減到0的時候,才能繼續往下執行;
  • await(long timeout, TimeUnit unit); //與上面的await方法功能一致,只不過這裏有了時間限制,調用該方法的線程等到指定的timeout時間後,無論N是否減至爲0,都會繼續往下執行;
  • long getCount(); //獲取當前CountDownLatch維護的值;

CountDownLatch底層原理

CountDownLatch經過AQS(AbstractQueuedSynchronizer)裏面的共享鎖來實現的。
ReentrantLock也是使用AQS

CountDownLatch是基於AbstractQueuedSynchronizer實現的,在AbstractQueuedSynchronizer中維護了一個volatile類型的整數state,volatile能夠保證多線程環境下該變量的修改對每一個線程均可見,而且因爲該屬性爲整型,於是對該變量的修改也是原子的。 建立一個CountDownLatch對象時,所傳入的整數n就會賦值給state屬性,當countDown()方法調用時,該線程就會嘗試對state減一,而調用await()方法時,當前線程就會判斷state屬性是否爲0,若是爲0,則繼續往下執行,若是不爲0,則使當前線程進入等待狀態,直到某個線程將state屬性置爲0,其就會喚醒在await()方法中等待的線程。以下是countDown()方法的源代碼:

public void countDown() {
  sync.releaseShared(1);
}複製代碼
這裏sync也即一個繼承了AbstractQueuedSynchronizer的類實例,該類是CountDownLatch的一個內部類,其聲明以下:

private static final class Sync extends AbstractQueuedSynchronizer {
  private static final long serialVersionUID = 4982264981922014374L;

  Sync(int count) {
    setState(count);
  }

  int getCount() {
    return getState();
  }

  protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
  }

  protected boolean tryReleaseShared(int releases) {
    for (;;) {
      int c = getState();   // 獲取當前state屬性的值
      if (c == 0)   // 若是state爲0,則說明當前計數器已經計數完成,直接返回
        return false;
      int nextc = c-1;
      if (compareAndSetState(c, nextc)) // 使用CAS算法對state進行設置
        return nextc == 0;  // 設置成功後返回當前是否爲最後一個設置state的線程
    }
  }
}複製代碼
      這裏tryReleaseShared(int)方法即對state屬性進行減一操做的代碼。能夠看到,CAS也即compare and set的縮寫,jvm會保證該方法的原子性,其會比較state是否爲c,若是是則將其設置爲nextc(自減1),若是state不爲c,則說明有另外的線程在getState()方法和compareAndSetState()方法調用之間對state進行了設置,當前線程也就沒有成功設置state屬性的值,其會進入下一次循環中,如此往復,直至其成功設置state屬性的值,即countDown()方法調用成功。

在countDown()方法中調用的sync.releaseShared(1)調用時實際仍是調用的tryReleaseShared(int)方法,以下是releaseShared(int)方法的實現:

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}複製代碼
        能夠看到,在執行sync.releaseShared(1)方法時,其在調用tryReleaseShared(int)方法時會在無限for循環中設置state屬性的值,設置成功以後其會根據設置的返回值(此時state已經自減了一),即當前線程是否爲將state屬性設置爲0的線程,來判斷是否執行if塊中的代碼。doReleaseShared()方法主要做用是喚醒調用了await()方法的線程。須要注意的是,若是有多個線程調用了await()方法,這些線程都是以共享的方式等待在await()方法處的,試想,若是以獨佔的方式等待,那麼當計數器減小至零時,就只有一個線程會被喚醒執行await()以後的代碼,這顯然不符合邏輯。以下是doReleaseShared()方法的實現代碼:

private void doReleaseShared() {
  for (;;) {
    Node h = head;  // 記錄等待隊列中的頭結點的線程
    if (h != null && h != tail) {   // 頭結點不爲空,且頭結點不等於尾節點
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {  // SIGNAL狀態表示當前節點正在等待被喚醒
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))    // 清除當前節點的等待狀態
          continue;
        unparkSuccessor(h); // 喚醒當前節點的下一個節點
      } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;
    }
    if (h == head)  // 若是h仍是指向頭結點,說明前面這段代碼執行過程當中沒有其餘線程對頭結點進行過處理
      break;
  }
}複製代碼
在doReleaseShared()方法中(始終注意當前方法是最後一個執行countDown()方法的線程執行的),首先判斷頭結點不爲空,且不爲尾節點,說明等待隊列中有等待喚醒的線程,這裏須要說明的是,在等待隊列中,頭節點中並無保存正在等待的線程,其只是一個空的Node對象,真正等待的線程是從頭節點的下一個節點開始存放的,於是會有對頭結點是否等於尾節點的判斷。在判斷等待隊列中有正在等待的線程以後,其會清除頭結點的狀態信息,而且調用unparkSuccessor(Node)方法喚醒頭結點的下一個節點,使其繼續往下執行。以下是unparkSuccessor(Node)方法的具體實現:

private void unparkSuccessor(Node node) {
  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);   // 清除當前節點的等待狀態

  Node s = node.next;
  if (s == null || s.waitStatus > 0) {  // s的等待狀態大於0說明該節點中的線程已經被外部取消等待了
    s = null;
    // 從隊列尾部往前遍歷,找到最後一個處於等待狀態的節點,用s記錄下來
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread);   // 喚醒離傳入節點最近的處於等待狀態的節點線程
}複製代碼
能夠看到,unparkSuccessor(Node)方法的做用是喚醒離傳入節點最近的一個處於等待狀態的線程,使其繼續往下執行。前面咱們講到過,等待隊列中的線程可能有多個,而調用countDown()方法的線程只喚醒了一個處於等待狀態的線程,這裏剩下的等待線程是如何被喚醒的呢?其實這些線程是被當前喚醒的線程喚醒的。具體的咱們能夠看看await()方法的具體執行過程。以下是await()方法的代碼:

public void await() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}
複製代碼
     await()方法實際仍是調用了Sync對象的方法acquireSharedInterruptibly(int)方法,以下是該方法的具體實現:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}複製代碼

     能夠看到acquireSharedInterruptibly(int)方法判斷當前線程是否須要以共享狀態獲取執行權限,這裏tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一個模板方法,其具體實如今前面的Sync類中,能夠看到,其主要是判斷state是否爲零,若是爲零則返回1,表示當前線程不須要進行權限獲取,可直接執行後續代碼,返回-1則表示當前線程須要進行共享權限。具體的獲取執行權限的代碼在doAcquireSharedInterruptibly(int)方法中,以下是該方法的具體實現:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  final Node node = addWaiter(Node.SHARED); // 使用當前線程建立一個共享模式的節點
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();    // 獲取當前節點的前一個節點
      if (p == head) {  // 判斷前一個節點是否爲頭結點
        int r = tryAcquireShared(arg);  // 查看當前線程是否獲取到了執行權限
        if (r >= 0) {   // 大於0表示獲取了執行權限
          setHeadAndPropagate(node, r); // 將當前節點設置爲頭結點,而且喚醒後面處於等待狀態的節點
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      
      // 走到這一步說明沒有獲取到執行權限,就使當前線程進入「擱置」狀態
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}複製代碼
      在doAcquireSharedInterruptibly(int)方法中,首先使用當前線程建立一個共享模式的節點。而後在一個for循環中判斷當前線程是否獲取到執行權限,若是有(r >= 0判斷)則將當前節點設置爲頭節點,而且喚醒後續處於共享模式的節點;若是沒有,則對調用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法使當前線程處於「擱置」狀態,該「擱置」狀態是由操做系統進行的,這樣能夠避免該線程無限循環而獲取不到執行權限,形成資源浪費,這裏也就是線程處於等待狀態的位置,也就是說當線程被阻塞的時候就是阻塞在這個位置。當有多個線程調用await()方法而進入等待狀態時,這幾個線程都將等待在此處。這裏回過頭來看前面將的countDown()方法,其會喚醒處於等待隊列中離頭節點最近的一個處於等待狀態的線程,也就是說該線程被喚醒以後會繼續從這個位置開始往下執行,此時執行到tryAcquireShared(int)方法時,發現r大於0(由於state已經被置爲0了),該線程就會調用setHeadAndPropagate(Node, int)方法,而且退出當前循環,也就開始執行awat()方法以後的代碼。這裏咱們看看setHeadAndPropagate(Node, int)方法的具體實現:

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  setHead(node);    // 將當前節點設置爲頭節點
  // 檢查喚醒過程是否須要往下傳遞,而且檢查頭結點的等待狀態
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())  // 若是下一個節點是嘗試以共享狀態獲取獲取執行權限的節點,則將其喚醒
      doReleaseShared();
  }
}複製代碼

     setHeadAndPropagate(Node, int)方法主要做用是設置當前節點爲頭結點,而且將喚醒工做往下傳遞,在傳遞的過程當中,其會判斷被傳遞的節點是不是以共享模式嘗試獲取執行權限的,若是不是,則傳遞到該節點處爲止(通常狀況下,等待隊列中都只會都是處於共享模式或者處於獨佔模式的節點)。也就是說,頭結點會依次喚醒後續處於共享狀態的節點,這也就是共享鎖與獨佔鎖的實現方式。這裏doReleaseShared()方法也就是咱們前面講到的會將離頭結點最近的一個處於等待狀態的節點喚醒的方法。

CountDownLatch的demo

public class CountdownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 10;
        CountDownLatch countDownLatch = new CountDownLatch(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("run..");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
}
run..run..run..run..run..run..run..run..run..run..end複製代碼

總結

CountDownLatch的做用就是容許一個或多個線程等待其餘線程完成操做,看起來有點相似join() 方法,但其提供了比 join() 更加靈活的API。CountDownLatch能夠手動控制在n個線程裏調用n次countDown方法使計數器進行減一操做,也能夠在一個線程裏調用n次執行減一操做。而 join() 的實現原理是不停檢查join線程是否存活,若是 join 線程存活則讓當前線程永遠等待。因此二者之間相對來講仍是CountDownLatch使用起來較爲靈活。併發


參考自:《Java併發編程的藝術》和www.jianshu.com/p/128476015…框架

相關文章
相關標籤/搜索