CountDownLatch容許一個或多個線程等待其餘線程完成操做。相似於join的操做,能夠進行類比:java
join用於讓當前執行線程等待join線程執行結束,如
A.join()
方法,將不停檢查A線程是否存活,若是A存活,則當前線程永遠等待。編程
public class JoinCDLT { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @SneakyThrows @Override public void run() { System.out.println("parser1 start"); Thread.sleep(5000); System.out.println("parser1 finish"); } }); Thread parser2 = new Thread(new Runnable() { @SneakyThrows @Override public void run() { System.out.println("parser2 start"); Thread.sleep(10000); System.out.println("parser2 finish"); } }); long start = System.currentTimeMillis(); parser1.start(); parser2.start(); //join用於讓當前執行線程等待join線程執行結束 parser1.join(); parser2.join(); long end = System.currentTimeMillis(); System.out.println("all parser finish , spend " + (end - start) + " ms"); } }
CountDownLatch使用AQS實現,經過AQS的狀態變量state來做爲計數器值,當多個線程調用countdown方法時實際是原子性遞減AQS的狀態值,當線程調用await方法後當前線程會被放入AQS阻塞隊列等待計數器爲0再返回。segmentfault
public class CountDownLatchTest { //傳入int參數做爲計數器 static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { Thread a = new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(3000); System.out.println(1); c.countDown();//每當調用該方法,計數器N - 1 } }); Thread b = new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(3000); System.out.println(2); c.countDown(); } }); long start = System.currentTimeMillis(); a.start(); b.start(); //await方法會阻塞當前線程 c.await(); long end = System.currentTimeMillis(); System.out.println(3 + " " + (end - start)); } }
計數器必須大於等於0,只是等於0時候,計數器就是0,調用await方法時不會阻塞當前線程。CountDownLatch不可能從新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happen-before,另一個線程調用await方法。併發
CyclicBarrier能夠讓一組線程達到一個屏障【同步點】時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。app
CyclicBarrier默認的構造方法是
CyclicBarrier(int parties)
,其參數表示屏障攔截的線程數量,每一個線程調用await方法告訴CyclicBarrier我已經到達了屏障,而後當前線程被阻塞。ide
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { Thread thread = new Thread(new Runnable() { @Override public void run() { try { //線程調用await,告訴CyclicBarrier已經到達了屏障,而後當前線程被阻塞 c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(1); } }, "thread-1"); thread.start(); try { //主線程到達了屏障,由於設置了parties設置爲2,所以能夠繼續下去 c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(2); } }
上述程序,輸出的結果多是先1後2的次序,也多是先2後1,緣由在於:主線程和子線程的調度由CPU決定,兩個線程均可能先執行。函數
可是,若是將屏障數量改成3,此時主線程和子線程會永遠等待,由於沒有第三個線程達到屏障了。ui
getNumberWaiting
方法能夠得到CyclicBarrier阻塞的線程數量。isBroken()
方法用來了解阻塞的線程是否被中斷。Phaser能夠替代CountDownLatch 和CyclicBarrier,但比二者更增強大,能夠動態調整須要的線程個數,能夠經過構造函數傳入父Phaser實現層次Phaser。線程
參考:http://www.javashuo.com/article/p-gholfgmo-de.htmlcode
Semaphore 能夠用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源,適用場景能夠是流量控制。
使用AQS實現,AQS的狀態變量state作爲許可證數量,每次經過acquire()/tryAcquire()
,許可證數量經過CAS原子性遞減,調用release()釋放許可證,原子性遞增,只要有許可證就能夠重複使用。
public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10);//10個許可證數量,最大併發數爲10 public static void main(String[] args) { for(int i = 0; i < THREAD_COUNT; i ++){ //執行30個線程 threadPool.execute(new Runnable() { @Override public void run() { s.tryAcquire(); //嘗試獲取一個許可證 System.out.println("save data"); s.release(); //使用完以後歸還許可證 } }); } threadPool.shutdown(); } }
用於進行線程間的數據交換,它提供一個同步點,在這個同步點兩個線程能夠交換彼此的數據。
若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當都達到同步點時,這兩個線程能夠交換數據。
public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @SneakyThrows @Override public void run() { String A = "銀行流水A"; exgr.exchange(A); } }); threadPool.execute(new Runnable() { @SneakyThrows @Override public void run() { String B = "銀行流水B"; String A = exgr.exchange(B); System.out.println("A 和 B 的數據是否一致 :" + A.equals(B) + " A錄入的數據爲 :" + A + " B錄入的數據爲 :" + B); } }); threadPool.shutdown(); } }
參考
《Java併發編程的藝術》 方騰飛