CountdownLatch和CyclicBarrier都屬於線程同步的工具,不過具體的實現以及使用的狀況有所不一樣,咱們先來看看不一樣的使用狀況bash
顧名思義CountdownLatch能夠當作一個計數器來使用,好比某線程須要等待其餘幾個線程都執行過某個時間節點後才能繼續執行 咱們來模擬一個場景,某公司一共有十我的,門衛要等十我的都來上班之後,才能夠休息,代碼實現以下微信
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
//lambda中只能只用final的變量
final int times = i;
new Thread(() -> {
try {
System.out.println("子線程" + Thread.currentThread().getName() + "正在趕路");
Thread.sleep(1000 * times);
System.out.println("子線程" + Thread.currentThread().getName() + "到公司了");
//調用latch的countDown方法使計數器-1
latch.countDown();
System.out.println("子線程" + Thread.currentThread().getName() + "開始工做");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
try {
System.out.println("門衛等待員工上班中...");
//主線程阻塞等待計數器歸零
latch.await();
System.out.println("員工都來了,門衛去休息了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製代碼
運行後結果以下app
子線程Thread-0正在趕路
子線程Thread-2正在趕路
子線程Thread-0到公司了
子線程Thread-0開始工做
子線程Thread-1正在趕路
門衛等待員工上班中...
子線程Thread-4正在趕路
子線程Thread-9正在趕路
子線程Thread-5正在趕路
子線程Thread-6正在趕路
子線程Thread-7正在趕路
子線程Thread-8正在趕路
子線程Thread-3正在趕路
子線程Thread-1到公司了
子線程Thread-1開始工做
子線程Thread-2到公司了
子線程Thread-2開始工做
子線程Thread-3到公司了
子線程Thread-3開始工做
子線程Thread-4到公司了
子線程Thread-4開始工做
子線程Thread-5到公司了
子線程Thread-5開始工做
子線程Thread-6到公司了
子線程Thread-6開始工做
子線程Thread-7到公司了
子線程Thread-7開始工做
子線程Thread-8到公司了
子線程Thread-8開始工做
子線程Thread-9到公司了
子線程Thread-9開始工做
員工都來了,門衛去休息了
複製代碼
能夠看到子線程並無由於調用latch.countDown而阻塞,會繼續進行該作的工做,只是通知計數器-1,即完成了咱們如上說的場景,只須要在全部進程都進行到某一節點後纔會執行被阻塞的進程.若是咱們想要多個線程在同一時間進行就要用到CyclicBarrier了工具
咱們從新模擬一個新的場景,就用已經被說爛的跑步場景吧,十名運動員各自準備比賽,須要等待全部運動員都準備好之後,裁判才能說開始而後全部運動員一塊兒跑,代碼實現以下ui
public static void main(String[] args) {
final CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{
System.out.println("全部人都準備好了裁判開始了");
});
for (int i = 0; i < 10; i++) {
//lambda中只能只用final的變量
final int times = i;
new Thread(() -> {
try {
System.out.println("子線程" + Thread.currentThread().getName() + "正在準備");
Thread.sleep(1000 * times);
System.out.println("子線程" + Thread.currentThread().getName() + "準備好了");
cyclicBarrier.await();
System.out.println("子線程" + Thread.currentThread().getName() + "開始跑了");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
複製代碼
執行結果以下this
子線程Thread-0正在準備
子線程Thread-2正在準備
子線程Thread-1正在準備
子線程Thread-3正在準備
子線程Thread-4正在準備
子線程Thread-0準備好了
子線程Thread-5正在準備
子線程Thread-6正在準備
子線程Thread-7正在準備
子線程Thread-8正在準備
子線程Thread-9正在準備
子線程Thread-1準備好了
子線程Thread-2準備好了
子線程Thread-3準備好了
子線程Thread-4準備好了
子線程Thread-5準備好了
子線程Thread-6準備好了
子線程Thread-7準備好了
子線程Thread-8準備好了
子線程Thread-9準備好了
全部人都準備好了裁判開始了
子線程Thread-9開始跑了
子線程Thread-0開始跑了
子線程Thread-2開始跑了
子線程Thread-1開始跑了
子線程Thread-7開始跑了
子線程Thread-6開始跑了
子線程Thread-5開始跑了
子線程Thread-4開始跑了
子線程Thread-3開始跑了
子線程Thread-8開始跑了
複製代碼
能夠看到全部線程在其餘線程沒有準備好以前都在被阻塞中,等到全部線程都準備好了才繼續執行 咱們在建立CyclicBarrier對象時傳入了一個方法,當調用CyclicBarrier的await方法後,當前線程會被阻塞等到全部線程都調用了await方法後 調用傳入CyclicBarrier的方法,而後讓全部的被阻塞的線程一塊兒運行spa
應用場景咱們說完了,接下來看看兩個工具的具體實現線程
咱們先來看看CountdownLatch的構造方法code
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
複製代碼
首先保證了count必定要大於零,而後初始化了一個Sync對象,在看看這個Sync對象是個什麼cdn
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
複製代碼
Sync是CountdownLatch的靜態內部類,繼承了AbstractQueuedSynchronizer(即AQS,提供了一種實現阻塞鎖和一系列依賴FIFO等待隊列的同步器的工具,回頭單講)抽象類, 在Sync的構造方法中,調用了setState方法,能夠視做初始化了一個標記來記錄當前計數器的數量
咱們來看CountdownLatch的兩個核心方法,await和countdown,先來看await
public void await() throws InterruptedException {
//能夠視做將線程阻塞
sync.acquireSharedInterruptibly(1);
}
複製代碼
await調用的是AQS的方法,能夠視做阻塞線程,具體實如今分析AQS的章節中展開 再來看看countdown方法
public void countDown() {
sync.releaseShared(1);
}
複製代碼
調用了sync的一個方法,再來看看這個方法的實現
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
再來看這個tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
//獲取標記位
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//用cas的方式更新標記位
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
複製代碼
能夠看到在調用tryReleaseShared其實是將標記位-1而且返回標記位是否爲0,若是標記位爲0 那麼調用的doReleaseShared能夠視做將阻塞的線程放行,這樣整個的流程就通了
老規矩先看構造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
複製代碼
這邊傳入了兩個對象簡單的記錄了一下存值,咱們直接查看一下關鍵的await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
複製代碼
再來看dowait的實現
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** 省略部分代碼 **/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//判斷是否被打斷
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//將計數器-1 即在構造方法中賦值的count
int index = --count;
if (index == 0) { // tripped
//若是全部的線程都執行完畢即count=0時
boolean ranAction = false;
try {
//執行傳入的方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//喚醒全部線程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//若是count沒有到0那麼阻塞當前線程
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } 複製代碼
從代碼中能夠看到,CyclicBarrier是利用Lock的condition方法來進行線程的阻塞和喚醒,相似Object.wait()和notifyAll()在count不爲0時阻塞,在count=0時喚醒全部線程
1,CountdownLatch適用於全部線程經過某一點後通知方法,而CyclicBarrier則適合讓全部線程在同一點同時執行 2,CountdownLatch利用繼承AQS的共享鎖來進行線程的通知,利用CAS來進行--,而CyclicBarrier則利用ReentrantLock的Condition來阻塞和通知線程
感謝閱讀
有興趣能夠關注個人我的微信公衆號,會按期推送關於Java的技術文章,並且目前不恰飯都是乾貨哈哈哈哈