同步輔助類,經過它能夠阻塞當前線程。也就是說,可以實現一個線程或者多個線程一直等待,直到其餘線程執行的操做完成。使用一個給定的計數器進行初始化,該計數器的操做是原子操做,即同時只能有一個線程操做該計數器。java
調用該類await()方法的線程會一直阻塞,直到其餘線程調用該類的countDown()方法,使當前計數器的值變爲0爲止。每次調用該類的countDown()方法,當前計數器的值就會減1。當計數器的值減爲0的時候,全部因調用await()方法而處於等待狀態的線程就會繼續往下執行。這種操做只能出現一次,由於該類中的計數器不能被重置。若是須要一個能夠重置計數次數的版本,能夠考慮使用CyclicBarrier類。數據庫
CountDownLatch支持給定時間的等待,超過必定的時間再也不等待,使用時只須要在countDown()方法中傳入須要等待的時間便可。此時,countDown()方法的方法簽名以下:多線程
public boolean await(long timeout, TimeUnit unit)
在某些業務場景中,程序執行須要等待某個條件完成後才能繼續執行後續的操做。典型的應用爲並行計算:當某個處理的運算量很大時,能夠將該運算任務拆分紅多個子任務,等待全部的子任務都完成以後,父任務再拿到全部子任務的運算結果進行彙總。併發
調用ExecutorService類的shutdown()方法,並不會第一時間內把全部線程所有都銷燬掉,而是讓當前已有的線程所有執行完,以後,再把線程池銷燬掉。ide
示例代碼以下:高併發
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
支持給定時間等待的示例代碼以下:ui
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MICROSECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
控制同一時間併發線程的數目。可以完成對於信號量的控制,能夠控制某個資源可被同時訪問的個數。線程
提供了兩個核心方法——acquire()方法和release()方法。acquire()方法表示獲取一個許可,若是沒有則等待,release()方法則是在操做完成後釋放對應的許可。Semaphore維護了當前訪問的個數,經過提供同步機制來控制同時訪問的個數。Semaphore能夠實現有限大小的鏈表。code
Semaphore經常使用於僅能提供有限訪問的資源,好比:數據庫鏈接數。資源
每次獲取並釋放一個許可,示例代碼以下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); //獲取一個許可
test(threadNum);
semaphore.release(); //釋放一個許可
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
每次獲取並釋放多個許可,示例代碼以下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(3); //獲取多個許可
test(threadNum);
semaphore.release(3); //釋放多個許可
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
假設有這樣一個場景,併發過高了,即便使用Semaphore進行控制,處理起來也比較棘手。假設系統當前容許的最高併發數是3,超過3後就須要丟棄,使用Semaphore也能實現這樣的場景,示例代碼以下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample {
private static final int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++){
final int threadNum = i;
exec.execute(() -> {
try {
//嘗試獲取一個許可,也能夠嘗試獲取多個許可,
//支持嘗試獲取許可超時設置,超時後再也不等待後續線程的執行
//具體能夠參見Semaphore的源碼
if (semaphore.tryAcquire()) {
test(threadNum);
semaphore.release(); //釋放一個許可
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
是一個同步輔助類,容許一組線程相互等待,直到到達某個公共的屏障點,經過它能夠完成多個線程之間相互等待,只有當每一個線程都準備就緒後,才能各自繼續往下執行後面的操做。
與CountDownLatch有類似的地方,都是使用計數器實現,當某個線程調用了CyclicBarrier的await()方法後,該線程就進入了等待狀態,並且計數器執行加1操做,當計數器的值達到了設置的初始值,調用await()方法進入等待狀態的線程會被喚醒,繼續執行各自後續的操做。CyclicBarrier在釋放等待線程後能夠重用,因此,CyclicBarrier又被稱爲循環屏障。
能夠用於多線程計算數據,最後合併計算結果的場景。
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可使用reset()方法進行重置,而且能夠循環使用
CountDownLatch主要實現1個或n個線程須要等待其餘線程完成某項操做以後,才能繼續往下執行,描述的是1個或n個線程等待其餘線程的關係。而CyclicBarrier主要實現了多個線程之間相互等待,直到全部的線程都知足了條件以後,才能繼續執行後續的操做,描述的是各個線程內部相互等待的關係。
CyclicBarrier可以處理更復雜的場景,若是計算髮生錯誤,能夠重置計數器讓線程從新執行一次。
CyclicBarrier中提供了不少有用的方法,好比:能夠經過getNumberWaiting()方法獲取阻塞的線程數量,經過isBroken()方法判斷阻塞的線程是否被中斷。
示例代碼以下。
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void race(int threadNum) throws Exception{
Thread.sleep(1000);
log.info("{} is ready", threadNum);
cyclicBarrier.await();
log.info("{} continue", threadNum);
}
}
設置等待超時示例代碼以下:
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executorService.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void race(int threadNum) throws Exception{
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try{
cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
}catch (BrokenBarrierException | TimeoutException e){
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
在聲明CyclicBarrier的時候,還能夠指定一個Runnable,當線程達到屏障的時候,能夠優先執行Runnable中的方法。
示例代碼以下:
package io.binghe.concurrency.example.aqs;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;@Slf4jpublic class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++){ final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception{ Thread.sleep(1000); log.info("{} is ready", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); }}