一、多線程開發中,常常會遇到一個線程等待一個或多個線程的狀況,遇到這樣的場景如何解決?
一個線程等待一個線程:能夠經過wait和notify實現
一個線程等待多個線程:能夠經過CountDownLatch實現
多個線程之間互相等待:能夠經過CyclicBarrier實現java
二、countDownLatch和CyclicBarrier的區別:
countDownLatch 多線程
減計數方式
調用countDown()計數減1,調用await()只阻塞線程,對計數無影響
計數爲0時,釋放全部等待的線程
計數爲0時,不可重複利用dom
CyclicBarrieride
加計數方式
調用await方法,計數加1,若加1後的值不等於構造方法的值,則線程阻塞
計數達到指定值時,計數從新置爲0,釋放全部阻塞線程
計數達到指定值時,可重複利用.函數
三、CountDownLatch的僞代碼以下所示:測試
//Main thread start
this
//Create CountDownLatch for N threads
spa
//Create and start N threads
線程
//Main thread wait on latch
code
//N threads completes there tasks are returns
//Main thread resume execution
CountDownLatch.java類中定義的構造函數:
//Constructs a CountDownLatch initialized with the given count.
public
void
CountDownLatch(
int
count) {...}
與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值。
其餘N 個線程必須引用閉鎖對象,由於他們須要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是經過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。因此當N個線程都調 用了這個方法,count的值等於0,而後主線程就能經過await()方法,恢復執行本身的任務。
五、CountDownLatch使用例子
在這個例子中,模擬了一個應用程序啓動類,它開始時啓動了n個線程類,這些線程將檢查外部系統並通知閉鎖,而且啓動類一直在閉鎖上等待着。一旦驗證和檢查了全部外部服務,那麼啓動類恢復執行。
BaseHealthChecker.java:這個類是一個Runnable,負責全部特定的外部服務健康的檢測。
public abstract class BaseHealthChecker implements Runnable{ private CountDownLatch countDownLatch; private String serviceName; private boolean serviceUp; public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch){ this.serviceName = serviceName; this.countDownLatch = countDownLatch; this.serviceUp = false; } @Override public void run() { try { serviceVerify(); serviceUp = true; } catch (Exception e) { e.printStackTrace(); serviceUp = false; } finally { if (null != countDownLatch) { countDownLatch.countDown(); } } } public String getServiceName() { return serviceName; } public boolean isServiceUp() { return serviceUp; } public abstract void serviceVerify() throws Exception; }
NetworkHealthChecker.java:這個類繼承了BaseHealthChecker,實現了verifyService()方法。DatabaseHealthChecker.java和CacheHealthChecker.java除了服務名和休眠時間外,與NetworkHealthChecker.java是同樣的。
public class NetworkHealthChecker extends BaseHealthChecker{ public NetworkHealthChecker(CountDownLatch countDownLatch) { super("Network Service", countDownLatch); } @Override public void serviceVerify() throws InterruptedException { System.out.println("Checking " + this.getServiceName()); Thread.sleep(3000); System.out.println(this.getServiceName() + " is checked"); } }
ApplicationStartupUtil.java:這個類是一個主啓動類,它負責初始化閉鎖,而後等待,直到全部服務都被檢測完。
public class ApplicatiionStartupUtil { private static List<BaseHealthChecker> checkers; private ApplicatiionStartupUtil(){ } private final ApplicatiionStartupUtil INSTANCE = new ApplicatiionStartupUtil(); public ApplicatiionStartupUtil getInstance(){ return INSTANCE; } public static boolean checkExternalService() throws InterruptedException{ CountDownLatch countDownLatch = new CountDownLatch(3); checkers = new ArrayList<BaseHealthChecker>(); checkers.add(new NetworkHealthChecker(countDownLatch)); checkers.add(new CacheHealthChecker(countDownLatch)); checkers.add(new DateBasekHealthChecker(countDownLatch)); ExecutorService executor = Executors.newFixedThreadPool(checkers.size()); for(final BaseHealthChecker checker : checkers){ executor.execute(checker); } countDownLatch.await(); executor.shutdown(); for(final BaseHealthChecker checker : checkers){ if (!checker.isServiceUp()) { return false; } } return true; } }
測試代碼:
public class Main { public static void main(String[] args) { boolean result = false; try { result = ApplicatiionStartupUtil.checkExternalService(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("External services validation completed !! Result was :: "+ result); } }
運行結果
Checking Network Service
Checking DateBase Service
Checking Cache Service
Network Service is checked
Cache Service is checked
DateBase Serviceis Checked
External services validation completed !! Result was :: true
六、CyclicBarrier使用的例子
public class CyclicBarrierTest { static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { public void run() { try { Thread.sleep(new Random().nextInt(5) * 1000); System.out.println("thread is prepared...." + new Date()); cyclicBarrier.await(); System.out.println(1); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }).start(); try { Thread.sleep(new Random().nextInt(5) * 1000); System.out.println("main is prepared...." + new Date()); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e ) { e.printStackTrace(); } System.out.println(2); } static class A implements Runnable{ @Override public void run() { System.out.println("A......."); } } }
運行結果:
thread is prepared....Fri Oct 27 11:08:44 CST 2017 main is prepared....Fri Oct 27 11:08:47 CST 2017 A....... 1 2