CountDownLatch和CyclicBarrier

一、多線程開發中,常常會遇到一個線程等待一個或多個線程的狀況,遇到這樣的場景如何解決?
一個線程等待一個線程:能夠經過wait和notify實現
一個線程等待多個線程:能夠經過CountDownLatch實現
多個線程之間互相等待:能夠經過CyclicBarrier實現java

二、countDownLatch和CyclicBarrier的區別:
countDownLatch    多線程

減計數方式
調用countDown()計數減1,調用await()只阻塞線程,對計數無影響
計數爲0時,釋放全部等待的線程
計數爲0時,不可重複利用dom

CyclicBarrieride

加計數方式
調用await方法,計數加1,若加1後的值不等於構造方法的值,則線程阻塞
計數達到指定值時,計數從新置爲0,釋放全部阻塞線程
計數達到指定值時,可重複利用.函數

三、CountDownLatch的僞代碼以下所示:測試

//Main thread startthis

//Create CountDownLatch for N threadsspa

//Create and start N threads線程

//Main thread wait on latchcode

//N threads completes there tasks are returns

//Main thread resume execution

四、CountDownLatch如何工做

CountDownLatch.java類中定義的構造函數:

//Constructs a CountDownLatch initialized with the given count.

public void CountDownLatch(int count) {...}

與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值。

其餘N 個線程必須引用閉鎖對象,由於他們須要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是經過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。因此當N個線程都調 用了這個方法,count的值等於0,而後主線程就能經過await()方法,恢復執行本身的任務。

五、CountDownLatch使用例子

在這個例子中,模擬了一個應用程序啓動類,它開始時啓動了n個線程類,這些線程將檢查外部系統並通知閉鎖,而且啓動類一直在閉鎖上等待着。一旦驗證和檢查了全部外部服務,那麼啓動類恢復執行。

BaseHealthChecker.java:這個類是一個Runnable,負責全部特定的外部服務健康的檢測。

public abstract class BaseHealthChecker implements Runnable{

    private CountDownLatch countDownLatch;
    private String serviceName;
    private boolean serviceUp;
    
    public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch){
        this.serviceName = serviceName;
        this.countDownLatch = countDownLatch;
        this.serviceUp = false;
    }
    
    @Override
    public void run() {
        try {
            serviceVerify();
            serviceUp = true;
        } catch (Exception e) {
            e.printStackTrace();
            serviceUp = false;
        } finally {
            if (null != countDownLatch) {
                countDownLatch.countDown();
            }
        }
    }
    
    public String getServiceName() {
        return serviceName;
    }

    public boolean isServiceUp() {
        return serviceUp;
    }

    public abstract void serviceVerify() throws Exception;    
}

NetworkHealthChecker.java:這個類繼承了BaseHealthChecker,實現了verifyService()方法。DatabaseHealthChecker.javaCacheHealthChecker.java除了服務名和休眠時間外,與NetworkHealthChecker.java是同樣的。

public class NetworkHealthChecker extends BaseHealthChecker{

    public NetworkHealthChecker(CountDownLatch countDownLatch) {
        super("Network Service", countDownLatch);
    }

    @Override
    public void serviceVerify() throws InterruptedException {
        System.out.println("Checking " + this.getServiceName());
        Thread.sleep(3000);
        System.out.println(this.getServiceName() + " is checked");
    }

}

ApplicationStartupUtil.java:這個類是一個主啓動類,它負責初始化閉鎖,而後等待,直到全部服務都被檢測完。

public class ApplicatiionStartupUtil {

    private static List<BaseHealthChecker> checkers;
    private ApplicatiionStartupUtil(){
        
    }

    private final ApplicatiionStartupUtil INSTANCE = new ApplicatiionStartupUtil();
    public ApplicatiionStartupUtil getInstance(){
        return INSTANCE;
    }
    
    public static boolean checkExternalService() throws InterruptedException{
        CountDownLatch countDownLatch = new CountDownLatch(3);
        checkers = new ArrayList<BaseHealthChecker>();
        checkers.add(new NetworkHealthChecker(countDownLatch));
        checkers.add(new CacheHealthChecker(countDownLatch));
        checkers.add(new DateBasekHealthChecker(countDownLatch));
        
        ExecutorService executor = Executors.newFixedThreadPool(checkers.size());
            
        for(final BaseHealthChecker checker : checkers){
            executor.execute(checker);
        }
        countDownLatch.await();
        executor.shutdown();
        
        for(final BaseHealthChecker checker : checkers){
            if (!checker.isServiceUp()) {
                return false;
            }
        }
        return true;
    }
}

測試代碼:

public class Main {

    public static void main(String[] args) {
        boolean result = false;
        try {
            result = ApplicatiionStartupUtil.checkExternalService();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: "+ result);
    }
}

運行結果

Checking Network Service
Checking DateBase Service
Checking Cache Service
Network Service is checked
Cache Service is checked
DateBase Serviceis Checked
External services validation completed !! Result was :: true

六、CyclicBarrier使用的例子

public class CyclicBarrierTest {

    static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A());
    public static void main(String[] args) {
        new Thread(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(new Random().nextInt(5) * 1000);
                    System.out.println("thread is prepared...." + new Date());
                    cyclicBarrier.await();
                    System.out.println(1);
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        try {
            Thread.sleep(new Random().nextInt(5) * 1000);
            System.out.println("main is prepared...." + new Date());
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e ) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
    
    static class A implements Runnable{

        @Override
        public void run() {
            System.out.println("A.......");
        }
        
    }
}

運行結果:

thread is prepared....Fri Oct 27 11:08:44 CST 2017 main is prepared....Fri Oct 27 11:08:47 CST 2017 A....... 1 2

相關文章
相關標籤/搜索