java多線程併發系列之閉鎖(Latch)和柵欄(CyclicBarrier)

-閉鎖(Latch)

 

閉鎖(Latch):一種同步方法,能夠延遲線程的進度直到線程到達某個終點狀態。通俗的講就是,一個閉鎖至關於一扇大門,在大門打開以前全部線程都被阻斷,一旦大門打開全部線程都將經過,可是一旦大門打開,全部線程都經過了,那麼這個閉鎖的狀態就失效了,門的狀態也就不能變了,只能是打開狀態。也就是說閉鎖的狀態是一次性的,它確保在閉鎖打開以前全部特定的活動都須要在閉鎖打開以後才能完成。

 

 

應用場景:

 

  • 確保某個計算在其須要的全部資源都被初始化以後才繼續執行。二元閉鎖(包括兩個狀態)能夠用來表示「資源R已經被初始化」,而全部須要R的操做都必須先在這個閉鎖上等待。
  • 確保某個服務在其依賴的全部其餘服務都已經啓動以後才啓動。
  • 等待直到某個操做的全部參與者都就緒在繼續執行。(例如:多人遊戲中須要全部玩家準備才能開始)

 

CountDownLatch是JDK 5+裏面閉鎖的一個實現,容許一個或者多個線程等待某個事件的發生。CountDownLatch有一個正數計數器,countDown方法對計數器作減操做,await方法等待計數器達到0。全部await的線程都會阻塞直到計數器爲0或者等待線程中斷或者超時。

 


-柵欄(CyclicBarrier)

 

柵欄相似於閉鎖,它能阻塞一組線程直到某個事件發生。 柵欄與閉鎖的關鍵區別在於,全部的線程必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其餘線程。

 

 

場景: 應用一些協議,好比幾個家庭成員決定在某個地方集合,全部人在6:00在某地集合,到了之後要等待其餘人,以後才能討論去哪裏吃飯。 並行迭代,將一個問題分紅不少子問題,當一系列的子問題都解決以後(全部子問題線程都已經await()),此時將柵欄打開,全部子問題線程被釋放,而柵欄位置能夠留着下次使用。

 


-例子:兩個分別關於CountDownlatch和CyclicBarrier的例子

一、CountDownLatchjava

 

有三個工人在爲老闆幹活,這個老闆有一個習慣,就是當三個工人把一天的活都幹完了的時候,他就來檢查全部工人所幹的活。記住這個條件:三個工人先所有幹完活,老闆才檢查。因此在這裏用Java代碼設計兩個類,Worker表明工人,Boss表明老闆,具體的代碼實現以下:

 

工人:app

package LatchAndCyclicBarrier; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class Work implements Runnable{ private CountDownLatch downLatch; private String name; public Work(CountDownLatch downLatch, String name){ this.downLatch = downLatch; this.name = name; } public void run() { this.doWork(); try{ TimeUnit.SECONDS.sleep(new Random().nextInt(10)); }catch(InterruptedException ie){ } System.out.println(this.name + "活幹完了!"); this.downLatch.countDown(); } private void doWork(){ System.out.println(this.name + "正在幹活!"); } } 

老闆:dom

package LatchAndCyclicBarrier; import java.util.concurrent.CountDownLatch; public class Boss implements Runnable{ private CountDownLatch downLatch; public Boss(CountDownLatch downLatch){ this.downLatch = downLatch; } public void run() { System.out.println("老闆正在等全部的工人幹完活......"); try { this.downLatch.await(); } catch (InterruptedException e) { } System.out.println("工人活都幹完了,老闆開始檢查了!"); } } 

測試代碼:ide

package LatchAndCyclicBarrier; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestLatch { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(3); Work w1 = new Work(latch,"張三"); Work w2 = new Work(latch,"李四"); Work w3 = new Work(latch,"王二"); Boss boss = new Boss(latch); executor.execute(w3); executor.execute(w2); executor.execute(w1); executor.execute(boss); executor.shutdown(); } } 

執行結果:函數

李四正在幹活!
老闆正在等全部的工人幹完活......
王二正在幹活!
張三正在幹活!
李四活幹完了!
王二活幹完了!
張三活幹完了!
工人活都幹完了,老闆開始檢查了!

二、CyclicBarrieroop

 

接着上面的例子,仍是這三個工人,不過這一次,這三個工人自由了,老闆不用檢查他們任務了,他們三個合做建橋,有三個樁,每人打一個,同時打完以後才能一塊兒搭橋(搭橋須要三人一塊兒合做)。也就是說三我的都打完樁以後才能繼續工做。

 

package LatchAndCyclicBarrier; import java.util.concurrent.CyclicBarrier; public class CycWork implements Runnable { private CyclicBarrier cyclicBarrier ; private String name ; public CycWork(CyclicBarrier cyclicBarrier,String name) { this .name =name; this .cyclicBarrier =cyclicBarrier; } @Override public void run() { // TODO Auto-generated method stub System. out .println(name +"正在打樁,畢竟不輕鬆。。。。。" ); try { Thread. sleep(5000); System. out .println(name +"不容易,終於把樁打完了。。。。" ); cyclicBarrier .await(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } System. out .println(name +":其餘逗b把樁都打完了,又得忙活了。。。" ); } } 

測試程序測試

package LatchAndCyclicBarrier; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CycTest { public static void main(String[] args) { ExecutorService executorpool=Executors. newFixedThreadPool(3); CyclicBarrier cyclicBarrier= new CyclicBarrier(3); CycWork work1= new CycWork(cyclicBarrier, "張三" ); CycWork work2= new CycWork(cyclicBarrier, "李四" ); CycWork work3= new CycWork(cyclicBarrier, "王五" ); executorpool.execute(work1); executorpool.execute(work2); executorpool.execute(work3); executorpool.shutdown(); } } 

運行結果:ui

李四正在打樁,畢竟不輕鬆。。。。。
張三正在打樁,畢竟不輕鬆。。。。。
王五正在打樁,畢竟不輕鬆。。。。。
李四不容易,終於把樁打完了。。。。
張三不容易,終於把樁打完了。。。。
王五不容易,終於把樁打完了。。。。
王五:其餘逗b把樁都打完了,又得忙活了。。。
李四:其餘逗b把樁都打完了,又得忙活了。。。
張三:其餘逗b把樁都打完了,又得忙活了。。。

CountDownlatch和CyclicBarrierde 源碼部分this

一、CountDownLatch中的兩個關鍵方法spa

public void countDown() { //對計數器減一 表示有一個事件已經發生了 sync.releaseShared(1); } 
public void await() throws InterruptedException { //等到計數器爲0 sync.acquireSharedInterruptibly(1); } 

 

await方法調用了AbstractQueuedSynchronizer中的acquireSharedInterruptibly

 

public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 
public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); } 

二、CyclicBarrier中的await()方法

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } 

 

上面dowait方法中有一個index,index=--count而count的值在源碼中來自

 

count = parties;

提到 parties就不得不看看構造函數了

public CyclicBarrier(int parties) { this(parties, null); } 

如上例子,咱們構造了CyclicBarrier(3)那麼此時的 count值爲3,接着dowait源碼,當index==0時,後面執行的

final Runnable command = barrierCommand; 

實際上是能夠設置的,這個Runnable能夠傳進來,當咱們但願全部線程都達到某一時刻以後,用什麼線程執行接下來的工做,當沒有傳Runnable進來時,就繼續執行(喚醒其餘線程),不然就runnable.run()(喚醒其餘線程以前執行)

相關文章
相關標籤/搜索