從JDK 1.5以後,在java.util.concurrent包下引入了好多的處理多線程的工具類,本文首先會介紹CyclicBarrier輔助工具類,其次將用CyclicBarrier工具類來完成一個實例,最後將給出CyclicBarrier和CountDownLatch的幾點比較。 html
以前關於CountDownLatch的博文,請參考使用CountDownLatch協調子線程java
之前在<<編寫高質量代碼-改善Java程序的151個建議>>一書中看到有一節的標題是「CyclicBarrier讓多線程齊步走」,以爲這標題挺不錯的,因此在寫這篇博文的時候也採用了這個名字。 多線程
CyclicBarrier是一個同步輔助工具類,它容許一組線程相互等待,直到到達一個公共的欄柵點。app
CyclicBarrier對於那些包含一組固定大小線程,而且這些線程必須不時地相互等待的程序很是有用。之因此將其稱之爲循環的Barrier是由於該Barrier在等待的線程釋放以後能夠重用。 less
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做 頗有用。
上面的介紹來自於CyclicBarrier類的註釋。 工具
/** * A synchronization aid that allows a set of threads to all wait for * each other to reach a common barrier point. CyclicBarriers are * useful in programs involving a fixed sized party of threads that * must occasionally wait for each other. The barrier is called * [i]cyclic[/i] because it can be re-used after the waiting threads * are released. * * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command * that is run once per barrier point, after the last thread in the party * arrives, but before any threads are released. * This [i]barrier action[/i] is useful * for updating shared-state before any of the parties continue. */
CyclicBarrier採用Condition和Lock來完成線程之間的同步。oop
相關的類圖是CyclicBarrier類內容以下:學習
/* * @(#)CyclicBarrier.java 1.12 06/03/30 * * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package java.util.concurrent; import java.util.concurrent.locks.*; /** * A synchronization aid that allows a set of threads to all wait for * each other to reach a common barrier point. CyclicBarriers are * useful in programs involving a fixed sized party of threads that * must occasionally wait for each other. The barrier is called * [i]cyclic[/i] because it can be re-used after the waiting threads * are released. * * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command * that is run once per barrier point, after the last thread in the party * arrives, but before any threads are released. * This [i]barrier action[/i] is useful * for updating shared-state before any of the parties continue. * * <p><b>Sample usage:</b> Here is an example of * using a barrier in a parallel decomposition design: * <pre> * class Solver { * final int N; * final float[][] data; * final CyclicBarrier barrier; * * class Worker implements Runnable { * int myRow; * Worker(int row) { myRow = row; } * public void run() { * while (!done()) { * processRow(myRow); * * try { * barrier.await(); * } catch (InterruptedException ex) { * return; * } catch (BrokenBarrierException ex) { * return; * } * } * } * } * * public Solver(float[][] matrix) { * data = matrix; * N = matrix.length; * barrier = new CyclicBarrier(N, * new Runnable() { * public void run() { * mergeRows(...); * } * }); * for (int i = 0; i < N; ++i) * new Thread(new Worker(i)).start(); * * waitUntilDone(); * } * } * </pre> * Here, each worker thread processes a row of the matrix then waits at the * barrier until all rows have been processed. When all rows are processed * the supplied {@link Runnable} barrier action is executed and merges the * rows. If the merger * determines that a solution has been found then <tt>done()</tt> will return * <tt>true</tt> and each worker will terminate. * * <p>If the barrier action does not rely on the parties being suspended when * it is executed, then any of the threads in the party could execute that * action when it is released. To facilitate this, each invocation of * {@link #await} returns the arrival index of that thread at the barrier. * You can then choose which thread should execute the barrier action, for * example: * <pre> if (barrier.await() == 0) { * // log the completion of this iteration * }</pre> * * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model * for failed synchronization attempts: If a thread leaves a barrier * point prematurely because of interruption, failure, or timeout, all * other threads waiting at that barrier point will also leave * abnormally via {@link BrokenBarrierException} (or * {@link InterruptedException} if they too were interrupted at about * the same time). * * <p>Memory consistency effects: Actions in a thread prior to calling * {@code await()} * [url=package-summary.html#MemoryVisibility]<i>happen-before</i>[/url] * actions that are part of the barrier action, which in turn * <i>happen-before</i> actions following a successful return from the * corresponding {@code await()} in other threads. * * @since 1.5 * @see CountDownLatch * * @author Doug Lea */ public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which <tt>count</tt> applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * Creates a new <tt>CyclicBarrier</tt> that will trip when the * given number of parties (threads) are waiting upon it, and * does not perform a predefined action when the barrier is tripped. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @throws IllegalArgumentException if {@code parties} is less than 1 */ public CyclicBarrier(int parties) { this(parties, null); } /** * Returns the number of parties required to trip this barrier. * * @return the number of parties required to trip this barrier */ public int getParties() { return parties; } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * [list] * <li>The last thread arrives; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * [/list] * * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, * then all other waiting threads will throw * {@link BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws BrokenBarrierException if [i]another[/i] thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was * broken when {@code await} was called, or the barrier * action (if present) failed due an exception. */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } } /** * Waits until all {@linkplain #getParties parties} have invoked * <tt>await</tt> on this barrier, or the specified waiting time elapses. * * <p>If the current thread is not the last to arrive then it is * disabled for thread scheduling purposes and lies dormant until * one of the following things happens: * [list] * <li>The last thread arrives; or * <li>The specified timeout elapses; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * one of the other waiting threads; or * <li>Some other thread times out while waiting for barrier; or * <li>Some other thread invokes {@link #reset} on this barrier. * [/list] * * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the specified waiting time elapses then {@link TimeoutException} * is thrown. If the time is less than or equal to zero, the * method will not wait at all. * * <p>If the barrier is {@link #reset} while any thread is waiting, * or if the barrier {@linkplain #isBroken is broken} when * <tt>await</tt> is invoked, or while any thread is waiting, then * {@link BrokenBarrierException} is thrown. * * <p>If any thread is {@linkplain Thread#interrupt interrupted} while * waiting, then all other waiting threads will throw {@link * BrokenBarrierException} and the barrier is placed in the broken * state. * * <p>If the current thread is the last thread to arrive, and a * non-null barrier action was supplied in the constructor, then the * current thread runs the action before allowing the other threads to * continue. * If an exception occurs during the barrier action then that exception * will be propagated in the current thread and the barrier is placed in * the broken state. * * @param timeout the time to wait for the barrier * @param unit the time unit of the timeout parameter * @return the arrival index of the current thread, where index * <tt>{@link #getParties()} - 1</tt> indicates the first * to arrive and zero indicates the last to arrive * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the specified timeout elapses * @throws BrokenBarrierException if [i]another[/i] thread was * interrupted or timed out while the current thread was * waiting, or the barrier was reset, or the barrier was broken * when {@code await} was called, or the barrier action (if * present) failed due an exception */ public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } /** * Queries if this barrier is in a broken state. * * @return {@code true} if one or more parties broke out of this * barrier due to interruption or timeout since * construction or the last reset, or a barrier action * failed due to an exception; {@code false} otherwise. */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets [i]after[/i] * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * Returns the number of parties currently waiting at the barrier. * This method is primarily useful for debugging and assertions. * * @return the number of parties currently blocked in {@link #await} */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
CyclicBarrier可讓全部線程都處於等待狀態(阻塞),而後在知足條件的狀況下繼續執行。測試
使用CyclicBarrier模擬幾個小組出去遊玩的場景,如:ui
幾個小組包一輛車去旅遊,一天行程包括上午小組自由活動和下午自由活動。
各個小組早上自由活動,可是11點半大巴車上集合,而後吃飯並趕赴下一個景區。
各個小組下午自由活動,可是要5點半大巴車上集合,而後一塊兒回去。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * * @author wangmengjun * */ public class TeamGroup implements Runnable { private final CyclicBarrier barrier; private int groupNumber; /** * @param barrier * @param groupNumber */ public TeamGroup(CyclicBarrier barrier, int groupNumber) { this.barrier = barrier; this.groupNumber = groupNumber; } public void run() { try { print(); barrier.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void print() { System.out.println(String.format("第%d組完成該地景點瀏覽,並回到集合點", groupNumber)); } }
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { private static final int THREAD_SLEEP_MILLIS = 6000; /** 旅遊小數的個數 */ private static final int NUMBER_OF_GROUPS = 6; /** 觀光是否結束的標識 */ private static boolean tourOver = false; public static void main(String[] args) { ExecutorService service = Executors .newFixedThreadPool(NUMBER_OF_GROUPS); CyclicBarrier cb = new CyclicBarrier(NUMBER_OF_GROUPS, new Runnable() { public void run() { /* * 若是一天的遊玩結束了,你們能夠坐大巴回去了... ... */ if (isTourOver()) { System.out.println("各個小組都集合到大巴上,準備回家.. ..."); } } }); System.out.println("用CyclicBarrier輔助工具類模擬旅遊過程當中小組集合::"); /** * 上午各個小組自由活動,而後在某個點,好比11點半集合到大巴上。 */ tourInTheMorning(service, cb); sleep(THREAD_SLEEP_MILLIS); /** * 調用reset方法,將barrier設置到初始化狀態。 * */ cb.reset(); /** * 下午各個小組自由活動,而後在某個點,好比11點半集合到大巴上。 */ tourInTheAfternoon(service, cb); /** * 下午小組集合完畢後,一天的觀光就結束了,將標誌位記爲true; */ tourOver = true; sleep(THREAD_SLEEP_MILLIS); service.shutdown(); } /** * @return the tourOver */ public static boolean isTourOver() { return tourOver; } /** * @param tourOver * the tourOver to set */ public static void setTourOver(boolean tourOver) { CyclicBarrierTest.tourOver = tourOver; } private static void tourInTheMorning(ExecutorService service, final CyclicBarrier cb) { System.out.println("早上自由玩... ... "); for (int groupNumber = 1; groupNumber <= NUMBER_OF_GROUPS; groupNumber++) { service.execute(new TeamGroup(cb, groupNumber)); } } private static void tourInTheAfternoon(ExecutorService service, final CyclicBarrier cb) { System.out.println("下午自由玩... ... "); for (int groupNumber = 1; groupNumber <= NUMBER_OF_GROUPS; groupNumber++) { service.execute(new TeamGroup(cb, groupNumber)); } } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
某次運行的結果以下:
用CyclicBarrier輔助工具類模擬旅遊過程當中小組集合:: 早上自由玩... ... 第2組完成該地景點瀏覽,並回到集合點 第1組完成該地景點瀏覽,並回到集合點 第5組完成該地景點瀏覽,並回到集合點 第3組完成該地景點瀏覽,並回到集合點 第6組完成該地景點瀏覽,並回到集合點 第4組完成該地景點瀏覽,並回到集合點 下午自由玩... ... 第2組完成該地景點瀏覽,並回到集合點 第4組完成該地景點瀏覽,並回到集合點 第3組完成該地景點瀏覽,並回到集合點 第6組完成該地景點瀏覽,並回到集合點 第1組完成該地景點瀏覽,並回到集合點 第5組完成該地景點瀏覽,並回到集合點 各個小組都集合到大巴上,準備回家.. ...
二者都是用於線程同步的輔助工具類,都提供了await方法來達到線程等待。
CountDownLatch經過一個繼承AbstractQueuedSynchronizer的內部類Sync來完成同步。
CyclicBarrier經過Condition和Lock來完成同步。
CountDownLatch: 一個或者是一部分線程,等待另一部線程都完成操做。
CyclicBarrier: 全部線程互相等待完成。
CountDownLatch中計數是不能被重置的。
若是須要一個能夠重置計數的版本,須要考慮使用CyclicBarrie。
CountDownLatch適用於一次同步。當使用CountDownLatch時,任何線程容許屢次調用countDown(). 那些調用了await()方法的線程將被阻塞,直到那些沒有被阻塞線程調用countDown()使計數到達0爲止。
相反,CyclicBarrier適用於多個同步點。
例如:一組正在運算的線程,在進入下一個階段計算以前須要同步。
與CountDownLatch不一樣,一個處於某個階段的線程調用了await()方法將會被阻塞,直到全部屬於這個階段的線程都調用了await()方法爲止。
在CyclicBarrier中,若是一個線程因爲中斷,失敗或者超時等緣由,過早地離開了柵欄點,那麼全部在柵欄點等待的其它線程也會經過BrokenBarrierException或者IterupedException異常地離開。
使用CountDownLatch時,它關注的一個線程或者多個線程須要在其它在一組線程完成操做以後,在去作一些事情。好比:服務的啓動等。
CyclicBarrier更加關注的是公共的柵欄點(Common Barrier point),關注的是這個點上的同步。這個點以前以後的事情並不須要太多的關注。好比:一個並行計算須要分幾個階段完成,在一個階段完成進入到下一個階段以前,須要同步,這時候CyclicBarrie很適合。
因爲知識的緣由,上述例子以及CountDownLatch和CyclicBarrier的比較上會存在不足,若是有問題請你們指正,也但願你們可以提供二者其它方面的不一樣之處,一塊兒學習分享。