Java併發編程筆記之 CountDownLatch閉鎖的源碼分析

JUC 中倒數計數器 CountDownLatch 的使用與原理分析,當須要等待多個線程執行完畢後在作一件事情時候 CountDownLatch 是比調用線程的 join 方法更好的選擇,CountDownLatch 與 線程的 join 方法區別是什麼?java

平常開發中常常會遇到須要在主線程中開啓多線程去並行執行任務,而且主線程須要等待全部子線程執行完畢後再進行彙總的場景,它的內部提供了一個計數器,在構造閉鎖時必須指定計數器的初始值,且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操做計數器的值,每調用一次countDown方法計數器都會減1,直到計數器的值減爲0時就表明條件已成熟,全部因調用await方法而阻塞的線程都會被喚醒。這就是CountDownLatch的內部機制,看起來很簡單,無非就是阻塞一部分線程讓其在達到某個條件以後再執行。可是CountDownLatch的應用場景卻比較普遍,只要你腦洞夠大利用它就能夠玩出各類花樣。最多見的一個應用場景是開啓多個線程同時執行某個任務,等到全部任務都執行完再統計彙總結果。下圖動態演示了閉鎖阻塞線程的整個過程。多線程

 

 

在CountDownLatch出現以前通常都是使用線程的join()方法來實現,可是join不夠靈活,不可以知足不一樣場景的需求。接下來咱們看看CountDownLatch的原理實現。ide

 

一.CountDownLatch原理探究函數

  從CountDownLatch的名字能夠猜想內部應該有個計數器,而且這個計數器是遞減的,下面就經過源碼看看JDK開發組是什麼時候初始化計數器,什麼時候遞減的,計數器變爲 0 的時候作了什麼操做,多個線程是如何經過計時器值實現同步的,首先咱們先看看CountDownLatch內部結構,類圖以下:測試

從類圖能夠知道CountDownLatch內部仍是使用AQS實現的,經過下面構造函數初始化計數器的值,可知其實是把計數器的值賦值給了AQS的state,也就是這裏AQS的狀態值來表示計數器值。ui

構造函數源碼以下:this

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

   Sync(int count) {
       setState(count);
   }

接下來主要看一下CountDownLatch中幾個重要的方法內部是如何調用AQS來實現功能的。atom

  1.void await()方法,當前線程調用了CountDownLatch對象的await方法後,當前線程會被阻塞,直到下面的狀況之一纔會返回:(1)當全部線程都調用了CountDownLatch對象的countDown方法後,spa

也就是說計時器值爲 0 的時候。(2)其餘線程調用了當前線程的interrupt()方法中斷了當前線程,當前線程會拋出InterruptedException異常後返回。接下來讓咱們看看await()方法內部是如何調用線程

AQS的方法的,源碼以下:

//CountDownLatch的await()方法
public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}
    //AQS的獲取共享資源時候可被中斷的方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    //若是線程被中斷則拋異常
    if (Thread.interrupted())
         throw new InterruptedException();
        //嘗試看當前是否計數值爲0,爲0則直接返回,否者進入AQS的隊列等待
    if (tryAcquireShared(arg) < 0)
         doAcquireSharedInterruptibly(arg);
}

 //sync類實現的AQS的接口
 protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
 }

  從上面代碼能夠看到await()方法委託sync調用了AQS的acquireSharedInterruptibly方法,該方法的特色是線程獲取資源的時候能夠被中斷,而且獲取到的資源是共享資源,這裏爲何要調用AQS的這個方法,而不是調用獨佔鎖的accquireInterruptibly方法呢?這是由於這裏狀態值須要的並非非 0 即 1 的效果,而是和初始化時候指定的計數器值有關係,好比你初始化的時候計數器值爲 8 ,那麼state的值應該就有 0 到 8 的狀態,而不是隻有  0  和  1 的獨佔效果。

  這裏await()方法調用acquireSharedInterruptibly的時候傳遞的是 1 ,就是說明要獲取一個資源,而這裏計數器值是資源總數,也就是意味着是讓總的資源數減 1 ,acquireSharedInterruptibly內部首先判斷若是當前線程被中斷了則拋出異常,不然調用sync實現的tryAcquireShared方法看當前狀態值(計數器值)是否爲 0  ,是則當前線程的await()方法直接返回,不然調用AQS的doAcquireSharedInterruptibly讓當前線程阻塞。另外調用tryAcquireShared的方法僅僅是檢查當前狀態值是否是爲 0 ,並無調用CAS讓當前狀態值減去 1 。

 

  2.boolean await(long timeout, TimeUnit unit),當線程調用了 CountDownLatch 對象的該方法後,當前線程會被阻塞,直到下面的狀況之一發生纔會返回: (1)當全部線程都調用了 CountDownLatch 對象的 countDown 方法後,也就是計時器值爲 0 的時候,這時候返回 true; (2) 設置的 timeout 時間到了,由於超時而返回 false; (3)其它線程調用了當前線程的 interrupt()方法中斷了當前線程,當前線程會拋出 InterruptedException 異常後返回。源碼以下:

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

 

  3.void countDown() 當前線程調用了該方法後,會遞減計數器的值,遞減後若是計數器爲 0 則會喚醒全部調用await 方法而被阻塞的線程,不然什麼都不作,接下來看一下countDown()方法內部是如何調用AQS的方法的,源碼以下:

   //CountDownLatch的countDown()方法
    public void countDown() {
       //委託sync調用AQS的方法
        sync.releaseShared(1);
    }
   //AQS的方法
    public final boolean releaseShared(int arg) {
        //調用sync實現的tryReleaseShared
        if (tryReleaseShared(arg)) {
            //AQS的釋放資源方法
            doReleaseShared();
            return true;
        }
        return false;
    }

如上面代碼能夠知道CountDownLatch的countDown()方法是委託sync調用了AQS的releaseShared方法,後者調用了sync 實現的AQS的tryReleaseShared,源碼以下:

//syn的方法
protected boolean tryReleaseShared(int releases) {
  //循環進行cas,直到當前線程成功完成cas使計數值(狀態值state)減一併更新到state
  for (;;) {
      int c = getState();

      //若是當前狀態值爲0則直接返回(1)
      if (c == 0)
          return false;

      //CAS設置計數值減一(2)
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
          return nextc == 0;
  }
}

如上代碼能夠看到首先獲取當前狀態值(計數器值),代碼(1)若是當前狀態值爲 0 則直接返回 false ,則countDown()方法直接返回;不然執行代碼(2)使用CAS設置計數器減一,CAS失敗則循環重試,不然若是當前計數器爲 0 則返回 true 。返回 true 後,說明當前線程是最後一個調用countDown()方法的線程,那麼該線程除了讓計數器減一外,還須要喚醒調用CountDownLatch的await 方法而被阻塞的線程。這裏的代碼(1)貌似是多餘的,其實否則,之因此添加代碼 (1) 是爲了防止計數器值爲 0 後,其餘線程又調用了countDown方法,若是沒有代碼(1),狀態值就會變成負數。

 

  4.long getCount() 獲取當前計數器的值,也就是 AQS 的 state 的值,通常在 debug 測試時候使用,源碼以下:

public long getCount() {
     return sync.getCount();
}

int getCount() {
     return getState();
}

如上代碼可知內部仍是調用了 AQS 的 getState 方法來獲取 state 的值(計數器當前值)。

 

到目前爲止原理理解的差很少了,接下來用一個例子進行講解CountDownLatch的用法,例子以下:

package com.hjc;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by cong on 2018/7/6.
 */
public class CountDownLatchTest {

    private static AtomicInteger id = new AtomicInteger();

    // 建立一個CountDownLatch實例,管理計數爲ThreadNum
    private static volatile CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) throws InterruptedException {

        Thread threadOne = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();
            }
        });

        Thread threadTwo = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();

            }
        });

        Thread threadThree = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                System.out.println("【玩家" + id.getAndIncrement() + "】已入場");
                countDownLatch.countDown();

            }
        });

        // 啓動子線程
        threadOne.start();
        threadTwo.start();
        threadThree.start();
        System.out.println("等待鬥地主玩家進場");

        // 等待子線程執行完畢,返回
        countDownLatch.await();

        System.out.println("鬥地主玩家已經滿人,開始發牌.....");

    }
}

運行結果以下:

 

 

如上代碼,建立了一個 CountDownLatch 實例,由於有兩個子線程因此構造函數參數傳遞爲 3,主線程調用 countDownLatch.await()方法後會被阻塞。子線程執行完畢後調用 countDownLatch.countDown() 方法讓 countDownLatch 內部的計數器減一,等全部子線程執行完畢調用 countDown()後計數器會變爲 0,這時候主線程的 await()纔會返回。

 

若是把上面的代碼中Thread.sleep和countDownLatch.await()的代碼註釋掉,運行幾遍,運行結果就可能會出現以下結果,以下圖:

 能夠看到在註釋掉latch.await()這行以後,就不能保證在全部玩家入場後纔開始發牌了。

 

總結:CountDownLatch 與 join 方法的區別,一個區別是調用一個子線程的 join()方法後,該線程會一直被阻塞直到該線程運行完畢,而 CountDownLatch 則使用計數器容許子線程運行完畢或者運行中時候遞減計數,也就是 CountDownLatch 能夠在子線程運行任什麼時候候讓 await 方法返回而不必定必須等到線程結束;另外使用線程池來管理線程時候通常都是直接添加 Runable 到線程池這時候就沒有辦法在調用線程的 join 方法了,countDownLatch 相比 Join 方法讓咱們對線程同步有更靈活的控制。

相關文章
相關標籤/搜索