CyclicBarrier從字面上能夠直接理解爲線程運行的屏障,它可讓一組線程執行到一個共同的屏障點時被阻塞,直到最後一個線程執行到指定位置,你設置的執行線程就會觸發運行;同時CyclicBarrier相比與CountDownLatch,它是能夠被重置的;下面咱們經過一個簡單例子看下CyclicBarrier的使用;安全
實例化一個CyclicBarrier對象並傳入你要控制的線程內部;微信
public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(3, new Runnable() { public void run() { System.out.println("全部線程集合"); } }); for (int i = 0; i < 3; i++) { new CyclicBarrierThread(i + "", cb).start(); } }
計數線程代碼,每當計數到偶數時調用CyclicBarrier的await()方法app
public class CyclicBarrierThread extends Thread{ private CyclicBarrier barrier; private String name; private int count; public CyclicBarrierThread(String name,CyclicBarrier barrier) { this.name=name; this.barrier=barrier; this.count=0; } public void run() { try { for(int i=0;i<10;i++) { Thread.sleep(100); count++; System.out.println(name+"號線程---"+Thread.currentThread().getName()+"開始計數:"+count); if(count%2==0) {//每計數到偶數次時集合一次 barrier.await(); System.out.println(name+"號線程---"+Thread.currentThread().getName()+"集合完畢,繼續計數"); } } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
查看代碼輸出函數
2號線程---Thread-2開始計數:1 0號線程---Thread-0開始計數:1 1號線程---Thread-1開始計數:1 2號線程---Thread-2開始計數:2 1號線程---Thread-1開始計數:2 0號線程---Thread-0開始計數:2 全部線程集合 2號線程---Thread-2集合完畢,繼續計數 1號線程---Thread-1集合完畢,繼續計數 0號線程---Thread-0集合完畢,繼續計數 2號線程---Thread-2開始計數:3 1號線程---Thread-1開始計數:3 0號線程---Thread-0開始計數:3 2號線程---Thread-2開始計數:4 0號線程---Thread-0開始計數:4 1號線程---Thread-1開始計數:4 全部線程集合 1號線程---Thread-1集合完畢,繼續計數 2號線程---Thread-2集合完畢,繼續計數 0號線程---Thread-0集合完畢,繼續計數 0號線程---Thread-0開始計數:5 2號線程---Thread-2開始計數:5 1號線程---Thread-1開始計數:5 0號線程---Thread-0開始計數:6 1號線程---Thread-1開始計數:6 2號線程---Thread-2開始計數:6 全部線程集合 2號線程---Thread-2集合完畢,繼續計數 0號線程---Thread-0集合完畢,繼續計數 1號線程---Thread-1集合完畢,繼續計數 0號線程---Thread-0開始計數:7 1號線程---Thread-1開始計數:7 2號線程---Thread-2開始計數:7 1號線程---Thread-1開始計數:8 0號線程---Thread-0開始計數:8 2號線程---Thread-2開始計數:8 全部線程集合 2號線程---Thread-2集合完畢,繼續計數 0號線程---Thread-0集合完畢,繼續計數 1號線程---Thread-1集合完畢,繼續計數 0號線程---Thread-0開始計數:9 1號線程---Thread-1開始計數:9 2號線程---Thread-2開始計數:9 1號線程---Thread-1開始計數:10 0號線程---Thread-0開始計數:10 2號線程---Thread-2開始計數:10 全部線程集合 1號線程---Thread-1集合完畢,繼續計數 2號線程---Thread-2集合完畢,繼續計數 0號線程---Thread-0集合完畢,繼續計數
經過輸出結果能夠看到,計數線程每計數到偶數次時使用CyclicBarrier的await()方法,線程都會進入阻塞等待的狀態,直到最後一個線程到達屏障點時,觸發你定義的執行線程,並且CyclicBarrier的await()方法是能夠重複使用的。oop
下面咱們就對CyclicBarrier內部的源碼實現進行一些分析與總結源碼分析
首先看下CyclicBarrier的構造函數this
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); //攔截的線程數量 this.parties = parties; //用於計數的count值,每有一個線程執行到屏障點,就會遞減1 this.count = parties; //定義的攔截線程 this.barrierCommand = barrierAction; }
CyclicBarrier的構造函數很簡單就是接收你要攔截的線程數量與定義的執行線程。spa
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
咱們看下具體實現dowait方法的實現線程
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //獲取可重入鎖 final ReentrantLock lock = this.lock; //加鎖 lock.lock(); try { //CyclicBarrier內部定義的一個Generation類 final Generation g = generation; //判斷Generation的broken狀態 if (g.broken) throw new BrokenBarrierException(); //若是線程被中斷 if (Thread.interrupted()) { //Generation的broken置爲true,count值重置,並喚醒全部線程 breakBarrier(); throw new InterruptedException(); } //count值減一 int index = --count; if (index == 0) { // 若是conunt爲0,說明最後一個線程到大屏障 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run();//執行你傳入的線程 ranAction = true; nextGeneration();//喚醒全部阻塞的線程,同時重置count值與Generation return 0; } finally { if (!ranAction) //攔截線程沒有正常執行,喚醒全部線程,同時重置count值,Generation的broken置爲true breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { //是否設置阻塞的超時時間 if (!timed) //釋放當前鎖 trip.await();//false 表示不設置,一直阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos);//true 設置阻塞的超時時間 } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放鎖 lock.unlock(); } }
dowait方法的實現流程是很清晰的,經過ReentrantLock的Condition接口與count值相互配合,主要完成如下功能:code
一、當須要攔截的線程到達屏障點調用await方法後獲取ReentrantLock鎖,保證線程安全;
二、檢查count值是否爲0,判斷是不是最後一個線程到達屏障,若是是的話執行須要觸發執行的線程,調用Condition的signalAll方法喚醒全部阻塞的線程,並重置count值與Generation類,保障CyclicBarrier的重複可用;
三、若是不是最後一個線程的話,根據傳入的參數調用Condition的await方法釋放鎖資源並進入阻塞等待,直到被喚醒;
能夠用來主動重置CyclicBarrier的狀態
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //generation.broken設置爲true,喚醒全部線程,count值重置 breakBarrier(); nextGeneration(); } finally { lock.unlock(); } } private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
breakBarrier()與nextGeneration(),這兩個方法的主要區別就在於前者會把generation.broken設置爲true,也就是說若是調用reset方法主動重置CyclicBarrier類的狀態,當前正在使用CyclicBarrier類同步的線程都會被喚醒或拋出異常;
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
很明顯getNumberWaiting方法使用來獲取當前已經運行至屏蔽點並阻塞等待的線程數量的;
經過上面分析能夠看到CyclicBarrier的實現原理相對仍是比較簡單與清晰的,主要是基於ReentrantLock與計數器相結合來實現多個線程的同步控制的。以上就是對CyclicBarrier類的使用與內部實現進行的分析,其中若有不足與不正確的地方還望指出與海涵。
關注微信公衆號,查看更多技術文章。