一、CountDownLatchjava
工人:app
package LatchAndCyclicBarrier; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class Work implements Runnable{ private CountDownLatch downLatch; private String name; public Work(CountDownLatch downLatch, String name){ this.downLatch = downLatch; this.name = name; } public void run() { this.doWork(); try{ TimeUnit.SECONDS.sleep(new Random().nextInt(10)); }catch(InterruptedException ie){ } System.out.println(this.name + "活幹完了!"); this.downLatch.countDown(); } private void doWork(){ System.out.println(this.name + "正在幹活!"); } }
老闆:dom
package LatchAndCyclicBarrier; import java.util.concurrent.CountDownLatch; public class Boss implements Runnable{ private CountDownLatch downLatch; public Boss(CountDownLatch downLatch){ this.downLatch = downLatch; } public void run() { System.out.println("老闆正在等全部的工人幹完活......"); try { this.downLatch.await(); } catch (InterruptedException e) { } System.out.println("工人活都幹完了,老闆開始檢查了!"); } }
測試代碼:ide
package LatchAndCyclicBarrier; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestLatch { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(3); Work w1 = new Work(latch,"張三"); Work w2 = new Work(latch,"李四"); Work w3 = new Work(latch,"王二"); Boss boss = new Boss(latch); executor.execute(w3); executor.execute(w2); executor.execute(w1); executor.execute(boss); executor.shutdown(); } }
執行結果:函數
李四正在幹活! 老闆正在等全部的工人幹完活...... 王二正在幹活! 張三正在幹活! 李四活幹完了! 王二活幹完了! 張三活幹完了! 工人活都幹完了,老闆開始檢查了!
二、CyclicBarrieroop
package LatchAndCyclicBarrier; import java.util.concurrent.CyclicBarrier; public class CycWork implements Runnable { private CyclicBarrier cyclicBarrier ; private String name ; public CycWork(CyclicBarrier cyclicBarrier,String name) { this .name =name; this .cyclicBarrier =cyclicBarrier; } @Override public void run() { // TODO Auto-generated method stub System. out .println(name +"正在打樁,畢竟不輕鬆。。。。。" ); try { Thread. sleep(5000); System. out .println(name +"不容易,終於把樁打完了。。。。" ); cyclicBarrier .await(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } System. out .println(name +":其餘逗b把樁都打完了,又得忙活了。。。" ); } }
測試程序測試
package LatchAndCyclicBarrier; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CycTest { public static void main(String[] args) { ExecutorService executorpool=Executors. newFixedThreadPool(3); CyclicBarrier cyclicBarrier= new CyclicBarrier(3); CycWork work1= new CycWork(cyclicBarrier, "張三" ); CycWork work2= new CycWork(cyclicBarrier, "李四" ); CycWork work3= new CycWork(cyclicBarrier, "王五" ); executorpool.execute(work1); executorpool.execute(work2); executorpool.execute(work3); executorpool.shutdown(); } }
運行結果:ui
李四正在打樁,畢竟不輕鬆。。。。。 張三正在打樁,畢竟不輕鬆。。。。。 王五正在打樁,畢竟不輕鬆。。。。。 李四不容易,終於把樁打完了。。。。 張三不容易,終於把樁打完了。。。。 王五不容易,終於把樁打完了。。。。 王五:其餘逗b把樁都打完了,又得忙活了。。。 李四:其餘逗b把樁都打完了,又得忙活了。。。 張三:其餘逗b把樁都打完了,又得忙活了。。。
CountDownlatch和CyclicBarrierde 源碼部分this
一、CountDownLatch中的兩個關鍵方法spa
public void countDown() { //對計數器減一 表示有一個事件已經發生了 sync.releaseShared(1); }
public void await() throws InterruptedException { //等到計數器爲0 sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int arg) { throw new UnsupportedOperationException(); }
二、CyclicBarrier中的await()方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
count = parties;
提到 parties就不得不看看構造函數了
public CyclicBarrier(int parties) { this(parties, null); }
如上例子,咱們構造了CyclicBarrier(3)那麼此時的 count值爲3,接着dowait源碼,當index==0時,後面執行的
final Runnable command = barrierCommand;
實際上是能夠設置的,這個Runnable能夠傳進來,當咱們但願全部線程都達到某一時刻以後,用什麼線程執行接下來的工做,當沒有傳Runnable進來時,就繼續執行(喚醒其餘線程),不然就runnable.run()(喚醒其餘線程以前執行)